• 流媒体分析之srt 协议srs 服务器数据收发


    根据上一个章节的,我们知道了srt 每个链路创建SrsMpegtsSrtConn对象,SrsMpegtsSrtConn 调用do_cycle函数。do_sycle 函数推拉流调用不同处理函数。推流处理publishing及拉流处理playing

    1. 分析推流数据处理流程。

    1. srs_error_t SrsMpegtsSrtConn::publishing()
    2. {
    3. srs_error_t err = srs_success;
    4. // We must do stat the client before hooks, because hooks depends on it.
    5. SrsStatistic* stat = SrsStatistic::instance();
    6. if ((err = stat->on_client(_srs_context->get_id().c_str(), req_, this, SrsSrtConnPublish)) != srs_success) {
    7. return srs_error_wrap(err, "srt: stat client");
    8. }
    9. // We must do hook after stat, because depends on it.
    10. if ((err = http_hooks_on_publish()) != srs_success) {
    11. return srs_error_wrap(err, "srt: callback on publish");
    12. }
    13. if ((err = acquire_publish()) == srs_success) {
    14. err = do_publishing();
    15. release_publish();
    16. }
    17. http_hooks_on_unpublish();
    18. return err;
    19. }

    do_publishing  函数,调用srt_conn_->read 函数接收推流数据。

      on_srt_packet  函数处理数据。

    1. srs_error_t SrsMpegtsSrtConn::do_publishing()
    2. {
    3. srs_error_t err = srs_success;
    4. SrsPithyPrint* pprint = SrsPithyPrint::create_srt_publish();
    5. SrsAutoFree(SrsPithyPrint, pprint);
    6. int nb_packets = 0;
    7. // Max udp packet size equal to 1500.
    8. char buf[1500];
    9. while (true) {
    10. if ((err = trd_->pull()) != srs_success) {
    11. return srs_error_wrap(err, "srt: thread quit");
    12. }
    13. pprint->elapse();
    14. if (pprint->can_print()) {
    15. SrsSrtStat s;
    16. if ((err = s.fetch(srt_fd_, true)) != srs_success) {
    17. srs_freep(err);
    18. } else {
    19. srs_trace("<- " SRS_CONSTS_LOG_SRT_PUBLISH " Transport Stats # pktRecv=%" PRId64 ", pktRcvLoss=%d, pktRcvRetrans=%d, pktRcvDrop=%d",
    20. s.pktRecv(), s.pktRcvLoss(), s.pktRcvRetrans(), s.pktRcvDrop());
    21. }
    22. kbps_->sample();
    23. srs_trace("<- " SRS_CONSTS_LOG_SRT_PUBLISH " time=%" PRId64 ", packets=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d",
    24. srsu2ms(pprint->age()), nb_packets, kbps_->get_send_kbps(), kbps_->get_send_kbps_30s(), kbps_->get_send_kbps_5m(),
    25. kbps_->get_recv_kbps(), kbps_->get_recv_kbps_30s(), kbps_->get_recv_kbps_5m());
    26. nb_packets = 0;
    27. }
    28. ssize_t nb = 0;
    29. if ((err = srt_conn_->read(buf, sizeof(buf), &nb)) != srs_success) {
    30. return srs_error_wrap(err, "srt: recvmsg");
    31. }
    32. ++nb_packets;
    33. if ((err = on_srt_packet(buf, nb)) != srs_success) {
    34. return srs_error_wrap(err, "srt: process packet");
    35. }
    36. }
    37. return err;
    38. }

    on_srt_packet  创建SrsSrtPacket 函数.执行 srt_source_->on_packet 。

    1. srs_error_t SrsMpegtsSrtConn::on_srt_packet(char* buf, int nb_buf)
    2. {
    3. srs_error_t err = srs_success;
    4. // Ignore if invalid length.
    5. if (nb_buf <= 0) {
    6. return err;
    7. }
    8. // Check srt payload, mpegts must be N times of SRS_TS_PACKET_SIZE
    9. if ((nb_buf % SRS_TS_PACKET_SIZE) != 0) {
    10. return srs_error_new(ERROR_SRT_CONN, "invalid ts packet len=%d", nb_buf);
    11. }
    12. // Check srt payload, the first byte must be 0x47
    13. if (buf[0] != 0x47) {
    14. return srs_error_new(ERROR_SRT_CONN, "invalid ts packet first=%#x", (uint8_t)buf[0]);
    15. }
    16. SrsSrtPacket* packet = new SrsSrtPacket();
    17. SrsAutoFree(SrsSrtPacket, packet);
    18. packet->wrap(buf, nb_buf);
    19. if ((err = srt_source_->on_packet(packet)) != srs_success) {
    20. return srs_error_wrap(err, "on srt packet");
    21. }
    22. return err;
    23. }

     on_packet : srt 推流 给每个消费者 consumer->enqueue 推数据;

     bridge_->on_packet  :srt 转换其他协议的流数据。

    1. srs_error_t SrsSrtSource::on_packet(SrsSrtPacket* packet)
    2. {
    3. srs_error_t err = srs_success;
    4. for (int i = 0; i < (int)consumers.size(); i++) {
    5. SrsSrtConsumer* consumer = consumers.at(i);
    6. if ((err = consumer->enqueue(packet->copy())) != srs_success) {
    7. return srs_error_wrap(err, "consume ts packet");
    8. }
    9. }
    10. if ((err = bridge_->on_packet(packet)) != srs_success) {
    11. return srs_error_wrap(err, "bridge consume message");
    12. }
    13. return err;
    14. }

     2. srs服务器处理srt拉流处理。 

    1. srs_error_t SrsMpegtsSrtConn::playing()
    2. {
    3. srs_error_t err = srs_success;
    4. // We must do stat the client before hooks, because hooks depends on it.
    5. SrsStatistic* stat = SrsStatistic::instance();
    6. if ((err = stat->on_client(_srs_context->get_id().c_str(), req_, this, SrsSrtConnPlay)) != srs_success) {
    7. return srs_error_wrap(err, "rtmp: stat client");
    8. }
    9. // We must do hook after stat, because depends on it.
    10. if ((err = http_hooks_on_play()) != srs_success) {
    11. return srs_error_wrap(err, "rtmp: callback on play");
    12. }
    13. err = do_playing();
    14. http_hooks_on_stop();
    15. return err;
    16. }

     do_playing() 函数:

    consumer->dump_packet(&pkt);每个拉流消费者取出数据。

     srt_conn_->write   发送数据。 

    1. srs_error_t SrsMpegtsSrtConn::do_playing()
    2. {
    3. srs_error_t err = srs_success;
    4. SrsSrtConsumer* consumer = NULL;
    5. SrsAutoFree(SrsSrtConsumer, consumer);
    6. if ((err = srt_source_->create_consumer(consumer)) != srs_success) {
    7. return srs_error_wrap(err, "create consumer, ts source=%s", req_->get_stream_url().c_str());
    8. }
    9. srs_assert(consumer);
    10. // TODO: FIXME: Dumps the SPS/PPS from gop cache, without other frames.
    11. if ((err = srt_source_->consumer_dumps(consumer)) != srs_success) {
    12. return srs_error_wrap(err, "dumps consumer, url=%s", req_->get_stream_url().c_str());
    13. }
    14. SrsPithyPrint* pprint = SrsPithyPrint::create_srt_play();
    15. SrsAutoFree(SrsPithyPrint, pprint);
    16. SrsSrtRecvThread srt_recv_trd(srt_conn_);
    17. if ((err = srt_recv_trd.start()) != srs_success) {
    18. return srs_error_wrap(err, "start srt recv trd");
    19. }
    20. int nb_packets = 0;
    21. while (true) {
    22. if ((err = trd_->pull()) != srs_success) {
    23. return srs_error_wrap(err, "srt play thread");
    24. }
    25. if ((err = srt_recv_trd.get_recv_err()) != srs_success) {
    26. return srs_error_wrap(err, "srt play recv thread");
    27. }
    28. // Wait for amount of packets.
    29. SrsSrtPacket* pkt = NULL;
    30. SrsAutoFree(SrsSrtPacket, pkt);
    31. consumer->dump_packet(&pkt);
    32. if (!pkt) {
    33. // TODO: FIXME: We should check the quit event.
    34. consumer->wait(1, 1000 * SRS_UTIME_MILLISECONDS);
    35. continue;
    36. }
    37. ++nb_packets;
    38. // reportable
    39. pprint->elapse();
    40. if (pprint->can_print()) {
    41. SrsSrtStat s;
    42. if ((err = s.fetch(srt_fd_, true)) != srs_success) {
    43. srs_freep(err);
    44. } else {
    45. srs_trace("-> " SRS_CONSTS_LOG_SRT_PLAY " Transport Stats # pktSent=%" PRId64 ", pktSndLoss=%d, pktRetrans=%d, pktSndDrop=%d",
    46. s.pktSent(), s.pktSndLoss(), s.pktRetrans(), s.pktSndDrop());
    47. }
    48. kbps_->sample();
    49. srs_trace("-> " SRS_CONSTS_LOG_SRT_PLAY " time=%" PRId64 ", packets=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d",
    50. srsu2ms(pprint->age()), nb_packets, kbps_->get_send_kbps(), kbps_->get_send_kbps_30s(), kbps_->get_send_kbps_5m(),
    51. kbps_->get_recv_kbps(), kbps_->get_recv_kbps_30s(), kbps_->get_recv_kbps_5m());
    52. nb_packets = 0;
    53. }
    54. ssize_t nb_write = 0;
    55. if ((err = srt_conn_->write(pkt->data(), pkt->size(), &nb_write)) != srs_success) {
    56. return srs_error_wrap(err, "srt send, size=%d", pkt->size());
    57. }
    58. // Yield to another coroutines.
    59. // @see https://github.com/ossrs/srs/issues/2194#issuecomment-777542162
    60. // TODO: FIXME: Please check whether SRT sendmsg causing clock deviation, see srs_thread_yield of SrsUdpMuxSocket::sendto
    61. }
    62. return err;
    63. }

     

  • 相关阅读:
    微宏科技基于 KubeSphere 的微服务架构实践
    分组聚合不再难:Pandas groupby使用指南
    vue-waterfall2 实现瀑布流,及总结的问题
    JavaEE-多线程(基础篇三)线程安全
    QGIS添加在线底图
    基于高股息高分红优化的量化选股模型
    SecureCRT -- 使用说明
    1-5年Java面试者必备:一线名企各专题面试笔记+java核心宝典pdf
    Jupyter Notebook 怎么在虚拟环境之间切换
    【zlkmedia】20221019 带x264 和 openssl构建windows工程MediaServer
  • 原文地址:https://blog.csdn.net/u012794472/article/details/126947508