1. srt协议概述
SRT协议能够在不可预测的互联网环境下提供安全、可靠的数据传输,目前广泛应用在流媒体传输领域。理论上SRT可以传输任意类型的数据,但由于其特别针对实时音视频传输做了优化,目前的主要应用场景是跨越公共互联网点对点传输实时音视频数据。
SRT协议最初是一个私有协议,在2017年4月由SRT联盟将其开源,由于该协议良好的性能以及开源、应用灵活等特性,越来越多的厂商和设备开始支持SRT协议。在实际工作中,搭配使用不同厂商的SRT设备也能够实现高可靠、低延时的音视频传输,这对于用户来说非常方便和灵活。
2. srt 协议工作流程
SRT协议中最常用的工作模式为“呼叫-监听”(Caller-Listener)模式,监听方(Listener)会持续监听本方的固定UDP端口,呼叫方(Caller)通过访问监听方的公网IP地址和该固定端口来建立SRT连接。呼叫和监听的角色主要在SRT协议握手阶段起作用,无论是编码端还是解码端都可以担任呼叫者或监听者的角色。
图1表示了SRT协议的工作流程,整个流程包括握手、参数交换、数据传输、连接关闭等步骤。另外在传输有效数据时,双方会发送控制数据来完成丢包恢复、连接保持等功能。

图1 SRT协议工作流程
3.SRT数据包结构
SRT协议根据 UDT协议(UDP-based Data Transfer Protocol) 改进而来,已经在2020年3月10日向IETF提交了RFC草案,这也表示SRT协议进入了比较稳定的发展轨道。
众所周知,SRT的传统优势领域是点对点的实时音视频传输,而近两年,SRT协议在上行推流方面有了迅速的发展,很多主流平台和公司都支持使用SRT协议来代替RTMP协议进行上行推流,其中的关键点就是SRT的StreamID功能,而StreamID功能就包含在SRT握手数据包的配置扩展模块中。
总的来说,SRT协议中包含两类数据包:信息数据包(Data Packet)和控制数据包(Control Packet),他们通过SRT首部的最高位(标志位)来区分,0代表信息数据包,1代表控制数据包。控制数据包又包含了 握手(Handshake)、肯定应答(ACK)、否定应答(NAK)、对肯定应答的应答(ACKACK),保持连接(Keepalive)、关闭连接(Shutdown) 等多种类型。
3.1 信息数据包结构
图2展示了SRT信息数据包的结构,其承载了需要传输的有效数据。SRT首部长度为16字节,最高位为标志位,SRT信息数据包首部包含四个区域:数据包序列号、报文序号、时间戳、目的地端套接字ID。

3.2 握手数据包结构
握手数据包分为HSv4版本(SRT版本<1.3)和HSv5版本(SRT版本>=1.3),图3为HSv5版本握手数据包的结构,HSv5握手数据包主要包含五个区域:SRT首部、握手控制信息(cif.hsv5)、握手请求/响应扩展模块(hsreg/hsrsp)、加密扩展模块(kmreg/kmrsp)、配置扩展模块(config)。这里重点介绍前三个区域,握手数据包的结构参见图3:

