IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> JavaScript知识库 -> ZLMediakit 源码分析 -> 正文阅读

[JavaScript知识库]ZLMediakit 源码分析

基本概念:

twcc:接收端twcc主要功能就是通过fb通知发送端每一个rtp的接收时间。
基本逻辑:当接收端接收到每一个rtp包的时候,记录当前的接受时间和包序号,然后按一定策略定时的发送到发送端,然后发送端可以根据的发送,接收时间统计延迟,为后续估算提供数据点。

SSRC:在RTP协议中, 定义同步源(SSRC,Synchronization source)为RTP包流的源,用RTP报头中32位数值的SSRC标识符进行标识,使其不依赖于网络地址。通常麦克风,音频接口,摄像头,视频接口的变化,都会导致SSRC的变化。?

1 重点类

epoll管理多个socket连接

class EventPoller : public TaskExecutor, public AnyStorage, public std::enable_shared_from_this<EventPoller> {

public:

? ? using Ptr = std::shared_ptr<EventPoller>;

? ? friend class TaskExecutorGetterImp;

? ? ~EventPoller();

? ? /**

? ? ?* 获取EventPollerPool单例中的第一个EventPoller实例,

? ? ?* 保留该接口是为了兼容老代码

? ? ?* @return 单例

? ? ?*/

? ? static EventPoller &Instance();

? ? /**

? ? ?* 添加事件监听

? ? ?* @param fd 监听的文件描述符

? ? ?* @param event 事件类型,例如 Event_Read | Event_Write

? ? ?* @param eventCb 事件回调functional

? ? ?* @return -1:失败,0:成功

? ? ?*/

? ? int addEvent(int fd, int event, PollEventCB eventCb);

? ? /**

? ? ?* 删除事件监听

? ? ?* @param fd 监听的文件描述符

? ? ?* @param delCb 删除成功回调functional

? ? ?* @return -1:失败,0:成功

? ? ?*/

? ? int delEvent(int fd, PollDelCB delCb = nullptr);

? ? /**

? ? ?* 修改监听事件类型

? ? ?* @param fd 监听的文件描述符

? ? ?* @param event 事件类型,例如 Event_Read | Event_Write

? ? ?* @return -1:失败,0:成功

? ? ?*/

? ? int modifyEvent(int fd, int event);

? ? /**

? ? ?* 异步执行任务

? ? ?* @param task 任务

? ? ?* @param may_sync 如果调用该函数的线程就是本对象的轮询线程,那么may_sync为true时就是同步执行任务

? ? ?* @return 是否成功,一定会返回true

? ? ?*/

? ? Task::Ptr async(TaskIn task, bool may_sync = true) override ;

? ? /**

? ? ?* 同async方法,不过是把任务打入任务列队头,这样任务优先级最高

? ? ?* @param task 任务

? ? ?* @param may_sync 如果调用该函数的线程就是本对象的轮询线程,那么may_sync为true时就是同步执行任务

? ? ?* @return 是否成功,一定会返回true

? ? ?*/

? ? Task::Ptr async_first(TaskIn task, bool may_sync = true) override ;

? ? /**

? ? ?* 判断执行该接口的线程是否为本对象的轮询线程

? ? ?* @return 是否为本对象的轮询线程

? ? ?*/

? ? bool isCurrentThread();

? ? /**

? ? ?* 延时执行某个任务

? ? ?* @param delayMS 延时毫秒数

? ? ?* @param task 任务,返回值为0时代表不再重复任务,否则为下次执行延时,如果任务中抛异常,那么默认不重复任务

? ? ?* @return 可取消的任务标签

? ? ?*/

? ? DelayTask::Ptr doDelayTask(uint64_t delayMS, function<uint64_t()> task);

? ? /**

? ? ?* 获取当前线程关联的Poller实例

? ? ?*/

? ? static EventPoller::Ptr getCurrentPoller();

? ? /**

? ? ?* 获取当前线程下所有socket共享的读缓存

? ? ?*/

? ? BufferRaw::Ptr getSharedBuffer();

? ? /**

? ? ?* 获取poller线程id

? ? ?*/

? ? const thread::id& getThreadId() const;

private:

? ? /**

? ? ?* 本对象只允许在EventPollerPool中构造

? ? ?*/

? ? EventPoller(ThreadPool::Priority priority = ThreadPool::PRIORITY_HIGHEST);

? ? /**

? ? ?* 执行事件轮询

? ? ?* @param blocked 是否用执行该接口的线程执行轮询

? ? ?* @param regist_self 是否注册到全局map

? ? ?*/

? ? void runLoop(bool blocked , bool regist_self);

? ? /**

? ? ?* 内部管道事件,用于唤醒轮询线程用

? ? ?*/

? ? void onPipeEvent();

? ? /**

? ? ?* 切换线程并执行任务

? ? ?* @param task

? ? ?* @param may_sync

? ? ?* @param first

? ? ?* @return 可取消的任务本体,如果已经同步执行,则返回nullptr

? ? ?*/

? ? Task::Ptr async_l(TaskIn task, bool may_sync = true,bool first = false) ;

? ? /**

? ? ?* 阻塞当前线程,等待轮询线程退出;

? ? ?* 在执行shutdown接口时本函数会退出

? ? ?*/

? ? void wait() ;

? ? /**

? ? ?* 结束事件轮询

? ? ?* 需要指出的是,一旦结束就不能再次恢复轮询线程

? ? ?*/

? ? void shutdown();

? ? /**

? ? ?* 刷新延时任务

? ? ?*/

? ? uint64_t flushDelayTask(uint64_t now);

? ? /**

? ? ?* 获取select或epoll休眠时间

? ? ?*/

? ? uint64_t getMinDelay();

private:

? ? class ExitException : public std::exception{

? ? public:

? ? ? ? ExitException(){}

? ? ? ? ~ExitException(){}

? ? };

private:

? ? //标记loop线程是否退出

? ? bool _exit_flag;

? ? //当前线程下,所有socket共享的读缓存

? ? weak_ptr<BufferRaw> _shared_buffer;

? ? //线程优先级

? ? ThreadPool::Priority _priority;

? ? //正在运行事件循环时该锁处于被锁定状态

? ? mutex _mtx_runing;

? ? //执行事件循环的线程

? ? thread *_loop_thread = nullptr;

? ? //事件循环的线程id

? ? thread::id _loop_thread_id;

? ? //通知事件循环的线程已启动

? ? semaphore _sem_run_started;

? ? //内部事件管道

? ? PipeWrap _pipe;

? ? //从其他线程切换过来的任务

? ? mutex _mtx_task;

? ? List<Task::Ptr> _list_task;

? ? //保持日志可用

? ? Logger::Ptr _logger;

#if defined(HAS_EPOLL)

? ? //epoll相关

? ? int _epoll_fd = -1;

? ? unordered_map<int, std::shared_ptr<PollEventCB> > _event_map;

#else

? ? //select相关

? ? struct Poll_Record {

? ? ? ? using Ptr = std::shared_ptr<Poll_Record>;

? ? ? ? int event;

? ? ? ? int attach;

? ? ? ? PollEventCB callBack;

? ? };

? ? unordered_map<int, Poll_Record::Ptr> _event_map;

#endif //HAS_EPOLL

? ? //定时器相关

? ? multimap<uint64_t, DelayTask::Ptr> _delay_task_map;

};

