• Websocket升级版


    之前写过一个关于websocket的博客,是看书时候做的一个demo。但是纸上得来终觉浅,这次实战后实打实的踩了不少坑,写个博客记录总结。

    1.安装postman

    websocket接口调试,需要8.5以及以上版本的postman

    先把以前的卸载,直接exe安装就好

    点击这个连接可以下载,或者去官网下载

    https://download.csdn.net/download/qq_35653822/88419296

    2.postman调试方法

    2.1参考这个博客的postman调试方法

    Postman进行Websocket接口测试_postman websocket-CSDN博客

    2.2请求路径

    不是http开头  而是ws开头

    如 ws://127.0.0.1:9999/test/3

    2.3测试方式

    websocket接口和普通接口不同,需要先建立连接,再发送消息

    建立连接时候这个url可以是pathvariable那种

    如 /test/{id}

    3.上代码

    3.1controller层

    1.@ServerEndpoint
    这个注解算是websocket中极其重要的一个注解了
    这里边要配置上前后端建立连接的url,后端返回前端内容的解析器。我这写了两个分别对应字符串和指定实体的解析器。以及websocket对应的config文件

    2. @OnOpen  @OnMessage

    @OnOpen 前后端建立连接时走的方法,也就是postman点击connect按钮时候会走的方法

    @OnMessage 前端给后端发送消息时走的方法,也就是postman点击send的时候会走的方法

    3.session.getBasicRemote().sendObject();

    后端给前端发送消息,此处根据类型不同,走不同的解析器。也就是@ServerEndpoint中的

    encoders

    4.注入bean

    根据idea提示,需要用set方法注入 

    1. package websocket;
    2. import com.fasterxml.jackson.databind.ObjectMapper;
    3. import com.valley.common.result.Result;
    4. import com.valley.system.enums.LinkType;
    5. import com.valley.system.pojo.bo.ConnectResultBO;
    6. import com.valley.system.pojo.form.SourceForm;
    7. import com.valley.system.service.SysDataSourceService;
    8. import lombok.extern.slf4j.Slf4j;
    9. import org.springframework.beans.factory.annotation.Autowired;
    10. import org.springframework.messaging.handler.annotation.MessageMapping;
    11. import org.springframework.messaging.handler.annotation.SendTo;
    12. import org.springframework.messaging.simp.annotation.SendToUser;
    13. import org.springframework.stereotype.Component;
    14. import org.springframework.web.bind.annotation.*;
    15. import javax.websocket.EncodeException;
    16. import javax.websocket.OnMessage;
    17. import javax.websocket.OnOpen;
    18. import javax.websocket.Session;
    19. import javax.websocket.server.PathParam;
    20. import javax.websocket.server.ServerEndpoint;
    21. import java.io.IOException;
    22. import java.security.Principal;
    23. /**
    24. * @Description: websocket url中得sourceId是为了指定websocket得连接
    25. */
    26. @ServerEndpoint(value = "/linkTest/{sourceId}", encoders = {ServerEncoder.class,StringEncoder.class},configurator = WebSocketConfig.class)
    27. @Component
    28. @Slf4j
    29. public class WebSocketController {
    30. private static SysDataSourceService sysDataSourceService;
    31. @Autowired
    32. public void setDeviceListenerService(SysDataSourceService sysDataSourceService) {
    33. WebSocketController.sysDataSourceService = sysDataSourceService;
    34. }
    35. /**
    36. * 实现服务器主动推送
    37. */
    38. public void sendMessage(String message, Session session) throws IOException {
    39. session.getBasicRemote().sendText(message);
    40. }
    41. /**
    42. * 连接建立成功调用的方法
    43. */
    44. @OnOpen
    45. public void onOpen(Session session, @PathParam("sourceId") String sourceId) throws EncodeException, IOException {
    46. log.info("连接成功,sourceId:{}", sourceId);
    47. session.getBasicRemote().sendObject(sourceId);
    48. }
    49. /**
    50. * 收到客户端消息后调用的方法
    51. *
    52. * @param message 客户端发送过来的消息
    53. */
    54. @OnMessage
    55. public void onMessage(String message, Session session) throws Exception {
    56. log.info("【websocket消息】收到客户端发来的消息:{}", message);
    57. // 创建ObjectMapper对象
    58. ObjectMapper objectMapper = new ObjectMapper();
    59. // 将JSON字符串解析为JSON对象
    60. SourceForm sourceForm = objectMapper.readValue(message, SourceForm.class);
    61. //根据type判断是否是心跳连接
    62. if(LinkType.HEARTBEAT.equals(sourceForm.getLinkType())){
    63. //前端约定返回“pong”
    64. session.getBasicRemote().sendObject("pong");
    65. return;
    66. }
    67. ConnectResultBO bo = sysDataSourceService.linkTest(sourceForm);
    68. try {
    69. Result result = Result.success(bo);
    70. session.getBasicRemote().sendObject(result);
    71. //session.getBasicRemote().sendText(message.toString());
    72. } catch (IOException e) {
    73. e.printStackTrace();
    74. }
    75. }
    76. }

    3.2config层

    1.需要配置在controller层的@ServerEndpoint中

    @ServerEndpoint(value = "/api/v1/source/linkTest/{sourceId}", encoders = {ServerEncoder.class,StringEncoder.class},configurator = WebSocketConfig.class)
    

    2. @Configuration修饰

    因为要返回其他的bean

    3.websocket的token上传返回

    需要取header中的 Sec-WebSocket-Protocol 属性,并且返回给前端的时候也要在header中携带

    1. package websocket;
    2. import cn.hutool.core.lang.Assert;
    3. import com.valley.system.config.Constant;
    4. import jodd.util.StringUtil;
    5. import org.springframework.context.annotation.Bean;
    6. import org.springframework.context.annotation.Configuration;
    7. import org.springframework.messaging.simp.config.MessageBrokerRegistry;
    8. import org.springframework.security.oauth2.core.OAuth2AccessToken;
    9. import org.springframework.web.socket.config.annotation.AbstractWebSocketMessageBrokerConfigurer;
    10. import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
    11. import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
    12. import org.springframework.web.socket.server.standard.ServerEndpointExporter;
    13. import javax.websocket.HandshakeResponse;
    14. import javax.websocket.server.HandshakeRequest;
    15. import javax.websocket.server.ServerEndpointConfig;
    16. import java.util.Collections;
    17. import java.util.List;
    18. import java.util.Map;
    19. /**
    20. * @Description: websocket配置
    21. */
    22. @Configuration
    23. public class WebSocketConfig extends ServerEndpointConfig.Configurator{
    24. @Bean
    25. public ServerEndpointExporter serverEndpointExporter(){
    26. return new ServerEndpointExporter();
    27. }
    28. /**
    29. * 建立握手时,连接前的操作
    30. */
    31. @Override
    32. public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) {
    33. // 这个userProperties 可以通过 session.getUserProperties()获取
    34. //获取请求头
    35. String token = request.getHeaders().get("Sec-WebSocket-Protocol").get(0);
    36. Assert.notNull(token,"token不可为空");
    37. //todo 之后需要对token进行解析校验
    38. //OAuth2AccessToken accessToken = tokenStore.readAccessToken(token);
    39. //前端会把空格传成 
    40. //token = token.replace(Constant.NBSP," ");
    41. //当Sec-WebSocket-Protocol请求头不为空时,需要返回给前端相同的响应
    42. response.getHeaders().put("Sec-WebSocket-Protocol", Collections.singletonList(token));
    43. super.modifyHandshake(sec, request, response);
    44. }
    45. /**
    46. * 初始化端点对象,也就是被@ServerEndpoint所标注的对象
    47. */
    48. @Override
    49. public T getEndpointInstance(Class clazz) throws InstantiationException {
    50. return super.getEndpointInstance(clazz);
    51. }
    52. }

    3.3解析器

    1.如果不配置,在调用

    session.getBasicRemote().sendObject(sourceId);

    返回方法的时候会报此类型解析器不存在的错误

    2.解析器写完之后要加到controller层的@ServerEndpoint中

    @ServerEndpoint(value = "/api/v1/source/linkTest/{sourceId}", encoders = {ServerEncoder.class,StringEncoder.class},configurator = WebSocketConfig.class)
    

    3.3.1实体解析器

    1. package websocket;
    2. import com.fasterxml.jackson.core.JsonProcessingException;
    3. import com.fasterxml.jackson.databind.json.JsonMapper;
    4. import com.valley.common.result.Result;
    5. import com.valley.system.pojo.bo.ConnectResultBO;
    6. import javax.websocket.Encoder;
    7. import javax.websocket.EndpointConfig;
    8. /**
    9. * @Description: websocket发送给前端 泛型中Result是要发送的类型
    10. */
    11. public class ServerEncoder implements Encoder.Text> {
    12. @Override
    13. public void destroy() {
    14. // TODO Auto-generated method stub
    15. // 这里不重要
    16. }
    17. @Override
    18. public void init(EndpointConfig arg0) {
    19. // TODO Auto-generated method stub
    20. // 这里也不重要
    21. }
    22. @Override
    23. public String encode(Result responseMessage) {
    24. try {
    25. JsonMapper jsonMapper = new JsonMapper();
    26. return jsonMapper.writeValueAsString(responseMessage);
    27. } catch (JsonProcessingException e) {
    28. e.printStackTrace();
    29. return null;
    30. }
    31. }
    32. }

     3.3.2字符串解析器

    现在复盘来看,我这个解析器写的完全没必要,如果要发送字符串  把

    session.getBasicRemote().sendObject

    改为

    session.getBasicRemote().sendText

    就行了

    1. public class StringEncoder implements Encoder.Text{
    2. @Override
    3. public String encode(String s){
    4. return s;
    5. }
    6. @Override
    7. public void init(EndpointConfig endpointConfig) {
    8. }
    9. @Override
    10. public void destroy() {
    11. }
    12. }

    4.security权限过滤

    这块踩了很久的坑,我们项目是开源项目改的,然后自己拆成了好几个微服务。我开始的时候只专注去改auth项目中的,不管是忽略路径permitall,还是加切面,还是加filter。都不生效。

    最后,在公共项目common中找到一个关于security的配置,改了permitall,才生效

    但是还要注意哈,既然在这里把权限放开了,那在config中,就得把权限校验加上

    此处放开是因为,上边说过前端只能把token放到header的Sec-WebSocket-Protocol属性中,而不能像平时一样放到Authorization属性中。

     5.插曲

    在进行实体转换的时候,前端多给我传了参数,引起了报错

    objectMapper.readValue(message, SourceForm.class);

    为了让代码健壮一点,给实体加了个注解

    @JsonIgnoreProperties(ignoreUnknown = true)

    Unrecognized field , not marked as ignorable解决办法-CSDN博客

    6.F12怎么看websocket

    切换环境后,websocket接口报错了

    我以前一直以为是环境问题或者是KONG这个组件问题,后来f12看了下,是前端url写的不对,还是原来环境的地址

    7.关于心跳 

    下边的ping pong就是前后端约定好的上传返回 前端用这个来保持心跳

    至于在收到数据之后是否还要保持心跳链接,就看后续处理还要不要用到,不用就可以关,用到就继续连着

     再就是但是每个用户心跳检测时间最好不要一致 会产生高并发

    比如这个用户隔3秒 那个用户隔五秒 弄一个随机数

    8.广播

    这个没有亲测,但是看着比较靠谱。

    之前也写过websocket广播相关博客,但实现方式不同

    思路

    1.用一个线程安全得set集合,放session对象

    2.每次onOpen建立连接时,都把当前得session对象,放到集合中

    3.每次需要发消息时,都遍历这个集合

    4.提供给别人得方法,sendInfo(),不需要入参(看实际业务情况),遍历set,获取session发送消息

    5.onclose方法:接收前端close通知,将指定session从set中移除

    1. @ServerEndpoint(value = "/webSocket")//主要是将目前的类定义成一个websocket服务器端, 注解的值将被用于监听用户连接的终端访问URL地址,客户端可以通过这个URL来连接到WebSocket服务器端
    2. @Component
    3. @EnableScheduling// cron定时任务
    4. @Data
    5. public class WebSocket {
    6. private static final Logger logger = LoggerFactory.getLogger(WebSocket.class);
    7. /**
    8. * 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
    9. */
    10. private static int onlineCount = 0;
    11. /**
    12. * concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
    13. */
    14. private static CopyOnWriteArraySet webSocketSet = new CopyOnWriteArraySet<>();
    15. /**
    16. * 与某个客户端的连接会话,需要通过它来给客户端发送数据
    17. */
    18. private Session session;
    19. public static CopyOnWriteArraySet getWebSocketSet() {
    20. return webSocketSet;
    21. }
    22. public static void setWebSocketSet(CopyOnWriteArraySet webSocketSet) {
    23. WebSocket.webSocketSet = webSocketSet;
    24. }
    25. /**
    26. * 从数据库查询相关数据信息,可以根据实际业务场景进行修改
    27. */
    28. @Resource
    29. private IndexService indexService;
    30. private static IndexService indexServiceMapper;
    31. @PostConstruct
    32. public void init() {
    33. WebSocket.indexServiceMapper = this.indexService;
    34. }
    35. /**
    36. * 连接建立成功调用的方法
    37. *
    38. * @param session 会话
    39. */
    40. @OnOpen
    41. public void onOpen(Session session) throws Exception {
    42. this.session = session;
    43. webSocketSet.add(this);
    44. //查询当前在线人数
    45. int nowOnline = indexServiceMapper.nowOnline();
    46. this.sendMessage(JSON.toJSONString(nowOnline));
    47. }
    48. /**
    49. * 收到客户端消息后调用的方法
    50. *
    51. * @param message 客户端发送过来的消息
    52. */
    53. @OnMessage
    54. public void onMessage(String message, Session session) throws IOException {
    55. logger.info("参数信息:{}", message);
    56. //群发消息
    57. for (WebSocket item : webSocketSet) {
    58. try {
    59. item.sendMessage(JSON.toJSONString(message));
    60. } catch (IOException e) {
    61. e.printStackTrace();
    62. }
    63. }
    64. }
    65. /**
    66. * 连接关闭调用的方法
    67. */
    68. @OnClose
    69. public void onClose(Session session, CloseReason reason) {
    70. webSocketSet.remove(this);
    71. if (session != null) {
    72. try {
    73. session.close();
    74. } catch (IOException e) {
    75. e.printStackTrace();
    76. }
    77. }
    78. }
    79. /**
    80. * 发生错误时调用
    81. *
    82. * @param session 会话
    83. * @param error 错误信息
    84. */
    85. @OnError
    86. public void onError(Session session, Throwable error) {
    87. logger.error("连接异常!");
    88. error.printStackTrace();
    89. }
    90. /**
    91. * 发送信息
    92. *
    93. * @param message 消息
    94. */
    95. public void sendMessage(String message) throws IOException {
    96. this.session.getBasicRemote().sendText(message);
    97. }
    98. /**
    99. * 自定义消息推送、可群发、单发
    100. *
    101. * @param message 消息
    102. */
    103. public static void sendInfo() throws IOException {
    104. for (WebSocket item : webSocketSet) {
    105. item.sendMessage(message);
    106. }
    107. }
    108. }
    1. @Slf4j
    2. @Component
    3. public class IndexScheduled {
    4. @Autowired
    5. private IndexMapper indexMapper;
    6. /**
    7. * 每3秒执行一次
    8. */
    9. //@Scheduled(cron = "0/3 * * * * ? ") //我这里暂时不需要运行这条定时任务,所以将注解注释了,朋友们运行时记得放开注释啊
    10. public void nowOnline() {
    11. System.err.println("********* 首页定时任务执行 **************");
    12. CopyOnWriteArraySet webSocketSet = WebSocket.getWebSocketSet();
    13. int nowOnline = indexMapper.nowOnline();
    14. webSocketSet.forEach(c -> {
    15. try {
    16. c.sendMessage(JSON.toJSONString(nowOnline));
    17. } catch (IOException e) {
    18. e.printStackTrace();
    19. }
    20. });
    21. System.err.println("/n 首页定时任务完成.......");
    22. }
    23. }

    9.设计方式

    需求如下:

    跟前后端同事咨询过,一致觉得:

    websocket只返回未读数量,然后点击闹钟之后,另外走一个接口,返回消息列表。

  • 相关阅读:
    基于微信小程序的小说阅读系统(小程序+Nodejs)
    洛谷P5309 Ynoi 2011 初始化 题解
    网络代理技术的威力:保障安全、保护隐私
    Word2Vec的安装与使用
    [C++]AVL树怎么转
    CUDA initialization: The NVIDIA driver on your system is too old解决方案
    C语言数据结构-----单链表(无头单向不循环)
    四数之和 - 力扣中等
    typescript手记
    深入理解Spring注解机制(一):注解的搜索与处理机制
  • 原文地址:https://blog.csdn.net/qq_35653822/article/details/133794112