SRS流媒体服务器——服务器给RTMP拉流端转发数据
目录
- 从consumer中获取message并进行存储
- 将每个message通过SrsRtmpServer发送到拉流客户端
请按照此顺序阅读
- SRS流媒体服务器——RTMP端?监听逻辑分析
- SRS流媒体服务器——RTMP推流、拉流创建连接
- SRS流媒体服务器——服务器读取RTMP推流数据
- SRS流媒体服务器——服务器给RTMP拉流端转发数据
1. 服务器给RTMP拉流端转发数据
- RTMP推流、拉流创建连接说到,SrsRtmpServer::identify_client会判断推流还是拉流,如果是拉流则会进入SrsRtmpConn::playing。
- SrsRtmpConn::playing会创建SrsLiveConsumer,并添加到SrsLiveSource对象下的保存SrsLiveConsumer的consumers(vector)中。
- 这样推流时就可以遍历consumers将rtmp message转发给这些拉流客户端,见服务器读取RTMP推流数据。
- 如果开启了gop cache,SrsLiveConsumer创建完后会将对应的SrsLiveSource的gop cache发送到SrsLiveConsumer。
srs_error_t SrsRtmpConn::playing(SrsLiveSource* source)
{
srs_error_t err = srs_success;
SrsRequest* req = info->req;
if (_srs_config->get_refer_enabled(req->vhost)) {
if ((err = refer->check(req->pageUrl, _srs_config->get_refer_play(req->vhost))) != srs_success) {
return srs_error_wrap(err, "rtmp: referer check");
}
}
if (!info->edge && _srs_config->get_vhost_origin_cluster(req->vhost) && source->inactive()) {
vector<string> coworkers = _srs_config->get_vhost_coworkers(req->vhost);
for (int i = 0; i < (int)coworkers.size(); i++) {
string host; int port = 0; string coworker = coworkers.at(i);
string url = "http://" + coworker + "/api/v1/clusters?"
+ "vhost=" + req->vhost + "&ip=" + req->host + "&app=" + req->app + "&stream=" + req->stream
+ "&coworker=" + coworker;
if ((err = SrsHttpHooks::discover_co_workers(url, host, port)) != srs_success) {
if (i < (int)coworkers.size() - 1) {
continue;
}
return srs_error_wrap(err, "discover coworkers, url=%s", url.c_str());
}
string rurl = srs_generate_rtmp_url(host, port, req->host, req->vhost, req->app, req->stream, req->param);
srs_trace("rtmp: redirect in cluster, from=%s:%d, target=%s:%d, url=%s, rurl=%s",
req->host.c_str(), req->port, host.c_str(), port, url.c_str(), rurl.c_str());
if (host.empty() || port == 0) {
continue;
}
bool accepted = false;
if ((err = rtmp->redirect(req, rurl, accepted)) != srs_success) {
srs_error_reset(err);
} else {
return srs_error_new(ERROR_CONTROL_REDIRECT, "redirected");
}
}
return srs_error_new(ERROR_OCLUSTER_REDIRECT, "no origin");
}
set_sock_options();
SrsLiveConsumer* consumer = NULL;
SrsAutoFree(SrsLiveConsumer, consumer);
if ((err = source->create_consumer(consumer)) != srs_success) {
return srs_error_wrap(err, "rtmp: create consumer");
}
if ((err = source->consumer_dumps(consumer)) != srs_success) {
return srs_error_wrap(err, "rtmp: dumps consumer");
}
SrsQueueRecvThread trd(consumer, rtmp, SRS_PERF_MW_SLEEP, _srs_context->get_id());
if ((err = trd.start()) != srs_success) {
return srs_error_wrap(err, "rtmp: start receive thread");
}
wakable = consumer;
err = do_playing(source, consumer, &trd);
wakable = NULL;
trd.stop();
if (!trd.empty()) {
srs_warn("drop the received %d messages", trd.size());
}
return err;
}
- SrsLiveSource::consumer_dumps会将正在推流的stream发送sequence header和gop cache到拉流客户端。
srs_error_t SrsLiveSource::consumer_dumps(SrsLiveConsumer* consumer, bool ds, bool dm, bool dg)
{
srs_error_t err = srs_success;
srs_utime_t queue_size = _srs_config->get_queue_length(req->vhost);
consumer->set_queue_size(queue_size);
if (atc && !gop_cache->empty()) {
if (meta->data()) {
meta->data()->timestamp = srsu2ms(gop_cache->start_time());
}
if (meta->vsh()) {
meta->vsh()->timestamp = srsu2ms(gop_cache->start_time());
}
if (meta->ash()) {
meta->ash()->timestamp = srsu2ms(gop_cache->start_time());
}
}
if (hub->active()) {
if ((err = meta->dumps(consumer, atc, jitter_algorithm, dm, ds)) != srs_success) {
return srs_error_wrap(err, "meta dumps");
}
if (dg && (err = gop_cache->dump(consumer, atc, jitter_algorithm)) != srs_success) {
return srs_error_wrap(err, "gop cache dumps");
}
}
if (dg) {
srs_trace("create consumer, active=%d, queue_size=%.2f, jitter=%d", hub->active(), queue_size, jitter_algorithm);
} else {
srs_trace("create consumer, active=%d, ignore gop cache, jitter=%d", hub->active(), jitter_algorithm);
}
return err;
}
- SrsRtmpConn::do_playing会创建SrsMessageArray,SrsMessageArray是存储message的数组,数组默认长度为128。
- SrsMessageArray会从consumer中获取message并进行存储,见SrsLiveConsumer::dump_packets。
- 然后将每个message通过SrsRtmpServer发送到拉流客户端,见SrsRtmpServer::send_and_free_messages。
srs_error_t SrsRtmpConn::do_playing(SrsLiveSource* source, SrsLiveConsumer* consumer, SrsQueueRecvThread* rtrd)
{
srs_error_t err = srs_success;
SrsRequest* req = info->req;
srs_assert(req);
srs_assert(consumer);
SrsStatistic* stat = SrsStatistic::instance();
if ((err = stat->on_client(_srs_context->get_id().c_str(), req, this, info->type)) != srs_success) {
return srs_error_wrap(err, "rtmp: stat client");
}
SrsPithyPrint* pprint = SrsPithyPrint::create_rtmp_play();
SrsAutoFree(SrsPithyPrint, pprint);
SrsMessageArray msgs(SRS_PERF_MW_MSGS);
bool user_specified_duration_to_stop = (req->duration > 0);
int64_t starttime = -1;
realtime = _srs_config->get_realtime_enabled(req->vhost);
mw_msgs = _srs_config->get_mw_msgs(req->vhost, realtime);
mw_sleep = _srs_config->get_mw_sleep(req->vhost);
skt->set_socket_buffer(mw_sleep);
send_min_interval = _srs_config->get_send_min_interval(req->vhost);
srs_trace("start play smi=%dms, mw_sleep=%d, mw_msgs=%d, realtime=%d, tcp_nodelay=%d",
srsu2msi(send_min_interval), srsu2msi(mw_sleep), mw_msgs, realtime, tcp_nodelay);
while (true) {
if ((err = trd->pull()) != srs_success) {
return srs_error_wrap(err, "rtmp: thread quit");
}
pprint->elapse();
while (!rtrd->empty()) {
SrsCommonMessage* msg = rtrd->pump();
if ((err = process_play_control_msg(consumer, msg)) != srs_success) {
return srs_error_wrap(err, "rtmp: play control message");
}
}
if ((err = rtrd->error_code()) != srs_success) {
return srs_error_wrap(err, "rtmp: recv thread");
}
#ifdef SRS_PERF_QUEUE_COND_WAIT
consumer->wait(mw_msgs, mw_sleep);
#endif
int count = (send_min_interval > 0)? 1 : 0;
if ((err = consumer->dump_packets(&msgs, count)) != srs_success) {
return srs_error_wrap(err, "rtmp: consumer dump packets");
}
if (pprint->can_print()) {
kbps->sample();
srs_trace("-> " SRS_CONSTS_LOG_PLAY " time=%d, msgs=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d, mw=%d/%d",
(int)pprint->age(), count, kbps->get_send_kbps(), kbps->get_send_kbps_30s(), kbps->get_send_kbps_5m(),
kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m(), srsu2msi(mw_sleep), mw_msgs);
}
if (count <= 0) {
#ifndef SRS_PERF_QUEUE_COND_WAIT
srs_usleep(mw_sleep);
#endif
continue;
}
if (user_specified_duration_to_stop) {
for (int i = 0; i < count; i++) {
SrsSharedPtrMessage* msg = msgs.msgs[i];
if (starttime < 0 || starttime > msg->timestamp) {
starttime = msg->timestamp;
}
duration += (msg->timestamp - starttime) * SRS_UTIME_MILLISECONDS;
starttime = msg->timestamp;
}
}
if (count > 0 && (err = rtmp->send_and_free_messages(msgs.msgs, count, info->res->stream_id)) != srs_success) {
return srs_error_wrap(err, "rtmp: send %d messages", count);
}
if (user_specified_duration_to_stop) {
if (duration >= req->duration) {
return srs_error_new(ERROR_RTMP_DURATION_EXCEED, "rtmp: time %d up %d", srsu2msi(duration), srsu2msi(req->duration));
}
}
if (send_min_interval > 0) {
srs_usleep(send_min_interval);
}
srs_thread_yield();
}
return err;
}
1. 从consumer中获取message并进行存储
- SrsLiveConsumer::dump_packets作用是将SrsLiveConsumer队列中的message拷贝到SrsMessageArray。
srs_error_t SrsLiveConsumer::dump_packets(SrsMessageArray* msgs, int& count)
{
srs_error_t err = srs_success;
srs_assert(count >= 0);
srs_assert(msgs->max > 0);
int max = count? srs_min(count, msgs->max) : msgs->max;
count = 0;
if (should_update_source_id) {
srs_trace("update source_id=%s/%s", source->source_id().c_str(), source->pre_source_id().c_str());
should_update_source_id = false;
}
if (paused) {
return err;
}
if ((err = queue->dump_packets(max, msgs->msgs, count)) != srs_success) {
return srs_error_wrap(err, "dump packets");
}
return err;
}
srs_error_t SrsMessageQueue::dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, int& count)
{
srs_error_t err = srs_success;
int nb_msgs = (int)msgs.size();
if (nb_msgs <= 0) {
return err;
}
srs_assert(max_count > 0);
count = srs_min(max_count, nb_msgs);
SrsSharedPtrMessage** omsgs = msgs.data();
memcpy(pmsgs, omsgs, count * sizeof(SrsSharedPtrMessage*));
SrsSharedPtrMessage* last = omsgs[count - 1];
av_start_time = srs_utime_t(last->timestamp * SRS_UTIME_MILLISECONDS);
if (count >= nb_msgs) {
msgs.clear();
} else {
msgs.erase(msgs.begin(), msgs.begin() + count);
}
return err;
}
2. 将每个message通过SrsRtmpServer发送到拉流客户端
- SrsRtmpServer::send_and_free_messages将每个message通过SrsRtmpServer发送到拉流客户端,实际是通过SrsProtocol::send_and_free_packet进行发送。
srs_error_t SrsProtocol::send_and_free_messages(SrsSharedPtrMessage** msgs, int nb_msgs, int stream_id)
{
srs_assert(msgs);
srs_assert(nb_msgs > 0);
for (int i = 0; i < nb_msgs; i++) {
SrsSharedPtrMessage* msg = msgs[i];
if (!msg) {
continue;
}
if (msg->check(stream_id)) {
break;
}
}
srs_error_t err = do_send_messages(msgs, nb_msgs);
for (int i = 0; i < nb_msgs; i++) {
SrsSharedPtrMessage* msg = msgs[i];
srs_freep(msg);
}
if (err != srs_success) {
return srs_error_wrap(err, "send messages");
}
if ((err = manual_response_flush()) != srs_success) {
return srs_error_wrap(err, "manual flush response");
}
print_debug_info();
return err;
}
- 根据message的个数顺序发送到拉流客户端。
- TODO:具体发送过程后面写一篇文章分析。
srs_error_t SrsProtocol::do_send_messages(SrsSharedPtrMessage** msgs, int nb_msgs)
{
srs_error_t err = srs_success;
#ifdef SRS_PERF_COMPLEX_SEND
int iov_index = 0;
iovec* iovs = out_iovs + iov_index;
int c0c3_cache_index = 0;
char* c0c3_cache = out_c0c3_caches + c0c3_cache_index;
for (int i = 0; i < nb_msgs; i++) {
SrsSharedPtrMessage* msg = msgs[i];
if (!msg) {
continue;
}
if (!msg->payload || msg->size <= 0) {
continue;
}
char* p = msg->payload;
char* pend = msg->payload + msg->size;
while (p < pend) {
int nb_cache = SRS_CONSTS_C0C3_HEADERS_MAX - c0c3_cache_index;
int nbh = msg->chunk_header(c0c3_cache, nb_cache, p == msg->payload);
srs_assert(nbh > 0);
iovs[0].iov_base = c0c3_cache;
iovs[0].iov_len = nbh;
int payload_size = srs_min(out_chunk_size, (int)(pend - p));
iovs[1].iov_base = p;
iovs[1].iov_len = payload_size;
p += payload_size;
if (iov_index >= nb_out_iovs - 2) {
int ov = nb_out_iovs;
nb_out_iovs = 2 * nb_out_iovs;
int realloc_size = sizeof(iovec) * nb_out_iovs;
out_iovs = (iovec*)realloc(out_iovs, realloc_size);
srs_warn("resize iovs %d => %d, max_msgs=%d", ov, nb_out_iovs, SRS_PERF_MW_MSGS);
}
iov_index += 2;
iovs = out_iovs + iov_index;
c0c3_cache_index += nbh;
c0c3_cache = out_c0c3_caches + c0c3_cache_index;
int c0c3_left = SRS_CONSTS_C0C3_HEADERS_MAX - c0c3_cache_index;
if (c0c3_left < SRS_CONSTS_RTMP_MAX_FMT0_HEADER_SIZE) {
if (!warned_c0c3_cache_dry) {
srs_warn("c0c3 cache header too small, recoment to %d", SRS_CONSTS_C0C3_HEADERS_MAX + SRS_CONSTS_RTMP_MAX_FMT0_HEADER_SIZE);
warned_c0c3_cache_dry = true;
}
if ((err = do_iovs_send(out_iovs, iov_index)) != srs_success) {
return srs_error_wrap(err, "send iovs");
}
iov_index = 0;
iovs = out_iovs + iov_index;
c0c3_cache_index = 0;
c0c3_cache = out_c0c3_caches + c0c3_cache_index;
}
}
}
if (iov_index <= 0) {
return err;
}
if ((err = do_iovs_send(out_iovs, iov_index)) != srs_success) {
return srs_error_wrap(err, "send iovs");
}
return err;
#else
for (int i = 0; i < nb_msgs; i++) {
SrsSharedPtrMessage* msg = msgs[i];
if (!msg) {
continue;
}
if (!msg->payload || msg->size <= 0) {
continue;
}
char* p = msg->payload;
char* pend = msg->payload + msg->size;
while (p < pend) {
iovec* iovs = out_iovs;
char* c0c3_cache = out_c0c3_caches;
int nb_cache = SRS_CONSTS_C0C3_HEADERS_MAX;
int nbh = msg->chunk_header(c0c3_cache, nb_cache, p == msg->payload);
srs_assert(nbh > 0);
iovs[0].iov_base = c0c3_cache;
iovs[0].iov_len = nbh;
int payload_size = srs_min(out_chunk_size, pend - p);
iovs[1].iov_base = p;
iovs[1].iov_len = payload_size;
p += payload_size;
if ((er = skt->writev(iovs, 2, NULL)) != srs_success) {
return srs_error_wrap(err, "writev");
}
}
}
return err;
#endif
}
|