线程池

class EventPollerPool : public std::enable_shared_from_this<EventPollerPool>, public TaskExecutorGetterImp {

public:

? ? using Ptr = std::shared_ptr<EventPollerPool>;

? ? ~EventPollerPool(){};

? ? /**

? ? ?* 获取单例

? ? ?* @return

? ? ?*/

? ? static EventPollerPool &Instance();

? ? /**

? ? ?* 设置EventPoller个数,在EventPollerPool单例创建前有效

? ? ?* 在不调用此方法的情况下,默认创建thread::hardware_concurrency()个EventPoller实例

? ? ?* @param size ?EventPoller个数,如果为0则为thread::hardware_concurrency()

? ? ?*/

? ? static void setPoolSize(size_t size = 0);

? ? /**

? ? ?* 获取第一个实例

? ? ?* @return

? ? ?*/

? ? EventPoller::Ptr getFirstPoller();

? ? /**

? ? ?* 根据负载情况获取轻负载的实例

? ? ?* 如果优先返回当前线程,那么会返回当前线程

? ? ?* 返回当前线程的目的是为了提高线程安全性

? ? ?* @param prefer_current_thread 是否优先获取当前线程

? ? ?*/

? ? EventPoller::Ptr getPoller(bool prefer_current_thread = true);

? ? /**

? ? ?* 设置 getPoller() 是否优先返回当前线程

? ? ?* 在批量创建Socket对象时,如果优先返回当前线程,

? ? ?* 那么将导致负载不够均衡,所以可以暂时关闭然后再开启

? ? ?* @param flag 是否优先返回当前线程

? ? ?*/

? ? void preferCurrentThread(bool flag = true);

private:

? ? EventPollerPool() ;

private:

? ? bool _preferCurrentThread = true;

};

