• 流媒体分析之srt 协议libsrt 实现


    1. srt协议概述

    SRT协议能够在不可预测的互联网环境下提供安全、可靠的数据传输,目前广泛应用在流媒体传输领域。理论上SRT可以传输任意类型的数据,但由于其特别针对实时音视频传输做了优化,目前的主要应用场景是跨越公共互联网点对点传输实时音视频数据

    SRT协议最初是一个私有协议,在2017年4月由SRT联盟将其开源,由于该协议良好的性能以及开源、应用灵活等特性,越来越多的厂商和设备开始支持SRT协议。在实际工作中,搭配使用不同厂商的SRT设备也能够实现高可靠、低延时的音视频传输,这对于用户来说非常方便和灵活。

    2. srt 协议工作流程

    SRT协议中最常用的工作模式为“呼叫-监听”(Caller-Listener)模式,监听方(Listener)会持续监听本方的固定UDP端口,呼叫方(Caller)通过访问监听方的公网IP地址和该固定端口来建立SRT连接。呼叫和监听的角色主要在SRT协议握手阶段起作用,无论是编码端还是解码端都可以担任呼叫者或监听者的角色。

    图1表示了SRT协议的工作流程,整个流程包括握手、参数交换、数据传输、连接关闭等步骤。另外在传输有效数据时,双方会发送控制数据来完成丢包恢复、连接保持等功能。

                                                            图1  SRT协议工作流程

    3.SRT数据包结构

    SRT协议根据 UDT协议(UDP-based Data Transfer Protocol) 改进而来,已经在2020年3月10日向IETF提交了RFC草案,这也表示SRT协议进入了比较稳定的发展轨道。

    众所周知,SRT的传统优势领域是点对点的实时音视频传输,而近两年,SRT协议在上行推流方面有了迅速的发展,很多主流平台和公司都支持使用SRT协议来代替RTMP协议进行上行推流,其中的关键点就是SRT的StreamID功能,而StreamID功能就包含在SRT握手数据包的配置扩展模块中。

    总的来说,SRT协议中包含两类数据包:信息数据包(Data Packet)和控制数据包(Control Packet),他们通过SRT首部的最高位(标志位)来区分,0代表信息数据包,1代表控制数据包。控制数据包又包含了 握手(Handshake)、肯定应答(ACK)、否定应答(NAK)、对肯定应答的应答(ACKACK),保持连接(Keepalive)、关闭连接(Shutdown) 等多种类型。

    3.1 信息数据包结构

    图2展示了SRT信息数据包的结构,其承载了需要传输的有效数据。SRT首部长度为16字节,最高位为标志位,SRT信息数据包首部包含四个区域:数据包序列号、报文序号、时间戳、目的地端套接字ID

    • 数据包序列号:SRT使用基于序列号的数据包发送机制,发送端每发送一个数据包,数据包序列号加1。
    • 报文序号:报文序号独立计数,在它之前设置了四个标志位(见图2)。
    • 时间戳:以连接建立时间点(StartTime)为基准的相对时间戳,单位为微秒。
    • 目的地端套接字ID:在多路复用时用来区分不同的SRT流。

     

    3.2 握手数据包结构

    握手数据包分为HSv4版本(SRT版本<1.3)和HSv5版本(SRT版本>=1.3),图3为HSv5版本握手数据包的结构,HSv5握手数据包主要包含五个区域:SRT首部、握手控制信息(cif.hsv5)、握手请求/响应扩展模块(hsreg/hsrsp)、加密扩展模块(kmreg/kmrsp)、配置扩展模块(config)。这里重点介绍前三个区域,握手数据包的结构参见图3:

     

    图3  HSv5握手数据包

    1. 所有SRT控制数据包的首部是基本相同的,均包含四个区域:控制类型和保留区域、附加信息、时间戳、目的地端套接字,其中控制类型字段为0代表握手数据包

    2. 握手控制信息区域(cif.hsv5)中比较重要的字段如下:

    • ISN:随机生成的数据包初始序列号,之后所有的信息数据包以此为基准计数。
    • 握手类型:该字段第一个作用是表示该握手数据包所处的握手阶段(以“呼叫-监听”模式为例,其握手分为诱导阶段Induction和结尾阶段Conclusion),第二个作用对于用户来说更为重要,在握手失败后“握手类型”字段会显示相应的错误码,错误码所对应的错误类型见表1。

    错误码

    错误类型

    错误码

    错误类型

    1000

    未知原因

    1008

    对端版本过旧

    1001

    系统功能错误

    1009

    集合模式套接字冲突

    1002

    对端拒绝

    1010

    密码错误

    1003

    资源分配问题

    1011

    需要密码

    1004

    握手中的错误数据

    1012

    Stream标志位冲突

    1005

    监听方Backlog溢出

    1013

    拥塞控制类型冲突

    1006

    内部程序错误

    1014

    包过滤器冲突

    1007

    该套接字已关闭

    1015

    组冲突

    表1 错误码和错误类型对应表1

    • SRT套接字ID:该字段需要和SRT首部中的目的地端套接字ID加以区分,该字段只作用于握手阶段,而目的地端套接字ID作用于数据传输全过程。
    • 同步cookie:在“呼叫-监听”模式下,出于防止DoS攻击的目的,只由监听方生成同步cookie,该cookie由监听方的主机、端口和当前时间生成,精确度为1分钟。

    3. 握手请求扩展模块(HSREG)中比较重要的字段如下:

    • SRT版本:只要有任何一方的SRT版本低于1.3,双方就会以HSv4版本握手方式来建立连接,HSv4方式握手会有三次或四次往返,而最新的HSv5握手只需要两次往返。出于兼容性的考虑,即使双方的SRT版本都高于1.3,第一个握手请求信息也是HSv4格式。
    • SRT标志位:共有8位标志位,来实现SRT的不同模式和功能。
    • 发送方向延时和接收方向延时:SRT协议1.3版本实现了双向传输功能,双向传输可以分别设定不同方向的固定延时。对于常规的单向传输,假设A向B发送数据,该方向的延时量Latency应该是A的发送方向延时(PeerLatency)和B的接收方向延时(RecLatency)的最大值,该延时量在握手阶段就已由双方协商确定。在单向传输时,有一些编解码器将它的PeerLatency和RecLatency设置成统一的值,这种简易设置方法并不会影响单向传输的工作。

    4. 加密扩展模块KMREQ和配置扩展模块CONFIG

    由于篇幅的原因,最后两个非必需的扩展模块不再详细讨论。其中加密扩展模块(KMREQ)主要负责SRT的AES128/AES192/AES256加密功能的实现。而配置扩展模块(CONFIG)包含了四种:SRT\_CMD\_SID、SRT\_CMD\_CONGESTION、SRT\_CMD\_FILTER、SRT\_CMD\_GROUP,其中SRT\_CMD\_SID扩展模块就是负责SRT上行推流中不可或缺的StreamID功能,有兴趣的朋友可以自行抓包查看。

    3.3 ACK数据包结构

    ACK数据包是由SRT接收端反馈给发送端的肯定应答,发送端收到ACK后便会认为相应数据包已经成功送达。ACK数据包中还包含了接收端估算的链路数据,可以作为发送端拥塞控制的参考。ACK数据包结构见图4,其中几个比较重要的字段如下:

    图4 ACK控制数据包

    • 控制类型:该字段等于2便表示ACK数据包。
    • 附加信息:其中包含了独立计数的ACK序列号,该序列号主要用于ACK包和ACKACK包的一一对应。
    • 最近一个已接收数据包的序列号+1:该字段的值等于最近一个已收到的信息数据包的序列号加1,例如ACK包中该字段为6,便表示前5个数据包均已收到,发送端可以将它们从缓冲区中踢出。需要注意本字段是和数据包序列号有关,与ACK序列号无关。
    • 往返时延RTT估值:通过ACK数据包和ACKACK数据包估算出的链路往返时延。
    • 往返时延RTT估值的变化量:该变化量能够衡量RTT的波动程度,数值越大表示链路RTT越不稳定。
    • 接收端可用缓冲数据:表示目前接收端缓冲区有多少缓冲数据可供解码,该数值越大越好,其最大值由延时量参数(Latency)决定。
    • 链路带宽估值:对本次链路带宽的估算值。
    • 接收速率估值:接收端下行网络带宽的估算值。

    3.4 NAK数据包结构

    当SRT接收端发现收到的数据包序列号不连续时,便会判断有数据包丢失,并立刻向发送方回复否定应答(NAK)数据包。此外SRT接收端还会以一定间隔发送周期NAK报告,其中包括了间隔期的所有丢失包序列号,这种重复发送NAK的机制主要为了防止NAK数据包在反向传输中丢失。NAK数据包结构见图5,其控制类型字段等于3,包内含有丢失数据包的序列号列表。

    图5 NAK控制数据包

    3.5 ACKACK数据包结构

    ACKACK的主要作用是用来计算链路的往返时延(RTT),而RTT作为重要的链路信息会包含在ACK数据包中,ACKACK数据包结构参见图6。首先ACK数据包和ACKACK数据包都包含有精准的时间戳和ACK序列号,当发送端传输给接收端ACK数据包时,接受端会立刻返回一个ACKACK数据包,之后发送端会根据“ACK序列号”将ACK包和ACKACK包一一对应起来,并通过将他们的时间戳相减从而得到链路的往返时延(RTT)。

    图6 ACKACK数据包结构

    3.6 连接保持和连接关闭数据包结构

    SRT中最后两个数据包类型是连接保持(Keepalive)数据包和连接关闭(Shutdown)数据包,它们的数据包结构参见图7和图8。

    图7 连接保持数据包结构

     

    图8 连接关闭数据包结构

    4. ffmpeg 实现对libsrt 库调用:

       1. srt  握手操作:

    1. static int libsrt_open(URLContext *h, const char *uri, int flags)
    2. {
    3. SRTContext *s = h->priv_data;
    4. const char * p;
    5. char buf[256];
    6. int ret = 0;
    7. if (srt_startup() < 0) {
    8. return AVERROR_UNKNOWN;
    9. }
    10. /* SRT options (srt/srt.h) */
    11. p = strchr(uri, '?');
    12. if (p) {
    13. if (av_find_info_tag(buf, sizeof(buf), "maxbw", p)) {
    14. s->maxbw = strtoll(buf, NULL, 0);
    15. }
    16. if (av_find_info_tag(buf, sizeof(buf), "pbkeylen", p)) {
    17. s->pbkeylen = strtol(buf, NULL, 10);
    18. }
    19. if (av_find_info_tag(buf, sizeof(buf), "passphrase", p)) {
    20. av_freep(&s->passphrase);
    21. s->passphrase = av_strndup(buf, strlen(buf));
    22. }
    23. #if SRT_VERSION_VALUE >= 0x010302
    24. if (av_find_info_tag(buf, sizeof(buf), "enforced_encryption", p)) {
    25. s->enforced_encryption = strtol(buf, NULL, 10);
    26. }
    27. if (av_find_info_tag(buf, sizeof(buf), "kmrefreshrate", p)) {
    28. s->kmrefreshrate = strtol(buf, NULL, 10);
    29. }
    30. if (av_find_info_tag(buf, sizeof(buf), "kmpreannounce", p)) {
    31. s->kmpreannounce = strtol(buf, NULL, 10);
    32. }
    33. #endif
    34. if (av_find_info_tag(buf, sizeof(buf), "mss", p)) {
    35. s->mss = strtol(buf, NULL, 10);
    36. }
    37. if (av_find_info_tag(buf, sizeof(buf), "ffs", p)) {
    38. s->ffs = strtol(buf, NULL, 10);
    39. }
    40. if (av_find_info_tag(buf, sizeof(buf), "ipttl", p)) {
    41. s->ipttl = strtol(buf, NULL, 10);
    42. }
    43. if (av_find_info_tag(buf, sizeof(buf), "iptos", p)) {
    44. s->iptos = strtol(buf, NULL, 10);
    45. }
    46. if (av_find_info_tag(buf, sizeof(buf), "inputbw", p)) {
    47. s->inputbw = strtoll(buf, NULL, 10);
    48. }
    49. if (av_find_info_tag(buf, sizeof(buf), "oheadbw", p)) {
    50. s->oheadbw = strtoll(buf, NULL, 10);
    51. }
    52. if (av_find_info_tag(buf, sizeof(buf), "latency", p)) {
    53. s->latency = strtol(buf, NULL, 10);
    54. }
    55. if (av_find_info_tag(buf, sizeof(buf), "tsbpddelay", p)) {
    56. s->latency = strtol(buf, NULL, 10);
    57. }
    58. if (av_find_info_tag(buf, sizeof(buf), "rcvlatency", p)) {
    59. s->rcvlatency = strtol(buf, NULL, 10);
    60. }
    61. if (av_find_info_tag(buf, sizeof(buf), "peerlatency", p)) {
    62. s->peerlatency = strtol(buf, NULL, 10);
    63. }
    64. if (av_find_info_tag(buf, sizeof(buf), "tlpktdrop", p)) {
    65. s->tlpktdrop = strtol(buf, NULL, 10);
    66. }
    67. if (av_find_info_tag(buf, sizeof(buf), "nakreport", p)) {
    68. s->nakreport = strtol(buf, NULL, 10);
    69. }
    70. if (av_find_info_tag(buf, sizeof(buf), "connect_timeout", p)) {
    71. s->connect_timeout = strtol(buf, NULL, 10);
    72. }
    73. if (av_find_info_tag(buf, sizeof(buf), "payload_size", p) ||
    74. av_find_info_tag(buf, sizeof(buf), "pkt_size", p)) {
    75. s->payload_size = strtol(buf, NULL, 10);
    76. }
    77. if (av_find_info_tag(buf, sizeof(buf), "mode", p)) {
    78. if (!strcmp(buf, "caller")) {
    79. s->mode = SRT_MODE_CALLER;
    80. } else if (!strcmp(buf, "listener")) {
    81. s->mode = SRT_MODE_LISTENER;
    82. } else if (!strcmp(buf, "rendezvous")) {
    83. s->mode = SRT_MODE_RENDEZVOUS;
    84. } else {
    85. ret = AVERROR(EINVAL);
    86. goto err;
    87. }
    88. }
    89. if (av_find_info_tag(buf, sizeof(buf), "sndbuf", p)) {
    90. s->sndbuf = strtol(buf, NULL, 10);
    91. }
    92. if (av_find_info_tag(buf, sizeof(buf), "rcvbuf", p)) {
    93. s->rcvbuf = strtol(buf, NULL, 10);
    94. }
    95. if (av_find_info_tag(buf, sizeof(buf), "lossmaxttl", p)) {
    96. s->lossmaxttl = strtol(buf, NULL, 10);
    97. }
    98. if (av_find_info_tag(buf, sizeof(buf), "minversion", p)) {
    99. s->minversion = strtol(buf, NULL, 0);
    100. }
    101. if (av_find_info_tag(buf, sizeof(buf), "streamid", p)) {
    102. av_freep(&s->streamid);
    103. s->streamid = av_strdup(buf);
    104. if (!s->streamid) {
    105. ret = AVERROR(ENOMEM);
    106. goto err;
    107. }
    108. }
    109. if (av_find_info_tag(buf, sizeof(buf), "smoother", p)) {
    110. av_freep(&s->smoother);
    111. s->smoother = av_strdup(buf);
    112. if(!s->smoother) {
    113. ret = AVERROR(ENOMEM);
    114. goto err;
    115. }
    116. }
    117. if (av_find_info_tag(buf, sizeof(buf), "messageapi", p)) {
    118. s->messageapi = strtol(buf, NULL, 10);
    119. }
    120. if (av_find_info_tag(buf, sizeof(buf), "transtype", p)) {
    121. if (!strcmp(buf, "live")) {
    122. s->transtype = SRTT_LIVE;
    123. } else if (!strcmp(buf, "file")) {
    124. s->transtype = SRTT_FILE;
    125. } else {
    126. ret = AVERROR(EINVAL);
    127. goto err;
    128. }
    129. }
    130. if (av_find_info_tag(buf, sizeof(buf), "linger", p)) {
    131. s->linger = strtol(buf, NULL, 10);
    132. }
    133. }
    134. ret = libsrt_setup(h, uri, flags);
    135. if (ret < 0)
    136. goto err;
    137. return 0;
    138. err:
    139. av_freep(&s->smoother);
    140. av_freep(&s->streamid);
    141. srt_cleanup();
    142. return ret;
    143. }

     libsrt_setup 

     1)srt_socket  函数创建socket 

     2)srt_bind  函数创建数据通道信息及收发缓存对立:

     3) libsrt_listen_connect 握手连接

    1. static int libsrt_setup(URLContext *h, const char *uri, int flags)
    2. {
    3. struct addrinfo hints = { 0 }, *ai, *cur_ai;
    4. int port, fd = -1, listen_fd = -1;
    5. SRTContext *s = h->priv_data;
    6. const char *p;
    7. char buf[256];
    8. int ret;
    9. char hostname[1024],proto[1024],path[1024];
    10. char portstr[10];
    11. int64_t open_timeout = 0;
    12. int eid;
    13. av_url_split(proto, sizeof(proto), NULL, 0, hostname, sizeof(hostname),
    14. &port, path, sizeof(path), uri);
    15. if (strcmp(proto, "srt"))
    16. return AVERROR(EINVAL);
    17. if (port <= 0 || port >= 65536) {
    18. av_log(h, AV_LOG_ERROR, "Port missing in uri\n");
    19. return AVERROR(EINVAL);
    20. }
    21. p = strchr(uri, '?');
    22. if (p) {
    23. if (av_find_info_tag(buf, sizeof(buf), "timeout", p)) {
    24. s->rw_timeout = strtol(buf, NULL, 10);
    25. }
    26. if (av_find_info_tag(buf, sizeof(buf), "listen_timeout", p)) {
    27. s->listen_timeout = strtol(buf, NULL, 10);
    28. }
    29. }
    30. if (s->rw_timeout >= 0) {
    31. open_timeout = h->rw_timeout = s->rw_timeout;
    32. }
    33. hints.ai_family = AF_UNSPEC;
    34. hints.ai_socktype = SOCK_DGRAM;
    35. snprintf(portstr, sizeof(portstr), "%d", port);
    36. if (s->mode == SRT_MODE_LISTENER)
    37. hints.ai_flags |= AI_PASSIVE;
    38. ret = getaddrinfo(hostname[0] ? hostname : NULL, portstr, &hints, &ai);
    39. if (ret) {
    40. av_log(h, AV_LOG_ERROR,
    41. "Failed to resolve hostname %s: %s\n",
    42. hostname, gai_strerror(ret));
    43. return AVERROR(EIO);
    44. }
    45. cur_ai = ai;
    46. eid = srt_epoll_create();
    47. if (eid < 0)
    48. return libsrt_neterrno(h);
    49. s->eid = eid;
    50. restart:
    51. fd = srt_socket(cur_ai->ai_family, cur_ai->ai_socktype, 0);
    52. if (fd < 0) {
    53. ret = libsrt_neterrno(h);
    54. goto fail;
    55. }
    56. if ((ret = libsrt_set_options_pre(h, fd)) < 0) {
    57. goto fail;
    58. }
    59. /* Set the socket's send or receive buffer sizes, if specified.
    60. If unspecified or setting fails, system default is used. */
    61. if (s->recv_buffer_size > 0) {
    62. srt_setsockopt(fd, SOL_SOCKET, SRTO_UDP_RCVBUF, &s->recv_buffer_size, sizeof (s->recv_buffer_size));
    63. }
    64. if (s->send_buffer_size > 0) {
    65. srt_setsockopt(fd, SOL_SOCKET, SRTO_UDP_SNDBUF, &s->send_buffer_size, sizeof (s->send_buffer_size));
    66. }
    67. if (libsrt_socket_nonblock(fd, 1) < 0)
    68. av_log(h, AV_LOG_DEBUG, "libsrt_socket_nonblock failed\n");
    69. if (s->mode == SRT_MODE_LISTENER) {
    70. // multi-client
    71. if ((ret = libsrt_listen(s->eid, fd, cur_ai->ai_addr, cur_ai->ai_addrlen, h, s->listen_timeout)) < 0)
    72. goto fail1;
    73. listen_fd = fd;
    74. fd = ret;
    75. } else {
    76. if (s->mode == SRT_MODE_RENDEZVOUS) {
    77. ret = srt_bind(fd, cur_ai->ai_addr, cur_ai->ai_addrlen);
    78. if (ret)
    79. goto fail1;
    80. }
    81. if ((ret = libsrt_listen_connect(s->eid, fd, cur_ai->ai_addr, cur_ai->ai_addrlen,
    82. open_timeout, h, !!cur_ai->ai_next)) < 0) {
    83. if (ret == AVERROR_EXIT)
    84. goto fail1;
    85. else
    86. goto fail;
    87. }
    88. }
    89. if ((ret = libsrt_set_options_post(h, fd)) < 0) {
    90. goto fail;
    91. }
    92. if (flags & AVIO_FLAG_WRITE) {
    93. int packet_size = 0;
    94. int optlen = sizeof(packet_size);
    95. ret = libsrt_getsockopt(h, fd, SRTO_PAYLOADSIZE, "SRTO_PAYLOADSIZE", &packet_size, &optlen);
    96. if (ret < 0)
    97. goto fail1;
    98. if (packet_size > 0)
    99. h->max_packet_size = packet_size;
    100. }
    101. h->is_streamed = 1;
    102. s->fd = fd;
    103. s->listen_fd = listen_fd;
    104. freeaddrinfo(ai);
    105. return 0;
    106. fail:
    107. if (cur_ai->ai_next) {
    108. /* Retry with the next sockaddr */
    109. cur_ai = cur_ai->ai_next;
    110. if (fd >= 0)
    111. srt_close(fd);
    112. if (listen_fd >= 0)
    113. srt_close(listen_fd);
    114. ret = 0;
    115. goto restart;
    116. }
    117. fail1:
    118. if (fd >= 0)
    119. srt_close(fd);
    120. if (listen_fd >= 0)
    121. srt_close(listen_fd);
    122. freeaddrinfo(ai);
    123. srt_epoll_release(s->eid);
    124. return ret;
    125. }

     发数据:

            libsrt_write  调用:

            srt_sendmsg

    1. static int libsrt_write(URLContext *h, const uint8_t *buf, int size)
    2. {
    3. SRTContext *s = h->priv_data;
    4. int ret;
    5. if (!(h->flags & AVIO_FLAG_NONBLOCK)) {
    6. ret = libsrt_network_wait_fd_timeout(h, s->eid, s->fd, 1, h->rw_timeout, &h->interrupt_callback);
    7. if (ret)
    8. return ret;
    9. }
    10. ret = srt_sendmsg(s->fd, buf, size, -1, 0);
    11. if (ret < 0) {
    12. ret = libsrt_neterrno(h);
    13. }
    14. return ret;
    15. }

     收数据:

            libsrt_read

                      srt_recvmsg

    1. static int libsrt_read(URLContext *h, uint8_t *buf, int size)
    2. {
    3. SRTContext *s = h->priv_data;
    4. int ret;
    5. if (!(h->flags & AVIO_FLAG_NONBLOCK)) {
    6. ret = libsrt_network_wait_fd_timeout(h, s->eid, s->fd, 0, h->rw_timeout, &h->interrupt_callback);
    7. if (ret)
    8. return ret;
    9. }
    10. ret = srt_recvmsg(s->fd, buf, size);
    11. if (ret < 0) {
    12. ret = libsrt_neterrno(h);
    13. }
    14. return ret;
    15. }

  • 相关阅读:
    互联网程序设计课程 第2讲 网络对话程序设计
    【数据结构】二叉树OJ练习
    2. 使用IDEA创建Spring Boot Hello项目并管理依赖——Maven入门指南
    【云原生 | Kubernetes 系列】K8s 实战 Kubernetes 声明式对象的 增 删 改 查
    ruoyi中xxl-job配置使用
    ESP32开发三_蓝牙开发
    nmap参数详解
    【0093】(.text+0xb29): undefined reference to `mxml_format_cb‘
    数据挖掘实战(3):如何对比特币走势进行预测?
    k8s快速入门教程-----8 secret and configmap
  • 原文地址:https://blog.csdn.net/u012794472/article/details/126782225