• deepstream学习笔记(四):跟踪模块tracker接入与rtsp流异常问题解决


    引言

    本篇主要想就上一篇中还剩下的tracker模块进行补充说明,使整个流程更加完善。另外就是针对rtsp流异常问题,总结了一些在gstreamer中或者是其它工具的一些解决方案。

    Gst-nvtracker介绍

    Gst-nvtracker插件允许DeepStream 管道使用一个底层跟踪器来跟踪具有唯一ID的检测目标。它支持任何实现NvDsTracker API的底层库,包括三个参考实现:NvDCF、 KLT 和 IOU 跟踪器。作为这个 API 的一部分,插件查询底层库中关于输入格式和内存类型的功能和要求。然后,基于这些查询结果,插件将输入帧缓冲区转换为底层库请求的格式。例如,KLT 跟踪器使用 Luma 专用格式; NvDCF 和 DeepSORT 使用 NV12或 RGBA格式; IOU 不需要缓冲区。四个的跟踪器库支持不同的跟踪算法具体为:

    • KLT 跟踪器使用基于 cpu 的 Kanade-Lucas-Tomasi (KLT)跟踪器算法的实现。此库不需要配置文件。
    • IOU 跟踪器在两个连续帧之间的检测器边界框中使用 IOU 值来执行它们之间的关联或分配一个新的 ID。这个库接受一个可选的配置文件。
    • NvDCF跟踪器采用基于相关滤波器的在线鉴别学习算法作为视觉目标跟踪器,同时采用数据关联算法进行多目标跟踪。此库接受一个可选的配置文件。
    • DeepSORT:DeepSORT 跟踪器是官方 DeepSORT 跟踪器的重新实现,它使用深度余弦度量学习和 Re-ID 神经网络。只要 NVIDIA 的 TensorRT™ 框架支持,此实现允许用户使用任何 Re-ID 网络。

    这四种跟踪器的比较和权衡区别为:

    Tracker TypeGPU ComputeCPU Compute优点缺点最佳用例
    IOUX非常低轻量级- 没有用于匹配的视觉特征,因此容易出现频繁的跟踪器 ID 切换和故障。不适合快速移动的场景。- 对象位置稀疏,大小不同
    - 检测器预计每帧或非常频繁地运行(例如,每隔一帧)
    KLTX对于简单的场景效果相当好- 高 CPU 利用率。由于噪音和干扰,如阴影,非刚性变形,平面外旋转和部分遮挡,易改变视觉外观的。无法处理低纹理的对象。- 对象具有强大的纹理和简单的背景。
    - 理想的高 CPU 资源可用性。
    NvDCF中等- 对部分遮挡、阴影和其他瞬态视觉变化具有高度鲁棒性。
    - ID切换频率较低。
    - 可与 PGIE 间隔 > 0 一起使用,而不会显着降低精度 根据应用要求轻松调整参数以权衡精度和性能
    - 由于视觉特征提取的计算复杂度增加,比 IOU 慢- 多对象、复杂的场景,即使有部分遮挡 - PGIE 间隔 > 0
    DeepSORT- 允许自定义 Re-ID 模型进行视觉外观匹配
    - 高度区分取决于使用的 Re-ID 模型
    - 由于每个对象都需要推理,计算成本更高
    - 只有在检测器的 bbox 可用时才能执行跟踪
    - 除非切换 Re-ID 模型,否则无法轻松调整精度/性能级别
    - 与 NvDCF 相同(首选 PGIE 间隔=0 除外)

    上述内容来自deepstream sdk 6.1与5.1文档,其中6.1文档是将KLT 跟踪器去除,加入了deepsort说明。而动态库方面对于跟踪上更是巨大,下面是deepstream 5.1的/opt/nvidia/deepstream/deepstream-5.1/lib下包含的so文件:

    libnvds_tracker.so
    libnvds_mot_iou.so
    libnvds_mot_klt.so
    libnvds_nvdcf.so
    
    • 1
    • 2
    • 3
    • 4

    从名字上就能清楚它们隶属于哪种算法,而在deepstream 6.1同样目录,so文件只编译出了一个:

    libnvds_nvmultiobjecttracker.so
    
    • 1

    我也是因为一台机上装了两个docker镜像导致环境来回切发现的,至于为什么要来回切,那就是一个悲伤的故事了。。。

    从deepstream 6.0开始,nvidia将三种跟踪器算法(即 IOU、NvDCF 和 DeepSORT)统一在了一个架构中,支持批处理模式下的多流、多对象跟踪,可在 CPU 和 GPU 上进行高效处理。

    这里不再详述libnvds_nvmultiobjecttracker.so了,感兴趣可以去Gst-nvtracker 中的NvMultiObjectTracker库中的工作流程和核心模块一节查看它所支持的组建关系以及共享模块数据关联表格,因为我发现就目标跟踪来看,在deepstream-python-app下的测试历程仅有deepstream-test2使用了跟踪模块,并且配置文件还是基于DvDCF的yaml:
    在这里插入图片描述

    感觉py的例程没有考虑其它几种情况,而如果想深入原理的话,还是得看C端的deepstream-app,因为它的当前目录下包含了很全面的配置文件:

    root@$$:/opt/nvidia/deepstream/deepstream-6.1/samples/configs/deepstream-app# ls | grep config_tracker
    config_tracker_DeepSORT.yml
    config_tracker_IOU.yml
    config_tracker_NvDCF_accuracy.yml
    config_tracker_NvDCF_max_perf.yml
    config_tracker_NvDCF_perf.yml
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    所以tracker插件就介绍到这,下面开始接入部分。

    Gst-nvtracker接入流程

    这里我是以上一节deepstream-imagedata-multistream为模板,将test2中的tracker接入,其实很简单,就是把tracker这个element参数配置好后add进infer插件后,然后link住,具体我们可以先看参数配置。

    gst tracker 参数

    根据nvidia官网的介绍,跟踪模块的参数表格如下:

    PropertyMeaning类型和范围注释说明
    tracker-width跟踪器操作的帧宽度,以像素为单位。Integer,0 到 4,294,967,295tracker-width=640(为 32 的倍数)
    tracker-height跟踪器要运行的帧高度,以像素为单位。Integer,0 到 4,294,967,295tracker-height=384(为 32 的倍数)
    ll-lib-fileGst-nvtracker 要加载的跟踪器库的路径名。Stringll-lib-file=/opt/nvidia/deepstream/deepstream/lib/libnvds_nvmultiobjecttracker.so
    ll-config-file非必须,库的配置文件。配置文件的路径ll-config-file=config_tracker_NvDCF_perf.yml
    gpu-id要在其上分配设备/统一内存的 GPU 的 ID,以及要完成缓冲区复制/缩放的 GPU 的 ID。(仅限 dGPU。)Integer,0 到 4,294,967,295gpu-id=0
    enable-batch-process启用/禁用批处理模式。仅当底层库同时支持批处理和每个流处理时才有效。(可选)(默认值为 1)布尔值enable-batch-process=1
    enable-past-frame启用/禁用报告过去帧数据模式。只有在底层库支持时才有效。(可选)(默认值为 0)布尔值enable-past-frame=1
    tracking-surface-type设置跟踪的表面流类型。(默认值为 0)Integer,≥0tracking-surface-type=0
    display-tracking-id在 OSD 上启用跟踪 ID 显示。布尔值display-tracking-id=1
    compute-hw用于扩展的计算引擎。0 - 默认1 - 图形处理器2 - VIC(仅限 Jetson)Integer,0 到 2compute-hw=1
    tracking-id-reset-mode允许基于管道事件强制重置跟踪 ID。一旦启用跟踪 ID 重置并发生此类事件,跟踪 ID 的低 32 位将重置为 0
    0:当流重置或 EOS 事件发生时不重置跟踪 ID
    1:在流重置发生时终止所有现有跟踪器并为流分配新 ID(GST_NVEVENT_STREAM_RESET)2:在收到EOS事件后让tracking ID从0起(GST_NVEVENT_STREAM_EOS)(注:只有tracking ID的低32位从0开始)
    3:启用选项 1 和 2
    Integer,0 到 3tracking-id-reset-mode=0

    以上部分说明根据我自己的理解与翻译进行了一些修改。知道了大致的一些参数后,我们就能上一节中的config_infer_primary_yoloV5.txt文件,加入tracker类:

    [tracker]
    tracker-width=640
    tracker-height=384
    gpu-id=0
    ll-lib-file=/opt/nvidia/deepstream/deepstream/lib/libnvds_nvmultiobjecttracker.so
    # ll-lib-file = /opt/nvidia/deepstream/deepstream/lib/libnvds_mot_klt.so
    # ll-config-file=config_tracker_NvDCF_perf.yml
    enable-past-frame=1
    enable-batch-process=1
    display-tracking-id=1
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    这里要注意的是,官方的例程是没有最后的display-tracking-id参数的,而enable-past-frame参数也是关闭状态,其中前者是必要参数,后者我目前感觉没啥用处,可能为了提升效果吧,但我跑起来发现我没有开启。主要问题是当时调试了很久发现没有输出,明明跟踪都加载成功了,后来我才发现我少了个参数。。。width和height按照自己想要的来,最好是32的倍数。其它为默认或者不加也行。

    tracker代码接入

    首先,在main函数中,我们先初始化这样一个tacker Element:

        tracker = Gst.ElementFactory.make("nvtracker", "tracker")
        if not tracker:
            sys.stderr.write(" Unable to create tracker \n")
    
    • 1
    • 2
    • 3

    然后加载我们还刚写入txt中的配置信息:

        #Set properties of tracker
        config = configparser.ConfigParser()
        config.read('dstest2_tracker_config.txt')
        config.sections()
    
        for key in config['tracker']:
            if key == 'tracker-width' :
                tracker_width = config.getint('tracker', key)
                tracker.set_property('tracker-width', tracker_width)
            if key == 'tracker-height' :
                tracker_height = config.getint('tracker', key)
                tracker.set_property('tracker-height', tracker_height)
            if key == 'gpu-id' :
                tracker_gpu_id = config.getint('tracker', key)
                tracker.set_property('gpu_id', tracker_gpu_id)
            if key == 'll-lib-file' :
                tracker_ll_lib_file = config.get('tracker', key)
                tracker.set_property('ll-lib-file', tracker_ll_lib_file)
            if key == 'll-config-file' :
                tracker_ll_config_file = config.get('tracker', key)
                tracker.set_property('ll-config-file', tracker_ll_config_file)
            if key == 'enable-batch-process' :
                tracker_enable_batch_process = config.getint('tracker', key)
                tracker.set_property('enable_batch_process', tracker_enable_batch_process)
            if key == 'enable-past-frame' :
                tracker_enable_past_frame = config.getint('tracker', key)
                tracker.set_property('enable_past_frame', tracker_enable_past_frame)
            if key == 'display-tracking-id' :
                tracker_tracking_id = config.getint('tracker', key)
                tracker.set_property('display_tracking_id', tracker_tracking_id )
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    做好了这个element后,就可以加入pipeline并link起来了:

    # add部分
    pipeline.add(tracker)
    
    
    # link部分
    pgie.link(tracker)
    tracker.link(nvvidconv)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    原推理模块是连接图像转换器,现在相当于在中间插入了tracker模块,其它不变。那到此,主函数就已经没问题了,我们就可以从探针中的回调函数里获取到tracker的info,这里整个获取数据的代码为:

    #past traking meta data
    if(past_tracking_meta[0]==1):
    	l_user=batch_meta.batch_user_meta_list
    	while l_user is not None:
    		try:
    			# Note that l_user.data needs a cast to pyds.NvDsUserMeta
    			# The casting is done by pyds.NvDsUserMeta.cast()
    			# The casting also keeps ownership of the underlying memory
    			# in the C code, so the Python garbage collector will leave
    			# it alone
    			user_meta=pyds.NvDsUserMeta.cast(l_user.data)
    		except StopIteration:
    			break
    		if(user_meta and user_meta.base_meta.meta_type==pyds.NvDsMetaType.NVDS_TRACKER_PAST_FRAME_META):
    			try:
    				# Note that user_meta.user_meta_data needs a cast to pyds.NvDsPastFrameObjBatch
    				# The casting is done by pyds.NvDsPastFrameObjBatch.cast()
    				# The casting also keeps ownership of the underlying memory
    				# in the C code, so the Python garbage collector will leave
    				# it alone
    				pPastFrameObjBatch = pyds.NvDsPastFrameObjBatch.cast(user_meta.user_meta_data)
    			except StopIteration:
    				break
    			for trackobj in pyds.NvDsPastFrameObjBatch.list(pPastFrameObjBatch):
    				print("streamId=",trackobj.streamID)
    				print("surfaceStreamID=",trackobj.surfaceStreamID)
    				for pastframeobj in pyds.NvDsPastFrameObjStream.list(trackobj):
    					print("numobj=",pastframeobj.numObj)
    					print("uniqueId=",pastframeobj.uniqueId)
    					print("classId=",pastframeobj.classId)
    					print("objLabel=",pastframeobj.objLabel)
    					for objlist in pyds.NvDsPastFrameObjList.list(pastframeobj):
    						print('frameNum:', objlist.frameNum)
    						print('tBbox.left:', objlist.tBbox.left)
    						print('tBbox.width:', objlist.tBbox.width)
    						print('tBbox.top:', objlist.tBbox.top)
    						print('tBbox.right:', objlist.tBbox.height)
    						print('confidence:', objlist.confidence)
    						print('age:', objlist.age)
    		try:
    			l_user=l_user.next
    		except StopIteration:
    			break
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    这部分代码跟infer基本一致,代码也是紧接着infer取数后来的,跟infer共用batch_meta,而batch_meta是deepstream从哈希buffer中拿到的所有的info。这里相当于就只多了个id,前面的推理数据就可以注释了。首行的past_tracking_meta判断可以直接给0,虽然说前面我在制作element的时候有加载进这个配置,该参数就相当于一个开关,程序启动后从用户输入中获取选择0或者1,我一般都给0并且跳过判断,虽然说目前我这边还没有上线,目前感觉作用不大。

    那到此为止,deepstream-imagedata-multistream的例程就改造完成,可以重新跑整个demo,并创建管道图,跑出来的图如下:
    在这里插入图片描述
    另外,还有一个现象是加入tracker会输出如下日志:

    gstnvtracker: Loading low-level lib at /opt/nvidia/deepstream/deepstream/lib/libnvds_nvmultiobjecttracker.so
    gstnvtracker: Optional NvMOT_ProcessPast not implemented
    gstnvtracker: Optional NvMOT_RemoveStreams not implemented
    gstnvtracker: Batch processing is OFF
    gstnvtracker: Past frame output is OFF
    
    • 1
    • 2
    • 3
    • 4
    • 5

    这是模型加载的时候爆出来的日志,仅仅是有些东西没有开,对整个结果可能只是精度上的影响,作为测试的话影响不大。

    介绍完这个问题后,我还想说明的一个问题就是rtsp流的事情。tracker深入的参数调优文档,可以看官方的说明:

    https://docs.nvidia.com/metropolis/deepstream/dev-guide/text/DS_plugin_NvMultiObjectTracker_parameter_tuning_guide.html

    rtsp流异常说明

    这个问题,是我长时间跑rtsp流遇到的一个bug,或者说当我测试gstreamer对于流断的异常处理,这就引起了我的一个思考,然而印了那张表情包,30分钟后,思考崩溃,能用就行,emmm。

    这个问题在C的源码里是不存在的,原因是C的bus_callback函数差不多写了2/3百行,大大小小所有情况都考虑清楚了,而python的,nvidia在每个版本里,都是定义在common中,为:

    import gi
    import sys
    gi.require_version('Gst', '1.0')
    from gi.repository import Gst
    def bus_call(bus, message, loop):
        t = message.type
        if t == Gst.MessageType.EOS:
            sys.stdout.write("End-of-stream\n")
            loop.quit()
        elif t==Gst.MessageType.WARNING:
            err, debug = message.parse_warning()
            sys.stderr.write("Warning: %s: %s\n" % (err, debug))
        elif t == Gst.MessageType.ERROR:
            err, debug = message.parse_error()
            sys.stderr.write("Error: %s: %s\n" % (err, debug))
            loop.quit()
        return True
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    这个程序说明,除了警告,主要遇到EOS与ERROR,程序就会退出,我尝试过将警告和EOS(EOS的意思可以理解为流媒体结束的一个标志,即EOS of stream)注释掉,但发现整个程序陷入了假死状态,因为pipeline已经无法分析了,FPS会变成0,然后我针对这个问题,进行了一些资料查找,当然,最好的办法是看懂C端的解决方案,但我发现我看完了还是没得办法,因为API不同步,比如C里有个reset_pipeline_xxx好像是这名字,在python中我并没有找到类似的,这种有很多,于是有了如下简单的解决方案:

    首先,我们可以从pipeline考虑,如果中间出现问题,比如说输入源这种,不是内部element报错,那么我们可以利用pipeline的特性,先暂停管道,然后运行完自定义事件,再重新开启:

    pipeline.set_state(Gst.State.NULL)
    //do your stuff for example, change some elements, remove some elements etc:
    pipeline.set_state(Gst.State.PLAYING)
    
    • 1
    • 2
    • 3

    这个过程可以参照stackoverflow中Sink restart on failure without stopping the pipeline的方案,为:

    def event_probe2(pad, info, *args):
        Gst.Pad.remove_probe(pad, info.id)
        tee.link(opusenc1)
        opusenc1.set_state(Gst.State.PLAYING)
        oggmux1.set_state(Gst.State.PLAYING)
        queue1.set_state(Gst.State.PLAYING)
        shout2send.set_state(Gst.State.PLAYING)
        return Gst.PadProbeReturn.OK
    
    def reconnect():
        pad = tee.get_static_pad('src_1')
        pad.add_probe(Gst.PadProbeType.BLOCK_DOWNSTREAM, event_probe2, None)
    
    def event_probe(pad, info, *args):
        Gst.Pad.remove_probe(pad, info.id)
        tee.unlink(opusenc1)
        opusenc1.set_state(Gst.State.NULL)
        oggmux1.set_state(Gst.State.NULL)
        queue1.set_state(Gst.State.NULL)
        shout2send.set_state(Gst.State.NULL)
        GLib.timeout_add_seconds(interval, reconnect)
        return Gst.PadProbeReturn.OK
    
    def message_handler(bus, message):
        if message.type == Gst.MessageType.ERROR:
            if message.src == shout2send:
                pad = tee.get_static_pad('src_1')
                pad.add_probe(Gst.PadProbeType.BLOCK_DOWNSTREAM, event_probe, None)
            else:
                print(message.parse_error())
                pipeline.set_state(Gst.State.NULL)
                exit(1)
        else:
            print(message.type)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    而这是错误发生时的解决,关于EOS,我找到的一个类似方案为Restarting/Reconnecting RTSP source on EOS

    其中部分代码为:

      msg_type = msg.type
    if msg_type == Gst.MessageType.EOS:
        ret = self.pipeline.set_state(Gst.State.PAUSED)
        self.loop.quit()
        Gst.debug_bin_to_dot_file(self.pipeline, Gst.DebugGraphDetails.ALL, "EOS")
        print("Setting Pipeline to Paused State")
        time.sleep(10)
        print("Trying to set back to playing state")
        if ret == Gst.StateChangeReturn.SUCCESS or ret == Gst.StateChangeReturn.NO_PREROLL:
            flush_start = self.pipeline.send_event(Gst.Event.new_flush_start())
            print("Managed to Flush Start: ", flush_start)
            flush_stop = self.pipeline.send_event(Gst.Event.new_flush_stop(True))
            print("Managed to Flush Stop: ", flush_stop)
            i = 0
            uri = configFile['source%u' % int(i)]['uri']
            padname = "sink_%u" % int(i)
            removed_state = self.remove_source_bin()
            if all(element == 1 for element in removed_state):
                self.nbin = self.create_source_bin(i, uri)
                added_state = self.pipeline.add(self.nbin)
                print("Added state: ", added_state)
                self.streammux_sinkpad = self.streammux.get_request_pad(padname)
                if not self.streammux_sinkpad:
                    sys.stderr.write("Unable to create sink pad bin \n")
                    print("Pad name: ", padname)
                self.srcpad = self.nbin.get_static_pad("src")
                self.srcpad.link(self.streammux_sinkpad)
                Gst.debug_bin_to_dot_file(self.pipeline, Gst.DebugGraphDetails.ALL, "Resetting_Source")
    
                self.bus = self.pipeline.get_bus()
                self.bus.add_signal_watch()
                self.bus.connect("message", self.bus_call, self.loop)
    
                self.pipeline.set_state(Gst.State.PLAYING)
    
                self.nbin.set_state(Gst.State.PLAYING)
                nbin_check = self.nbin.get_state(Gst.CLOCK_TIME_NONE)[0]
                if nbin_check == Gst.StateChangeReturn.SUCCESS or nbin_check == Gst.StateChangeReturn.NO_PREROLL:  
                    self.uri_decode_bin.set_state(Gst.State.PLAYING)
                    uridecodebin_check = self.uri_decode_bin.get_state(Gst.CLOCK_TIME_NONE)[0]
                    if uridecodebin_check == Gst.StateChangeReturn.SUCCESS or uridecodebin_check == Gst.StateChangeReturn.NO_PREROLL: 
                        self.streammux.set_state(Gst.State.PLAYING)
                        streammux_check = self.streammux.get_state(Gst.CLOCK_TIME_NONE)[0]
                        if streammux_check == Gst.StateChangeReturn.SUCCESS or streammux_check == Gst.StateChangeReturn.NO_PREROLL:  
                            self.pipeline.set_state(Gst.State.PLAYING)
                            pipeline_check = self.pipeline.get_state(Gst.CLOCK_TIME_NONE)[0]
                            if pipeline_check == Gst.StateChangeReturn.SUCCESS or pipeline_check == Gst.StateChangeReturn.NO_PREROLL:  
                                print("We did it boys!")
                                Gst.debug_bin_to_dot_file(self.pipeline, Gst.DebugGraphDetails.ALL, "Trying_Playing")
                            else:
                                print("pipeline failed us")
                        else:
                            print("streammux failed us")
                    else:
                        print("uridecodebin failed us")
                else:
                    print("nbin failed us")
    
                self.loop.run()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59

    我感觉很有参考意义,虽然我目前还没有处理EOS的问题,因为我是rtsp流,不过我后来改写的解决方案与上述类似,主要参考nvidia写得deepstream_rt_src_add_del例程,这个引用部分删除资源函数为:

    def stop_release_source(source_id):
        global g_num_sources
        global g_source_bin_list
        global streammux
        global pipeline
    
        #Attempt to change status of source to be released 
        state_return = g_source_bin_list[source_id].set_state(Gst.State.NULL)
    
        if state_return == Gst.StateChangeReturn.SUCCESS:
            print("STATE CHANGE SUCCESS\n")
            pad_name = "sink_%u" % source_id
            print(pad_name)
            #Retrieve sink pad to be released
            sinkpad = streammux.get_static_pad(pad_name)
            #Send flush stop event to the sink pad, then release from the streammux
            sinkpad.send_event(Gst.Event.new_flush_stop(False))
            streammux.release_request_pad(sinkpad)
            print("STATE CHANGE SUCCESS\n")
            #Remove the source bin from the pipeline
            pipeline.remove(g_source_bin_list[source_id])
            source_id -= 1
            g_num_sources -= 1
    
        elif state_return == Gst.StateChangeReturn.FAILURE:
            print("STATE CHANGE FAILURE\n")
        
        elif state_return == Gst.StateChangeReturn.ASYNC:
            state_return = g_source_bin_list[source_id].get_state(Gst.CLOCK_TIME_NONE)
            pad_name = "sink_%u" % source_id
            print(pad_name)
            sinkpad = streammux.get_static_pad(pad_name)
            sinkpad.send_event(Gst.Event.new_flush_stop(False))
            streammux.release_request_pad(sinkpad)
            print("STATE CHANGE ASYNC\n")
            pipeline.remove(g_source_bin_list[source_id])
            source_id -= 1
            g_num_sources -= 1
    
    def delete_sources(data):
        global loop
        global g_num_sources
        global g_eos_list
        global g_source_enabled
    
        #First delete sources that have reached end of stream
        for source_id in range(MAX_NUM_SOURCES):
            if (g_eos_list[source_id] and g_source_enabled[source_id]):
                g_source_enabled[source_id] = False
                stop_release_source(source_id)
    
        #Quit if no sources remaining
        if (g_num_sources == 0):
            loop.quit()
            print("All sources stopped quitting")
            return False
    
        #Randomly choose an enabled source to delete
        source_id = random.randrange(0, MAX_NUM_SOURCES)
        while (not g_source_enabled[source_id]):
            source_id = random.randrange(0, MAX_NUM_SOURCES)
        #Disable the source
        g_source_enabled[source_id] = False
        #Release the source
        print("Calling Stop %d " % source_id)
        stop_release_source(source_id)
    
        #Quit if no sources remaining
        if (g_num_sources == 0):
            loop.quit()
            print("All sources stopped quitting")
            return False
    
        return True
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74

    异常接收改为:

        elif t == Gst.MessageType.ELEMENT:
            struct = message.get_structure()
            #Check for stream-eos message
            if struct is not None and struct.has_name("stream-eos"):
                parsed, stream_id = struct.get_uint("stream-id")
                if parsed:
                    #Set eos status of stream to True, to be deleted in delete-sources
                    print("Got EOS from stream %d" % stream_id)
                    g_eos_list[stream_id] = True
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    我针对于此改写成我自己业务想要的样子,不过我目前发现还有点bug在于,rtsp流如果中途断开,我的整个pipeline会出现5s左右的延迟,只有当坏的流完全释放掉,才能恢复,我不清楚这个是哪里的问题,有bug,感觉还是要研究一下,之后解决了会在这里补充说明,现在先略过,如果有大佬会或者有啥好资料可以评论区教一手或者私信我,我将不胜感激。

    最后,是刚开始如果丢进去的流本身就是坏的怎么处理,这里的方案就很多了,但我没找到特别适配的,于是就自己写了,我使用ffprobe去过滤掉了连接不上的流,ffprobe函数如下:

    def get_rtsp_format(self,strFileName):
        strCmd = 'ffprobe -v quiet -print_format json -show_format -show_streams -i "{0}"' + format(strFileName)
        mystring = os.popen(strCmd).read()
        result = json.loads(mystring)
        return result["format"]
    
    • 1
    • 2
    • 3
    • 4
    • 5

    当然还有其它方法,上述也能换成ffmpeg和gstreamer的命令行,不过我是正好要对rtsp流做分析才用分析工具,或者最简单的,也能直接对流资源进行ping连接来作为判断条件。

    至此,本篇笔记结束。

  • 相关阅读:
    2024年保安员证考试题库
    ceph对象储存的使用
    PHP:类型转换
    二分算法(2)
    Python 潮流周刊#48:Python 3.14 的发布计划
    激活函数(机器学习)
    【编译原理+句柄+入栈顺序从右至左+系统调用+win api+程序安排+acm ieee usenix信息】答疑
    深度学习中的normalization总结(BN、LN、WN、IN、GN)
    在很多公司里面会使用打tag的方式保留版本
    Mybatis基础
  • 原文地址:https://blog.csdn.net/submarineas/article/details/126433341