一、前言:
????????上一次详细分析Framework Audio相关代码还是Android5的时候(2015年),那时候还没有HIDL,AudioFlinger直接使用xxx.so调用pcm_xxx()函数读写声卡节点的。AudioTrack,AudioRecord作为(client端) 和 AudiFlinger(server端)交互读写pcm数据网上很多博客分析,本文就不打算从新造轮子了,主要分析AudioFlinger作为(client端) 和 android.hardware.audio.service(server端)交互读写pcm数据的流程。
前面我们分析了AudioFlinger的初始化流程。这篇来看看AudioFlinger是如何读取pcm数据的。
Android 12 AudioFlinger 分析(RK3588)_Windra6的博客-CSDN博客_android audio flinger分析
二、Android Audio音频总体框架:
三、AudioFlinger读取pcm数据流程:
uml流程图:
?
?这里使用到FMQ机制。最终是调用到了audio_hw.c文件的pcm_read函数
确定read函数调用地方,相关日志:
audio_hw.h:
struct stream_in {
struct audio_stream_in stream;
}
audio_hw.c:
static int adev_open_input_stream(struct audio_hw_device *dev,
audio_io_handle_t handle,
audio_devices_t devices,
struct audio_config *config,
struct audio_stream_in **stream_in,
audio_input_flags_t flags,
const char *address __unused,
audio_source_t source __unused)
{
in->stream.read = in_read;
ALOGE("windra_E debug audio hal %s in->stream = %p , in->stream.read = %p", __func__, &(in->stream), in->stream.read);
}
audio.h:
typedef struct audio_stream_in audio_stream_in_t;
StreamIn.cpp:
class ReadThread : public Thread {
public:
// ReadThread's lifespan never exceeds StreamIn's lifespan.
ReadThread(std::atomic<bool>* stop, audio_stream_in_t* stream, StreamIn::CommandMQ* commandMQ,
StreamIn::DataMQ* dataMQ, StreamIn::StatusMQ* statusMQ, EventFlag* efGroup)
: Thread(false /*canCallJava*/),
mStop(stop),
mStream(stream),
mCommandMQ(commandMQ),
mDataMQ(dataMQ),
mStatusMQ(statusMQ),
mEfGroup(efGroup),
mBuffer(nullptr) {
ALOGE("windra_E debug audio hal %s mStream = %p, mStream->read = %p", __func__, mStream, mStream->read);
}
private:
audio_stream_in_t* mStream;
}
打开APP录音:
04-24 01:43:24.599 478 587 E AudioHardwareTiny: windra_E debug audio hal adev_open_input_stream in->stream = 0xf1682fe0 , in->stream.read = 0xf0a93d49
04-24 01:43:24.617 478 587 E StreamInHAL: windra_E debug audio hal ReadThread mStream = 0xf1682fe0, mStream->read = 0xf0a93d49
从上面日志可以看到?mStream->read = 0xf0a93d49,in->stream.read = 0xf0a93d49。 函数指针地址是一样的。
四、AudioFlinger写pcm数据流程:
类似读流程。涉及到的文件都差不多。
如:StreamOut.cpp ?
ssize_t writeResult = mStream->write(mStream, &mBuffer[0], availToRead);
五、FMQ介绍:
官方文档: https://source.android.google.cn/devices/architecture/hidl/fmq?hl=zh-cn FMQ 在用户神经网络中的爆发执行的应用 https://source.android.google.cn/devices/neural-networks?hl=zh-cn 代码位置: system/libfmq /system/libhidl/base/ ?
1. 主FMQ 的创建和初始化:
unsync 类型的block的队列
MessageQueue<int32_t, kUnsynchronizedWrite>* mFmqUnsynchronized = new (std::nothrow) MessageQueue<int32_t, kUnsynchronizedWrite> (queue_size, isBlock);
对应的实现为:
template <typename T, MQFlavor flavor>
MessageQueue<T, flavor>::MessageQueue(size_t numElementsInQueue, bool configureEventFlagWord) {
? ? //判断队列最大值 是否超标
? ? //SIZE_MAX == LONG_MAX==FFFF FFFF FFFF FFFF
? ? //numElementsInQueue * sizeof(T) ?为队列所占的内存大小
? ? if (numElementsInQueue > SIZE_MAX / sizeof(T)) {
? ? ? ? return;
? ? }
? ? // fmq 队列大小
? ? size_t kQueueSizeBytes = numElementsInQueue * sizeof(T);
? ??
? ? // typedef uint64_t RingBufferPosition;
? ? //16 个字节的buf, 用来存放fmq的读指针和写指针
? ? //存放在 共享内存的前16个字节
? ? //内存模型见Descriptor 的初始化
? ? size_t kMetaDataSize = 2 * sizeof(android::hardware::RingBufferPosition);
?? ?//如果是阻塞队列腾出4个字节?
?? ?//TODO
? ? if (configureEventFlagWord) {
? ? ? ? kMetaDataSize+= sizeof(std::atomic<uint32_t>);
? ? }
? ? ??
? ? ? ?//计算申请的共享内存的队列长度?
? ? ? ?// alignToWordBoundary 方法:
? ? ? ? // ? ? kQueueSizeBytes 如果是8 的倍数,结果依然是 kQueueSizeBytes
? ? ? ? // ?kQueueSizeBytes 如果不是8 的倍数, 根据kQueueSizeBytes计算出一个最小的大于kQueueSizeBytes且为8的倍数的数
? ? ? ? // kMetaDataSize ?:4 个字节
? ? ? ? //PAGE_SIZE ?: 4个字节
? ? ? ? // ?PAGE_SIZE - 1) & ?~(PAGE_SIZE - 1) : 将 kAshmemSizePageAligned 调整为PAGE_SIZE 的整数, ?和alignToWordBoundary类似
? ? ? ?size_t kAshmemSizePageAligned =
? ? ? ? ? ? (Descriptor::alignToWordBoundary(kQueueSizeBytes) + kMetaDataSize + PAGE_SIZE - 1) &
? ? ? ? ? ? ~(PAGE_SIZE - 1);
? ? //申请一块共享内存,设置匿名共享内存的保护位
? ? int ashmemFd = ashmem_create_region("MessageQueue", kAshmemSizePageAligned);
? ? ashmem_set_prot_region(ashmemFd, PROT_READ | PROT_WRITE);
? ? ?//创建一个native_handle_t
?/*
? ?typedef struct native_handle
? ? {
? ? ?? ??? ?int version; ? ? ? ?// sizeof(native_handle_t)
? ? ?? ??? ?int numFds; ? ? ? ? // number of file-descriptors at &data[0]?
? ? ?? ??? ?int numInts; ? ? ? ?// number of ints at &data[numFds]?
? ? ?? ??? ?int data[0]; ? ? ? ?// numFds + numInts ints?
? ? ?} native_handle_t;
? ? ?*/
? ? ?native_handle_t* mqHandle =
? ? ? ? ? ? native_handle_create(1 /* numFds */, 0 /* numInts */);
? ? mqHandle->data[0] = ashmemFd; // share memery 的fd放入mqHandle
? ? //创建Descriptor
? ? // typedef MQDescriptor<T, flavor> Descriptor;
? ? // kQueueSizeBytes: ? fmq 队列大小
? ? //mqHandler: ?共享内存fd的封装
? ? //sizeof(T): 单个元素的大小
? ? //configureEventFlagWord: 关于阻塞的配置
? ? mDesc = std::unique_ptr<Descriptor>(new (std::nothrow) Descriptor(kQueueSizeBytes,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? mqHandle,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? sizeof(T),
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? configureEventFlagWord));
? ?//初始化memory
? ? initMemory(true);
}
Descriptor 的初始化
template<typename T, MQFlavor flavor>
MQDescriptor<T, flavor>::MQDescriptor(size_t bufferSize, native_handle_t *nHandle,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?size_t messageSize, bool configureEventFlag)
? ? : mHandle(nHandle), mQuantum(messageSize), mFlags(flavor) {
/*
enum GrantorType : int { READPTRPOS = 0, WRITEPTRPOS, DATAPTRPOS, EVFLAGWORDPOS };
static constexpr int32_t kMinGrantorCount = DATAPTRPOS + 1;
static constexpr int32_t kMinGrantorCountForEvFlagSupport = EVFLAGWORDPOS + 1;
*/
? //阻塞队列, mGrantors 长度为4
? //非阻塞队列, mGrantors 长度为 3
? //一个数组, ?用于存储fmq队列内的
? //0元素 : 读指针
? //1元素: ?写指针
? //2元素: ?fmq 队列内容
? //3 元素: 阻塞队列有, ?非阻塞队列没有,用于存储队列flag
? ? mGrantors.resize(configureEventFlag? kMinGrantorCountForEvFlagSupport : kMinGrantorCount);
? ? size_t memSize[] = {
? ? ? ? sizeof(RingBufferPosition), ? ? //memory to be allocated for read pointer counter, ? ? ? fmq的读指针大小 8个字节
? ? ? ? sizeof(RingBufferPosition), ? ?// memory to be allocated for write pointer counter, ? ? ?fmq的写指针大小 8个字节
? ? ? ? bufferSize, ? ? ? ? ? ? ? ? ?// memory to be allocated for data buffer, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?fmq的数据域
? ? ? ? sizeof(std::atomic<uint32_t>) ?// memory to be allocated for EventFlag word ? ? ? ? ? ? ? TODO
? ? };
//初始化mGrantors ?数组
? ? for (size_t grantorPos = 0, offset = 0;
? ? ? ? ?grantorPos < mGrantors.size();
? ? ? ? ?offset += memSize[grantorPos++]) {
? ? ? ? mGrantors[grantorPos] = {
? ? ? ? ? ? 0 /* grantor flags */,
? ? ? ? ? ? 0 /* fdIndex */,
? ? ? ? ? ? static_cast<uint32_t>(alignToWordBoundary(offset)),
? ? ? ? ? ? memSize[grantorPos]
? ? ? ? };
?/*
?struct GrantorDescriptor {
? ? uint32_t flags __attribute__ ((aligned(4)));
? ? uint32_t fdIndex __attribute__ ((aligned(4)));
? ? uint32_t offset __attribute__ ((aligned(4)));
? ? uint64_t extent __attribute__ ((aligned(8)));
};
*/
? ? }
}
static inline size_t alignToWordBoundary(size_t length) {
? ? constexpr size_t kAlignmentSize = 64;
? ? if (length > SIZE_MAX - kAlignmentSize/8 + 1) {
? ? ? ? details::logAlwaysFatal("Queue size too large");
? ? }
? ? return (length + kAlignmentSize/8 - 1) & ~(kAlignmentSize/8 - 1U);
}
native_handle_t* native_handle_create(int numFds, int numInts) {
? ? size_t mallocSize = sizeof(native_handle_t) + (sizeof(int) * (numFds + numInts));
? ? native_handle_t* h = static_cast<native_handle_t*>(malloc(mallocSize));
? ? if (h) {
? ? ? ? h->version = sizeof(native_handle_t);
? ? ? ? h->numFds = numFds;
? ? ? ? h->numInts = numInts;
? ? }
? ? return h;
}
template <typename T, MQFlavor flavor>
void MessageQueue<T, flavor>::initMemory(bool resetPointers) {
?? ?//判断队列类型,同步队列还是非同步队列
? ? if (flavor == kSynchronizedReadWrite) {
?? ??? ?//同步队列
?? ??? ?//通过mapGrantorDescr 方法将, 将共享内存中的前8个字节, 映射到本进程的读指针上
? ? ? ? mReadPtr = reinterpret_cast<std::atomic<uint64_t>*>(mapGrantorDescr(Descriptor::READPTRPOS));
? ? } else {
?? ??? ?//非同步队列
? ? ? ? mReadPtr = new (std::nothrow) std::atomic<uint64_t>;
? ? }
?? ?//通过mapGrantorDescr 方法将, 将共享内存中的第8~16个字节, 映射到本进程的写指针
? ? mWritePtr = reinterpret_cast<std::atomic<uint64_t>*>(mapGrantorDescr(Descriptor::WRITEPTRPOS));
? ? if (resetPointers) {
?? ??? ?//初始化读写指针.
? ? ? ? mReadPtr->store(0, std::memory_order_release);
? ? ? ? mWritePtr->store(0, std::memory_order_release);
? ? } else if (flavor != kSynchronizedReadWrite) {
? ? ? ? // Always reset the read pointer.
? ? ? ? mReadPtr->store(0, std::memory_order_release);
? ? }
? ? //通过mapGrantorDescr 方法将, 将共享内存中的数据域, 映射到本进程的队列中
? ? mRing = reinterpret_cast<uint8_t*>(mapGrantorDescr(Descriptor::DATAPTRPOS));
? ? details::check(mRing != nullptr);
? ? // 映射出共享内存的 阻塞的flag
? ? mEvFlagWord = static_cast<std::atomic<uint32_t>*>(mapGrantorDescr(Descriptor::EVFLAGWORDPOS));
? ? if (mEvFlagWord != nullptr) {
?? ??? ?//如果是阻塞队列, 表示存在flag, 将mEventFlag 初始化指向mEvFlagWord
? ? ? ? android::hardware::EventFlag::createEventFlag(mEvFlagWord, &mEventFlag);
? ? }
}
template <typename T, MQFlavor flavor>
void* MessageQueue<T, flavor>::mapGrantorDescr(uint32_t grantorIdx) {
? ? const native_handle_t* handle = mDesc->handle();
? ? //获取mDesc中各个指针的配置
? ? auto grantors = mDesc->grantors();
?? ?
? ? int fdIndex = grantors[grantorIdx].fdIndex;
? ? //指针在共享内存中偏移量
? ? int mapOffset = (grantors[grantorIdx].offset / PAGE_SIZE) * PAGE_SIZE;
? ? //指针在共享内存中的长度
? ? int mapLength = grantors[grantorIdx].offset - mapOffset + grantors[grantorIdx].extent;
? ? //将共享内存 指针的内存, 映射到本地指针中
? ? void* address = mmap(0, mapLength, PROT_READ | PROT_WRITE, MAP_SHARED,handle->data[fdIndex], mapOffset);
? ??
? ? //返回指针
? ? return (address == MAP_FAILED)
? ? ? ? ? ? ? nullptr
? ? ? ? ? ? : reinterpret_cast<uint8_t*>(address) +
? ? ? ? ? ? (grantors[grantorIdx].offset - mapOffset);
}
status_t EventFlag::createEventFlag(std::atomic<uint32_t>* fwAddr, EventFlag** flag) {
? ? if (flag == nullptr) {
? ? ? ? return BAD_VALUE;
? ? }
? ? status_t status = NO_MEMORY;
? ? *flag ?= nullptr;
? ? EventFlag* evFlag = new (std::nothrow) EventFlag(fwAddr, &status);
? ? if (evFlag != nullptr) {
? ? ? ? if (status == NO_ERROR) {
? ? ? ? ? ? *flag = evFlag;
? ? ? ? } else {
? ? ? ? ? ? delete evFlag;
? ? ? ? }
? ? }
? ? return status;
}
EventFlag::EventFlag(std::atomic<uint32_t>* fwAddr, status_t* status) {
? ? *status = NO_ERROR;
? ? if (fwAddr == nullptr) {
? ? ? ? *status = BAD_VALUE;
? ? } else {
? ? ? ? mEfWordPtr = fwAddr;
? ? }
}
2. 附属FMQ 的创建和初始化:
Return<void> VehicleHalManager::getQueue(getQueue_cb _hidl_cb) {
? ? ? _hidl_cb(*mFmqUnsynchronized->getDesc());
? ? ?return Void();
}
拿到服务端的desc之后创建自己的fmq队列
template<typename T> using MQDescriptorUnsync = MQDescriptor<T, kUnsynchronizedWrite>;
typedef MessageQueue<uint16_t, kUnsynchronizedWrite> MessageQueueUnsync;?
?pVnet->getQueue([&fmq](const MQDescriptorUnsync<int32_t>& in){
? ? ?fmq = new (std::nothrow) MessageQueueUnsync(in);
?});
?MessageQueue(const Descriptor& Desc, bool resetPointers = true);
MessageQueue<T, flavor>::MessageQueue(const Descriptor& Desc, bool resetPointers) {
? ? mDesc = std::unique_ptr<Descriptor>(new (std::nothrow) Descriptor(Desc));
? ? initMemory(resetPointers);
}
initMemory的流程类似, 之后 双发进程,就共同存在,同一个mGrantor, vector 两个进程的读指针,写指针, 数据域, flag域也指向了共享内存的同一处3. FMQ 数据的写入 ?
? bool writeBlocking(const T* data, size_t count, uint32_t readNotification,
? ? ? ? ? ? ? ? ? ? ? ?uint32_t writeNotification, int64_t timeOutNanos = 0,
? ? ? ? ? ? ? ? ? ? ? ?android::hardware::EventFlag* evFlag = nullptr);
template <typename T, MQFlavor flavor>
bool MessageQueue<T, flavor>::writeBlocking(const T* data,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? size_t count,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? uint32_t readNotification,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? uint32_t writeNotification,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? int64_t timeOutNanos,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? android::hardware::EventFlag* evFlag) {
?? ?//向FMQ写入数据
? ? bool result = write(data, count);
? ? if (result) {
? ? ? ?//写入成功
? ? ? ? if (writeNotification) {
? ? ? ? ? ? //如果是阻塞方是, wake唤醒读指针
? ? ? ? ? ? evFlag->wake(writeNotification);
? ? ? ? }
? ? ? ? return result;
? ? }
? ? //空间不够,不满足写入条件时
? ? bool shouldTimeOut = timeOutNanos != 0;
? ? int64_t prevTimeNanos = shouldTimeOut ? android::elapsedRealtimeNano() : 0;
? ? while (true) {
? ? ? ? //write 超时的逻辑
? ? ? ? if (shouldTimeOut) {
? ? ? ? ? ? int64_t currentTimeNs = android::elapsedRealtimeNano();
? ? ? ? ? ? timeOutNanos -= currentTimeNs - prevTimeNanos;
? ? ? ? ? ? prevTimeNanos = currentTimeNs;
? ? ? ? ? ? if (timeOutNanos <= 0) {
? ? ? ? ? ? ? ? result = write(data, count);
? ? ? ? ? ? ? ? break;
? ? ? ? ? ? }
? ? ? ? }
? ? ? ? uint32_t efState = 0;
? ? ? ? //等待read 唤醒.
? ? ? ? status_t status = evFlag->wait(readNotification,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?&efState,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?timeOutNanos,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?true /* retry on spurious wake */);
? ? ? ? if (status != android::TIMED_OUT && status != android::NO_ERROR) {
? ? ? ? ? ? details::logError("Unexpected error code from EventFlag Wait status " + std::to_string(status));
? ? ? ? ? ? break;
? ? ? ? }
? ? ? ? if (status == android::TIMED_OUT) {
? ? ? ? ? ? break;
? ? ? ? }
? ? ? ? if ((efState & readNotification) && write(data, count)) {
? ? ? ? ? ? result = true;
? ? ? ? ? ? break;
? ? ? ? }
? ? }
? ? if (result && writeNotification != 0) {
? ? ? ? evFlag->wake(writeNotification);
? ? }
? ? return result;
}
//向FMQ写入数据
template <typename T, MQFlavor flavor>
bool MessageQueue<T, flavor>::write(const T* data, size_t nMessages) {
? ? MemTransaction tx;
? ? return beginWrite(nMessages, &tx) &&
? ? ? ? ? ? tx.copyTo(data, 0 /* startIdx */, nMessages) &&
? ? ? ? ? ? commitWrite(nMessages);
}
template <typename T, MQFlavor flavor>
bool MessageQueue<T, flavor>::beginWrite(size_t nMessages, MemTransaction* result) const {
?? ?//表示当前操作都为原子操作
? ? auto writePtr = mWritePtr->load(std::memory_order_relaxed);
? ? //writeOffset: fmq 数据域使用的多少
? ? size_t writeOffset = writePtr % mDesc->getSize();
?? ?//getSize: fmq 数据域长度 - 使用长度/ sizeof(T) = 还可以存储的元素
? ? size_t contiguousMessages = (mDesc->getSize() - writeOffset) / sizeof(T);
? ? if (contiguousMessages < nMessages) {
?? ? ? ? //可以存储的元素 < 要存储的元素
? ? ? ? *result = MemTransaction(MemRegion(reinterpret_cast<T*>(mRing + writeOffset), contiguousMessages),
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?MemRegion(reinterpret_cast<T*>(mRing),
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?nMessages - contiguousMessages));
? ? } else {
?? ? ? ? //可以存储的元素 > 要存储的元素
?? ? ? ? //全部存储
?? ? ? ? //MemTransaction 中 的?
? ? ? ? *result = MemTransaction(MemRegion(reinterpret_cast<T*>(mRing + writeOffset), nMessages),
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?MemRegion());
? ? }
? ? return true;
}
将数据copy到FMQ 队列中
template <typename T, MQFlavor flavor>
bool MessageQueue<T, flavor>::MemTransaction::copyTo(const T* data,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?size_t startIdx,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?size_t nMessages) { ??
? ? size_t firstWriteCount = 0, secondWriteCount = 0;
? ? T * firstBaseAddress = nullptr, * secondBaseAddress = nullptr;
? ? //计算出两个指针
? ? //firstBaseAddress
? ? //secondBaseAddress
? ? if (getMemRegionInfo(startIdx,
? ? ? ? ? ? ? ? ? ? ? ? ?nMessages,
? ? ? ? ? ? ? ? ? ? ? ? ?firstWriteCount,
? ? ? ? ? ? ? ? ? ? ? ? ?secondWriteCount,
? ? ? ? ? ? ? ? ? ? ? ? ?&firstBaseAddress,
? ? ? ? ? ? ? ? ? ? ? ? ?&secondBaseAddress) == false) {
? ? ? ? return false;
? ? }
? ? //copy到fmq
? ? if (firstWriteCount != 0) {
? ? ? ? memcpy(firstBaseAddress, data, firstWriteCount * sizeof(T));
? ? }
? ?//copy到fmq
? ? if (secondWriteCount != 0) {
? ? ? ? memcpy(secondBaseAddress, ?data + firstWriteCount, ?secondWriteCount * sizeof(T));
? ? }
? ? return true;
}
//TODO
template <typename T, MQFlavor flavor>
bool MessageQueue<T, flavor>::MemTransaction::getMemRegionInfo(size_t startIdx,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?size_t nMessages,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?size_t& firstCount,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?size_t& secondCount,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?T** firstBaseAddress,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?T** secondBaseAddress) {
? ? ?//当前fmq写入多少数据
? ? size_t firstRegionLength = first.getLength();
? ? size_t secondRegionLength = second.getLength();
? ? firstCount = startIdx < firstRegionLength ? ?std::min(nMessages, firstRegionLength - startIdx) : 0;
? ? secondCount = nMessages - firstCount;
? ? if (firstCount != 0) {
? ? ? ? *firstBaseAddress = first.getAddress() + startIdx;
? ? }
? ? if (secondCount != 0) {
? ? ? ? size_t secondStartIdx = startIdx > firstRegionLength ? startIdx - firstRegionLength : 0;
? ? ? ? *secondBaseAddress = second.getAddress() + secondStartIdx;
? ? }
? ? return true;
}
计算写指针位置
template <typename T, MQFlavor flavor>
__attribute__((no_sanitize("integer")))
bool MessageQueue<T, flavor>::commitWrite(size_t nMessages) {
? ? size_t nBytesWritten = nMessages * sizeof(T);
? ? auto writePtr = mWritePtr->load(std::memory_order_relaxed);
? ? //写指针 = 写指针+ 当前写入的数据大小
? ? writePtr += nBytesWritten;
? ? mWritePtr->store(writePtr, std::memory_order_release);
? ? return true;
}
? ? struct MemRegion {
? ? ? ? MemRegion() : MemRegion(nullptr, 0) {}
? ? ? ? MemRegion(T* base, size_t size) : address(base), length(size) {}
? ? ? ? MemRegion& operator=(const MemRegion &other) {
? ? ? ? ? ? address = other.address;
? ? ? ? ? ? length = other.length;
? ? ? ? ? ? return *this;
? ? ? ? }
? ? ? ? inline T* getAddress() const { return address; }
? ? ? ? inline size_t getLength() const { return length; }
? ? ? ? inline size_t getLengthInBytes() const { return length * sizeof(T); }
? ? private:
? ? ? ? T* address;
? ? ? ? size_t length;
? ? };
struct MemTransaction {
? ? ? ?MemTransaction() : MemTransaction(MemRegion(), MemRegion()) {}
? ? ? ?MemTransaction(const MemRegion& regionFirst, const MemRegion& regionSecond) :
? ? ? ? ? ?first(regionFirst), second(regionSecond) {}
? ? ? ?MemTransaction& operator=(const MemTransaction &other) {
? ? ? ? ? ?first = other.first;
? ? ? ? ? ?second = other.second;
? ? ? ? ? ?return *this;
? ? ? ?}
? ? ? ?T* getSlot(size_t idx);
? ? ? ?bool copyTo(const T* data, size_t startIdx, size_t nMessages = 1);
? ? ? ?bool copyFrom(T* data, size_t startIdx, size_t nMessages = 1);
? ? ? ?inline const MemRegion& getFirstRegion() const { return first; }
? ? ? ?inline const MemRegion& getSecondRegion() const { return second; }
? ?private:
? ? ? ?bool inline getMemRegionInfo(size_t idx,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? size_t nMessages,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? size_t& firstCount,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? size_t& secondCount,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? T** firstBaseAddress,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? T** secondBaseAddress);
? ? ? ?MemRegion first;
? ? ? ?MemRegion second;
? ?};
4.FMQ读数据
bool readBlocking(T* data, size_t count, uint32_t readNotification,
? ? ? ? ? ? ? ? ? uint32_t writeNotification, int64_t timeOutNanos = 0,
? ? ? ? ? ? ? ? ? android::hardware::EventFlag* evFlag = nullptr);
bool readBlocking(T* data, size_t count, int64_t timeOutNanos = 0);
template <typename T, MQFlavor flavor>
bool MessageQueue<T, flavor>::readBlocking(T* data, size_t count, int64_t timeOutNanos) {
? ? return readBlocking(data, count, FMQ_NOT_FULL, FMQ_NOT_EMPTY, timeOutNanos);
}
template <typename T, MQFlavor flavor>
bool MessageQueue<T, flavor>::readBlocking(T* data,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?size_t count,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?uint32_t readNotification,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?uint32_t writeNotification,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?int64_t timeOutNanos,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?android::hardware::EventFlag* evFlag) {
? ? //读取fmq中数据
? ? bool result = read(data, count);
? ? //读取成功, notify出去
? ? if (result) {
? ? ? ? if (readNotification) {
? ? ? ? ? ? evFlag->wake(readNotification);
? ? ? ? }
? ? ? ? return result;
? ? }
? ? bool shouldTimeOut = timeOutNanos != 0;
? ? int64_t prevTimeNanos = shouldTimeOut ? android::elapsedRealtimeNano() : 0;
? ? while (true) {
? ? ? ? if (shouldTimeOut) {
? ? ? ? ? ?//读取超时的逻辑
? ? ? ? ? ? ?int64_t currentTimeNs = android::elapsedRealtimeNano();
? ? ? ? ? ? timeOutNanos -= currentTimeNs - prevTimeNanos;
? ? ? ? ? ? prevTimeNanos = currentTimeNs;
? ? ? ? ? ? if (timeOutNanos <= 0) {
? ? ? ? ? ? ? ? result = read(data, count);
? ? ? ? ? ? ? ? break;
? ? ? ? ? ? }
? ? ? ? }
? ? ? ? uint32_t efState = 0;
? ? ? ? //没有数据或数据不满足时, wait 等待写入线程唤醒.?? ??? ??? ??? ??? ??? ??? ??? ??? ??? ??? ??? ??? ??? ??? ??? ??? ??? ??? ??? ??? ??? ??? ??? ??? ??? ?
? ? ? ? status_t status = evFlag->wait(writeNotification,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?&efState,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?timeOutNanos,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?true /* retry on spurious wake */);
? ? ? ? if (status != android::TIMED_OUT && status != android::NO_ERROR) {
? ? ? ? ? ? details::logError("Unexpected error code from EventFlag Wait status " + std::to_string(status));
? ? ? ? ? ? break;
? ? ? ? }
? ? ? ? if (status == android::TIMED_OUT) {
? ? ? ? ? ? break;
? ? ? ? }
? ? ? ? if ((efState & writeNotification) && read(data, count)) {
? ? ? ? ? ? result = true;
? ? ? ? ? ? break;
? ? ? ? }
? ? }
? ? if (result && readNotification != 0) {
? ? ? ? evFlag->wake(readNotification);
? ? }
? ? return result;
}
template <typename T, MQFlavor flavor>
bool MessageQueue<T, flavor>::read(T* data, size_t nMessages) {
? ? MemTransaction tx;
? ? return beginRead(nMessages, &tx) &&
? ? ? ? ? ? tx.copyFrom(data, 0 /* startIdx */, nMessages) &&
? ? ? ? ? ? commitRead(nMessages);
}
template <typename T, MQFlavor flavor>
__attribute__((no_sanitize("integer")))
bool MessageQueue<T, flavor>::beginRead(size_t nMessages, MemTransaction* result) const {
? ? *result = MemTransaction();
? ? auto writePtr = mWritePtr->load(std::memory_order_acquire);
? ? auto readPtr = mReadPtr->load(std::memory_order_relaxed);
? ? if (writePtr - readPtr > mDesc->getSize()) {
? ? ? ? mReadPtr->store(writePtr, std::memory_order_release);
? ? ? ? return false;
? ? }
? //要读取多少字节
? ? size_t nBytesDesired = nMessages * sizeof(T);
? ?//写指针-读指针 < 要读取的 数据
? ? //当前数据较少,满足不了获取的数据量
? ? if (writePtr - readPtr < nBytesDesired) {
? ? ? ? return false;
? ? }
? ? //读取了多少数据
? ? size_t readOffset = readPtr % mDesc->getSize();
? ? //fmq 剩余多少 数据单元的空间
? ? size_t contiguousMessages = (mDesc->getSize() - readOffset) / sizeof(T);
? ? if (contiguousMessages < nMessages) {
? ? ? ? //剩余的数据单元空间,小于nMessages,表示读取的数据过大
? ? ? ? *result = MemTransaction(MemRegion(reinterpret_cast<T*>(mRing + readOffset),
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?contiguousMessages),
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?MemRegion(reinterpret_cast<T*>(mRing),
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?nMessages - contiguousMessages));
? ? } else {
? ? ? ? *result = MemTransaction(MemRegion(reinterpret_cast<T*>(mRing + readOffset), nMessages),
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?MemRegion());
? ? }
? ? return true;
}
//读取数据,将数据从fmq中copy出来
template <typename T, MQFlavor flavor>
bool MessageQueue<T, flavor>::MemTransaction::copyFrom(T* data, size_t startIdx, size_t nMessages) {
? ? if (data == nullptr) {
? ? ? ? return false;
? ? }
? ? size_t firstReadCount = 0, secondReadCount = 0;
? ? T* firstBaseAddress = nullptr, * secondBaseAddress = nullptr;
? ? if (getMemRegionInfo(startIdx,
? ? ? ? ? ? ? ? ? ? ? ? ?nMessages,
? ? ? ? ? ? ? ? ? ? ? ? ?firstReadCount,
? ? ? ? ? ? ? ? ? ? ? ? ?secondReadCount,
? ? ? ? ? ? ? ? ? ? ? ? ?&firstBaseAddress,
? ? ? ? ? ? ? ? ? ? ? ? ?&secondBaseAddress) == false) {
? ? ? ? return false;
? ? }
? ? if (firstReadCount != 0) {
? ? ? ? memcpy(data, firstBaseAddress, firstReadCount * sizeof(T));
? ? }
? ? if (secondReadCount != 0) {
? ? ? ? memcpy(data + firstReadCount,
? ? ? ? ? ? ? ?secondBaseAddress,
? ? ? ? ? ? ? ?secondReadCount * sizeof(T));
? ? }
? ? return true;
}
5. event flag 通知机制 读写事件的同步主要用到了 futex机制: Linux进程同步机制-Futex https://www.linuxidc.com/Linux/2011-02/32229.htm https://www.cnblogs.com/feng9exe/p/7055710.html
wait的流程
status_t EventFlag::wait(uint32_t bitmask,
? ? ? ? ? ? ? ? ? ? ? ? ?uint32_t* efState,
? ? ? ? ? ? ? ? ? ? ? ? ?int64_t timeoutNanoSeconds,
? ? ? ? ? ? ? ? ? ? ? ? ?bool retry) {
? ? bool shouldTimeOut = timeoutNanoSeconds != 0;
? ? int64_t prevTimeNs = shouldTimeOut ? android::elapsedRealtimeNano() : 0;
? ? status_t status;
? ? while (true) {
? ? ? ? //是否添加超时机制
? ? ? ? if (shouldTimeOut) {
? ? ? ? ? ? int64_t currentTimeNs = android::elapsedRealtimeNano();
? ? ? ? ? ? timeoutNanoSeconds -= currentTimeNs - prevTimeNs;
? ? ? ? ? ? prevTimeNs = currentTimeNs;
? ? ? ? ? ? if (timeoutNanoSeconds <= 0) {
? ? ? ? ? ? ? ? status = -ETIMEDOUT;
? ? ? ? ? ? ? ? *efState = 0;
? ? ? ? ? ? ? ? break;
? ? ? ? ? ? }
? ? ? ? }
? ? ? ? status = waitHelper(bitmask, efState, timeoutNanoSeconds);
? ? ? ? if ((status != -EAGAIN) && (status != -EINTR)) {
? ? ? ? ? ? break;
? ? ? ? }
? ? }
status_t EventFlag::waitHelper(uint32_t bitmask, uint32_t* efState, int64_t timeoutNanoSeconds) {
? ? status_t status = NO_ERROR;
? ? uint32_t old = std::atomic_fetch_and(mEfWordPtr, ~bitmask);
? ? uint32_t setBits = old & bitmask;
? ? uint32_t efWord = old & ~bitmask;
? ? int ret = 0;
? ? //挂起在 mEfWordPtr 指针等待被唤醒.
? ? if (timeoutNanoSeconds) {
? ? ? ? //有超时机制
? ? ? ? struct timespec waitTimeAbsolute;
? ? ? ? addNanosecondsToCurrentTime(timeoutNanoSeconds, &waitTimeAbsolute);
? ? ? ? ret = syscall(__NR_futex, mEfWordPtr, FUTEX_WAIT_BITSET,
? ? ? ? ? ? ? ? ? ? ? efWord, &waitTimeAbsolute, NULL, bitmask);
? ? } else {
? ? ? ? ?//无超时机制
? ? ? ? ret = syscall(__NR_futex, mEfWordPtr, FUTEX_WAIT_BITSET, efWord, NULL, NULL, bitmask);
? ? }
? ? return status;
}
wake流程
status_t EventFlag::wake(uint32_t bitmask) {
? ? status_t status = NO_ERROR;
? ? uint32_t old = std::atomic_fetch_or(mEfWordPtr, bitmask);
? ? //唤醒 挂在 mEfWordPtr 地址的 所有进程.
? ? if ((~old & bitmask) != 0) {
? ? ? ? int ret = syscall(__NR_futex, mEfWordPtr, FUTEX_WAKE_BITSET,
? ? ? ? ? ? ? ? ? ? ? ? ? INT_MAX, NULL, NULL, bitmask);
? ? }
? ? return status;
}
|