目录
一 关键类
二 推流缓冲
webrtc拉流篇,可参考
https://mp.csdn.net/mp_blog/creation/editor/122743325
一 关键类
环形缓冲,聚合了_RingStorage template<typename T> class RingBuffer : public enable_shared_from_this<RingBuffer<T> > { public: ? ? typedef std::shared_ptr<RingBuffer> Ptr; ? ? typedef _RingReader<T> RingReader; ? ? typedef _RingStorage<T> RingStorage; ? ? typedef _RingReaderDispatcher<T> RingReaderDispatcher; ? ? typedef function<void(int size)> onReaderChanged; ? ? RingBuffer(int max_size = 1024, const onReaderChanged &cb = nullptr) { ? ? ? ? _on_reader_changed = cb; ? ? ? ? _storage = std::make_shared<RingStorage>(max_size); ? ? } ? ? ~RingBuffer() {} ? ? void write(T in, bool is_key = true) { ? ? ? ? if (_delegate) { ? ? ? ? ? ? _delegate->onWrite(std::move(in), is_key); ? ? ? ? ? ? return; ? ? ? ? } ? ? ? ? LOCK_GUARD(_mtx_map); ? ? ? ? for (auto &pr : _dispatcher_map) { ? ? ? ? ? ? auto &second = pr.second; ? ? ? ? ? ? //切换线程后触发onRead事件 ? ? ? ? ? ? pr.first->async([second, in, is_key]() { ? ? ? ? ? ? ? ? second->write(std::move(const_cast<T &>(in)), is_key); ? ? ? ? ? ? }, false); ? ? ? ? } ? ? ? ? _storage->write(std::move(in), is_key); ? ? } ? ? int readerCount() { ? ? ? ? return _total_count; ? ? } ? ? void clearCache(){ ? ? ? ? LOCK_GUARD(_mtx_map); ? ? ? ? _storage->clearCache(); ? ? ? ? for (auto &pr : _dispatcher_map) { ? ? ? ? ? ? auto &second = pr.second; ? ? ? ? ? ? //切换线程后清空缓存 ? ? ? ? ? ? pr.first->async([second]() { ? ? ? ? ? ? ? ? second->clearCache(); ? ? ? ? ? ? }, false); ? ? ? ? } ? ? } private: ? ? struct HashOfPtr { ? ? ? ? std::size_t operator()(const EventPoller::Ptr &key) const { ? ? ? ? ? ? return (std::size_t) key.get(); ? ? ? ? } ? ? }; private: ? ? mutex _mtx_map; ? ? atomic_int _total_count {0}; ? ? typename RingStorage::Ptr _storage; ? ? typename RingDelegate<T>::Ptr _delegate; ? ? onReaderChanged _on_reader_changed; ? ? unordered_map<EventPoller::Ptr, typename RingReaderDispatcher::Ptr, HashOfPtr> _dispatcher_map; }; |
存放ffmpeg rtmp推流,demuxer,解码,重新编码后的rtppacket数据 template<typename T> class _RingStorage { public: ? ? typedef std::shared_ptr<_RingStorage> Ptr; ? ? _RingStorage(int max_size) { ? ? ? ? //gop缓存个数不能小于32 ? ? ? ? if(max_size < RING_MIN_SIZE){ ? ? ? ? ? ? max_size = RING_MIN_SIZE; ? ? ? ? } ? ? ? ? _max_size = max_size; ? ? } ? ? ~_RingStorage() {} ? ? /** ? ? ?* 写入环形缓存数据 ? ? ?* @param in 数据 ? ? ?* @param is_key 是否为关键帧 ? ? ?* @return 是否触发重置环形缓存大小 ? ? ?*/ ? ? ?void write(T in, bool is_key = true) { ? ? ? ? if (is_key) { ? ? ? ? ? ? //遇到I帧,那么移除老数据 ? ? ? ? ? ? _size = 0; ? ? ? ? ? ? _have_idr = true; ? ? ? ? ? ? _data_cache.clear(); ? ? ? ? } ? ? ? ? if (!_have_idr) { ? ? ? ? ? ? //缓存中没有关键帧,那么gop缓存无效 ? ? ? ? ? ? return; ? ? ? ? } ? ? ? ? _data_cache.emplace_back(std::make_pair(is_key, std::move(in))); ? ? ? ? if (++_size > _max_size) { ? ? ? ? ? ? //GOP缓存溢出,清空关老数据 ? ? ? ? ? ? _size = 0; ? ? ? ? ? ? _have_idr = false; ? ? ? ? ? ? _data_cache.clear(); ? ? ? ? } ? ? } ? ? Ptr clone() const { ? ? ? ? Ptr ret(new _RingStorage()); ? ? ? ? ret->_size = _size; ? ? ? ? ret->_have_idr = _have_idr; ? ? ? ? ret->_max_size = _max_size; ? ? ? ? ret->_data_cache = _data_cache; ? ? ? ? return ret; ? ? } ? ? const List<pair<bool, T> > &getCache() const { ? ? ? ? return _data_cache; ? ? } ? ? void clearCache(){ ? ? ? ? _size = 0; ? ? ? ? _data_cache.clear(); ? ? } private: ? ? _RingStorage() = default; private: ? ? bool _have_idr = false; ? ? int _size = 0; ? ? int _max_size; ? ? List<pair<bool, T> > _data_cache; }; |
webrtc从此处reader template<typename T> class _RingReader { public: ? ? typedef std::shared_ptr<_RingReader> Ptr; ? ? friend class _RingReaderDispatcher<T>; ? ? _RingReader(const std::shared_ptr<_RingStorage<T> > &storage, bool use_cache) { ? ? ? ? _storage = storage; ? ? ? ? _use_cache = use_cache; ? ? } ? ? ~_RingReader() {} ? ? void setReadCB(const function<void(const T &)> &cb) { ? ? ? ? if (!cb) { ? ? ? ? ? ? _read_cb = [](const T &) {}; ? ? ? ? } else { ? ? ? ? ? ? _read_cb = cb; ? ? ? ? ? ? flushGop(); ? ? ? ? } ? ? } ? ? void setDetachCB(const function<void()> &cb) { ? ? ? ? if (!cb) { ? ? ? ? ? ? _detach_cb = []() {}; ? ? ? ? } else { ? ? ? ? ? ? _detach_cb = cb; ? ? ? ? } ? ? } private: ? ? void onRead(const T &data, bool is_key) { ? ? ? ? _read_cb(data); ? ? } ? ? void onDetach() const { ? ? ? ? _detach_cb(); ? ? } ? ? void flushGop() { ? ? ? ? if (!_use_cache) { ? ? ? ? ? ? return; ? ? ? ? } ? ? ? ? _storage->getCache().for_each([&](const pair<bool, T> &pr) { ? ? ? ? ? ? onRead(pr.second, pr.first); ? ? ? ? }); ? ? } private: ? ? bool _use_cache; ? ? shared_ptr<_RingStorage<T> > _storage; ? ? function<void(void)> _detach_cb = []() {}; ? ? function<void(const T &)> _read_cb = [](const T &) {}; }; |
二 推流缓冲
void EventPoller::runLoop(bool blocked,bool regist_self)
bool Socket::attachEvent(const SockFD::Ptr &sock, bool is_udp) /lambda表达式
ssize_t Socket::onRead(const SockFD::Ptr &sock, bool is_udp) noexcept
void TcpServer::onAcceptConnection(const Socket::Ptr &sock) /lambda表达式
void RtmpSession::onRecv(const Buffer::Ptr &buf)
void RtmpProtocol::onParseRtmp(const char *data, size_t size)
void HttpRequestSplitter::input(const char *data,size_t len)
const char *RtmpProtocol::onSearchPacketTail(const char *data,size_t len)
const char* RtmpProtocol::handle_C2(const char *data, size_t len)
const char* RtmpProtocol::handle_rtmp(const char *data, size_t len)
void RtmpProtocol::handle_chunk(RtmpPacket::Ptr packet)
void RtmpSession::onRtmpChunk(RtmpPacket::Ptr packet)?
void onWrite(RtmpPacket::Ptr pkt, bool = true) override
void RtmpDemuxer::inputRtmp(const RtmpPacket::Ptr &pkt)?
void H264RtmpDecoder::inputRtmp(const RtmpPacket::Ptr &pkt)?
inline void H264RtmpDecoder::onGetH264(const char* data, size_t len, uint32_t dts, uint32_t pts)?
bool FrameDispatcher ::?inputFrame(const Frame::Ptr &frame) override
bool H264Track::inputFrame(const Frame::Ptr &frame)?
bool H264Track::inputFrame_l(const Frame::Ptr &frame)
?bool FrameWriterInterfaceHelper::inputFrame(const Frame::Ptr &frame) override
bool MediaSink::addTrack(const Track::Ptr &track_in)/lambda表达式
bool MultiMediaSourceMuxer::onTrackFrame(const Frame::Ptr &frame_in)?
bool RtspMediaSourceMuxer::inputFrame(const Frame::Ptr &frame) override
bool RtspMuxer::inputFrame(const Frame::Ptr &frame)?
bool H264RtpEncoder::inputFrame(const Frame::Ptr &frame)?
bool H264RtpEncoder::inputFrame_l(const Frame::Ptr &frame, bool is_mark)
void H264RtpEncoder::packRtp(const char *ptr, size_t len, uint32_t pts, bool is_mark, bool gop_pos)
void H264RtpEncoder::packRtpFu(const char *ptr, size_t len, uint32_t pts, bool is_mark, bool gop_pos)
bool RtpRing::inputRtp(const RtpPacket::Ptr &rtp, bool key_pos)
void RingBuffer::write(T in, bool is_key = true)
ringbuffer超过maxsize 512之后,推流数据未被拉流的话,会清空。推流和拉流过程是独立进行,比如只ffmepg推流,webrtc不拉流,播放放。
|