• zlMediaKit 7 utils模块--ringbuffer&&发布订阅&&


    onceToken

    利用变量生命周期执行传入的构造和析构,保证代码执行的先后顺序

    class onceToken {
    public:
        using task = std::function<void(void)>;
    
        template<typename FUNC>
        onceToken(const FUNC &onConstructed, task onDestructed = nullptr) {
            onConstructed();
            _onDestructed = std::move(onDestructed);
        }
    
        onceToken(std::nullptr_t, task onDestructed = nullptr) {
            _onDestructed = std::move(onDestructed);
        }
    
        ~onceToken() {
            if (_onDestructed) {
                _onDestructed();
            }
        }
    
    private:
        onceToken() = delete;
        onceToken(const onceToken &) = delete;
        onceToken(onceToken &&) = delete;
        onceToken &operator=(const onceToken &) = delete;
        onceToken &operator=(onceToken &&) = delete;
    
    private:
        task _onDestructed;
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    应用

    socket->setOnErr([weak_self, weak_session, id](const SockException &err) {
                // 在本函数作用域结束时移除会话对象
                // 目的是确保移除会话前执行其 onError 函数
                // 同时避免其 onError 函数抛异常时没有移除会话对象
                onceToken token(nullptr, [&]() {
                    // 移除掉会话
                    auto strong_self = weak_self.lock();
                    if (!strong_self) {
                        return;
                    }
                    //从共享map中移除本session对象
                    lock_guard<std::recursive_mutex> lck(*strong_self->_session_mutex);
                    strong_self->_session_map->erase(id);
                });
    
                // 获取会话强应用
                if (auto strong_session = weak_session.lock()) {
                    // 触发 onError 事件回调
                    strong_session->onError(err);
                }
            });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    RingBuffer

    RingDelegate

    继承的子类需要实现onWrite函数

    virtual void onWrite(T in, bool is_key = true) = 0;

    _RingStorage

    在这里插入图片描述

    数据

    using GopType = List< List<std::pair<bool, T> > >;//链表中装着链表,次级链表是装着
    
        bool _have_idr;
        size_t _size;         //当前帧的数量 SUM(ervey gop size)
        size_t _max_size;     //最大记录帧数量 不能小于32。即最少最少也能存32帧
        size_t _max_gop_size; //最大gop个数(Group of Picture IBPBP)
        GopType _data_cache;  //链表套链表
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    构造

    设置 _max_size _max_gop_size

    clearCache清除主list,插入一个空的次级list

    popFrontGop 弹出最先插入的GOP

        void clearCache() {
            _size = 0;
            _have_idr = false;
            _data_cache.clear();
            _data_cache.emplace_back();
        }
    
    	void popFrontGop()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    write

    传过来的是I帧,就往新的GOP组里添加,

    ​ 传过来的是PB帧,

    ​ 之前的没有I帧则PB帧丢弃;

    ​ 之前已经有I帧到来,插入到对应的GOP组

    如果帧总数超过 _max_size,先尝试清除老的GOP缓存,还是大于最大缓冲限制,那么清空所有GOP

    /**
         * 写入环形缓存数据
         * @param in 数据
         * @param is_key 是否为关键帧
         * @return 是否触发重置环形缓存大小
         */
        void write(T in, bool is_key = true) {
            if (is_key) {
                _have_idr = true;
                if (!_data_cache.back().empty()) {
                    //当前gop列队还没收到任意缓存
                    _data_cache.emplace_back();
                }
                if (_data_cache.size() > _max_gop_size) {
                    // GOP个数超过限制,那么移除最早的GOP
                    popFrontGop();
                }
            }
    
            if (!_have_idr) {
                //缓存中没有关键帧,那么gop缓存无效
                return;
            }
            _data_cache.back().emplace_back(std::make_pair(is_key, std::move(in)));
            if (++_size > _max_size) {
                //GOP缓存溢出
                while (_data_cache.size() > 1) {
                    //先尝试清除老的GOP缓存
                    popFrontGop();
                }
                if (_size > _max_size) {
                    //还是大于最大缓冲限制,那么清空所有GOP
                    clearCache();
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    List

    继承了list,增加了append/for_each函数

    template<typename T>
    class List : public std::list<T> {
    public:
        template<typename ... ARGS>
        List(ARGS &&...args) : std::list<T>(std::forward<ARGS>(args)...) {};
    
        ~List() = default;
    
        void append(List<T> &other) {
            if (other.empty()) {
                return;
            }
            this->insert(this->end(), other.begin(), other.end());
            other.clear();
        }
    
        template<typename FUNC>
        void for_each(FUNC &&func) {
            for (auto &t : *this) {
                func(t);
            }
        }
    
        template<typename FUNC>
        void for_each(FUNC &&func) const {
            for (auto &t : *this) {
                func(t);
            }
        }
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    _RingReader

    环形缓存读取器,构造时即绑定了对应的_RingStorage

    自己注册读取和detach函数,flushGop一次性读取所有帧

    	std::function<void(void)> _detach_cb = []() {};
        std::function<void(const T &)> _read_cb = [](const T &) {};
    	
        void flushGop() {
            if (!_storage) {
                return;
            }
            _storage->getCache().for_each([this](const List<std::pair<bool, T > > &lst) {
                lst.for_each([this](const std::pair<bool, T> &pr) { onRead(pr.second, pr.first); });
            });
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    _RingReaderDispatcher

    环形事件派发器

    	std::atomic_int _reader_size;                         //?
        std::function<void(int, bool)> _on_size_changed;      //?
        typename RingStorage::Ptr _storage;                   //环形存储
        std::unordered_map<void *, std::weak_ptr<RingReader> > _reader_map;//本线程中,读取相同源的player的集合
    
    • 1
    • 2
    • 3
    • 4

    析构:_reader_map中的所有RingReader调用onDetach

    构造:private,初始化_storage,_reader_size,_on_size_changed

    write

    ,调用_reader_map中的每个RingReader的onRead,然后往环形存储中写

    rtp包来时,给每个拉流源调用onRead,同时往自己的_storage里写入缓存,当新的拉流源来时,根据useCache判断是否使用_storage

    void write(T in, bool is_key = true) {
        for (auto it = _reader_map.begin(); it != _reader_map.end();) { 
            auto reader = it->second.lock();
            if (!reader) {
                it = _reader_map.erase(it);
                --_reader_size;
                onSizeChanged(false);
                continue;
            }
            reader->onRead(in, is_key);
            ++it;
        }
        _storage->write(std::move(in), is_key);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    attach

    附加到poller上,构造RingReader,_reader_map++。可以选择是否使用存储的cache。

    std::shared_ptr<RingReader> attach(const EventPoller::Ptr &poller, bool use_cache) {
        if (!poller->isCurrentThread()) {
            throw std::runtime_error("必须在绑定的poller线程中执行attach操作");
        }
    
        std::weak_ptr<_RingReaderDispatcher> weakSelf = this->shared_from_this();
        auto on_dealloc = [weakSelf, poller](RingReader *ptr) {
            poller->async([weakSelf, ptr]() {
                auto strongSelf = weakSelf.lock();
                if (strongSelf && strongSelf->_reader_map.erase(ptr)) {
                    --strongSelf->_reader_size;
                    strongSelf->onSizeChanged(false);
                }
                delete ptr;
            });
        };
    
        std::shared_ptr<RingReader> reader(new RingReader(use_cache ? _storage : nullptr), on_dealloc);
        _reader_map[reader.get()] = reader;
        ++_reader_size;
        onSizeChanged(true);
        return reader;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    RingBuffer

        using Ptr = std::shared_ptr<RingBuffer>;
        using RingReader = _RingReader<T>;                     
        using RingStorage = _RingStorage<T>;
        using RingReaderDispatcher = _RingReaderDispatcher<T>;
        using onReaderChanged = std::function<void(int size)>; //
    
    	std::mutex _mtx_map;                               //?
        std::atomic_int _total_count { 0 };                //?_total_count是正在拉流的客户端的个数
        typename RingStorage::Ptr _storage;                //环形缓存
        typename RingDelegate<T>::Ptr _delegate;           //子类需要实现onWrite
        onReaderChanged _on_reader_changed;                //读取器变化的函数
        std::unordered_map<EventPoller::Ptr, typename RingReaderDispatcher::Ptr, HashOfPtr> _dispatcher_map;
        //不同的poller对应的不同的RingReaderDispatcher
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    构造

    初始化RingStorage

    attach

    如果**_dispatcher_map中没有对应的poller键,new一个RingReaderDispatcher**,使用clone的RingStorage

    write

    有_delegate,_delegate->onWrite

    否则 调用每个环形事件分发器中(其他线程)的write往自己线程的缓存写。往自己的线程中写

    本线程的对应的pusher数据来了,往storage中写,同时调用各个线程的读poller去读这数据

    void write(T in, bool is_key = true) {
            if (_delegate) {
                _delegate->onWrite(std::move(in), is_key);
                return;
            }
    
            LOCK_GUARD(_mtx_map);
            for (auto &pr : _dispatcher_map) {
                auto &second = pr.second;
                //切换线程后触发onRead事件
                pr.first->async([second, in, is_key]() { second->write(std::move(const_cast<T &>(in)), is_key); }, false);
            }
            _storage->write(std::move(in), is_key);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    发送包到player的调用栈

    EventPoller::runLoop//
    addEvent(_pipe.readFD(), Event_Read, [this](int event) { onPipeEvent(); }) //
    pr.first->async([second, in, is_key]() { second->write(std::move(const_cast<T &>(in)), is_key); }, false);//
    _RingReaderDispatcher::write --->reader->onRead(in, is_key);
    _RingReader::onRead--->_read_cb(data);
    
    _play_reader->setReadCB([weakSelf](const RtspMediaSource::RingDataType &pack) {
                auto strongSelf = weakSelf.lock();
                if (!strongSelf) {
                    return;
                }
                strongSelf->sendRtpPacket(pack);//发送rtp包
            });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    StrPrinter

    一个可以支持<<的字符串

    class _StrPrinter : public std::string {
    public:
        _StrPrinter() {}
    
        template<typename T>
        _StrPrinter& operator <<(T && data) {
            _stream << std::forward<T>(data);
            this->std::string::operator=(_stream.str());
            return *this;
        }
    
        std::string operator <<(std::ostream&(*f)(std::ostream&)) const {
            return *this;
        }
    
    private:
        std::stringstream _stream;
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    EventDispatcher/NoticeCenter

    NoticeCenter

    结构

    std::recursive_mutex _mtxListener;
    std::unordered_map<std::string, EventDispatcher::Ptr> _mapListener;
    
    • 1
    • 2

    private 获取和删除

        EventDispatcher::Ptr getDispatcher(const std::string &event, bool create = false) {
            std::lock_guard<std::recursive_mutex> lck(_mtxListener);
            auto it = _mapListener.find(event);
            if (it != _mapListener.end()) {
                return it->second;
            }
            if (create) {
                //如果为空则创建一个
                EventDispatcher::Ptr dispatcher(new EventDispatcher());
                _mapListener.emplace(event, dispatcher);
                return dispatcher;
            }
            return nullptr;
        }
    
    	void delDispatcher(const std::string &event, const EventDispatcher::Ptr &dispatcher) {
            std::lock_guard<std::recursive_mutex> lck(_mtxListener);
            auto it = _mapListener.find(event);
            if (it != _mapListener.end() && dispatcher == it->second) {
                //两者相同则删除
                _mapListener.erase(it);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    EventDispatcher

    在这里插入图片描述

    任意类型的FUNC

    template

    void addListener(void *tag, FUNC &&func) {

    ​ using funType = typename function_traits::stl_function_type;

    ​ std::shared_ptr pListener(new funType(std::forward(func)), [](void *ptr) {

    ​ funType *obj = (funType *) ptr;

    ​ delete obj;

    ​ });

    多参数

    template

    int emitEvent(ArgsType &&…args) {

    ​ using funType = std::function(args))…)>;

    ​ funType *obj = (funType *) (pr.second.get());

    ​ (*obj)(std::forward(args)…);

    class EventDispatcher {
    public:
        friend class NoticeCenter;
        using Ptr = std::shared_ptr<EventDispatcher>;
    
        ~EventDispatcher() = default;
    
    private:
        using MapType = std::unordered_multimap<void *, std::shared_ptr<void> >;
    
        EventDispatcher() = default;
    
        class InterruptException : public std::runtime_error {
        public:
            InterruptException() : std::runtime_error("InterruptException") {}
    
            ~InterruptException() {}
        };
    
        template<typename ...ArgsType>
        int emitEvent(ArgsType &&...args) {
            using funType = std::function<void(decltype(std::forward<ArgsType>(args))...)>;
            decltype(_mapListener) copy;
            {
                //先拷贝(开销比较小),目的是防止在触发回调时还是上锁状态从而导致交叉互锁
                std::lock_guard<std::recursive_mutex> lck(_mtxListener);
                copy = _mapListener;
            }
    
            int ret = 0;
            for (auto &pr : copy) {
                funType *obj = (funType *) (pr.second.get());
                try {
                    (*obj)(std::forward<ArgsType>(args)...);
                    ++ret;
                } catch (InterruptException &) {
                    ++ret;
                    break;
                }
            }
            return ret;
        }
    
        template<typename FUNC>
        void addListener(void *tag, FUNC &&func) {
            using funType = typename function_traits<typename std::remove_reference<FUNC>::type>::stl_function_type;
            std::shared_ptr<void> pListener(new funType(std::forward<FUNC>(func)), [](void *ptr) {
                funType *obj = (funType *) ptr;
                delete obj;
            });
            std::lock_guard<std::recursive_mutex> lck(_mtxListener);
            _mapListener.emplace(tag, pListener);
        }
    
        void delListener(void *tag, bool &empty) {
            std::lock_guard<std::recursive_mutex> lck(_mtxListener);
            _mapListener.erase(tag);
            empty = _mapListener.empty();
        }
    
    private:
        std::recursive_mutex _mtxListener;
        MapType _mapListener;
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64

    如果用C

    完美转发void*代替

    锁+map用静态的map和vector代替(非多线程)

    typedef int (*BG_RPC_MSG_HANDLER_FUNC)(void *rpc_msg);
    std::map<int, std::vector<BG_RPC_MSG_HANDLER_FUNC>> event_handles;
    
    void event_register(int id, BG_RPC_MSG_HANDLER_FUNC func)
    {
        auto& event_funcs = event_handles[id];
        for (const auto& each_func : event_funcs)
        {
            if (each_func == func)
            {
                error_log("id:%d already register!", id);
                return;
            }
        }
        event_handles[id].push_back(func);
    }
    
    void event_call(int id, void *msg)
    {
        auto it = event_handles.find(id);
        if (it == event_handles.end()) return;
        
        for (const auto &h : it->second)
        {
            h(msg);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    总结

    • onceToken

    • EventDispatcher 模板编程,std::unordered_multimap<**void ***, std::shared_ptr >的使用; std::forward, ArgsType &&…args

    • 如何多线程处理一个rtp数据包,且不涉及到拷贝

      每个RingBuffer有对应的map [EventPoller::Ptr, typename RingReaderDispatcher] 加锁,给对应的RingReaderDispatcher发rtp包指针,RingReaderDispatcher再给_reader_map中的不同客户端(但是是相同的poller管理)发rtp包。

  • 相关阅读:
    pycharm报错提示:无法加载文件\venv\Scripts\activate.ps1,因为在此系统上禁止运行脚本
    golang leetcode算法小抄
    ssm基于web的教务管理系统毕业设计源码261620
    Linux执行命令
    高德面试:为什么Map不能插入null?
    iPhone 的健康数据采用的是 FHIR 传输格式
    Hbase参数调优
    RocketMq部署-二主二从异步集群(安装实践)(未完成)
    android mk常用代码
    Java基础学习笔记记录第一天
  • 原文地址:https://blog.csdn.net/qq_41565920/article/details/127718719