• rabbimq之java.net.SocketException: Connection reset与MissedHeartbeatException分析


    一、前言

    android前端中接入了rabbitmq消息队列来处理业务,在手机网络环境错综复杂,网络信号不稳定,可能导致mq的频繁断开与连接,在日志中,发现有很多这样的日志,java.net.SocketException: Connection reset,接下来通过源码调试来分析下该错误可能产生的原因。MissedHeartbeatException则是在客户端在多次未收到服务端的消息后,认为服务端已经断开,则抛出该异常。

    二、分析

    java.net.SocketException: Connection reset在网络搜了一圈,基本上说的是客户端连接着mq,但是服务端已经断开与客户端的连接,此时客户端还在执行接收数据操作,就会发生该错误。

    三、MQ的心跳机制

    MQ在创建连接的时候则会进行初始化开启心跳服务initializeHeartbeatSender();

      private void initializeHeartbeatSender() {
            this._heartbeatSender = new HeartbeatSender(_frameHandler, heartbeatExecutor, threadFactory);
        }
    
    • 1
    • 2
    • 3

    在rabbitmq中,客户端会间隔1/2的心跳周期来定时发送心跳

        /**
         * Sets the heartbeat in seconds.
         */
        public void setHeartbeat(int heartbeatSeconds) {
            synchronized(this.monitor) {
                if(this.shutdown) {
                    return;
                }
    
                // cancel any existing heartbeat task
                if(this.future != null) {
                    this.future.cancel(true);
                    this.future = null;
                }
    
                if (heartbeatSeconds > 0) {
                    // wake every heartbeatSeconds / 2 to avoid the worst case
                    // where the last activity comes just after the last heartbeat
                    long interval = SECONDS.toNanos(heartbeatSeconds) / 2;
                    ScheduledExecutorService executor = createExecutorIfNecessary();
                    Runnable task = new HeartbeatRunnable(interval);
                    this.future = executor.scheduleAtFixedRate(
                        task, interval, interval, TimeUnit.NANOSECONDS);
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    发送心跳,此时如果发生IO异常,这边没处理

        private final class HeartbeatRunnable implements Runnable {
    
            private final long heartbeatNanos;
    
            private HeartbeatRunnable(long heartbeatNanos) {
                this.heartbeatNanos = heartbeatNanos;
            }
    
            @Override
            public void run() {
                try {
                    LogUtils.log("心跳定时器发送");
                    long now = System.nanoTime();
    
                    if (now > (lastActivityTime + this.heartbeatNanos)) {
                        frameHandler.writeFrame(new Frame(AMQP.FRAME_HEARTBEAT, 0));
                        frameHandler.flush();
                    }
                } catch (IOException e) {
                    // ignore
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    结合官方文档和客户端源代码,心跳默认超时时间是60秒,并且每隔30秒进行一次心跳检查,如果超过两次心跳检查都没有确定节点检查,则会关闭连接

    3.1测试

    测试用例中, 将心跳周期设置为30秒

    public static void main(String[] args) {
            String queueName="123456";
            ExecutorService executor= Executors.newFixedThreadPool(10); 
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("192.0.11.211");
            factory.setPort(5672);
            factory.setUsername("admin");
            factory.setVirtualHost("/");
            factory.setPassword("admin");
            factory.setConnectionTimeout(5000);       
            factory.setAutomaticRecoveryEnabled(false); 
            factory.setTopologyRecoveryEnabled(false);   
            factory.setRequestedHeartbeat(30);
            executor.submit(() -> {
                try {
                    Connection connection = factory.newConnection();
                    LogUtils.log("连接创建成功");
                    connection.addShutdownListener(cause -> {
                        LogUtils.log("断开连接:"+cause.getMessage()+" msg=>:"+cause.getCause());
                    });
                    Channel channel = connection.createChannel();
                    LogUtils.log("创建通道成功:" + channel.getChannelNumber());
                    channel.basicQos(30);
                    channel.basicConsume(queueName, false, new DefaultConsumer(channel) {
                        @Override
                        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
                            try {
                                String message = new String(body, "UTF-8");
                                LogUtils.log("消息:"+message);
                                channel.basicReject(envelope.getDeliveryTag(), false);
                            } catch (Exception e) {
                                LogUtils.log("消费者异常,e:" + e.getMessage()+" consumerTag:"+consumerTag);
                            }
                        }
                    });
                    channel.addShutdownListener(cause -> {
                        LogUtils.log("消费者断开连接:" + cause.getMessage() + " msg=>:" + cause.getCause().toString());
                    });
                } catch (Exception e) {
                    LogUtils.log("发生异常:"+e);
                    e.printStackTrace();
                }
            });
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    然后将心跳的发送业务关闭

     private final class HeartbeatRunnable implements Runnable {
    
            private final long heartbeatNanos;
    
            private HeartbeatRunnable(long heartbeatNanos) {
                this.heartbeatNanos = heartbeatNanos;
            }
    
            @Override
            public void run() {
                try {
                    LogUtils.log("心跳定时器发送");
                    long now = System.nanoTime();
    
                    if (now > (lastActivityTime + this.heartbeatNanos)) {
                      //  frameHandler.writeFrame(new Frame(AMQP.FRAME_HEARTBEAT, 0));
                      //   frameHandler.flush();
                    }
                } catch (IOException e) {
                    // ignore
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    运行后,如下:

    2023-09-25 09:35:02.976=>连接创建成功
    2023-09-25 09:35:02.987=>创建通道成功:1
    2023-09-25 09:35:17.948=>心跳定时器发送
    2023-09-25 09:35:32.948=>心跳定时器发送
    2023-09-25 09:35:47.949=>心跳定时器发送
    2023-09-25 09:36:02.949=>心跳定时器发送
    2023-09-25 09:36:17.948=>心跳定时器发送
    2023-09-25 09:36:32.948=>心跳定时器发送
    2023-09-25 09:36:32.960=>消费者断开连接:connection error msg=>:java.net.SocketException: Connection reset
    2023-09-25 09:36:32.962=>断开连接:connection error msg=>:java.net.SocketException: Connection reset
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    结果分析,服务端在3个心跳周期未检测到客户端的心跳后,则会默认客户端已经断线,则将其断开。

    四、MissedHeartbeatException分析

    在客户端连接MQ成功后,则开始数据服务的读取this._frameHandler.initialize(this);

        private void startIoLoops() {
            if (executorService == null) {
                Thread nioThread = Environment.newThread(
                    threadFactory,
                    new NioLoop(socketChannelFrameHandlerFactory.nioParams, this),
                    "rabbitmq-nio"
                );
                nioThread.start();
            } else {
                this.executorService.submit(new NioLoop(socketChannelFrameHandlerFactory.nioParams, this));
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    读取线程业务方法,如果frame不为空,则丢失心跳这边重置为0次,反之则开始计数丢失次数

    private void readFrame(Frame frame) throws IOException {
            LogUtils.log("开始读取数据");
            if (frame != null) {
                _missedHeartbeats = 0;
                if (frame.getType() == AMQP.FRAME_HEARTBEAT) {
                    LogUtils.log("读取数据:心跳"); 
                } else {
                    if (frame.getChannel() == 0) { // the special channel
                        _channel0.handleFrame(frame);
                    } else {
                        if (isOpen()) { 
                            ChannelManager cm = _channelManager;
                            if (cm != null) {
                                ChannelN channel;
                                try {
                                    channel = cm.getChannel(frame.getChannel());
                                } catch(UnknownChannelException e) { 
                                    LOGGER.info("Received a frame on an unknown channel, ignoring it");
                                    return;
                                }
                                channel.handleFrame(frame);
                            }
                        }
                    }
                }
            } else {    
            	LogUtils.log("开始读取数据frame为空"); 
                handleSocketTimeout();
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    超时机制,如果进入该业务,则_missedHeartbeats 会自动加1,如果超过一定次数,则会跑出MissedHeartbeatException

        private void handleSocketTimeout() throws SocketTimeoutException {
            if (_inConnectionNegotiation) {
                throw new SocketTimeoutException("Timeout during Connection negotiation");
            }
            if (_heartbeat == 0) { // No heart-beating
                return;
            }
            LogUtils.log("handleSocketTimeout-------_missedHeartbeats心跳:"+_missedHeartbeats);
            if (++_missedHeartbeats > (1)) {
                throw new MissedHeartbeatException("Heartbeat missing with heartbeat = " +
                                                   _heartbeat + " seconds, for " + this.getHostAddress());
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    为了方便测试,将心跳设置为10s,将_missedHeartbeats 判断大于1则抛出MissedHeartbeatException异常

    4.1测试
    2023-09-25 10:21:16.565=>开始读取数据
    2023-09-25 10:21:16.651=>开始读取数据
    2023-09-25 10:21:16.658=>开始读取数据
    2023-09-25 10:21:16.658=>连接创建成功
    2023-09-25 10:21:16.669=>开始读取数据
    2023-09-25 10:21:16.670=>创建通道成功:1
    2023-09-25 10:21:16.671=>开始读取数据
    2023-09-25 10:21:16.675=>开始读取数据
    2023-09-25 10:21:19.177=>开始读取数据
    2023-09-25 10:21:19.177=>开始读取数据frame为空
    2023-09-25 10:21:19.177=>handleSocketTimeout-------_missedHeartbeats心跳:0
    2023-09-25 10:21:21.659=>开始读取数据
    2023-09-25 10:21:21.659=>读取数据:心跳
    2023-09-25 10:21:24.160=>开始读取数据
    2023-09-25 10:21:24.160=>开始读取数据frame为空
    2023-09-25 10:21:24.161=>handleSocketTimeout-------_missedHeartbeats心跳:0
    2023-09-25 10:21:26.659=>开始读取数据
    2023-09-25 10:21:26.660=>读取数据:心跳
    2023-09-25 10:21:29.161=>开始读取数据
    2023-09-25 10:21:29.161=>开始读取数据frame为空
    2023-09-25 10:21:29.161=>handleSocketTimeout-------_missedHeartbeats心跳:0
    2023-09-25 10:21:31.661=>开始读取数据
    2023-09-25 10:21:31.661=>读取数据:心跳
    2023-09-25 10:21:34.161=>开始读取数据
    2023-09-25 10:21:34.161=>开始读取数据frame为空
    2023-09-25 10:21:34.161=>handleSocketTimeout-------_missedHeartbeats心跳:0
    2023-09-25 10:21:36.662=>开始读取数据
    2023-09-25 10:21:36.662=>开始读取数据frame为空
    2023-09-25 10:21:36.662=>handleSocketTimeout-------_missedHeartbeats心跳:1
    2023-09-25 10:21:36.668=>消费者断开连接:connection error msg=>:com.rabbitmq.client.MissedHeartbeatException: Heartbeat missing with heartbeat = 10 seconds, for 192.0.11.211
    2023-09-25 10:21:36.671=>断开连接:connection error msg=>:com.rabbitmq.client.MissedHeartbeatException: Heartbeat missing with heartbeat = 10 seconds, for 192.0.11.211
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    上面可知,服务端会在心跳周期向客户端发送心跳,如果在客户端没收到任何消息时间段内,MissedHeartbeatException超过一定次数,则将跑出该异常,官方默认是2*4=8

  • 相关阅读:
    解决:ERROR: No matching distribution found for PIL
    【AI视野·今日Robot 机器人论文速览 第八十二期】Tue, 5 Mar 2024
    以下关于服务器控件的叙述中正确的是
    Python:50行代码实现下载小说,图片章节可自动识别转文字保存...
    【退役记】NOI2022
    java计算机毕业设计小说阅读网站源程序+mysql+系统+lw文档+远程调试
    Shell脚本-变量的定义、赋值和删除
    springboot项目整合kafka实现消息队列
    (五)编译中出现的向后兼容问题
    OSPF中LSA相关内容
  • 原文地址:https://blog.csdn.net/u010520146/article/details/133266412