IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 数据结构与算法 -> ZLMediaKit接收ffmpeg rtmp推流 -> 正文阅读

[数据结构与算法]ZLMediaKit接收ffmpeg rtmp推流

目录

一 关键类

二 推流缓冲


webrtc拉流篇,可参考

https://mp.csdn.net/mp_blog/creation/editor/122743325

一 关键类

环形缓冲,聚合了_RingStorage

template<typename T>

class RingBuffer : public enable_shared_from_this<RingBuffer<T> > {

public:

? ? typedef std::shared_ptr<RingBuffer> Ptr;

? ? typedef _RingReader<T> RingReader;

? ? typedef _RingStorage<T> RingStorage;

? ? typedef _RingReaderDispatcher<T> RingReaderDispatcher;

? ? typedef function<void(int size)> onReaderChanged;

? ? RingBuffer(int max_size = 1024, const onReaderChanged &cb = nullptr) {

? ? ? ? _on_reader_changed = cb;

? ? ? ? _storage = std::make_shared<RingStorage>(max_size);

? ? }

? ? ~RingBuffer() {}

? ? void write(T in, bool is_key = true) {

? ? ? ? if (_delegate) {

? ? ? ? ? ? _delegate->onWrite(std::move(in), is_key);

? ? ? ? ? ? return;

? ? ? ? }

? ? ? ? LOCK_GUARD(_mtx_map);

? ? ? ? for (auto &pr : _dispatcher_map) {

? ? ? ? ? ? auto &second = pr.second;

? ? ? ? ? ? //切换线程后触发onRead事件

? ? ? ? ? ? pr.first->async([second, in, is_key]() {

? ? ? ? ? ? ? ? second->write(std::move(const_cast<T &>(in)), is_key);

? ? ? ? ? ? }, false);

? ? ? ? }

? ? ? ? _storage->write(std::move(in), is_key);

? ? }

? ? int readerCount() {

? ? ? ? return _total_count;

? ? }

? ? void clearCache(){

? ? ? ? LOCK_GUARD(_mtx_map);

? ? ? ? _storage->clearCache();

? ? ? ? for (auto &pr : _dispatcher_map) {

? ? ? ? ? ? auto &second = pr.second;

? ? ? ? ? ? //切换线程后清空缓存

? ? ? ? ? ? pr.first->async([second]() {

? ? ? ? ? ? ? ? second->clearCache();

? ? ? ? ? ? }, false);

? ? ? ? }

? ? }

private:

? ? struct HashOfPtr {

? ? ? ? std::size_t operator()(const EventPoller::Ptr &key) const {

? ? ? ? ? ? return (std::size_t) key.get();

? ? ? ? }

? ? };

private:

? ? mutex _mtx_map;

? ? atomic_int _total_count {0};

? ? typename RingStorage::Ptr _storage;

? ? typename RingDelegate<T>::Ptr _delegate;

? ? onReaderChanged _on_reader_changed;

? ? unordered_map<EventPoller::Ptr, typename RingReaderDispatcher::Ptr, HashOfPtr> _dispatcher_map;

};

存放ffmpeg rtmp推流,demuxer,解码,重新编码后的rtppacket数据

template<typename T>

class _RingStorage {

public:

? ? typedef std::shared_ptr<_RingStorage> Ptr;

? ? _RingStorage(int max_size) {

? ? ? ? //gop缓存个数不能小于32

? ? ? ? if(max_size < RING_MIN_SIZE){

? ? ? ? ? ? max_size = RING_MIN_SIZE;

? ? ? ? }

? ? ? ? _max_size = max_size;

? ? }

? ? ~_RingStorage() {}

? ? /**

? ? ?* 写入环形缓存数据

? ? ?* @param in 数据

? ? ?* @param is_key 是否为关键帧

? ? ?* @return 是否触发重置环形缓存大小

? ? ?*/

? ? ?void write(T in, bool is_key = true) {

? ? ? ? if (is_key) {

? ? ? ? ? ? //遇到I帧,那么移除老数据

? ? ? ? ? ? _size = 0;

? ? ? ? ? ? _have_idr = true;

? ? ? ? ? ? _data_cache.clear();

? ? ? ? }

? ? ? ? if (!_have_idr) {

? ? ? ? ? ? //缓存中没有关键帧,那么gop缓存无效

? ? ? ? ? ? return;

? ? ? ? }

? ? ? ? _data_cache.emplace_back(std::make_pair(is_key, std::move(in)));

? ? ? ? if (++_size > _max_size) {

? ? ? ? ? ? //GOP缓存溢出,清空关老数据

? ? ? ? ? ? _size = 0;

? ? ? ? ? ? _have_idr = false;

? ? ? ? ? ? _data_cache.clear();

? ? ? ? }

? ? }

? ? Ptr clone() const {

? ? ? ? Ptr ret(new _RingStorage());

? ? ? ? ret->_size = _size;

? ? ? ? ret->_have_idr = _have_idr;

? ? ? ? ret->_max_size = _max_size;

? ? ? ? ret->_data_cache = _data_cache;

? ? ? ? return ret;

? ? }

? ? const List<pair<bool, T> > &getCache() const {

? ? ? ? return _data_cache;

? ? }

? ? void clearCache(){

? ? ? ? _size = 0;

? ? ? ? _data_cache.clear();

? ? }

private:

? ? _RingStorage() = default;

private:

? ? bool _have_idr = false;

? ? int _size = 0;

? ? int _max_size;

? ? List<pair<bool, T> > _data_cache;

};