?管道,进程通信

class PipeWrap {?

public:

? ? PipeWrap();

? ? ~PipeWrap();

? ? int write(const void *buf, int n);

? ? int read(void *buf, int n);

? ? int readFD() const {

? ? ? ? return _pipe_fd[0];

? ? }

? ? int writeFD() const {

? ? ? ? return _pipe_fd[1];

? ? }

private:

? ? int _pipe_fd[2] = { -1,-1 };

? ? void clearFD();

#if defined(_WIN32)

? ? int _listenerFd = -1;

#endif // defined(_WIN32)

};

2 webrtc play堆栈

void EventPoller::runLoop(bool blocked,bool regist_self)

bool Socket::attachEvent(const SockFD::Ptr &sock, bool is_udp) /lambda表达式

ssize_t Socket::onRead(const SockFD::Ptr &sock, bool is_udp) noexcept

void TcpServer::onAcceptConnection(const Socket::Ptr &sock) /lambda表达式

void HttpSession::onRecv(const Buffer::Ptr &pBuf)

void HttpRequestSplitter::input(const char *data,size_t len)

void HttpSession::onRecvContent(const char *data,size_t len)

void HttpSession::Handle_Req_POST(ssize_t &content_len) /lambda表达式

bool HttpSession::emitHttpEvent(bool doInvoke)

int NoticeCenter::emitEvent(const string &strEvent, ArgsType &&...args)

int EventDispatcher::emitEvent(ArgsType &&...args)

static inline void addHttpListener() /lambda表达式

static HttpApi toApi(const function<void(API_ARGS_STRING_ASYNC)> &cb)/lambda表达式

api_regist("/index/api/webrtc",[](API_ARGS_STRING_ASYNC)

void WebRtcPluginManager::getAnswerSdp(Session &sender, const string &type, const string &offer, const WebRtcArgs &args,const onCreateRtc &cb)

void play_plugin(Session &sender, const string &offer_sdp, const WebRtcArgs &args, const WebRtcPluginManager::onCreateRtc &cb)/lambda表达式

void MediaSource::findAsync(const MediaInfo &info, const std::shared_ptr<Session> &session, const function<void (const Ptr &)> &cb)?

static void findAsync_l(const MediaInfo &info, const std::shared_ptr<Session> &session, bool retry,const function<void(const MediaSource::Ptr &src)> &cb)

WebRtcPlayer::Ptr WebRtcPlayer::create(const EventPoller::Ptr &poller,const RtspMediaSource::Ptr &src,const MediaInfo &info)

void WebRtcTransportImp::onCreate()

WebRtcTransport::onCreate()/lambda表达式

const Session::Ptr& UdpServer::createSession(

static void emitSessionRecv(const Session::Ptr &session, const Buffer::Ptr &buf)

void WebRtcSession::onRecv(const Buffer::Ptr &buffer)

void WebRtcTransport::inputSockData(char *buf, int len, RTC::TransportTuple *tuple)?

void DtlsTransport::ProcessDtlsData(const uint8_t* data, size_t len)

inline bool DtlsTransport::CheckStatus(int returnCode)

inline bool DtlsTransport::ProcessHandshake()

inline void DtlsTransport::ExtractSrtpKeys(RTC::SrtpSession::CryptoSuite srtpCryptoSuite)

void WebRtcTransport::OnDtlsTransportConnected(

void WebRtcPlayer::onStartWebRTC()

void WebRtcTransportImp::onSendRtp(const RtpPacket::Ptr &rtp, bool flush, bool rtx)

void WebRtcTransport::sendRtpPacket(const char *buf, int len, bool flush, void *ctx)

void WebRtcTransportImp::onSendSockData(Buffer::Ptr buf, bool flush, RTC::TransportTuple *tuple)?

ssize_t SocketHelper::send(Buffer::Ptr buf)?

ssize_t Socket::send(Buffer::Ptr buf, struct sockaddr *addr, socklen_t addr_len, bool try_flush)?

ssize_t Socket::send_l(Buffer::Ptr buf, bool is_buf_sock, bool try_flush)?

  JavaScript知识库 最新文章
ES6的相关知识点
react 函数式组件 & react其他一些总结
Vue基础超详细
前端JS也可以连点成线(Vue中运用 AntVG6)
Vue事件处理的基本使用
Vue后台项目的记录 (一)
前后端分离vue跨域,devServer配置proxy代理
TypeScript
初识vuex
vue项目安装包指令收集
上一篇文章      下一篇文章      查看所有文章
加:2022-01-30 18:50:47  更:2022-01-30 18:52:43 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/9 16:03:02-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码