• WebSocket的那些事(5-Spring STOMP支持之连接外部消息代理)


    一、序言

    上节我们在 WebSocket的那些事(4-Spring中的STOMP支持详解) 中详细说明了通过Spring内置消息代理结合STOMP子协议进行Websocket通信,以及相关注解的使用及原理。

    但是Spring内置消息代理会有一些限制,比如只支持STOMP协议的一部分命令,像acksreceipts命令都是不支持的,还有由于内置消息代理把消息存储在内存,当应用不可用时,客户端也就订阅不到到后台推送的消息。

    这节我们将会使用支持STOMP协议的外部消息代理(RabbitMQ)进行Websocket通信。


    二、开启RabbitMQ外部消息代理

    服务端路由发送消息以及客户端订阅消息都要通过STOMP协议与RabbitMQ进行交互,由于RabbitMQ默认没有启动STOMP插件,因此我们需要先启用该插件。

    rabbitmq-plugins enable rabbitmq_stomp
    
    • 1

    启动该插件后,RabbitMQ中STOMP适配器默认会监听61613端口,如果是云服务器,需要把该端口在安全组中放开。

    关于该插件说明请参考:RabbitMQ中STOMP插件说明


    三、代码示例

    我们在 WebSocket的那些事(4-Spring中的STOMP支持详解)中写了一个简单的聊天Demo示例,下面我们对该聊天Demo示例进行改造,将Spring内置消息代理替换成RabbitMQ外部消息代理。

    1、Maven依赖项

    服务端和客户端与外部消息代理都是通过TCP进行通信,Spring底层默认使用的是NettyReactor,因此需要引入相关依赖项。

    <dependency>
        <groupId>org.springframework.bootgroupId>
        <artifactId>spring-boot-starter-websocketartifactId>
    dependency>
    <dependency>
        <groupId>org.springframework.bootgroupId>
        <artifactId>spring-boot-starter-reactor-nettyartifactId>
    dependency>
    <dependency>
        <groupId>org.springframework.bootgroupId>
        <artifactId>spring-boot-starter-thymeleafartifactId>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    2、相关实体

    (1) 请求消息参数

    @Data
    public class WebSocketMsgDTO {
    
    	private String name;
    
    	private String content;
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    (2) 响应消息内容

    @Data
    @Builder
    @AllArgsConstructor
    @NoArgsConstructor
    public class WebSocketMsgVO {
    
    	private String content;
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    (3) 自定义认证用户信息

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public class StompAuthenticatedUser implements Principal {
    
    	/**
    	 * 用户唯一ID
    	 */
    	private String userId;
    
    	/**
    	 * 用户昵称
    	 */
    	private String nickName;
    
    	/**
    	 * 用于指定用户消息推送的标识
    	 * @return
    	 */
    	@Override
    	public String getName() {
    		return this.userId;
    	}
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    3、自定义用户认证拦截器

    @Slf4j
    public class UserAuthenticationChannelInterceptor implements ChannelInterceptor {
    
    	private static final String USER_ID = "User-ID";
    	private static final String USER_NAME = "User-Name";
    
    	@Override
    	public Message<?> preSend(Message<?> message, MessageChannel channel) {
    		StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
    		// 如果是连接请求,记录userId
    		if (StompCommand.CONNECT.equals(accessor.getCommand())) {
    			String userID = accessor.getFirstNativeHeader(USER_ID);
    			String username = accessor.getFirstNativeHeader(USER_NAME);
    
    			log.info("Stomp User-Related headers found, userID: {}, username:{}", userID, username);
    			accessor.setUser(new StompAuthenticatedUser(userID, username));
    		}
    
    		return message;
    	}
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    4、Websocket外部消息代理配置

    Spring中与外部消息代理通信的中间方被称之为Broker Relay,它会维护一个系统共享的单一TCP连接和外部消息代理进行通信,该TCP连接仅仅适用于服务端,用来发送消息,而不是接收消息,通过Broker RelaysystemLoginsystemPasscode属性可以设置该连接的认证信息。

    Broker Relay也会为每个连接的Websocket客户端创建一个TCP连接,该连接用来接收消息,通过clientLoginclientPasscode属性可以设置连接的认证信息。

    /**
     * Websocket连接外部消息代理配置
     * @author Nick Liu
     * @date 2023/9/6
     */
    @Configuration
    @EnableWebSocketMessageBroker
    public class WebsocketExternalMessageBrokerConfig implements WebSocketMessageBrokerConfigurer {
    
    	@Override
    	public void configureClientInboundChannel(ChannelRegistration registration) {
    		// 拦截器配置
    		registration.interceptors(new UserAuthenticationChannelInterceptor());
    	}
    
    	@Override
    	public void registerStompEndpoints(StompEndpointRegistry registry) {
    		registry.addEndpoint("/websocket") // WebSocket握手端口
    			.addInterceptors(new HttpSessionHandshakeInterceptor())
    			.setAllowedOriginPatterns("*") // 设置跨域
    			.withSockJS(); // 开启SockJS回退机制
    	}
    
    	@Override
    	public void configureMessageBroker(MessageBrokerRegistry registry) {
    		registry.setApplicationDestinationPrefixes("/app") // 发送到服务端目的地前缀
    			.enableStompBrokerRelay("/topic") // 开启外部消息代理,指定消息订阅前缀
    			.setRelayHost("localhost") // 外部消息代理Host
    			.setRelayPort(61613) // 外部消息代理STOMP端口
    			.setSystemLogin("admin")  // 共享系统连接用户名,该连接主要用来发送消息
    			.setSystemPasscode("admin") // 共享系统连接密码,该连接主要用来发送消息
    			.setClientLogin("admin") // 客户端连接用户名,该连接主要用来接收消息
    			.setClientPasscode("admin") // 客户端连接密码,该连接主要用来接收消息
    			.setVirtualHost("/stomp"); // RabbitMQ虚拟主机
    	}
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    备注:我们可以为服务端与客户端的连接设置不同的用户,针对客户端连接用户进行权限管控,保证系统的安全性,在这里为了方便测试我们统一用一个用户。

    5、ChatController

    STOMP协议并没有规定消息代理必须支持哪种类型的Destinations(目的地),但是RabbitMQ STOMP适配器只支持一些指定的目的地类型,如下图:
    在这里插入图片描述

    • /exchange:指定交换机和路由key,发送和订阅来自队列的消息。
    • /queue:发送和订阅受STOMP网关管理的队列的消息,最多只有一个订阅者能到消息。
    • /amq/queue:发送和订阅不受STOMP网关管理的队列的消息。
    • /topic:发送和订阅来自临时或者持久Topic的消息,多个订阅者都能接收到消息。
    • /temp-queue/:发送和订阅来自临时队列的消息。

    参考文档见:RabbitMQ中STOMP插件说明

    在下面的示例中,我们选用了/topic的开头的消息发送和订阅前缀,目的地格式只能为/topic/{routing-key}routing-key不能有斜杠,否则会报错。

    @Slf4j
    @Controller
    @RequiredArgsConstructor
    public class ChatController {
    
    	private final SimpUserRegistry simpUserRegistry;
    	private final SimpMessagingTemplate simpMessagingTemplate;
    
    	/**
    	 * 模板引擎为Thymeleaf,需要加上spring-boot-starter-thymeleaf依赖,
    	 * @return
    	 */
    	@GetMapping("/page/chat")
    	public ModelAndView turnToChatPage() {
    		return new ModelAndView("chat");
    	}
    
    	/**
    	 * 群聊消息处理
    	 * 这里我们通过@SendTo注解指定消息目的地为"/topic/chat/group",如果不加该注解则会自动发送到"/topic" + "/chat/group"
    	 * @param webSocketMsgDTO 请求参数,消息处理器会自动将JSON字符串转换为对象
    	 * @return 消息内容,方法返回值将会广播给所有订阅"/topic/chat/group"的客户端
    	 */
    	@MessageMapping("/chat/group")
    	@SendTo("/topic/chat-group")
    	public WebSocketMsgVO groupChat(WebSocketMsgDTO webSocketMsgDTO) {
    		log.info("Group chat message received: {}", FastJsonUtils.toJsonString(webSocketMsgDTO));
    		String content = String.format("来自[%s]的群聊消息: %s", webSocketMsgDTO.getName(), webSocketMsgDTO.getContent());
    		return WebSocketMsgVO.builder().content(content).build();
    	}
    
    	/**
    	 * 私聊消息处理
    	 * 这里我们通过@SendToUser注解指定消息目的地为"/topic/chat/private",发送目的地默认会拼接上"/user/"前缀
    	 * 实际发送目的地为"/user/topic/chat/private"
    	 * @param webSocketMsgDTO 请求参数,消息处理器会自动将JSON字符串转换为对象
    	 * @return 消息内容,方法返回值将会基于SessionID单播给指定用户
    	 */
    	@MessageMapping("/chat/private")
    	@SendToUser("/topic/chat-private")
    	public WebSocketMsgVO privateChat(WebSocketMsgDTO webSocketMsgDTO) {
    		log.info("Private chat message received: {}", FastJsonUtils.toJsonString(webSocketMsgDTO));
    		String content = "私聊消息回复:" + webSocketMsgDTO.getContent();
    		return WebSocketMsgVO.builder().content(content).build();
    	}
    
    	/**
    	 * 定时消息推送,这里我们会列举所有在线的用户,然后单播给指定用户。
    	 * 通过SimpMessagingTemplate实例可以在任何地方推送消息。
    	 */
    	@Scheduled(fixedRate = 10 * 1000)
    	public void pushMessageAtFixedRate() {
    		log.info("当前在线人数: {}", simpUserRegistry.getUserCount());
    		if (simpUserRegistry.getUserCount() <= 0) {
    			return;
    		}
    
    		// 这里的Principal为StompAuthenticatedUser实例
    		Set<StompAuthenticatedUser> users = simpUserRegistry.getUsers().stream()
    			.map(simpUser -> StompAuthenticatedUser.class.cast(simpUser.getPrincipal()))
    			.collect(Collectors.toSet());
    
    		users.forEach(authenticatedUser -> {
    			String userId = authenticatedUser.getUserId();
    			String nickName = authenticatedUser.getNickName();
    			WebSocketMsgVO webSocketMsgVO = new WebSocketMsgVO();
    			webSocketMsgVO.setContent(String.format("定时推送的私聊消息, 接收人: %s, 时间: %s", nickName, LocalDateTime.now()));
    
    			log.info("开始推送消息给指定用户, userId: {}, 消息内容:{}", userId, FastJsonUtils.toJsonString(webSocketMsgVO));
    			simpMessagingTemplate.convertAndSendToUser(userId, "/topic/chat-push", webSocketMsgVO);
    		});
    	}
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74

    6、前端页面chat.html

    DOCTYPE html>
    <html lang="en" xmlns:th="http://www.thymeleaf.org">
    <head>
        <meta charset="UTF-8">
        <title>chattitle>
        <script src="https://cdnjs.cloudflare.com/ajax/libs/sockjs-client/1.6.1/sockjs.min.js">script>
        <script src="https://cdnjs.cloudflare.com/ajax/libs/stomp.js/2.3.3/stomp.min.js">script>
        <script src="https://cdnjs.cloudflare.com/ajax/libs/jquery/3.6.4/jquery.min.js">script>
        <style>
            #mainWrapper {
                width: 600px;
                margin: auto;
            }
        style>
    head>
    <body>
    <div id="mainWrapper">
        <div>
            <label for="username" style="margin-right: 5px">姓名:label><input id="username" type="text"/>
        div>
        <div id="msgWrapper">
            <p style="vertical-align: top">发送的消息:p>
            <textarea id="msgSent" style="width: 600px;height: 100px">textarea>
            <p style="vertical-align: top">收到的群聊消息:p>
            <textarea id="groupMsgReceived" style="width: 600px;height: 100px">textarea>
            <p style="vertical-align: top">收到的私聊消息:p>
            <textarea id="privateMsgReceived" style="width: 600px;height: 200px">textarea>
        div>
        <div style="margin-top: 5px;">
            <button onclick="connect()">连接button>
            <button onclick="sendGroupMessage()">发送群聊消息button>
            <button onclick="sendPrivateMessage()">发送私聊消息button>
            <button onclick="disconnect()">断开连接button>
        div>
    div>
    <script type="text/javascript">
        $(() => {
            $('#msgSent').val('');
            $("#groupMsgReceived").val('');
            $("#privateMsgReceived").val('');
        });
    
        let stompClient = null;
    
    
        // 连接服务器
        const connect = () => {
            const header = {"User-ID": new Date().getTime().toString(), "User-Name": $('#username').val()};
            const ws = new SockJS('http://localhost:8080/websocket');
            stompClient = Stomp.over(ws);
            stompClient.connect(header, () => subscribeTopic());
        }
    
        // 订阅主题
        const subscribeTopic = () => {
            alert("连接成功!");
    
            // 订阅广播消息
            stompClient.subscribe('/topic/chat-group', function (message) {
                    console.log(`Group message received : ${message.body}`);
                    const resp = JSON.parse(message.body);
                    const previousMsg = $("#groupMsgReceived").val();
                    $("#groupMsgReceived").val(`${previousMsg}${resp.content}\n`);
                }
            );
            // 订阅单播消息
            stompClient.subscribe('/user/topic/chat-private', message => {
                    console.log(`Private message received : ${message.body}`);
                    const resp = JSON.parse(message.body);
                    const previousMsg = $("#privateMsgReceived").val();
                    $("#privateMsgReceived").val(`${previousMsg}${resp.content}\n`);
                }
            );
            // 订阅定时推送的单播消息
            stompClient.subscribe(`/user/topic/chat-push`, message => {
                    console.log(`Private message received : ${message.body}`);
                    const resp = JSON.parse(message.body);
                    const previousMsg = $("#privateMsgReceived").val();
                    $("#privateMsgReceived").val(`${previousMsg}${resp.content}\n`);
                }
            );
        };
    
        // 断连
        const disconnect = () => {
            stompClient.disconnect(() => {
                $("#msgReceived").val('Disconnected from WebSocket server');
            });
        }
    
        // 发送群聊消息
        const sendGroupMessage = () => {
            const msg = {name: $('#username').val(), content: $('#msgSent').val()};
            stompClient.send('/app/chat/group', {}, JSON.stringify(msg));
        }
    
        // 发送私聊消息
        const sendPrivateMessage = () => {
            const msg = {name: $('#username').val(), content: $('#msgSent').val()};
            stompClient.send('/app/chat/private', {}, JSON.stringify(msg));
        }
    script>
    body>
    html>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104

    四、测试示例

    1、群聊、私聊、后台定时推送测试

    启动应用程序,日志打印显示系统连接建立成功,如下:

    在这里插入图片描述

    打开浏览器访问http://localhost:8080/page/chat可进入聊天页,同时打开两个窗口访问。
    在这里插入图片描述


    在这里插入图片描述

    2、登录RabbitMQ控制台查看队列信息

    在这里插入图片描述
    可以看到所有消息都发送到了amq.topic交换机上(Topic类型), RabbitMQ会为每个连接的客户端创建3个队列。

    因为我们在ChatController中定义了三个目的地,Routing Key分别是/topic/chat-group/topic/chat-private/topic/chat-push。群聊消息目的地/topic/chat-group绑定了两个队列,用于实现广播订阅,其它两个Routing Key分别绑定到了不同的队列上,实现唯一订阅。


    五、结语

    下一节我们将会详细说明RabbitMQ STOMP适配器支持的各种消息目的地类型的区别以及适用场景。

    在这里插入图片描述

  • 相关阅读:
    布隆过滤器的原理
    计算机网络 (中科大郑烇老师)笔记(一)概论
    2022 APMCM亚太数学建模竞赛 C题 全球是否变暖 思路及代码实现(持续更新中)
    Java项目:JSP手机商城管理系统包含前台
    EtherCAT IGH 的编译选项介绍
    application/octet-stream的问题
    聚丙烯酸负载小鼠血清白蛋白(MSA)/大鼠血清白蛋白(RSA)/小麦麦清白蛋白;PAA-MSA/RSA
    android开发布局知识
    3dMax2024中MAXScript的新增功能
    使用Http请求实现数据的批量导入
  • 原文地址:https://blog.csdn.net/lingbomanbu_lyl/article/details/132716636