• java netty 实现 websocket 服务端和客户端双向通信 实现心跳和断线重连 完整示例


    java netty 实现 websocket 服务端和客户端双向通信 实现心跳和断线重连 完整示例

    maven依赖

    <dependency>
        <groupId>io.nettygroupId>
        <artifactId>netty-allartifactId>
        <version>4.1.97.Finalversion>
    dependency>
    

    服务端

    服务端心跳事件处理 ServerHeartbeatHandler

    package com.sux.demo.websocket2;
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.handler.timeout.IdleState;
    import io.netty.handler.timeout.IdleStateEvent;
    
    public class ServerHeartbeatHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            if (evt instanceof IdleStateEvent) {
                IdleStateEvent event = (IdleStateEvent) evt;
                if (event.state() == IdleState.READER_IDLE) { // 读空闲
                    System.out.println("断开与客户端的连接, channel id=" + ctx.channel().id());
                    ctx.channel().close();
                } else if (event.state() == IdleState.WRITER_IDLE) { // 写空闲
    
                } else if (event.state() == IdleState.ALL_IDLE) { // 读写空闲
    
                }
            }
        }
    }
    

    定时发送心跳的类 HeartbeatThread

    package com.sux.demo.websocket2;
    
    import io.netty.channel.Channel;
    import io.netty.channel.group.ChannelGroup;
    import io.netty.channel.group.DefaultChannelGroup;
    import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
    import io.netty.util.concurrent.GlobalEventExecutor;
    
    public class HeartbeatThread extends Thread {
        private ChannelGroup channelGroup;
        private boolean running = true;
    
        public HeartbeatThread(Channel channel) {
            channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
            channelGroup.add(channel);
        }
    
        public HeartbeatThread(ChannelGroup channelGroup) {
            this.channelGroup = channelGroup;
        }
    
        @Override
        public void run() {
            while (running) {
                try {
                    if (channelGroup.size() > 0) {
                        System.out.println("发送心跳");
                        for (Channel channel : channelGroup) {
                            channel.writeAndFlush(new PingWebSocketFrame());
                        }
                    }
    
                    Thread.sleep(2000);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    
        public void close() {
            running = false;
        }
    }
    

    服务端封装 WebSocketServer

    package com.sux.demo.websocket2;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.http.HttpObjectAggregator;
    import io.netty.handler.codec.http.HttpServerCodec;
    import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
    import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
    import io.netty.handler.timeout.IdleStateHandler;
    import io.netty.util.CharsetUtil;
    
    import java.util.concurrent.TimeUnit;
    
    public class WebSocketServer {
        private EventLoopGroup bossGroup;
        private EventLoopGroup workerGroup;
    
        private WebSocketServerHandler webSocketServerHandler;
    
        public WebSocketServer() {
            //创建两个线程组 boosGroup、workerGroup
            bossGroup = new NioEventLoopGroup();
            workerGroup = new NioEventLoopGroup();
    
            webSocketServerHandler = new WebSocketServerHandler();
        }
    
        public void start(int port, String name) {
            try {
                //创建服务端的启动对象,设置参数
                ServerBootstrap bootstrap = new ServerBootstrap();
                //设置两个线程组boosGroup和workerGroup
                bootstrap.group(bossGroup, workerGroup)
                        //设置服务端通道实现类型
                        .channel(NioServerSocketChannel.class)
                        //设置线程队列得到连接个数
                        .option(ChannelOption.SO_BACKLOG, 128)
                        //设置保持活动连接状态
                        .childOption(ChannelOption.SO_KEEPALIVE, true)
                        //使用匿名内部类的形式初始化通道对象
                        .childHandler(new ChannelInitializer() {
                            @Override
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                //给pipeline管道设置处理器
                                socketChannel.pipeline().addLast(new HttpServerCodec());
                                socketChannel.pipeline().addLast(new HttpObjectAggregator(65536));
                                socketChannel.pipeline().addLast(new WebSocketServerProtocolHandler("/websocket", null, false, 65536, false, false, false, 10000));
                                socketChannel.pipeline().addLast(new IdleStateHandler(5, 2, 0, TimeUnit.SECONDS));
                                socketChannel.pipeline().addLast(new ServerHeartbeatHandler());
                                socketChannel.pipeline().addLast(webSocketServerHandler);
                            }
                        });//给workerGroup的EventLoop对应的管道设置处理器
                //绑定端口号,启动服务端
                ChannelFuture channelFuture = bootstrap.bind(port).sync();
                System.out.println(name + " 已启动");
                // 定时发送心跳
                startHeartbeat();
                //对通道关闭进行监听
                channelFuture.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
    
            }
        }
    
        public void startHeartbeat() {
            HeartbeatThread thread = new HeartbeatThread(webSocketServerHandler.getChannelGroup());
            thread.start();
        }
    
        public void send(String text) {
            for (Channel channel : webSocketServerHandler.getChannelGroup()) {
                channel.writeAndFlush(new TextWebSocketFrame(Unpooled.copiedBuffer(text, CharsetUtil.UTF_8)));
            }
        }
    
        public boolean hasClient() {
            return webSocketServerHandler.getChannelGroup().size() > 0;
        }
    
    }
    

    说明:new IdleStateHandler(5, 2, 0, TimeUnit.SECONDS)第一个参数5表示读空闲时间间隔是5秒,第二个参数2表示写空闲的时间间隔是2秒,第3个参数0表示不检测读写空闲。在ServerHeartbeatHandler代码中,读写空闲时断开与客户端的连接。

    服务端消息处理 WebSocketServerHandler

    package com.sux.demo.websocket2;
    
    import io.netty.channel.*;
    import io.netty.channel.group.ChannelGroup;
    import io.netty.channel.group.DefaultChannelGroup;
    import io.netty.handler.codec.http.websocketx.*;
    import io.netty.util.concurrent.GlobalEventExecutor;
    
    @ChannelHandler.Sharable
    public class WebSocketServerHandler extends SimpleChannelInboundHandler {
    
        private ChannelGroup channelGroup;
    
        public WebSocketServerHandler() {
            channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
        }
    
        public ChannelGroup getChannelGroup() {
            return channelGroup;
        }
    
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
            if (msg instanceof PongWebSocketFrame) {
                System.out.println("收到客户端" + ctx.channel().remoteAddress() + "发来的心跳:PONG");
            }
    
            if (msg instanceof TextWebSocketFrame) {
                TextWebSocketFrame frame = (TextWebSocketFrame) msg;
                System.out.println("收到客户端" + ctx.channel().remoteAddress() + "发来的消息:" + frame.text());
    
                /* // 测试转发消息
                for (Channel channel : channelGroup) {
                    if (!ctx.channel().id().toString().equals(channel.id().toString())) {
                        channel.writeAndFlush(new TextWebSocketFrame(Unpooled.copiedBuffer(frame.text(), CharsetUtil.UTF_8)));
                        System.out.println("服务端向客户端 " + channel.id().toString() + " 转发消息:" + frame.text());
                    }
                } */
            }
        }
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) {
            channelGroup.add(ctx.channel());
            System.out.println("客户端" + ctx.channel().id().toString() + "已连接");
        }
    
        @Override
        public void channelInactive(ChannelHandlerContext ctx) {
            channelGroup.remove(ctx.channel());
            System.out.println("客户端" + ctx.channel().id() + "已断开");
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) {
            ctx.flush();
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            ctx.close();
        }
    }
    
    

    服务端测试主机 WebSocketServerHost

    package com.sux.demo.websocket2;
    
    public class WebSocketServerHost {
        public static void main(String[] args) {
            WebSocketServer webSocketServer = new WebSocketServer();
    
            SendDataToClientThread thread = new SendDataToClientThread(webSocketServer);
            thread.start();
    
            webSocketServer.start(40005, "WebSocket服务端");
        }
    }
    
    class SendDataToClientThread extends Thread {
        private WebSocketServer webSocketServer;
    
        private int index = 1;
    
        public SendDataToClientThread(WebSocketServer webSocketServer) {
            this.webSocketServer = webSocketServer;
        }
    
        @Override
        public void run() {
            try {
                while (index <= 5) {
                    if (webSocketServer.hasClient()) {
                        String msg = "服务端发送的测试消息, index = " + index;
                        webSocketServer.send(msg);
                        index++;
                    }
                    Thread.sleep(1000);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    

    客户端

    客户端心跳事件处理 ClientHeartbeatHandler

    package com.sux.demo.websocket2;
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.handler.timeout.IdleState;
    import io.netty.handler.timeout.IdleStateEvent;
    
    
    public class ClientHeartbeatHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
            if (evt instanceof IdleStateEvent) {
                IdleStateEvent event = (IdleStateEvent) evt;
                if (event.state() == IdleState.READER_IDLE) { // 读空闲
                    System.out.println("断开与服务端的连接");
                    ctx.channel().close();
                } else if (event.state() == IdleState.WRITER_IDLE) { // 写空闲
    
                } else if (event.state() == IdleState.ALL_IDLE) { // 读写空闲
    
                }
            }
        }
    }
    

    客户端封装 WebSocketClient

    package com.sux.demo.websocket2;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.http.DefaultHttpHeaders;
    import io.netty.handler.codec.http.HttpClientCodec;
    import io.netty.handler.codec.http.HttpObjectAggregator;
    import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
    import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
    import io.netty.handler.codec.http.websocketx.WebSocketClientProtocolHandler;
    import io.netty.handler.codec.http.websocketx.WebSocketVersion;
    import io.netty.handler.timeout.IdleStateHandler;
    
    import java.net.URI;
    import java.net.URISyntaxException;
    import java.util.concurrent.TimeUnit;
    
    public class WebSocketClient {
        private NioEventLoopGroup eventExecutors;
    
        private Channel channel;
    
        private HeartbeatThread heartbeatThread;
    
        public WebSocketClient() {
            eventExecutors = new NioEventLoopGroup();
        }
    
        public Channel getChannel() {
            return channel;
        }
    
        public void connect(String ip, int port, String name) {
            try {
                WebSocketClientHandshaker handshaker = WebSocketClientHandshakerFactory.newHandshaker(
                        new URI("ws://" + ip + ":" + port + "/websocket"), WebSocketVersion.V13, null, false, new DefaultHttpHeaders());
                WebSocketClientHandler handler = new WebSocketClientHandler(handshaker);
                ClientHeartbeatHandler heartbeatHandler = new ClientHeartbeatHandler();
    
                //创建bootstrap对象,配置参数
                Bootstrap bootstrap = new Bootstrap();
                //设置线程组
                bootstrap.group(eventExecutors)
                        //设置客户端的通道实现类型
                        .channel(NioSocketChannel.class)
                        //使用匿名内部类初始化通道
                        .handler(new ChannelInitializer() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                //添加客户端通道的处理器
                                ch.pipeline().addLast(new HttpClientCodec());
                                ch.pipeline().addLast(new HttpObjectAggregator(65536));
                                ch.pipeline().addLast(new WebSocketClientProtocolHandler(handshaker, true, false));
                                ch.pipeline().addLast(new IdleStateHandler(5, 2, 0, TimeUnit.SECONDS));
                                ch.pipeline().addLast(heartbeatHandler);
                                ch.pipeline().addLast(handler);
                            }
                        });
    
                // 连接服务端
                ChannelFuture channelFuture = bootstrap.connect(ip, port);
    
                // 在连接关闭后尝试重连
                channelFuture.channel().closeFuture().addListener(future -> {
                    try {
                        if (heartbeatThread != null && heartbeatThread.isAlive()) {
                            System.out.println("停止发送心跳线程");
                            heartbeatThread.close();
                        }
                        Thread.sleep(2000);
                        System.out.println("重新连接服务端");
                        connect(ip, port, name); // 重新连接服务端
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                });
    
                channelFuture.sync();
    
                channel = channelFuture.channel();
                System.out.println(name + " 已启动");
    
                // 定时发送心跳
                heartbeatThread = new HeartbeatThread(channel);
                heartbeatThread.start();
    
                //对通道关闭进行监听
                channelFuture.channel().closeFuture().sync();
            } catch (InterruptedException | URISyntaxException e) {
                e.printStackTrace();
            } finally {
    
            }
        }
    }
    

    说明:new IdleStateHandler(5, 2, 0, TimeUnit.SECONDS)第一个参数5表示读空闲时间间隔是5秒,第二个参数2表示写空闲的时间间隔是2秒,第3个参数0表示不检测读写空闲。在ClientHeartbeatHandler代码中,读写空闲时断开与服务端的连接。

    客户端消息处理 WebSocketClientHandler

    package com.sux.demo.websocket2;
    
    import io.netty.channel.*;
    import io.netty.handler.codec.http.FullHttpResponse;
    import io.netty.handler.codec.http.websocketx.*;
    
    @ChannelHandler.Sharable
    public class WebSocketClientHandler extends SimpleChannelInboundHandler {
        private WebSocketClientHandshaker handshaker;
        private ChannelPromise handshakeFuture;
    
        public WebSocketClientHandler(WebSocketClientHandshaker handshaker) {
            this.handshaker = handshaker;
        }
    
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
            if (!handshaker.isHandshakeComplete()) {
                try {
                    handshaker.finishHandshake(ctx.channel(), (FullHttpResponse) msg);
                    handshakeFuture.setSuccess();
                } catch (WebSocketHandshakeException e) {
                    handshakeFuture.setFailure(e);
                }
                return;
            }
    
            if (msg instanceof PongWebSocketFrame) {
                System.out.println("收到服务端" + ctx.channel().remoteAddress() + "发来的心跳:PONG");
            }
    
            if (msg instanceof TextWebSocketFrame) {
                TextWebSocketFrame frame = (TextWebSocketFrame) msg;
                System.out.println("收到服务端" + ctx.channel().remoteAddress() + "发来的消息:" + frame.text()); // 接收服务端发送过来的消息
            }
        }
    
        @Override
        public void handlerAdded(ChannelHandlerContext ctx) {
            handshakeFuture = ctx.newPromise();
        }
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) {
    
        }
    
        @Override
        public void channelInactive(ChannelHandlerContext ctx) {
            System.out.println("客户端下线");
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) {
            ctx.flush();
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            ctx.close();
        }
    }
    
    

    客户端测试主机 WebSocketClientHost

    package com.sux.demo.websocket2;
    
    import io.netty.buffer.Unpooled;
    import io.netty.channel.Channel;
    import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
    import io.netty.util.CharsetUtil;
    
    public class WebSocketClientHost {
        public static void main(String[] args) {
            WebSocketClient webSocketClient = new WebSocketClient();
    
            SendDataToServerThread thread = new SendDataToServerThread(webSocketClient);
            thread.start();
    
            webSocketClient.connect("127.0.0.1", 40005, "WebSocket客户端");
        }
    }
    
    class SendDataToServerThread extends Thread {
        private WebSocketClient webSocketClient;
    
        private int index = 1;
    
        public SendDataToServerThread(WebSocketClient webSocketClient) {
            this.webSocketClient = webSocketClient;
        }
    
        @Override
        public void run() {
            try {
                while (index <= 5) {
                    Channel channel = webSocketClient.getChannel();
                    if (channel != null && channel.isActive()) {
                        String msg = "客户端发送的测试消息, index = " + index;
                        channel.writeAndFlush(new TextWebSocketFrame(Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8)));
                        index++;
                    }
                    Thread.sleep(1000);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    

    测试

    测试一

    步骤:先启动服务端,再启动客户端
    现象:客户端与服务端互发消息,消息发完后,互发心跳

    测试二

    步骤:先启动服务端,再启动客户端,然后关闭服务端,过一会再启动服务端
    现象:客户端断线重连,通信恢复,正常发消息和心跳

    测试三

    步骤:先启动客户端,过一会再启动服务端
    现象:服务端启动后,客户端连上服务端,正常通信,互发消息,消息发完互发心跳

    遇到的问题

    1. 心跳问题

    如果客户端想确认服务端是否在线,就向服务端发PING,如果服务端在线,服务端就会回一个PONG,客户端就能收到PONG,如果客户端长时间收不到服务端的消息,就会触发读空闲,然后断开与服务端的连接,触发断线重连。
    如果服务端想确认客户端是否在线,就向客户端发PING,如果客户端在线,客户端就会回一个PONG,服务端就能收到PONG,如果服务端长时间收不到客户端的消息,就会触发读空闲,然后断开与客户端的连接。
    WebSocketClientProtocolHandler和WebSocketServerProtocolHandler类都有一个dropPongFrames参数,默认为true,如果你在读空闲时写了断开连接的操作,要把dropPongFrames设置为false,这样才可以在消息处理handler中接收到PONG,否则即使对方回应了PONG,也会被丢弃掉,导致触发读空闲,导致连接被断开。

    2. 如何发心跳

    我在网上看到两种方式
    方式一:

    channel.writeAndFlush(new PingWebSocketFrame());
    

    方式二:

    channel.writeAndFlush(new TextWebSocketFrame("PING"));
    

    方式一,对方收到心跳,会自动回一个PONG,需要把dropPongFrames设置为false才能收到对方回应的PONG。
    方式二,对方收到心跳,需要手动回一个PONG,用TextWebSocketFrame包装,我方可以像收普通消息那样收到这个文本PONG。

    3. 在写空闲时发心跳需要注意

    如果我方一直向对方发业务数据,就不会触发写空闲,就不会发心跳,就收不到对方的回应PONG,如果对方又没有向我方发业务数据,就会触发我方的读空闲,如果在我方读空闲中写了断开逻辑,连接就会被主动断开,这种情况是错误的。所以本文的代码中,是在一个线程中一直定时发送PING,这样如果对方在线,也会一直定时回应PONG,只要对方在线就不会触发我方的读空闲,如果对方不在线,才会触发我方的读空闲。

  • 相关阅读:
    新华三路由器+华为交换机,实现华为交换机指定端口访问外网
    动态内存精讲
    数据结构day3
    Label Smoothing介绍及其代码实现
    TCP/UDP协议传输
    Selenium--常用元素操作方法
    23~49(构造函数+继承+类的本质+ES5中的新增方法)
    Mysql的基础架构
    线段树
    在十四届蓝桥杯开赛前一星期开始复习
  • 原文地址:https://www.cnblogs.com/s0611163/p/18194177