• 【无标题】Delayed延迟队列不工作


    背景

    项目中使用java 自带的延迟队列Delayed,只有添加进队列的消息,并没有被消费到

    版本

    jdk1.8

    问题原因

    上一个消费队列出现异常并且没有捕获,下一个队列就没有进行消费

    复现代码

    没有抛异常的情况下

    package com.ccb.core.config.delay;
    
    import lombok.Data;
    
    import java.util.concurrent.Delayed;
    import java.util.concurrent.TimeUnit;
    
    /**
     * 防护舱延迟对象
     *
     * @author sz
     * @version 1.0
     * @date 2023-02-10 15:47
     */
    @Data
    public class TestDelay implements Delayed {
    
    
         private  String seqId;
        /**
        *过期时间
        */
        private Long expireTime;
        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(this.expireTime - System.currentTimeMillis() , TimeUnit.MILLISECONDS);
        }
    
        @Override
        public int compareTo(Delayed other) {
            if (other == this){
                return 0;
            }
            if(other instanceof TestDelay){
                TestDelay otherRequest = (TestDelay)other;
                long otherStartTime = otherRequest.expireTime;
                return (int)(this.expireTime - otherStartTime);
            }
            return 0;
        }
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    package com.ccb.core.config.delay;
    
    import com.ccb.core.common.util.DateUtil;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.PostConstruct;
    import javax.annotation.Resource;
    import java.util.concurrent.DelayQueue;
    import java.util.concurrent.Executor;
    
    /**
     * @author sz
     * @version 1.0
     * @date 2023-10-08 19:12
     */
    @Slf4j
    @Component
    public class ProtectDelayTestComponent {
    
        private static DelayQueue<TestDelay> delayQueue = new DelayQueue<TestDelay>();
    
        @Resource(name = "poiExecutor")
        private Executor poiExecutor;
    
        /**
         * 系统启动时,预先加载的数据@PostConstruct
         */
        @PostConstruct
        public void init(){
    
            log.info("线程进入ProtectDelayTestComponent***************init");
    
            poiExecutor.execute(() -> {
    
                while(true){
                    try {
                        TestDelay protectDelay = delayQueue.take();
                        log.info("获取到的延迟队列信息:{}", protectDelay);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
    
        }
    
        /**
         * 加入延时队列
         *
         */
        public boolean addDelayQueue(TestDelay protectDelay){
            log.info("添加进延迟队列信息为{},对应的过期时间为:{}",protectDelay, DateUtil.convertTimeToString(protectDelay.getExpireTime(),"yyyy-MM-dd HH:mm:ss"));
            return delayQueue.add(protectDelay);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
     /**
         * poi线程池设置 由spring容器进行管理
         * @return
         */
        @Bean("poiExecutor")
        public Executor poiExecutor() {
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            executor.setCorePoolSize(8);
            executor.setMaxPoolSize(16);
            executor.setQueueCapacity(1000);
            executor.setKeepAliveSeconds(60);
            executor.setAllowCoreThreadTimeOut(true);
            executor.setThreadNamePrefix("poiExecutor-");
            executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
            return executor;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    写TestController 测试正常的添加进入延迟队列

      @GetMapping("/test/testProtectDelay")
        @ApiModelProperty("测试批量死信队列")
        public RespData testProtectDelay(){
            TestDelay protectDelay =new TestDelay();
            String s = UuidUtils.generateUuid();
            log.info("testProtectDelay的参数:{}",s);
            protectDelay.setSeqId(s);
            protectDelay.setExpireTime(System.currentTimeMillis()+10*1000);
            protectDelayTestComponent.addDelayQueue(protectDelay);
            return RespData.success();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    正常运行结果
    在这里插入图片描述

    手动抛出异常后

    在这里插入图片描述

    解决方案

    在take中捕获Exception ,可以继续消费

  • 相关阅读:
    (C++)把字符串转换成整数
    【嵌入式C】栈内存与printf,代码正确运行,删掉 printf 代码就崩溃了??
    linux usb驱动移植(1)
    修改克隆虚拟机的静态ip地址
    Flutter的专属Skia引擎解析+用法原理
    【FPGA教程案例57】深度学习案例4——基于FPGA的CNN卷积神经网络之卷积层verilog实现
    前端第二天___NPM是什么?怎么使用?NPM命令运行项目详解
    μC/OS-II---时间管理(os_time.c)
    easywsclient的DEMO测试
    Golang中的New和Make:内存分配与初始化的区别
  • 原文地址:https://blog.csdn.net/qq_39684784/article/details/133688257