采用protobuf实现数据的序列化和逆序列化,再使用socket进行通讯实现类似ROS topic通讯的效果。
UDPAdapting.hpp
继承google::protobuf::io::CopyingInputStream实现数据的读入和输出
#ifndef UDPADAPTING_H
#define UDPADAPTING_H
#include
// socket
#include
#include
class UDPInputStream : public google::protobuf::io::CopyingInputStream
{
public:
UDPInputStream(int sockfd) : _sockfd(sockfd)
{
memset(&_servaddr, 0, sizeof(_servaddr));
_addrlen = sizeof(_servaddr);
}
int Read(void *buffer, int size)
{
size = recvfrom(_sockfd, buffer, size, 0, (struct sockaddr *)&_servaddr, &_addrlen);
if (size < 0)
{
std::cout << "recvfrom error: " << strerror(errno) << " " << errno
<< std::endl;
}
return size;
}
private:
int _sockfd;
struct sockaddr_in _servaddr;
socklen_t _addrlen;
};
class UDPOutputStream : public google::protobuf::io::CopyingOutputStream
{
public:
UDPOutputStream(int sockfd, struct sockaddr_in servaddr) : _sockfd(sockfd), _servaddr(servaddr) {}
bool Write(const void *buffer, int size)
{
sendto(_sockfd, buffer, size, 0, (struct sockaddr *)&_servaddr, sizeof(_servaddr));
return true;
}
private:
int _sockfd;
struct sockaddr_in _servaddr;
};
#endif
UDPClient.hpp
#ifndef UDPCLIENT_H
#define UDPCLIENT_H
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include "UDPAdapting.hpp"
class UDPClient
{
private:
int _sockfd;
struct sockaddr_in _servaddr;
UDPOutputStream *_udp_outputStream;
google::protobuf::io::CopyingOutputStreamAdaptor *_cos_adp;
public:
UDPClient()
{
// socket 创建套接字
_sockfd = socket(AF_INET, SOCK_DGRAM, 0);
if (_sockfd == -1)
{
std::cout << "create socket error: " << strerror(errno) << " " << errno
<< std::endl;
}
}
~UDPClient()
{
delete _cos_adp;
delete _udp_outputStream;
close(_sockfd);
}
void setSockAddr(char *destAddr, int destPort)
{
// 绑定服务端地址
memset(&_servaddr, 0, sizeof(_servaddr));
_servaddr.sin_family = AF_INET;
_servaddr.sin_addr.s_addr = inet_addr(destAddr); // 服务器IP,本地
_servaddr.sin_port = htons(destPort); // 端口
// 初始化OutputStreamAdaptor
_udp_outputStream = new UDPOutputStream(_sockfd, _servaddr);
_cos_adp = new google::protobuf::io::CopyingOutputStreamAdaptor(_udp_outputStream);
}
template <typename ProtoType>
void sendData(ProtoType &msg)
{
google::protobuf::util::SerializeDelimitedToZeroCopyStream(msg, _cos_adp);
_cos_adp->Flush();
}
};
#endif
UDPServer.hpp
#ifndef UDPSERVER_H
#define UDPSERVER_H
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include "UDPAdapting.hpp"
class UDPServer
{
private:
int _sockfd;
struct sockaddr_in _servaddr;
UDPInputStream *_udp_inputstream;
google::protobuf::io::CopyingInputStreamAdaptor *_cis_adp;
bool _clean_eof = true;
public:
UDPServer()
{
// socket 创建套接字
if ((_sockfd = socket(AF_INET, SOCK_DGRAM, 0)) == -1)
{
std::cout << "create socket error: " << strerror(errno) << " " << errno
<< std::endl;
}
//防止关闭后端口还处于占用状态
int reuse = 1;
setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
}
~UDPServer()
{
delete _cis_adp;
delete _udp_inputstream;
close(_sockfd);
}
void listen(int port)
{
// bind 端口地址
memset(&_servaddr, 0, sizeof(_servaddr));
_servaddr.sin_family = AF_INET;
_servaddr.sin_addr.s_addr = htonl(INADDR_ANY); // 动态IP
_servaddr.sin_port = htons(port); // 端口
if (bind(_sockfd, (struct sockaddr *)&_servaddr, sizeof(_servaddr)) == -1)
{
std::cout << "bind socket error: " << strerror(errno) << " " << errno
<< std::endl;
}
// 初始化InputStreamAdaptor
_udp_inputstream = new UDPInputStream(_sockfd);
_cis_adp = new google::protobuf::io::CopyingInputStreamAdaptor(_udp_inputstream);
}
template <typename ProtoType>
void receive(ProtoType *data)
{
google::protobuf::util::ParseDelimitedFromZeroCopyStream(data, _cis_adp, &_clean_eof);
}
};
#endif
发布器
#include "include/UDPClient.hpp"
#include "test_values.pb.h"
#include
int main(int argc, char **argv)
{
test::test_2_msg msg;
msg.set_a(1.0);
msg.set_b(2.5);
std::shared_ptr<UDPClient> upd_client_ptr= std::make_shared<UDPClient>();
upd_client_ptr->setSockAddr((char*)"127.0.0.1", 7720);
Timer debug_t;
upd_client_ptr->sendData(msg);
printf("publish_msg %f ms\n", debug_t.getMs());
return 0;
}
接收器
#include "include/UDPServer.hpp"
#include "test_values.pb.h"
#include
int main(int argc, char **argv)
{
test::test_2_msg msg;
std::shared_ptr<UDPServer> upd_server_ptr = std::make_shared<UDPServer>();
upd_server_ptr->listen(7720);
int count = 0;
while (count < 10)
{
upd_server_ptr->receive(&msg);
std::cout << "receive data: " << msg.a() << "," << msg.b() << std::endl;
count++;
}
return 0;
}