整体分为两部分
1、 js对c++读写消息,_consumerSocket和producerSocket
2、c++对js读写消息,c++收到都是底层都是通过libuv的uv_read_start和uv_write
目录
二、js主动调用c++,通过producerSocket.write
channel.js 构造函数注册监听器
this._consumerSocket.on('data', (buffer) => {})
//真正的处理有效数据其实在processMessage
- processMessage(msg) {
- // If a response, retrieve its associated request.
- if (msg.id) {
- const sent = this.#sents.get(msg.id);
- if (!sent) {
- logger.error('received response does not match any sent request [id:%s]', msg.id);
- return;
- }
- if (msg.accepted) {
- logger.debug('request succeeded [method:%s, id:%s]', sent.method, sent.id);
- sent.resolve(msg.data);
- }
- else if (msg.targetId && msg.event) {
- setImmediate(() => this.emit(String(msg.targetId), msg.event, msg.data));
- }
上层需要回调response就带msg.id
不带id属于事件通知,最终会调用 this.emit(msg.targetId, msg.event, msg.data); 发送出去
- async request(method, handlerId, data) {
- this.#nextId < 4294967295 ? ++this.#nextId : (this.#nextId = 1);
- const id = this.#nextId;
- logger.debug('request() [method:%s, id:%s]', method, id);
- if (this.#closed) {
- throw new errors_1.InvalidStateError('Channel closed');
- }
- const request = `${id}:${method}:${handlerId}:${JSON.stringify(data)}`;
- if (Buffer.byteLength(request) > MESSAGE_MAX_LEN) {
- throw new Error('Channel request too big');
- }
- // This may throw if closed or remote side ended.
- this.#producerSocket.write(Buffer.from(Uint32Array.of(Buffer.byteLength(request)).buffer));
- this.#producerSocket.write(request);
- //构建response的callback,添加id,在processMessage进行回调
- return new Promise((pResolve, pReject) => {
- const sent = {
- id: id,
- method: method,
- resolve: (data2) => {
- if (!this.#sents.delete(id)) {
- return;
- }
- pResolve(data2);
- },
- reject: (error) => {
- if (!this.#sents.delete(id)) {
- return;
- }
- pReject(error);
- },
- close: () => {
- pReject(new errors_1.InvalidStateError('Channel closed'));
- }
- };
- // Add sent stuff to the map.
- this.#sents.set(id, sent);
- });
C++与js的读写就是通过
libuv的uv_read_start和uv_write
以下分析worker进程从启动,到处理信令消息的流程
main方法 ,run_worker方法,运行worker进程
- worker.cpp
- 1、run_worker 建数据通道,创建c++socket
-
- channel = new Channel::ChannelSocket(consumerChannelFd, producerChannelFd);
-
- class ChannelSocket : public ConsumerSocket::Listener
- class ConsumerSocket : public ::UnixStreamSocket
-
- 底层读取数据
- void ConsumerSocket::UserOnUnixStreamRead()
- {
- MS_TRACE_STD();
-
- // Be ready to parse more than a single message in a single chunk.
- while (true)
- {
- if (IsClosed())
- return;
-
- size_t readLen = this->bufferDataLen - this->msgStart;
- char* msgStart = nullptr;
- size_t msgLen;
- int nsRet = netstring_read( //读取数据
- reinterpret_cast<char*>(this->buffer + this->msgStart), readLen, &msgStart, &msgLen);
- }
- }
-
- 给bufferDataLen进行赋值
- inline void UnixStreamSocket::OnUvRead(
- this->bufferDataLen += static_cast<size_t>(nread);
- )
- err = uv_read_start(
- reinterpret_cast<uv_stream_t*>(this->uvHandle),
- static_cast
(onAlloc), - static_cast
(onRead)); - 2、解析为Json数据,转为request进行分发
- void ChannelSocket::OnConsumerSocketMessage(ConsumerSocket* /*consumerSocket*/, char* msg, size_t msgLen)
- {
-
- auto* request = new Channel::ChannelRequest(this, msg, msgLen);
- }
- 3、worker.cpp中HandleRequest方法
- inline void Worker::HandleRequest(Channel::ChannelRequest* request)
- {
- MS_TRACE();
- MS_DEBUG_DEV(
- "Channel request received [method:%s, id:%" PRIu32 "]", request->method.c_str(), request->id);
- INFO("[cxf]req id str:", (int)request->methodId, ":", request->method);
- switch (request->methodId)
- {
- case Channel::ChannelRequest::MethodId::WORKER_CLOSE:
- {
- if (this->closed)
- return;
- MS_DEBUG_DEV("Worker close request, stopping");
- Close();
- break;
- }
- case Channel::ChannelRequest::MethodId::WORKER_DUMP:
- {
- json data = json::object();
- FillJson(data);
- request->Accept(data);
- break;
- }
1、Accept返回response
2、给上层发送通知:使用Notifier类
- 1、Accept
- void ChannelRequest::Accept()
- {
- MS_TRACE();
-
- MS_ASSERT(!this->replied, "request already replied");
-
- this->replied = true;
-
- std::string response("{\"id\":");
-
- response.append(std::to_string(this->id));
- response.append(",\"accepted\":true}");
-
- this->channel->Send(response);
- }
-
- inline void ChannelSocket::SendImpl(const uint8_t* payload, uint32_t payloadLen)
- {
-
- if (this->channelWriteFn)
- {
- this->channelWriteFn(payload, payloadLen, this->channelWriteCtx);
- }
- else
- {
- std::memcpy(this->writeBuffer, &payloadLen, sizeof(uint32_t));
-
- if (payloadLen != 0)
- {
- std::memcpy(this->writeBuffer + sizeof(uint32_t), payload, payloadLen);
- }
-
- size_t len = sizeof(uint32_t) + payloadLen;
-
- this->producerSocket->Write(this->writeBuffer, len);
- }
- }
-
-
- void UnixStreamSocket::Write(const uint8_t* data, size_t len)
- {
-
-
- uv_buf_t buffer = uv_buf_init(reinterpret_cast<char*>(const_cast<uint8_t*>(data)), len);
- int written = uv_try_write(reinterpret_cast<uv_stream_t*>(this->uvHandle), &buffer, 1);
-
- // All the data was written. Done.
- if (written == static_cast<int>(len))
- {
- return;
- }
- // Cannot write any data at first time. Use uv_write().
- else if (written == UV_EAGAIN || written == UV_ENOSYS)
- {
- // Set written to 0 so pendingLen can be properly calculated.
- written = 0;
- }
- // Any other error.
- else if (written < 0)
- {
- MS_ERROR_STD("uv_try_write() failed, trying uv_write(): %s", uv_strerror(written));
-
- // Set written to 0 so pendingLen can be properly calculated.
- written = 0;
- }
-
- const size_t pendingLen = len - written;
- auto* writeData = new UvWriteData(pendingLen);
-
- writeData->req.data = static_cast<void*>(writeData);
- std::memcpy(writeData->store, data + written, pendingLen);
-
- buffer = uv_buf_init(reinterpret_cast<char*>(writeData->store), pendingLen);
-
- const int err = uv_write
- }
-
-
- 2、给上层发送通知
- Notifier类在run_worker函数里初始化
- Channel::ChannelNotifier::ClassInit(channel);
-
- void ChannelNotifier::Emit(const std::string& targetId, const char* event) //和下面区别就在于是否有数据,与Accept响应相似
- {
- MS_TRACE();
-
- MS_ASSERT(ChannelNotifier::channel, "channel unset");
-
- json jsonNotification = json::object();
-
- jsonNotification["targetId"] = targetId;
- jsonNotification["event"] = event;
-
- ChannelNotifier::channel->Send(jsonNotification);
- }
-
-
- 举例
- inline void WebRtcTransport::OnIceServerCompleted(const RTC::IceServer* /*iceServer*/) //状态发生变化,回調使用
- {
- MS_TRACE();
-
- MS_DEBUG_TAG(ice, "ICE completed");
-
- // Notify the Node WebRtcTransport.
- json data = json::object();
-
- data["iceState"] = "completed"; //设置Json数据
-
- Channel::ChannelNotifier::Emit(this->id, "icestatechange", data); //状态发送变化,上传通知到JS层
- }