• 结合protobuf和socket实现多进程通讯


    目标

    采用protobuf实现数据的序列化和逆序列化,再使用socket进行通讯实现类似ROS topic通讯的效果。

    程序

    • UDPAdapting.hpp

      继承google::protobuf::io::CopyingInputStream实现数据的读入和输出

      #ifndef UDPADAPTING_H
      #define UDPADAPTING_H
      
      #include 
      
      // socket
      #include 
      #include 
      
      class UDPInputStream : public google::protobuf::io::CopyingInputStream
      {
      public:
          UDPInputStream(int sockfd) : _sockfd(sockfd)
          {
              memset(&_servaddr, 0, sizeof(_servaddr));
              _addrlen = sizeof(_servaddr);
          }
      
          int Read(void *buffer, int size)
          {
              size = recvfrom(_sockfd, buffer, size, 0, (struct sockaddr *)&_servaddr, &_addrlen);
              if (size < 0)
              {
                  std::cout << "recvfrom error: " << strerror(errno) << " " << errno
                            << std::endl;
              }
              return size;
          }
      
      private:
          int _sockfd;
          struct sockaddr_in _servaddr;
          socklen_t _addrlen;
      };
      
      class UDPOutputStream : public google::protobuf::io::CopyingOutputStream
      {
      public:
          UDPOutputStream(int sockfd, struct sockaddr_in servaddr) : _sockfd(sockfd), _servaddr(servaddr) {}
      
          bool Write(const void *buffer, int size)
          {
              sendto(_sockfd, buffer, size, 0, (struct sockaddr *)&_servaddr, sizeof(_servaddr));
              return true;
          }
      
      private:
          int _sockfd;
          struct sockaddr_in _servaddr;
      };
      
      #endif
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
    • UDPClient.hpp

      #ifndef UDPCLIENT_H
      #define UDPCLIENT_H
      
      #include 
      #include 
      #include 
      #include 
      #include 
      #include 
      #include 
      #include 
      #include 
      #include 
      
      #include 
      #include "UDPAdapting.hpp"
      
      class UDPClient
      {
      private:
          int _sockfd;
          struct sockaddr_in _servaddr;
      
          UDPOutputStream *_udp_outputStream;
          google::protobuf::io::CopyingOutputStreamAdaptor *_cos_adp;
      
      public:
          UDPClient()
          {
              // socket 创建套接字
              _sockfd = socket(AF_INET, SOCK_DGRAM, 0);
              if (_sockfd == -1)
              {
                  std::cout << "create socket error: " << strerror(errno) << " " << errno
                            << std::endl;
              }
          }
      
          ~UDPClient()
          {
              delete _cos_adp;
              delete _udp_outputStream;
              close(_sockfd);
          }
      
          void setSockAddr(char *destAddr, int destPort)
          {
              // 绑定服务端地址
              memset(&_servaddr, 0, sizeof(_servaddr));
              _servaddr.sin_family = AF_INET;
              _servaddr.sin_addr.s_addr = inet_addr(destAddr); // 服务器IP,本地
              _servaddr.sin_port = htons(destPort);            // 端口
      
              // 初始化OutputStreamAdaptor
              _udp_outputStream = new UDPOutputStream(_sockfd, _servaddr);
              _cos_adp = new google::protobuf::io::CopyingOutputStreamAdaptor(_udp_outputStream);
          }
      
          template <typename ProtoType>
          void sendData(ProtoType &msg)
          {
              google::protobuf::util::SerializeDelimitedToZeroCopyStream(msg, _cos_adp);
              _cos_adp->Flush();
          }
      };
      
      #endif
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55
      • 56
      • 57
      • 58
      • 59
      • 60
      • 61
      • 62
      • 63
      • 64
      • 65
      • 66
      • 67
    • UDPServer.hpp

      #ifndef UDPSERVER_H
      #define UDPSERVER_H
      
      #include 
      #include 
      #include 
      #include 
      #include 
      #include 
      #include 
      #include 
      #include 
      
      #include 
      #include "UDPAdapting.hpp"
      
      class UDPServer
      {
      private:
        int _sockfd;
        struct sockaddr_in _servaddr;
      
        UDPInputStream *_udp_inputstream;
        google::protobuf::io::CopyingInputStreamAdaptor *_cis_adp;
        bool _clean_eof = true;
      
      public:
        UDPServer()
        {
          // socket 创建套接字
          if ((_sockfd = socket(AF_INET, SOCK_DGRAM, 0)) == -1)
          {
            std::cout << "create socket error: " << strerror(errno) << " " << errno
                      << std::endl;
          }
      
          //防止关闭后端口还处于占用状态
          int reuse = 1;
          setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
        }
      
        ~UDPServer()
        {
          delete _cis_adp;
          delete _udp_inputstream;
          close(_sockfd);
        }
      
        void listen(int port)
        {
          // bind 端口地址
          memset(&_servaddr, 0, sizeof(_servaddr));
          _servaddr.sin_family = AF_INET;
          _servaddr.sin_addr.s_addr = htonl(INADDR_ANY); // 动态IP
          _servaddr.sin_port = htons(port);              // 端口
          if (bind(_sockfd, (struct sockaddr *)&_servaddr, sizeof(_servaddr)) == -1)
          {
            std::cout << "bind socket error: " << strerror(errno) << " " << errno
                      << std::endl;
          }
      
          // 初始化InputStreamAdaptor
          _udp_inputstream = new UDPInputStream(_sockfd);
          _cis_adp = new google::protobuf::io::CopyingInputStreamAdaptor(_udp_inputstream);
        }
      
        template <typename ProtoType>
        void receive(ProtoType *data)
        {
          google::protobuf::util::ParseDelimitedFromZeroCopyStream(data, _cis_adp, &_clean_eof);
        }
      };
      
      #endif
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55
      • 56
      • 57
      • 58
      • 59
      • 60
      • 61
      • 62
      • 63
      • 64
      • 65
      • 66
      • 67
      • 68
      • 69
      • 70
      • 71
      • 72
      • 73
      • 74

    应用

    • 发布器

      #include "include/UDPClient.hpp"
      #include "test_values.pb.h"
      
      #include 
      
      int main(int argc, char **argv)
      {
          test::test_2_msg msg;
          msg.set_a(1.0);
          msg.set_b(2.5);
      
          std::shared_ptr<UDPClient> upd_client_ptr= std::make_shared<UDPClient>();
          upd_client_ptr->setSockAddr((char*)"127.0.0.1", 7720);
          Timer debug_t;
          upd_client_ptr->sendData(msg);
          printf("publish_msg %f ms\n", debug_t.getMs());
      
          return 0;
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
    • 接收器

      #include "include/UDPServer.hpp"
      #include "test_values.pb.h"
      
      #include 
      
      int main(int argc, char **argv)
      {
        test::test_2_msg msg;
      
        std::shared_ptr<UDPServer> upd_server_ptr = std::make_shared<UDPServer>();
        upd_server_ptr->listen(7720);
        int count = 0;
        while (count < 10)
        {
          upd_server_ptr->receive(&msg);
          std::cout << "receive data: " << msg.a() << "," << msg.b() << std::endl;
          count++;
        }
        return 0;
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20

    参考

    UDP通讯之字节流与protobuf转换(C++版)

    zero_copy_stream_impl_lite

  • 相关阅读:
    RabbitMQ的 五种工作模型
    如何压缩数据与图像?
    Qt实战案例(53)——利用QDrag实现拖拽拼图功能
    移动app测试的7个关键点,建议收藏
    笔试面试相关记录(13)
    包含光栅的高NA显微系统
    07_ElasticSearch:倒排序索引与分词Analysis
    红黑树的性质与简单实现
    使用Spark读写Parquet文件验证Parquet自带表头的性质及NULL值来源【Java】
    C语言——扫雷小游戏
  • 原文地址:https://blog.csdn.net/Kalenee/article/details/126440049