• 使用boost封装一个websocketserver类


    boost官网案例的基础上进行的修改,以下具体功能的实现方式应该根据具体应用场景而定

    boost版本:1.77
    功能:websocket服务器端,能够处理多个客户端,获取其客户端的url路径

    Websocket.h

    #pragma once
    
    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    #include "WebServer.h"
    
    #define BUFF_SIZE_MAX 10*1024
    
    namespace beast = boost::beast;         // from 
    namespace http = beast::http;           // from 
    namespace websocket = beast::websocket; // from 
    namespace net = boost::asio;            // from 
    using tcp = boost::asio::ip::tcp;       // from 
    
    class WebSocketSession;
    // 客户端关闭回调
    typedef std::function<void(std::shared_ptr<WebSocketSession> ptr)> pCb_Close_;
    // 接收数据回调
    typedef std::function<void(std::string, std::string)> pRecvData; 
    
    // Echoes back all received WebSocket messages
    class WebSocketSession : public std::enable_shared_from_this<WebSocketSession>
    {
    public:
    	// Take ownership of the socket
    	explicit WebSocketSession(tcp::socket&& socket, pCb_LogInfo pLog, pRecvData ptr, pCb_Close_ pclose)
    		: ws_(std::move(socket)),
    		pLog_(pLog),
    		pMsg_(ptr),
    		pClose_(pclose),
    		bConnected_(true)
    	{
    		memset(&buffer_, 0, BUFF_SIZE_MAX);
    	}
    
    	std::string GetUrlPath() const;
    	bool GetStatus() const;
    	void SendJsonData(std::string data);
    
    	// Get on the correct executor
    	void run();
    
    	// Start the asynchronous operation
    	void on_run();
    	void on_accept(beast::error_code ec);
    	void do_read();
    	void on_read(beast::error_code ec, std::size_t bytes_transferred);
    	void on_write(beast::error_code ec, std::size_t bytes_transferred);
    private:
    	void Info(std::string msg);
    
    private:
    	websocket::stream<beast::tcp_stream> ws_;
    	pRecvData pMsg_;
    	pCb_LogInfo pLog_;
    	pCb_Close_ pClose_;
    	std::string urlPath_;              // url路径
    	std::atomic<bool> bConnected_;     // 连接标志位
    	char buffer_[BUFF_SIZE_MAX];
    };
    
    // Accepts incoming connections and launches the sessions
    class listener : public std::enable_shared_from_this<listener>
    {
    public:
    	listener(int port, pCb_LogInfo pLog, pRecvData pData);
    
    	// Start accepting incoming connections
    	void run();
    
    	bool SendMsg(std::string url, std::string data);
    
    private:
    	void do_accept();
    	void on_accept(beast::error_code ec, tcp::socket socket);
    
    	void Info(std::string msg);
    	void closeClient(std::shared_ptr<WebSocketSession> ptr);
    
    private:
    	typedef boost::shared_ptr<net::io_service> io_service_ptr;
    	typedef boost::shared_ptr<net::io_service::work> work_ptr;
    
    	io_service_ptr ios_ptr_;
    	work_ptr work_ptr_;
    	std::shared_ptr<std::thread> pthread_;
    
    	std::mutex mtxClients_;
    	std::unordered_set<std::shared_ptr<WebSocketSession>> sessions_;  // 客户端管理
    	std::shared_ptr<tcp::acceptor> pAcceptor_;
    	pRecvData pMsg_;      // 消息回调
    	pCb_LogInfo pLog_;    // 日志回调
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107

    Websocket.cpp

    #include "Websocket.h"
    
    std::string WebSocketSession::GetUrlPath() const
    {
    	return urlPath_;
    }
    
    
    bool WebSocketSession::GetStatus() const
    {
    	return bConnected_;
    }
    
    void WebSocketSession::SendJsonData(std::string data)
    {
    	if (!bConnected_)
    		return;
    	boost::beast::error_code ecode;
    	ws_.text(ws_.got_text());
    	ws_.write(boost::asio::buffer(data), ecode);
    	if (ecode)
    	{
    		std::cerr << "[WebSocketSession::SendJsonData] error" << ecode.message()<<std::endl;
    		bConnected_ = false;
    	}
    }
    
    void WebSocketSession::run()
    {
    	// We need to be executing within a strand to perform async operations
        // on the I/O objects in this session. Although not strictly necessary
    	// for single-threaded contexts, this example code is written to be
    	// thread-safe by default.
    	net::dispatch(ws_.get_executor(),
    		beast::bind_front_handler(
    			&WebSocketSession::on_run,
    			shared_from_this()));
    }
    
    void WebSocketSession::on_run()
    {
    	// Set suggested timeout settings for the websocket
    	ws_.set_option(
    		websocket::stream_base::timeout::suggested(
    			beast::role_type::server));
    
    	// Set a decorator to change the Server of the handshake
    	ws_.set_option(websocket::stream_base::decorator(
    		[](websocket::response_type& res)
    		{
    			res.set(http::field::server,
    				std::string(BOOST_BEAST_VERSION_STRING) +
    				" websocket-server-async");
    		}));
    
    	beast::flat_buffer buffer;
    
    	// Read the HTTP request ourselves
    	http::request<http::string_body> req;
    	http::read(ws_.next_layer(), buffer, req);
    
    	// See if its a WebSocket upgrade request
    	if (websocket::is_upgrade(req))
    	{
    		// Construct the stream, transferring ownership of the socket
    		//stream ws(std::move(sock));
    
    		// Clients SHOULD NOT begin sending WebSocket
    		// frames until the server has provided a response.
    		BOOST_ASSERT(buffer.size() == 0);
    
    		// Accept the upgrade request
    		ws_.async_accept(req,
    			beast::bind_front_handler(
    				&WebSocketSession::on_accept,
    				shared_from_this()));
    
    		// 获取客户端url路径
    		urlPath_ = std::string(req.target().data(), req.target().length()).data();
    	}
    }
    
    void WebSocketSession::on_accept(beast::error_code ec)
    {
    	if (ec)
    	{
    		std::cerr << "[WebSocketSession::on_accept] error info:"<<ec.message() << std::endl;
    		return;
    	}
    	
    	// Read a message
    	do_read();
    }
    
    void WebSocketSession::do_read()
    {
    	// Read a message into our buffer
    	memset(buffer_, 0, BUFF_SIZE_MAX);
    	ws_.async_read_some(
    		boost::asio::buffer(buffer_, BUFF_SIZE_MAX),
    		beast::bind_front_handler(
    			&WebSocketSession::on_read,
    			shared_from_this()));
    }
    
    void WebSocketSession::on_read(beast::error_code ec, std::size_t bytes_transferred)
    {
    	if (ec)
    	{
    		std::string msg = ec.message();
    		if (pClose_)
    			pClose_(shared_from_this());
    		return;
    	}
    	
    	if (pMsg_)
    		pMsg_(urlPath_, std::string(buffer_, bytes_transferred));
    	
    	do_read();
    }
    
    void WebSocketSession::on_write(beast::error_code ec, std::size_t bytes_transferred)
    {
    	boost::ignore_unused(bytes_transferred);
    
    	if (ec)
    	{
    		std::cerr << "error in write:" << ec.message() << std::endl;
    		Info("write:" + ec.message());
    		if (pClose_)
    			pClose_(shared_from_this());
    	}
    }
    
    void WebSocketSession::Info(std::string msg)
    {
    	if (pLog_)
    		pLog_("[WebSocketClient]" + urlPath_ + "," + msg);
    }
    
    
    
    listener::listener(int port, pCb_LogInfo pLog, pRecvData pData)
    	:pMsg_(pData),
    	pLog_(pLog)
    {
    	ios_ptr_.reset(new boost::asio::io_service);
    	work_ptr_.reset(new boost::asio::io_service::work(*ios_ptr_));
    	pthread_.reset(new std::thread(boost::bind(&boost::asio::io_service::run, ios_ptr_)));
    
    	tcp::endpoint ep(boost::asio::ip::tcp::v4(), port);
    	pAcceptor_.reset(new tcp::acceptor(*ios_ptr_));
    	beast::error_code ec;
    
    	// Open the acceptor
    	pAcceptor_->open(ep.protocol(), ec);
    	if (ec)
    	{
    		std::string msg = ec.message();
    		return;
    	}
    
    	// Allow address reuse
    	pAcceptor_->set_option(net::socket_base::reuse_address(true), ec);
    	if (ec)
    	{
    		//fail(ec, "set_option");
    		std::string msg = ec.message();
    		return;
    	}
    
    	// Bind to the server address
    	pAcceptor_->bind(ep, ec);
    	if (ec)
    	{
    		//	fail(ec, "bind");
    		std::string msg = ec.message();
    		return;
    	}
    
    	// Start listening for connections
    	pAcceptor_->listen(net::socket_base::max_listen_connections, ec);
    	if (ec)
    	{
    		//	fail(ec, "listen");
    		std::string msg = ec.message();
    		return;
    	}
    }
    
    void listener::run()
    {
    	do_accept();
    }
    
    bool listener::SendMsg(std::string url, std::string data)
    {
    	std::lock_guard<std::mutex> lock(mtxClients_);
    	if (data.empty() || url.empty() || 0 == sessions_.size())
    		return false;
    
    	for (auto ptr = sessions_.begin(); ptr != sessions_.end();)
    	{
    		if (!ptr->get()->GetStatus())
    		{
    			ptr = sessions_.erase(ptr);
    			if (ptr == sessions_.end())
    				break;
    			else
    				continue;
    		}
    		if (ptr->get()->GetUrlPath() == url)
    				ptr->get()->SendJsonData(data);
    		++ptr;
    	}
    
    	return true;
    }
    
    void listener::do_accept()
    {
    	// The new connection gets its own strand
    	pAcceptor_->async_accept(
    		net::make_strand(*ios_ptr_),
    		beast::bind_front_handler(
    			&listener::on_accept,
    			shared_from_this()));
    }
    
    void listener::on_accept(beast::error_code ec, tcp::socket socket)
    {
    	if (ec)
    	{
    		//	fail(ec, "accept");
    	}
    	else
    	{
    		// Create the session and run it
    		auto session = std::make_shared<WebSocketSession>(std::move(socket),
    			pLog_,
    			pMsg_,
    			std::bind(&listener::closeClient, this,
    				std::placeholders::_1));
    		session->run();
    		std::unique_lock<std::mutex> lock(mtxClients_);
    		sessions_.insert(session);
    	}
    
    	// Accept another connection
    	do_accept();
    }
    
    void listener::Info(std::string msg)
    {
    	if (pLog_)
    		pLog_("[WebSocketServer]" + msg);
    }
    
    void listener::closeClient(std::shared_ptr<WebSocketSession> ptr)
    {
    	std::unique_lock<std::mutex> lock(mtxClients_);
    	if (nullptr == ptr || sessions_.empty())
    		return;
    
    	for (auto it = sessions_.begin(); it != sessions_.end(); ++it)
    	{
    		if (*it == ptr)
    		{
    			sessions_.erase(it);
    			break;
    		}
    	}
    	Info("clients:" + std::to_string(sessions_.size()));
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211
    • 212
    • 213
    • 214
    • 215
    • 216
    • 217
    • 218
    • 219
    • 220
    • 221
    • 222
    • 223
    • 224
    • 225
    • 226
    • 227
    • 228
    • 229
    • 230
    • 231
    • 232
    • 233
    • 234
    • 235
    • 236
    • 237
    • 238
    • 239
    • 240
    • 241
    • 242
    • 243
    • 244
    • 245
    • 246
    • 247
    • 248
    • 249
    • 250
    • 251
    • 252
    • 253
    • 254
    • 255
    • 256
    • 257
    • 258
    • 259
    • 260
    • 261
    • 262
    • 263
    • 264
    • 265
    • 266
    • 267
    • 268
    • 269
    • 270
    • 271
    • 272
    • 273
    • 274
  • 相关阅读:
    Hooks进阶--useEffect - 发送网络请求
    01.初识C语言1
    Codeforces Round 902 Div 1 (CF 1876)
    字符串与正则表达式(C#)
    python安装、输入输出、注释、中文编码、编码规范等基础语法
    泰山OFFICE技术讲座:缩放比例与打开文件
    大学生抗疫逆行者网页作业 感动人物HTML网页代码成品 最美逆行者dreamweaver网页模板 致敬疫情感动人物网页设计制作
    UE4 快速入门 1
    算力五力模型:一种衡量算力的综合方法
    模拟实现链式二叉树及其结构学习——【数据结构】
  • 原文地址:https://blog.csdn.net/seaeress/article/details/133684547