图3 HSv5握手数据包
1. 所有SRT控制数据包的首部是基本相同的,均包含四个区域:控制类型和保留区域、附加信息、时间戳、目的地端套接字,其中控制类型字段为0代表握手数据包。
2. 握手控制信息区域(cif.hsv5)中比较重要的字段如下:
| 错误码 | 错误类型 | 错误码 | 错误类型 |
| 1000 | 未知原因 | 1008 | 对端版本过旧 |
| 1001 | 系统功能错误 | 1009 | 集合模式套接字冲突 |
| 1002 | 对端拒绝 | 1010 | 密码错误 |
| 1003 | 资源分配问题 | 1011 | 需要密码 |
| 1004 | 握手中的错误数据 | 1012 | Stream标志位冲突 |
| 1005 | 监听方Backlog溢出 | 1013 | 拥塞控制类型冲突 |
| 1006 | 内部程序错误 | 1014 | 包过滤器冲突 |
| 1007 | 该套接字已关闭 | 1015 | 组冲突 |
表1 错误码和错误类型对应表1
3. 握手请求扩展模块(HSREG)中比较重要的字段如下:
4. 加密扩展模块KMREQ和配置扩展模块CONFIG
由于篇幅的原因,最后两个非必需的扩展模块不再详细讨论。其中加密扩展模块(KMREQ)主要负责SRT的AES128/AES192/AES256加密功能的实现。而配置扩展模块(CONFIG)包含了四种:SRT\_CMD\_SID、SRT\_CMD\_CONGESTION、SRT\_CMD\_FILTER、SRT\_CMD\_GROUP,其中SRT\_CMD\_SID扩展模块就是负责SRT上行推流中不可或缺的StreamID功能,有兴趣的朋友可以自行抓包查看。
3.3 ACK数据包结构
ACK数据包是由SRT接收端反馈给发送端的肯定应答,发送端收到ACK后便会认为相应数据包已经成功送达。ACK数据包中还包含了接收端估算的链路数据,可以作为发送端拥塞控制的参考。ACK数据包结构见图4,其中几个比较重要的
字段如下:
图4 ACK控制数据包
3.4 NAK数据包结构
当SRT接收端发现收到的数据包序列号不连续时,便会判断有数据包丢失,并立刻向发送方回复否定应答(NAK)数据包。此外SRT接收端还会以一定间隔发送周期NAK报告,其中包括了间隔期的所有丢失包序列号,这种重复发送NAK的机制主要为了防止NAK数据包在反向传输中丢失。NAK数据包结构见图5,其控制类型字段等于3,包内含有丢失数据包的序列号列表。

图5 NAK控制数据包
3.5 ACKACK数据包结构
ACKACK的主要作用是用来计算链路的往返时延(RTT),而RTT作为重要的链路信息会包含在ACK数据包中,ACKACK数据包结构参见图6。首先ACK数据包和ACKACK数据包都包含有精准的时间戳和ACK序列号,当发送端传输给接收端ACK数据包时,接受端会立刻返回一个ACKACK数据包,之后发送端会根据“ACK序列号”将ACK包和ACKACK包一一对应起来,并通过将他们的时间戳相减从而得到链路的往返时延(RTT)。

图6 ACKACK数据包结构
3.6 连接保持和连接关闭数据包结构
SRT中最后两个数据包类型是连接保持(Keepalive)数据包和连接关闭(Shutdown)数据包,它们的数据包结构参见图7和图8。

图7 连接保持数据包结构

