• java集成WebSocket


    package com.xzst.kafka.websocket;

    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.JSONObject;
    import org.apache.commons.lang.StringUtils;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.stereotype.Component;

    import javax.websocket.*;
    import javax.websocket.server.PathParam;
    import javax.websocket.server.ServerEndpoint;
    import java.io.IOException;
    import java.util.concurrent.ConcurrentHashMap;

    /**

    • WebSocket 服务器

    • @author liukexin

    • @date 2024/06/12
      */
      @ServerEndpoint(“/websocketServer/{userId}”)
      @Component
      public class WebSocketServer {

      private static final Logger log = LoggerFactory.getLogger(WebSocketServer.class);

      /**

      • 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
        /
        private static int onlineCount = 0;
        /
        *
      • concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
        /
        private static final ConcurrentHashMap WEB_SOCKET_MAP = new ConcurrentHashMap<>();
        /
        *
      • 与某个客户端的连接会话,需要通过它来给客户端发送数据
        /
        private Session session;
        /
        *
      • 接收userId
        */
        private String userId = “”;

      /**

      • 连接建立成功调用的方法
        */
        @OnOpen
        public void onOpen(Session session, @PathParam(“userId”) String userId) {
        this.session = session;
        this.userId = userId;
        WEB_SOCKET_MAP.put(userId, this);
        //在线数加1
        addOnlineCount();
        log.info(“用户连接:{},当前在线人数为:{}”, userId, getOnlineCount());
        try {
        sendMessage(“连接成功”);
        } catch (IOException e) {
        log.error(“用户:{},网络异常!!!”, userId);
        }
        }

      /**

      • 连接关闭调用的方法
        */
        @OnClose
        public void onClose() {
        WEB_SOCKET_MAP.remove(userId);
        //从set中删除
        subOnlineCount();
        log.info(“用户退出:{},当前在线人数为:{}”, userId, getOnlineCount());
        }

      /**

      • 收到客户端消息后调用的方法
      • @param message 客户端发送过来的消息
        */
        @OnMessage
        public void onMessage(String message, Session session) {
        log.info(“用户消息:{},报文:{}”, userId, message);
        //可以群发消息
        //消息保存到数据库、redis
        if (StringUtils.isNotBlank(message)) {
        try {
        //解析发送的报文
        JSONObject jsonObject = JSON.parseObject(message);
        //追加发送人(防止串改)
        jsonObject.put(“fromUserId”, this.userId);
        String toUserId = jsonObject.getString(“toUserId”);
        //传送给对应toUserId用户的websocket
        if (StringUtils.isNotBlank(toUserId) && WEB_SOCKET_MAP.containsKey(toUserId)) {
        WEB_SOCKET_MAP.get(toUserId).sendMessage(jsonObject.toJSONString());
        } else {
        log.error(“请求的userId:{}不在该服务器上”, toUserId);
        //否则不在这个服务器上,发送到mysql或者redis
        }
        } catch (Exception e) {
        e.printStackTrace();
        }
        }
        }

      /**

      • 出错时
      • @param session 会话
      • @param error 错误
        */
        @OnError
        public void onError(Session session, Throwable error) {
        log.error(“用户错误:{},原因:{}”, this.userId, error.getMessage());
        error.printStackTrace();
        }

      /**

      • 实现服务器主动推送
        */
        public void sendMessage(String message) throws IOException {
        this.session.getBasicRemote().sendText(message);
        }

      /**

      • 发送自定义消息
        */
        public static void sendInfo(String message, @PathParam(“userId”) String userId) throws IOException {
        log.info(“发送消息到:{},报文:{}”, userId, message);
        if (StringUtils.isNotBlank(userId) && WEB_SOCKET_MAP.containsKey(userId)) {
        WEB_SOCKET_MAP.get(userId).sendMessage(message);
        } else {
        log.error(“用户{},不在线!”, userId);
        }
        }

      public static synchronized int getOnlineCount() {
      return onlineCount;
      }

      public static synchronized void addOnlineCount() {
      WebSocketServer.onlineCount++;
      }

      public static synchronized void subOnlineCount() {
      WebSocketServer.onlineCount–;
      }

      /**

      • @param userId 用户id
      • @param message 发送的消息
      • @Title: sendMessageToUser
      • @Description: 发送消息给用户下的所有终端
        */
        public Boolean sendMessageToUser(String userId, String message) {
        if (WEB_SOCKET_MAP.containsKey(userId)) {
        log.debug(“给用户id为:{}的所有终端发送消息:{}”, userId, message);
        WebSocketServer wS = WEB_SOCKET_MAP.get(userId);
        log.debug(“sessionId为:{}”, wS.session.getId());
        try {
        wS.session.getBasicRemote().sendText(message);
        return true;
        } catch (IOException e) {
        e.printStackTrace();
        log.debug(" 给用户id为:{}发送消息失败", userId);
        return false;
        }
        }
        log.debug(“发送错误:当前连接不包含id为:{}的用户”, userId);
        return false;
        }
        }
  • 相关阅读:
    术语与定义
    双十一数据背后: 电商助力实体经济数字化转型才是未来方向
    Java 18 新功能介绍
    中科大给师生们发了一封钓鱼邮件 结果3000多人上当了
    TS(五):装饰器
    极光笔记 | 大语言模型插件
    pcigo图床插件的简单开发
    关于fifo和ram时序验证
    Java基础--阳光总在风雨后,请相信彩虹
    每天一点python——day63
  • 原文地址:https://blog.csdn.net/qq_36858666/article/details/139631869