之前写过一个关于websocket的博客,是看书时候做的一个demo。但是纸上得来终觉浅,这次实战后实打实的踩了不少坑,写个博客记录总结。
websocket接口调试,需要8.5以及以上版本的postman
先把以前的卸载,直接exe安装就好
点击这个连接可以下载,或者去官网下载
https://download.csdn.net/download/qq_35653822/88419296
Postman进行Websocket接口测试_postman websocket-CSDN博客
不是http开头 而是ws开头
如 ws://127.0.0.1:9999/test/3
websocket接口和普通接口不同,需要先建立连接,再发送消息
建立连接时候这个url可以是pathvariable那种
如 /test/{id}

1.@ServerEndpoint
这个注解算是websocket中极其重要的一个注解了
这里边要配置上前后端建立连接的url,后端返回前端内容的解析器。我这写了两个分别对应字符串和指定实体的解析器。以及websocket对应的config文件
2. @OnOpen @OnMessage
@OnOpen 前后端建立连接时走的方法,也就是postman点击connect按钮时候会走的方法
@OnMessage 前端给后端发送消息时走的方法,也就是postman点击send的时候会走的方法
3.session.getBasicRemote().sendObject();
后端给前端发送消息,此处根据类型不同,走不同的解析器。也就是@ServerEndpoint中的
encoders
4.注入bean
根据idea提示,需要用set方法注入
- package websocket;
-
- import com.fasterxml.jackson.databind.ObjectMapper;
- import com.valley.common.result.Result;
- import com.valley.system.enums.LinkType;
- import com.valley.system.pojo.bo.ConnectResultBO;
- import com.valley.system.pojo.form.SourceForm;
- import com.valley.system.service.SysDataSourceService;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.messaging.handler.annotation.MessageMapping;
- import org.springframework.messaging.handler.annotation.SendTo;
- import org.springframework.messaging.simp.annotation.SendToUser;
- import org.springframework.stereotype.Component;
- import org.springframework.web.bind.annotation.*;
-
- import javax.websocket.EncodeException;
- import javax.websocket.OnMessage;
- import javax.websocket.OnOpen;
- import javax.websocket.Session;
- import javax.websocket.server.PathParam;
- import javax.websocket.server.ServerEndpoint;
- import java.io.IOException;
- import java.security.Principal;
-
- /**
- * @Description: websocket url中得sourceId是为了指定websocket得连接
- */
-
- @ServerEndpoint(value = "/linkTest/{sourceId}", encoders = {ServerEncoder.class,StringEncoder.class},configurator = WebSocketConfig.class)
- @Component
- @Slf4j
- public class WebSocketController {
-
- private static SysDataSourceService sysDataSourceService;
-
- @Autowired
- public void setDeviceListenerService(SysDataSourceService sysDataSourceService) {
- WebSocketController.sysDataSourceService = sysDataSourceService;
- }
-
- /**
- * 实现服务器主动推送
- */
- public void sendMessage(String message, Session session) throws IOException {
-
- session.getBasicRemote().sendText(message);
- }
-
- /**
- * 连接建立成功调用的方法
- */
- @OnOpen
- public void onOpen(Session session, @PathParam("sourceId") String sourceId) throws EncodeException, IOException {
- log.info("连接成功,sourceId:{}", sourceId);
- session.getBasicRemote().sendObject(sourceId);
- }
-
- /**
- * 收到客户端消息后调用的方法
- *
- * @param message 客户端发送过来的消息
- */
- @OnMessage
- public void onMessage(String message, Session session) throws Exception {
- log.info("【websocket消息】收到客户端发来的消息:{}", message);
- // 创建ObjectMapper对象
- ObjectMapper objectMapper = new ObjectMapper();
-
- // 将JSON字符串解析为JSON对象
- SourceForm sourceForm = objectMapper.readValue(message, SourceForm.class);
- //根据type判断是否是心跳连接
- if(LinkType.HEARTBEAT.equals(sourceForm.getLinkType())){
- //前端约定返回“pong”
- session.getBasicRemote().sendObject("pong");
- return;
- }
- ConnectResultBO bo = sysDataSourceService.linkTest(sourceForm);
- try {
- Result
result = Result.success(bo); - session.getBasicRemote().sendObject(result);
- //session.getBasicRemote().sendText(message.toString());
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
- }
1.需要配置在controller层的@ServerEndpoint中
@ServerEndpoint(value = "/api/v1/source/linkTest/{sourceId}", encoders = {ServerEncoder.class,StringEncoder.class},configurator = WebSocketConfig.class)
2. @Configuration修饰
因为要返回其他的bean
3.websocket的token上传返回
需要取header中的 Sec-WebSocket-Protocol 属性,并且返回给前端的时候也要在header中携带
- package websocket;
-
- import cn.hutool.core.lang.Assert;
- import com.valley.system.config.Constant;
- import jodd.util.StringUtil;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.messaging.simp.config.MessageBrokerRegistry;
- import org.springframework.security.oauth2.core.OAuth2AccessToken;
- import org.springframework.web.socket.config.annotation.AbstractWebSocketMessageBrokerConfigurer;
- import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
- import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
- import org.springframework.web.socket.server.standard.ServerEndpointExporter;
-
- import javax.websocket.HandshakeResponse;
- import javax.websocket.server.HandshakeRequest;
- import javax.websocket.server.ServerEndpointConfig;
- import java.util.Collections;
- import java.util.List;
- import java.util.Map;
-
- /**
- * @Description: websocket配置
- */
- @Configuration
- public class WebSocketConfig extends ServerEndpointConfig.Configurator{
-
- @Bean
- public ServerEndpointExporter serverEndpointExporter(){
- return new ServerEndpointExporter();
- }
-
- /**
- * 建立握手时,连接前的操作
- */
- @Override
- public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) {
- // 这个userProperties 可以通过 session.getUserProperties()获取
- //获取请求头
- String token = request.getHeaders().get("Sec-WebSocket-Protocol").get(0);
- Assert.notNull(token,"token不可为空");
- //todo 之后需要对token进行解析校验
- //OAuth2AccessToken accessToken = tokenStore.readAccessToken(token);
- //前端会把空格传成 
- //token = token.replace(Constant.NBSP," ");
- //当Sec-WebSocket-Protocol请求头不为空时,需要返回给前端相同的响应
- response.getHeaders().put("Sec-WebSocket-Protocol", Collections.singletonList(token));
- super.modifyHandshake(sec, request, response);
- }
-
- /**
- * 初始化端点对象,也就是被@ServerEndpoint所标注的对象
- */
- @Override
- public
T getEndpointInstance(Class clazz) throws InstantiationException { - return super.getEndpointInstance(clazz);
- }
- }
-
1.如果不配置,在调用
session.getBasicRemote().sendObject(sourceId);返回方法的时候会报此类型解析器不存在的错误
2.解析器写完之后要加到controller层的@ServerEndpoint中
@ServerEndpoint(value = "/api/v1/source/linkTest/{sourceId}", encoders = {ServerEncoder.class,StringEncoder.class},configurator = WebSocketConfig.class)
- package websocket;
-
- import com.fasterxml.jackson.core.JsonProcessingException;
- import com.fasterxml.jackson.databind.json.JsonMapper;
- import com.valley.common.result.Result;
- import com.valley.system.pojo.bo.ConnectResultBO;
-
- import javax.websocket.Encoder;
- import javax.websocket.EndpointConfig;
-
- /**
- * @Description: websocket发送给前端 泛型中Result
是要发送的类型 - */
- public class ServerEncoder implements Encoder.Text
> { -
- @Override
- public void destroy() {
- // TODO Auto-generated method stub
- // 这里不重要
- }
-
- @Override
- public void init(EndpointConfig arg0) {
- // TODO Auto-generated method stub
- // 这里也不重要
-
- }
-
- @Override
- public String encode(Result
responseMessage) { - try {
- JsonMapper jsonMapper = new JsonMapper();
- return jsonMapper.writeValueAsString(responseMessage);
-
- } catch (JsonProcessingException e) {
- e.printStackTrace();
- return null;
- }
- }
- }
现在复盘来看,我这个解析器写的完全没必要,如果要发送字符串 把
session.getBasicRemote().sendObject改为
session.getBasicRemote().sendText就行了
- public class StringEncoder implements Encoder.Text
{ - @Override
- public String encode(String s){
- return s;
- }
-
- @Override
- public void init(EndpointConfig endpointConfig) {
-
- }
-
- @Override
- public void destroy() {
-
- }
- }
这块踩了很久的坑,我们项目是开源项目改的,然后自己拆成了好几个微服务。我开始的时候只专注去改auth项目中的,不管是忽略路径permitall,还是加切面,还是加filter。都不生效。
最后,在公共项目common中找到一个关于security的配置,改了permitall,才生效
但是还要注意哈,既然在这里把权限放开了,那在config中,就得把权限校验加上
此处放开是因为,上边说过前端只能把token放到header的Sec-WebSocket-Protocol属性中,而不能像平时一样放到Authorization属性中。
5.插曲在进行实体转换的时候,前端多给我传了参数,引起了报错
objectMapper.readValue(message, SourceForm.class);
为了让代码健壮一点,给实体加了个注解
@JsonIgnoreProperties(ignoreUnknown = true)
Unrecognized field , not marked as ignorable解决办法-CSDN博客
切换环境后,websocket接口报错了
我以前一直以为是环境问题或者是KONG这个组件问题,后来f12看了下,是前端url写的不对,还是原来环境的地址

下边的ping pong就是前后端约定好的上传返回 前端用这个来保持心跳

至于在收到数据之后是否还要保持心跳链接,就看后续处理还要不要用到,不用就可以关,用到就继续连着
再就是但是每个用户心跳检测时间最好不要一致 会产生高并发
比如这个用户隔3秒 那个用户隔五秒 弄一个随机数
这个没有亲测,但是看着比较靠谱。
之前也写过websocket广播相关博客,但实现方式不同
思路
1.用一个线程安全得set集合,放session对象
2.每次onOpen建立连接时,都把当前得session对象,放到集合中
3.每次需要发消息时,都遍历这个集合
4.提供给别人得方法,sendInfo(),不需要入参(看实际业务情况),遍历set,获取session发送消息
5.onclose方法:接收前端close通知,将指定session从set中移除
- @ServerEndpoint(value = "/webSocket")//主要是将目前的类定义成一个websocket服务器端, 注解的值将被用于监听用户连接的终端访问URL地址,客户端可以通过这个URL来连接到WebSocket服务器端
- @Component
- @EnableScheduling// cron定时任务
- @Data
- public class WebSocket {
-
- private static final Logger logger = LoggerFactory.getLogger(WebSocket.class);
-
- /**
- * 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
- */
- private static int onlineCount = 0;
-
- /**
- * concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
- */
- private static CopyOnWriteArraySet
webSocketSet = new CopyOnWriteArraySet<>(); -
- /**
- * 与某个客户端的连接会话,需要通过它来给客户端发送数据
- */
- private Session session;
-
- public static CopyOnWriteArraySet
getWebSocketSet() { - return webSocketSet;
- }
-
- public static void setWebSocketSet(CopyOnWriteArraySet
webSocketSet) { - WebSocket.webSocketSet = webSocketSet;
- }
-
- /**
- * 从数据库查询相关数据信息,可以根据实际业务场景进行修改
- */
- @Resource
- private IndexService indexService;
- private static IndexService indexServiceMapper;
-
- @PostConstruct
- public void init() {
- WebSocket.indexServiceMapper = this.indexService;
- }
-
- /**
- * 连接建立成功调用的方法
- *
- * @param session 会话
- */
- @OnOpen
- public void onOpen(Session session) throws Exception {
- this.session = session;
- webSocketSet.add(this);
- //查询当前在线人数
- int nowOnline = indexServiceMapper.nowOnline();
- this.sendMessage(JSON.toJSONString(nowOnline));
- }
-
- /**
- * 收到客户端消息后调用的方法
- *
- * @param message 客户端发送过来的消息
- */
- @OnMessage
- public void onMessage(String message, Session session) throws IOException {
- logger.info("参数信息:{}", message);
- //群发消息
- for (WebSocket item : webSocketSet) {
- try {
- item.sendMessage(JSON.toJSONString(message));
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
-
- /**
- * 连接关闭调用的方法
- */
- @OnClose
- public void onClose(Session session, CloseReason reason) {
- webSocketSet.remove(this);
- if (session != null) {
- try {
- session.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
-
- /**
- * 发生错误时调用
- *
- * @param session 会话
- * @param error 错误信息
- */
- @OnError
- public void onError(Session session, Throwable error) {
- logger.error("连接异常!");
- error.printStackTrace();
- }
-
- /**
- * 发送信息
- *
- * @param message 消息
- */
- public void sendMessage(String message) throws IOException {
- this.session.getBasicRemote().sendText(message);
- }
-
- /**
- * 自定义消息推送、可群发、单发
- *
- * @param message 消息
- */
- public static void sendInfo() throws IOException {
- for (WebSocket item : webSocketSet) {
- item.sendMessage(message);
- }
- }
- }
- @Slf4j
- @Component
- public class IndexScheduled {
-
- @Autowired
- private IndexMapper indexMapper;
-
- /**
- * 每3秒执行一次
- */
- //@Scheduled(cron = "0/3 * * * * ? ") //我这里暂时不需要运行这条定时任务,所以将注解注释了,朋友们运行时记得放开注释啊
- public void nowOnline() {
- System.err.println("********* 首页定时任务执行 **************");
-
- CopyOnWriteArraySet
webSocketSet = WebSocket.getWebSocketSet(); - int nowOnline = indexMapper.nowOnline();
- webSocketSet.forEach(c -> {
- try {
- c.sendMessage(JSON.toJSONString(nowOnline));
- } catch (IOException e) {
- e.printStackTrace();
- }
- });
-
- System.err.println("/n 首页定时任务完成.......");
- }
-
- }
需求如下:
跟前后端同事咨询过,一致觉得:
websocket只返回未读数量,然后点击闹钟之后,另外走一个接口,返回消息列表。