图8 连接关闭数据包结构
4. ffmpeg 实现对libsrt 库调用:
1. srt 握手操作:
- static int libsrt_open(URLContext *h, const char *uri, int flags)
- {
- SRTContext *s = h->priv_data;
- const char * p;
- char buf[256];
- int ret = 0;
-
- if (srt_startup() < 0) {
- return AVERROR_UNKNOWN;
- }
-
- /* SRT options (srt/srt.h) */
- p = strchr(uri, '?');
- if (p) {
- if (av_find_info_tag(buf, sizeof(buf), "maxbw", p)) {
- s->maxbw = strtoll(buf, NULL, 0);
- }
- if (av_find_info_tag(buf, sizeof(buf), "pbkeylen", p)) {
- s->pbkeylen = strtol(buf, NULL, 10);
- }
- if (av_find_info_tag(buf, sizeof(buf), "passphrase", p)) {
- av_freep(&s->passphrase);
- s->passphrase = av_strndup(buf, strlen(buf));
- }
- #if SRT_VERSION_VALUE >= 0x010302
- if (av_find_info_tag(buf, sizeof(buf), "enforced_encryption", p)) {
- s->enforced_encryption = strtol(buf, NULL, 10);
- }
- if (av_find_info_tag(buf, sizeof(buf), "kmrefreshrate", p)) {
- s->kmrefreshrate = strtol(buf, NULL, 10);
- }
- if (av_find_info_tag(buf, sizeof(buf), "kmpreannounce", p)) {
- s->kmpreannounce = strtol(buf, NULL, 10);
- }
- #endif
- if (av_find_info_tag(buf, sizeof(buf), "mss", p)) {
- s->mss = strtol(buf, NULL, 10);
- }
- if (av_find_info_tag(buf, sizeof(buf), "ffs", p)) {
- s->ffs = strtol(buf, NULL, 10);
- }
- if (av_find_info_tag(buf, sizeof(buf), "ipttl", p)) {
- s->ipttl = strtol(buf, NULL, 10);
- }
- if (av_find_info_tag(buf, sizeof(buf), "iptos", p)) {
- s->iptos = strtol(buf, NULL, 10);
- }
- if (av_find_info_tag(buf, sizeof(buf), "inputbw", p)) {
- s->inputbw = strtoll(buf, NULL, 10);
- }
- if (av_find_info_tag(buf, sizeof(buf), "oheadbw", p)) {
- s->oheadbw = strtoll(buf, NULL, 10);
- }
- if (av_find_info_tag(buf, sizeof(buf), "latency", p)) {
- s->latency = strtol(buf, NULL, 10);
- }
- if (av_find_info_tag(buf, sizeof(buf), "tsbpddelay", p)) {
- s->latency = strtol(buf, NULL, 10);
- }
- if (av_find_info_tag(buf, sizeof(buf), "rcvlatency", p)) {
- s->rcvlatency = strtol(buf, NULL, 10);
- }
- if (av_find_info_tag(buf, sizeof(buf), "peerlatency", p)) {
- s->peerlatency = strtol(buf, NULL, 10);
- }
- if (av_find_info_tag(buf, sizeof(buf), "tlpktdrop", p)) {
- s->tlpktdrop = strtol(buf, NULL, 10);
- }
- if (av_find_info_tag(buf, sizeof(buf), "nakreport", p)) {
- s->nakreport = strtol(buf, NULL, 10);
- }
- if (av_find_info_tag(buf, sizeof(buf), "connect_timeout", p)) {
- s->connect_timeout = strtol(buf, NULL, 10);
- }
- if (av_find_info_tag(buf, sizeof(buf), "payload_size", p) ||
- av_find_info_tag(buf, sizeof(buf), "pkt_size", p)) {
- s->payload_size = strtol(buf, NULL, 10);
- }
- if (av_find_info_tag(buf, sizeof(buf), "mode", p)) {
- if (!strcmp(buf, "caller")) {
- s->mode = SRT_MODE_CALLER;
- } else if (!strcmp(buf, "listener")) {
- s->mode = SRT_MODE_LISTENER;
- } else if (!strcmp(buf, "rendezvous")) {
- s->mode = SRT_MODE_RENDEZVOUS;
- } else {
- ret = AVERROR(EINVAL);
- goto err;
- }
- }
- if (av_find_info_tag(buf, sizeof(buf), "sndbuf", p)) {
- s->sndbuf = strtol(buf, NULL, 10);
- }
- if (av_find_info_tag(buf, sizeof(buf), "rcvbuf", p)) {
- s->rcvbuf = strtol(buf, NULL, 10);
- }
- if (av_find_info_tag(buf, sizeof(buf), "lossmaxttl", p)) {
- s->lossmaxttl = strtol(buf, NULL, 10);
- }
- if (av_find_info_tag(buf, sizeof(buf), "minversion", p)) {
- s->minversion = strtol(buf, NULL, 0);
- }
- if (av_find_info_tag(buf, sizeof(buf), "streamid", p)) {
- av_freep(&s->streamid);
- s->streamid = av_strdup(buf);
- if (!s->streamid) {
- ret = AVERROR(ENOMEM);
- goto err;
- }
- }
- if (av_find_info_tag(buf, sizeof(buf), "smoother", p)) {
- av_freep(&s->smoother);
- s->smoother = av_strdup(buf);
- if(!s->smoother) {
- ret = AVERROR(ENOMEM);
- goto err;
- }
- }
- if (av_find_info_tag(buf, sizeof(buf), "messageapi", p)) {
- s->messageapi = strtol(buf, NULL, 10);
- }
- if (av_find_info_tag(buf, sizeof(buf), "transtype", p)) {
- if (!strcmp(buf, "live")) {
- s->transtype = SRTT_LIVE;
- } else if (!strcmp(buf, "file")) {
- s->transtype = SRTT_FILE;
- } else {
- ret = AVERROR(EINVAL);
- goto err;
- }
- }
- if (av_find_info_tag(buf, sizeof(buf), "linger", p)) {
- s->linger = strtol(buf, NULL, 10);
- }
- }
- ret = libsrt_setup(h, uri, flags);
- if (ret < 0)
- goto err;
- return 0;
-
- err:
- av_freep(&s->smoother);
- av_freep(&s->streamid);
- srt_cleanup();
- return ret;
- }
libsrt_setup
1)srt_socket 函数创建socket
2)srt_bind 函数创建数据通道信息及收发缓存对立:
3) libsrt_listen_connect 握手连接
- static int libsrt_setup(URLContext *h, const char *uri, int flags)
- {
- struct addrinfo hints = { 0 }, *ai, *cur_ai;
- int port, fd = -1, listen_fd = -1;
- SRTContext *s = h->priv_data;
- const char *p;
- char buf[256];
- int ret;
- char hostname[1024],proto[1024],path[1024];
- char portstr[10];
- int64_t open_timeout = 0;
- int eid;
-
- av_url_split(proto, sizeof(proto), NULL, 0, hostname, sizeof(hostname),
- &port, path, sizeof(path), uri);
- if (strcmp(proto, "srt"))
- return AVERROR(EINVAL);
- if (port <= 0 || port >= 65536) {
- av_log(h, AV_LOG_ERROR, "Port missing in uri\n");
- return AVERROR(EINVAL);
- }
- p = strchr(uri, '?');
- if (p) {
- if (av_find_info_tag(buf, sizeof(buf), "timeout", p)) {
- s->rw_timeout = strtol(buf, NULL, 10);
- }
- if (av_find_info_tag(buf, sizeof(buf), "listen_timeout", p)) {
- s->listen_timeout = strtol(buf, NULL, 10);
- }
- }
- if (s->rw_timeout >= 0) {
- open_timeout = h->rw_timeout = s->rw_timeout;
- }
- hints.ai_family = AF_UNSPEC;
- hints.ai_socktype = SOCK_DGRAM;
- snprintf(portstr, sizeof(portstr), "%d", port);
- if (s->mode == SRT_MODE_LISTENER)
- hints.ai_flags |= AI_PASSIVE;
- ret = getaddrinfo(hostname[0] ? hostname : NULL, portstr, &hints, &ai);
- if (ret) {
- av_log(h, AV_LOG_ERROR,
- "Failed to resolve hostname %s: %s\n",
- hostname, gai_strerror(ret));
- return AVERROR(EIO);
- }
-
- cur_ai = ai;
-
- eid = srt_epoll_create();
- if (eid < 0)
- return libsrt_neterrno(h);
- s->eid = eid;
-
- restart:
-
- fd = srt_socket(cur_ai->ai_family, cur_ai->ai_socktype, 0);
- if (fd < 0) {
- ret = libsrt_neterrno(h);
- goto fail;
- }
-
- if ((ret = libsrt_set_options_pre(h, fd)) < 0) {
- goto fail;
- }
-
- /* Set the socket's send or receive buffer sizes, if specified.
- If unspecified or setting fails, system default is used. */
- if (s->recv_buffer_size > 0) {
- srt_setsockopt(fd, SOL_SOCKET, SRTO_UDP_RCVBUF, &s->recv_buffer_size, sizeof (s->recv_buffer_size));
- }
- if (s->send_buffer_size > 0) {
- srt_setsockopt(fd, SOL_SOCKET, SRTO_UDP_SNDBUF, &s->send_buffer_size, sizeof (s->send_buffer_size));
- }
- if (libsrt_socket_nonblock(fd, 1) < 0)
- av_log(h, AV_LOG_DEBUG, "libsrt_socket_nonblock failed\n");
- if (s->mode == SRT_MODE_LISTENER) {
- // multi-client
- if ((ret = libsrt_listen(s->eid, fd, cur_ai->ai_addr, cur_ai->ai_addrlen, h, s->listen_timeout)) < 0)
- goto fail1;
- listen_fd = fd;
- fd = ret;
- } else {
- if (s->mode == SRT_MODE_RENDEZVOUS) {
- ret = srt_bind(fd, cur_ai->ai_addr, cur_ai->ai_addrlen);
- if (ret)
- goto fail1;
- }
- if ((ret = libsrt_listen_connect(s->eid, fd, cur_ai->ai_addr, cur_ai->ai_addrlen,
- open_timeout, h, !!cur_ai->ai_next)) < 0) {
- if (ret == AVERROR_EXIT)
- goto fail1;
- else
- goto fail;
- }
- }
- if ((ret = libsrt_set_options_post(h, fd)) < 0) {
- goto fail;
- }
- if (flags & AVIO_FLAG_WRITE) {
- int packet_size = 0;
- int optlen = sizeof(packet_size);
- ret = libsrt_getsockopt(h, fd, SRTO_PAYLOADSIZE, "SRTO_PAYLOADSIZE", &packet_size, &optlen);
- if (ret < 0)
- goto fail1;
- if (packet_size > 0)
- h->max_packet_size = packet_size;
- }
- h->is_streamed = 1;
- s->fd = fd;
- s->listen_fd = listen_fd;
- freeaddrinfo(ai);
- return 0;
- fail:
- if (cur_ai->ai_next) {
- /* Retry with the next sockaddr */
- cur_ai = cur_ai->ai_next;
- if (fd >= 0)
- srt_close(fd);
- if (listen_fd >= 0)
- srt_close(listen_fd);
- ret = 0;
- goto restart;
- }
- fail1:
- if (fd >= 0)
- srt_close(fd);
- if (listen_fd >= 0)
- srt_close(listen_fd);
- freeaddrinfo(ai);
- srt_epoll_release(s->eid);
- return ret;
- }
发数据:
libsrt_write 调用:
srt_sendmsg
-
- static int libsrt_write(URLContext *h, const uint8_t *buf, int size)
- {
- SRTContext *s = h->priv_data;
- int ret;
-
- if (!(h->flags & AVIO_FLAG_NONBLOCK)) {
- ret = libsrt_network_wait_fd_timeout(h, s->eid, s->fd, 1, h->rw_timeout, &h->interrupt_callback);
- if (ret)
- return ret;
- }
-
- ret = srt_sendmsg(s->fd, buf, size, -1, 0);
- if (ret < 0) {
- ret = libsrt_neterrno(h);
- }
-
- return ret;
- }
收数据:
libsrt_read
srt_recvmsg
- static int libsrt_read(URLContext *h, uint8_t *buf, int size)
- {
- SRTContext *s = h->priv_data;
- int ret;
-
- if (!(h->flags & AVIO_FLAG_NONBLOCK)) {
- ret = libsrt_network_wait_fd_timeout(h, s->eid, s->fd, 0, h->rw_timeout, &h->interrupt_callback);
- if (ret)
- return ret;
- }
-
- ret = srt_recvmsg(s->fd, buf, size);
- if (ret < 0) {
- ret = libsrt_neterrno(h);
- }
-
- return ret;
- }