• 微信小程序与Netty实现的WebSocket聊天程序


    一、微信小程序实现WebSocket客户端程序

    1. 界面实现

    <input name="url" value="{{url}}" bindinput ="urlInput"/>
    <button size='mini' type="warn">断开连接button>
    <button size='mini' type="primary" bindtap="connectSocket">开启连接button>
    <textarea placeholder="输入发送内容" bindinput ="msgInput">textarea>
    <button size='mini' type="primary" bindtap="sendMsg">发送button>
    <view wx:for="{{msgs}}">{{index}}: {{item}}view>
    • 1
    • 2
    • 3
    • 4
    • 5

    界面效果:
    在这里插入图片描述

    2. WXS部分

    数据部分包含三个字段:
    url:字符串类型,代表WebSocket服务器地址;
    msgs:数组类型,用于存储发送和接收的消息;
    msg:字符串类型,用于保存发送的内容;

    另外还定义了三个函数:
    connectSocket:提供了连接WebSocket服务的功能;
    msgInput:获取用户输入的内容;
    sendMsg:实现了向远程WebSocket发送消息的功能;

    Page({
      data: {
        url: 'ws://localhost:8888/ws',
        msgs: [],
        msg: '',
      }
      // 连接WebSocket服务  
      connectSocket() {    
        let _this = this;    
        // 连接websocket服务    
        let task = wx.connectSocket({      
          url: _this.data.url    
        });    
        // 监听websocket消息,并将接收到的消息添加到消息数组msgs中   
        task.onMessage(function(res) {       
          _this.setData({        
            msgs: [..._this.data.msgs, "接收到消息 -> " + res.data]      
          });    
        });    
        // 保存websocket实例     
        _this.setData({       
          socketTask: task,       
          msgs: [..._this.data.msgs,"连接成功!"]    
        });  
      },    
      
      // 获取输入内容,并临时保存在msg中  
      msgInput(e) {    
        this.setData({       
          msg: e.detail.value    
        });  
      },    
      
      // 发送消息  
      sendMsg() {    
        // 1.获取输入内容    
        let msg = this.data.msg;    
        // 2.发送消息到WebSocket服务端    
        this.data.socketTask.send({      
          data: msg    
        });  
      }
    })
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    二、Netty实现WebSocket服务端程序

    在从HTTP或HTTPS协议切换到WebSocket时,将会使用一种称为升级握手的机制。因此我们的WebSocket服务端程序将始终以HTTP作为开始,然后再执行升级。其约定为:如果被请求的URL以/ws结尾,那么我们将会把该协议升级为WebSocket;否则,服务器将使用基本的HTTP。当连接升级完毕后,所有数据都将会使用WebSocket进行传输(如下图)。

    在这里插入图片描述

    1. 新建一个Maven工程,并引入Netty依赖

    • 项目目录结构:

    在这里插入图片描述

    • 引入Netty依赖:
    
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0modelVersion>
    
        <groupId>io.nettygroupId>
        <artifactId>NettyWebSocketartifactId>
        <version>1.0-SNAPSHOTversion>
    
        <dependencies>
            <dependency>
                <groupId>io.nettygroupId>
                <artifactId>netty-allartifactId>
                <version>4.1.48.Finalversion>
            dependency>
        dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.pluginsgroupId>
                    <artifactId>maven-compiler-pluginartifactId>
                    <configuration>
                        <source>1.8source>
                        <target>1.8target>
                    configuration>
                plugin>
            plugins>
        build>
    
    project>
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    2. 自定义处理器

    1)定义一个专门处理Http协议的处理器,当浏览器第一次连接时候会读取首页的html文件,并将html文件内容返回给浏览器展示。

    package io.netty.websocket;
    
    import io.netty.channel.*;
    import io.netty.handler.codec.http.*;
    import io.netty.handler.ssl.SslHandler;
    import io.netty.handler.stream.ChunkedNioFile;
    
    import java.io.File;
    import java.io.RandomAccessFile;
    import java.net.URISyntaxException;
    import java.net.URL;
    
    // 处理Http协议的Handler,该Handler只会在第一次客户端连接时候有用。
    public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
        private final String wsUri;
        private static final File INDEX;
    
        static {
            URL location = HttpRequestHandler.class.getProtectionDomain()
                    .getCodeSource().getLocation();
            try {
                String path = location.toURI() + "index.html";
                path = !path.contains("file:") ? path : path.substring(5);
                INDEX = new File(path);
            } catch (URISyntaxException e) {
                throw new IllegalStateException("Unable to locate index.html", e);
            }
        }
    
        public HttpRequestHandler(String wsUri) {
            this.wsUri = wsUri;
        }
    
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
            // 如果被请求的 URL 以/ws 结尾,那么我们将会把该协议升级为 WebSocket。
            if (wsUri.equalsIgnoreCase(request.getUri())) {
                // 将请求传递给下一个ChannelHandler,即WebSocketServerProtocolHandler处理
                // request.retain()会增加引用计数器,以防止资源被释放
                ctx.fireChannelRead(request.retain());
                return;
            }
            handleHttpRequest(ctx, request);
        }
    
        /**
         * 该方法读取首页html文件内容,然后将内容返回给客户端展示
         * @param ctx
         * @param request
         * @throws Exception
         */
        private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
            // HTTP1.1协议允许客户端先判定服务器是否愿意接受客户端发来的消息主体,以减少由于服务器拒绝请求所带来的额外资源开销
            if (HttpHeaders.is100ContinueExpected(request)) {
                FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE);
                ctx.writeAndFlush(response);
            }
            // 从resources目录读取index.html文件
            RandomAccessFile file = new RandomAccessFile(INDEX, "r");
            // 准备响应头信息
            HttpResponse response = new DefaultHttpResponse(request.getProtocolVersion(), HttpResponseStatus.OK);
            response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/html; charset=UTF-8");
            boolean keepAlive = HttpHeaders.isKeepAlive(request);
            if (keepAlive) {
                response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, file.length());
                response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
            }
            ctx.write(response);
            // 输出html文件内容
            ctx.write(new ChunkedNioFile(file.getChannel()));
            // 最后发送一个LastHttpContent来标记响应的结束
            ChannelFuture future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
            // 如果不是长链接,则在写操作完成后关闭Channel
            if (!keepAlive) {
                future.addListener(ChannelFutureListener.CLOSE);
            }
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
                throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84

    2)定义一个处理器,负责处理所有委托管理的WebSocket帧类型以及升级握手本身。如果握手成功,则所需的ChannelHandler将会被添加到ChannelPipeline中,而那些不需要的ChannelHandler会被移除掉。

    • WebSocket升级前的ChannelPipeline:

    在这里插入图片描述

    • WebSocket升级后的ChannelPipeline:

    在这里插入图片描述

    WebSocket升级完成后,WebSocketServerProtocolHandler会把HttpRequestDecoder替换为WebSocketFrameDecoder,把HttpResponseEncoder替换为WebSocketFrameEncoder。为了性能最大化,WebSocketServerProtocolHandler会移除任何不再被WebSocket连接所需要的ChannelHandler,其中包括 HttpObjectAggregator 和 HttpRequestHandler。

    实现代码:

    package io.netty.websocket;
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    import io.netty.channel.group.ChannelGroup;
    import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
    import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
    
    // 处理WebSocket协议的Handler
    public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
        private final ChannelGroup channelGroup;
    
        public TextWebSocketFrameHandler(ChannelGroup channelGroup) {
            this.channelGroup = channelGroup;
        }
    
        // 用户事件监听,每次客户端连接时候自动触发
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            String content = "Client " + ctx.channel().remoteAddress().toString().substring(1) + " joined";
            System.out.println(content);
            // 如果是握手完成事件,则从Pipeline中删除HttpRequestHandler,并将当前channel添加到ChannelGroup中
            if (evt == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE) {
                // 从Pipeline中删除HttpRequestHandler
                ctx.pipeline().remove(HttpRequestHandler.class);
                // 通知所有已连接的WebSocket客户端,新的客户端已经连接上了
                TextWebSocketFrame msg = new TextWebSocketFrame(content);
                channelGroup.writeAndFlush(msg);
                // 将WebSocket Channel添加到ChannelGroup中,以便可以它接收所有消息
                channelGroup.add(ctx.channel());
            } else {
                super.userEventTriggered(ctx, evt);
            }
        }
    
        // 每次客户端发送消息时执行
        @Override
        protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame msg) throws Exception {
            System.out.println("读取到的消息:" + msg.retain());
            // 将读取到的消息写到ChannelGroup中所有已经连接的客户端
            channelGroup.writeAndFlush(msg.retain());
        }
    }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    上面userEventTriggered方法监听用户事件。当有客户端连接时候,会自动执行该方法。而channelRead0方法负责读取客户端发送过来的消息,然后通过channelGroup将消息输出到所有已连接的客户端。

    3. 定义初始化器

    定义一个ChannelInitializer的子类,其主要目的是在某个 Channel 注册到 EventLoop 后,对这个 Channel 执行一些初始化操作。

    package io.netty.websocket;
    
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.group.ChannelGroup;
    import io.netty.handler.codec.http.HttpObjectAggregator;
    import io.netty.handler.codec.http.HttpServerCodec;
    import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
    import io.netty.handler.stream.ChunkedWriteHandler;
    
    public class ChatServerInitializer extends ChannelInitializer<Channel> {
        private final ChannelGroup channelGroup;
    
        public ChatServerInitializer(ChannelGroup channelGroup) {
            this.channelGroup = channelGroup;
        }
    
        @Override
        protected void initChannel(Channel channel) throws Exception {
            ChannelPipeline pipeline = channel.pipeline();
            // 安装编解码器,以实现对HttpRequest、 HttpContent、LastHttp-Content与字节之间的编解码
            pipeline.addLast(new HttpServerCodec());
            // 专门处理写文件的Handler
            pipeline.addLast(new ChunkedWriteHandler());
            // Http聚合器,可以让pipeline中下一个Channel收到完整的HTTP信息
            pipeline.addLast(new HttpObjectAggregator(64 * 1024));
            // 处理Http协议的ChannelHandler,只会在客户端第一次连接时候有用
            pipeline.addLast(new HttpRequestHandler("/ws"));
            // 升级Websocket后,使用该 ChannelHandler 处理Websocket请求
            pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
            // 安装专门处理 Websocket TextWebSocketFrame 帧的处理器
            pipeline.addLast(new TextWebSocketFrameHandler(channelGroup));
        }
    }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    4. 创建启动类

    package io.netty.websocket;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.group.ChannelGroup;
    import io.netty.channel.group.DefaultChannelGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.util.concurrent.ImmediateEventExecutor;
    
    import java.net.InetSocketAddress;
    
    public class ChatServer {
    
        public void start() {
            ChannelGroup channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                ServerBootstrap bootstrap = new ServerBootstrap();
                bootstrap.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .childHandler(new ChatServerInitializer(channelGroup));
                ChannelFuture future = bootstrap.bind(new InetSocketAddress(8888)).syncUninterruptibly();
                System.out.println("Starting ChatServer on port 8888 ...");
                future.channel().closeFuture().syncUninterruptibly();
            } finally {
                channelGroup.close();
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    
        public static void main(String[] args) throws Exception {
            new ChatServer().start();
        }
    }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    5. 编写一个html文件

    该html文件提供网页版的WebSocket客户端页面。在src/main/resources目录下新建一个html文件。

    DOCTYPE html>
    <html>
    <head>
        <meta charset="UTF-8">
        <title>WebSocket Chattitle>
    head>
    <body>
    <form οnsubmit="return false;">
        <h3>WebSocket 聊天室:h3>
        <textarea id="responseText" style="width: 500px; height: 300px;">textarea><br/>
        <input type="text" name="message"  style="width: 300px" value="Hello Netty"/>
        <input type="button" value="发送消息" onclick="send(this.form.message.value)"/>
        <input type="button" value="清空聊天记录" onclick="clearScreen()"/>
    form>
    <script type="text/javascript">
        var socket;
        if (!window.WebSocket) {
            window.WebSocket = window.MozWebSocket;
        }
        if (window.WebSocket) {
            socket = new WebSocket("ws://localhost:8888/ws");
            // 注意:使用tls协议通信时候,协议名为wss
            // socket = new WebSocket("wss://localhost:8443/ws");
            socket.onopen = function(event) {
                var ta = document.getElementById('responseText');
                ta.value = "连接开启!";
            };
            socket.onclose = function(event) {
                var ta = document.getElementById('responseText');
                ta.value = ta.value + '\n' + "连接被关闭!";
            };
            socket.onmessage = function(event) {
                var ta = document.getElementById('responseText');
                ta.value = ta.value + '\n' + event.data;
            };
        } else {
            alert("你的浏览器不支持 WebSocket!");
        }
    
        function send(message) {
            if (!window.WebSocket) {
                return;
            }
            if (socket.readyState == WebSocket.OPEN) {
                socket.send(message);
            } else {
                alert("连接没有开启.");
            }
        }
    
        function clearScreen() {
            document.getElementById('responseText').value = "";
        }
    script>
    body>
    html>
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55

    界面效果:
    在这里插入图片描述
    最终效果:
    在这里插入图片描述

  • 相关阅读:
    基于振弦式轴力计和采集仪的安全监测解决方案
    docker 安装 redis 6.0.8 cluster 实战 (3主3从) 动态缩容
    本地化小程序运营 同城小程序开发
    几款免费的时序数据库对比
    Python基础 --(1)Python概述
    汽车驾驶 - 四梁六柱是什么
    项目管理逻辑:为什么职能部门官僚主义气息浓重?
    广州某机械制造企业生产工序管理系统解决方案
    jmeter做性能测试
    第三方App与Termux命令建立IO通道
  • 原文地址:https://blog.csdn.net/zhongliwen1981/article/details/127247243