webrtc从此处reader

template<typename T>

class _RingReader {

public:

? ? typedef std::shared_ptr<_RingReader> Ptr;

? ? friend class _RingReaderDispatcher<T>;

? ? _RingReader(const std::shared_ptr<_RingStorage<T> > &storage, bool use_cache) {

? ? ? ? _storage = storage;

? ? ? ? _use_cache = use_cache;

? ? }

? ? ~_RingReader() {}

? ? void setReadCB(const function<void(const T &)> &cb) {

? ? ? ? if (!cb) {

? ? ? ? ? ? _read_cb = [](const T &) {};

? ? ? ? } else {

? ? ? ? ? ? _read_cb = cb;

? ? ? ? ? ? flushGop();

? ? ? ? }

? ? }

? ? void setDetachCB(const function<void()> &cb) {

? ? ? ? if (!cb) {

? ? ? ? ? ? _detach_cb = []() {};

? ? ? ? } else {

? ? ? ? ? ? _detach_cb = cb;

? ? ? ? }

? ? }

private:

? ? void onRead(const T &data, bool is_key) {

? ? ? ? _read_cb(data);

? ? }

? ? void onDetach() const {

? ? ? ? _detach_cb();

? ? }

? ? void flushGop() {

? ? ? ? if (!_use_cache) {

? ? ? ? ? ? return;

? ? ? ? }

? ? ? ? _storage->getCache().for_each([&](const pair<bool, T> &pr) {

? ? ? ? ? ? onRead(pr.second, pr.first);

? ? ? ? });

? ? }

private:

? ? bool _use_cache;

? ? shared_ptr<_RingStorage<T> > _storage;

? ? function<void(void)> _detach_cb = []() {};

? ? function<void(const T &)> _read_cb = [](const T &) {};

};

二 推流缓冲

void EventPoller::runLoop(bool blocked,bool regist_self)

bool Socket::attachEvent(const SockFD::Ptr &sock, bool is_udp) /lambda表达式

ssize_t Socket::onRead(const SockFD::Ptr &sock, bool is_udp) noexcept

void TcpServer::onAcceptConnection(const Socket::Ptr &sock) /lambda表达式

void RtmpSession::onRecv(const Buffer::Ptr &buf)

void RtmpProtocol::onParseRtmp(const char *data, size_t size)

void HttpRequestSplitter::input(const char *data,size_t len)

const char *RtmpProtocol::onSearchPacketTail(const char *data,size_t len)

const char* RtmpProtocol::handle_C2(const char *data, size_t len)

const char* RtmpProtocol::handle_rtmp(const char *data, size_t len)

void RtmpProtocol::handle_chunk(RtmpPacket::Ptr packet)

void RtmpSession::onRtmpChunk(RtmpPacket::Ptr packet)?

void onWrite(RtmpPacket::Ptr pkt, bool = true) override

void RtmpDemuxer::inputRtmp(const RtmpPacket::Ptr &pkt)?

void H264RtmpDecoder::inputRtmp(const RtmpPacket::Ptr &pkt)?

inline void H264RtmpDecoder::onGetH264(const char* data, size_t len, uint32_t dts, uint32_t pts)?

bool FrameDispatcher ::?inputFrame(const Frame::Ptr &frame) override

bool H264Track::inputFrame(const Frame::Ptr &frame)?

bool H264Track::inputFrame_l(const Frame::Ptr &frame)

?bool FrameWriterInterfaceHelper::inputFrame(const Frame::Ptr &frame) override

bool MediaSink::addTrack(const Track::Ptr &track_in)/lambda表达式

bool MultiMediaSourceMuxer::onTrackFrame(const Frame::Ptr &frame_in)?

bool RtspMediaSourceMuxer::inputFrame(const Frame::Ptr &frame) override

bool RtspMuxer::inputFrame(const Frame::Ptr &frame)?

bool H264RtpEncoder::inputFrame(const Frame::Ptr &frame)?

bool H264RtpEncoder::inputFrame_l(const Frame::Ptr &frame, bool is_mark)

void H264RtpEncoder::packRtp(const char *ptr, size_t len, uint32_t pts, bool is_mark, bool gop_pos)

void H264RtpEncoder::packRtpFu(const char *ptr, size_t len, uint32_t pts, bool is_mark, bool gop_pos)

bool RtpRing::inputRtp(const RtpPacket::Ptr &rtp, bool key_pos)

void RingBuffer::write(T in, bool is_key = true)

ringbuffer超过maxsize 512之后,推流数据未被拉流的话,会清空。推流和拉流过程是独立进行,比如只ffmepg推流,webrtc不拉流,播放放。

  数据结构与算法 最新文章
【力扣106】 从中序与后续遍历序列构造二叉
leetcode 322 零钱兑换
哈希的应用:海量数据处理
动态规划|最短Hamilton路径
华为机试_HJ41 称砝码【中等】【menset】【
【C与数据结构】——寒假提高每日练习Day1
基础算法——堆排序
2023王道数据结构线性表--单链表课后习题部
LeetCode 之 反转链表的一部分
【题解】lintcode必刷50题<有效的括号序列
上一篇文章      下一篇文章      查看所有文章
加:2022-02-03 01:23:44  更:2022-02-03 01:26:17 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/10 10:53:11-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码