• 四、Mediasoup Js和 C++ 管道通信的过程


    整体分为两部分

    1、 js对c++读写消息,_consumerSocket和producerSocket

    2、c++对js读写消息,c++收到都是底层都是通过libuv的uv_read_start和uv_write

    目录

    一、js接收c++数据

    二、js主动调用c++,通过producerSocket.write

    三、C++底层读取JS数据

    四、c++写数据给js,主要两种形式


    一、js接收c++数据

    channel.js 构造函数注册监听器
    this._consumerSocket.on('data', (buffer) => {})

    //真正的处理有效数据其实在processMessage

    1. processMessage(msg) {
    2.         // If a response, retrieve its associated request.
    3.         if (msg.id) {
    4.             const sent = this.#sents.get(msg.id);
    5.             if (!sent) {
    6.                 logger.error('received response does not match any sent request [id:%s]', msg.id);
    7.                 return;
    8.             }
    9.             if (msg.accepted) {
    10.                 logger.debug('request succeeded [method:%s, id:%s]', sent.method, sent.id);
    11.                 sent.resolve(msg.data);
    12.             }
    13.         else if (msg.targetId && msg.event) {
    14.             setImmediate(() => this.emit(String(msg.targetId), msg.event, msg.data));
    15.         }


    上层需要回调response就带msg.id
    不带id属于事件通知,最终会调用 this.emit(msg.targetId, msg.event, msg.data);  发送出去

    二、js主动调用c++,通过producerSocket.write

    1. async request(method, handlerId, data) {
    2.         this.#nextId < 4294967295 ? ++this.#nextId : (this.#nextId = 1);
    3.         const id = this.#nextId;
    4.         logger.debug('request() [method:%s, id:%s]', method, id);
    5.         if (this.#closed) {
    6.             throw new errors_1.InvalidStateError('Channel closed');
    7.         }
    8.         const request = `${id}:${method}:${handlerId}:${JSON.stringify(data)}`;
    9.         if (Buffer.byteLength(request) > MESSAGE_MAX_LEN) {
    10.             throw new Error('Channel request too big');
    11.         }
    12.         // This may throw if closed or remote side ended.
    13.         this.#producerSocket.write(Buffer.from(Uint32Array.of(Buffer.byteLength(request)).buffer));
    14.         this.#producerSocket.write(request);
    15.         //构建response的callback,添加id,在processMessage进行回调
    16.         return new Promise((pResolve, pReject) => {
    17.             const sent = {
    18.                 id: id,
    19.                 method: method,
    20.                 resolve: (data2) => {
    21.                     if (!this.#sents.delete(id)) {
    22.                         return;
    23.                     }
    24.                     pResolve(data2);
    25.                 },
    26.                 reject: (error) => {
    27.                     if (!this.#sents.delete(id)) {
    28.                         return;
    29.                     }
    30.                     pReject(error);
    31.                 },
    32.                 close: () => {
    33.                     pReject(new errors_1.InvalidStateError('Channel closed'));
    34.                 }
    35.             };
    36.             // Add sent stuff to the map.
    37.             this.#sents.set(id, sent);
    38.         });

    三、C++底层读取JS数据

    C++与js的读写就是通过
    libuv的uv_read_start和uv_write

    以下分析worker进程从启动,到处理信令消息的流程

    main方法 ,run_worker方法,运行worker进程

    1. worker.cpp 
    2. 1、run_worker 建数据通道,创建c++socket
    3.  channel = new Channel::ChannelSocket(consumerChannelFd, producerChannelFd);  
    4. class ChannelSocket : public ConsumerSocket::Listener
    5. class ConsumerSocket : public ::UnixStreamSocket
    6. 底层读取数据
    7. void ConsumerSocket::UserOnUnixStreamRead()
    8.     {
    9.         MS_TRACE_STD();
    10.         // Be ready to parse more than a single message in a single chunk.
    11.         while (true)
    12.         {
    13.             if (IsClosed())
    14.                 return;
    15.             size_t readLen = this->bufferDataLen - this->msgStart;
    16.             char* msgStart = nullptr;
    17.             size_t msgLen;
    18.             int nsRet = netstring_read(  //读取数据
    19.               reinterpret_cast<char*>(this->buffer + this->msgStart), readLen, &msgStart, &msgLen);
    20.          }
    21. }
    22. 给bufferDataLen进行赋值
    23. inline void UnixStreamSocket::OnUvRead(
    24.         this->bufferDataLen += static_cast<size_t>(nread);
    25.     err = uv_read_start(
    26.           reinterpret_cast<uv_stream_t*>(this->uvHandle),
    27.           static_cast(onAlloc),
    28.           static_cast(onRead));
    29. 2、解析为Json数据,转为request进行分发
    30.     void ChannelSocket::OnConsumerSocketMessage(ConsumerSocket* /*consumerSocket*/, char* msg, size_t msgLen)
    31.     {
    32.     
    33.             auto* request = new Channel::ChannelRequest(this, msg, msgLen);
    34.     }
    35. 3、worker.cpp中HandleRequest方法
    36. inline void Worker::HandleRequest(Channel::ChannelRequest* request)
    37. {
    38.     MS_TRACE();
    39.     MS_DEBUG_DEV(
    40.       "Channel request received [method:%s, id:%" PRIu32 "]", request->method.c_str(), request->id);
    41.     INFO("[cxf]req id str:", (int)request->methodId, ":", request->method);
    42.     switch (request->methodId)
    43.     {
    44.         case Channel::ChannelRequest::MethodId::WORKER_CLOSE:
    45.         {
    46.             if (this->closed)
    47.                 return;
    48.             MS_DEBUG_DEV("Worker close request, stopping");
    49.             Close();
    50.             break;
    51.         }
    52.         case Channel::ChannelRequest::MethodId::WORKER_DUMP:
    53.         {
    54.             json data = json::object();
    55.             FillJson(data);
    56.             request->Accept(data);
    57.             break;
    58.         }


    四、c++写数据给js,主要两种形式


    1、Accept返回response
    2、给上层发送通知:使用Notifier类
     

    1. 1Accept
    2.     void ChannelRequest::Accept()
    3.     {
    4.         MS_TRACE();
    5.         MS_ASSERT(!this->replied, "request already replied");
    6.         this->replied = true;
    7.         std::string response("{\"id\":");
    8.         response.append(std::to_string(this->id));
    9.         response.append(",\"accepted\":true}");
    10.         this->channel->Send(response);
    11.     }
    12.     inline void ChannelSocket::SendImpl(const uint8_t* payload, uint32_t payloadLen)
    13.     {
    14.     
    15.         if (this->channelWriteFn)
    16.         {
    17.             this->channelWriteFn(payload, payloadLen, this->channelWriteCtx);
    18.         }
    19.         else
    20.         {
    21.             std::memcpy(this->writeBuffer, &payloadLen, sizeof(uint32_t));
    22.             if (payloadLen != 0)
    23.             {
    24.                 std::memcpy(this->writeBuffer + sizeof(uint32_t), payload, payloadLen);
    25.             }
    26.             size_t len = sizeof(uint32_t) + payloadLen;
    27.             this->producerSocket->Write(this->writeBuffer, len);
    28.         }
    29.     }
    30. void UnixStreamSocket::Write(const uint8_t* data, size_t len)
    31. {
    32.     uv_buf_t buffer = uv_buf_init(reinterpret_cast<char*>(const_cast<uint8_t*>(data)), len);
    33.     int written     = uv_try_write(reinterpret_cast<uv_stream_t*>(this->uvHandle), &buffer, 1);
    34.     // All the data was written. Done.
    35.     if (written == static_cast<int>(len))
    36.     {
    37.         return;
    38.     }
    39.     // Cannot write any data at first time. Use uv_write().
    40.     else if (written == UV_EAGAIN || written == UV_ENOSYS)
    41.     {
    42.         // Set written to 0 so pendingLen can be properly calculated.
    43.         written = 0;
    44.     }
    45.     // Any other error.
    46.     else if (written < 0)
    47.     {
    48.         MS_ERROR_STD("uv_try_write() failed, trying uv_write(): %s", uv_strerror(written));
    49.         // Set written to 0 so pendingLen can be properly calculated.
    50.         written = 0;
    51.     }
    52.     const size_t pendingLen = len - written;
    53.     auto* writeData         = new UvWriteData(pendingLen);
    54.     writeData->req.data = static_cast<void*>(writeData);
    55.     std::memcpy(writeData->store, data + written, pendingLen);
    56.     buffer = uv_buf_init(reinterpret_cast<char*>(writeData->store), pendingLen);
    57.     const int err = uv_write
    58.     }
    59. 2、给上层发送通知
    60. Notifier类在run_worker函数里初始化
    61. Channel::ChannelNotifier::ClassInit(channel);
    62.   void ChannelNotifier::Emit(const std::string& targetId, const char* event)  //和下面区别就在于是否有数据,与Accept响应相似
    63.     {
    64.         MS_TRACE();
    65.         MS_ASSERT(ChannelNotifier::channel, "channel unset");
    66.         json jsonNotification = json::object();
    67.         jsonNotification["targetId"] = targetId;
    68.         jsonNotification["event"]    = event;
    69.         ChannelNotifier::channel->Send(jsonNotification);
    70.     }
    71. 举例
    72. inline void WebRtcTransport::OnIceServerCompleted(const RTC::IceServer* /*iceServer*/)  //状态发生变化,回調使用
    73.     {
    74.         MS_TRACE();
    75.         MS_DEBUG_TAG(ice, "ICE completed");
    76.         // Notify the Node WebRtcTransport.
    77.         json data = json::object();
    78.         data["iceState"] = "completed";  //设置Json数据
    79.         Channel::ChannelNotifier::Emit(this->id, "icestatechange", data);  //状态发送变化,上传通知到JS层
    80.       }

  • 相关阅读:
    redis实现用户签到以及签到统计
    更改Kali Linux系统语言以及安装zenmap
    2023最新SSM计算机毕业设计选题大全(附源码+LW)之java宠物寄养平台设计03zp5
    python使用hashlib库运行MD5哈希算法
    Java架构该如何进阶?还在东拼西凑的学习?这份进阶指南相信会对你有所帮助,十多位资深大佬独家秘籍一并传授!
    Codeforces Round #833 (Div. 2)
    一篇搞懂C++(万字总结)
    AdaBoost:提升机器学习的力量
    Sprites and textures
    Shapiro-Francia正态检验
  • 原文地址:https://blog.csdn.net/zrjliming/article/details/132817989