码农知识堂 - 1000bd
  •   Python
  •   PHP
  •   JS/TS
  •   JAVA
  •   C/C++
  •   C#
  •   GO
  •   Kotlin
  •   Swift
  • Spring Boot集成Redisson实现延迟队列


    项目场景:

       在电商、支付等领域,往往会有这样的场景,用户下单后放弃支付了,那这笔订单会在指定的时间段后进行关闭操作,细心的你一定发现了像某宝、某东都有这样的逻辑,而且时间很准确,误差在1s内;那他们是怎么实现的呢?

       一般实现的方法有几种:使用 redisson、rocketmq、rabbitmq等消息队列的延时投递功能。


    解决方案:

       一般项目集成redis的比较多,所以我这篇文章就说下redisson延迟队列,如果使用rocketmq或rabbitmq需要额外集成中间件,比较麻烦一点。

    1.集成redisson

    maven依赖

    1. org.redisson
    2. redisson-spring-boot-starter
    3. 3.21.1

    yml配置,单节点配置可以兼容redis的配置方式

    1. # redis配置
    2. spring:
    3. redis:
    4. database: 0
    5. host: 127.0.0.1
    6. password: redis@pass
    7. port: 6001

     更详细的配置参考:Spring Boot整合Redisson的两种方式-CSDN博客

    2.配置多线程

    因为延迟队列可能会多个任务同时执行,所以需要多线程处理。

    1. import org.springframework.context.annotation.Bean;
    2. import org.springframework.context.annotation.Configuration;
    3. import org.springframework.scheduling.annotation.EnableAsync;
    4. import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
    5. import java.util.concurrent.ThreadPoolExecutor;
    6. @Configuration
    7. @EnableAsync
    8. public class ExecutorConfig {
    9. /**
    10. * 异步任务自定义线程池
    11. */
    12. @Bean(name = "taskExecutor")
    13. public ThreadPoolTaskExecutor asyncServiceExecutor() {
    14. ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    15. //配置核心线程数
    16. executor.setCorePoolSize(50);
    17. //配置最大线程数
    18. executor.setMaxPoolSize(500);
    19. //配置队列大小
    20. executor.setQueueCapacity(300);
    21. //允许线程空闲时间
    22. executor.setKeepAliveSeconds(60);
    23. //配置线程池中的线程的名称前缀
    24. executor.setThreadNamePrefix("taskExecutor-");
    25. // rejection-policy:当pool已经达到max size的时候,如何处理新任务
    26. // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
    27. executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    28. //调用shutdown()方法时等待所有的任务完成后再关闭
    29. executor.setWaitForTasksToCompleteOnShutdown(true);
    30. //等待所有任务完成后的最大等待时间
    31. executor.setAwaitTerminationSeconds(60);
    32. return executor;
    33. }
    34. }

    3.具体业务

    比如消息通知、关闭订单等 ,这里加上了@Async注解,可以异步执行

    1. import org.springframework.scheduling.annotation.Async;
    2. import org.springframework.stereotype.Service;
    3. import java.text.SimpleDateFormat;
    4. import java.util.Date;
    5. @Service
    6. public class AsyncService {
    7. @Async
    8. public void executeQueue(Object value) {
    9. System.out.println();
    10. System.out.println("当前线程:"+Thread.currentThread().getName());
    11. System.out.println("执行任务:"+value);
    12. //打印时间方便查看
    13. SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    14. System.out.println("执行任务的时间:"+sdf.format(new Date()));
    15. //自己的业务逻辑,可以根据id发送通知消息等
    16. //......
    17. }
    18. }

    4.延迟队列(关键代码)

    这里包括添加延迟队列,和消费延迟队列,@PostConstruct注解的意思是服务启动加载一次,参考

    Spring Boot项目启动时执行指定的方法-CSDN博客Spring Boot中多个PostConstruct注解执行顺序控制_多个postconstruct执行顺序-CSDN博客

    1. import org.redisson.api.RBlockingQueue;
    2. import org.redisson.api.RDelayedQueue;
    3. import org.redisson.api.RedissonClient;
    4. import org.springframework.beans.factory.annotation.Autowired;
    5. import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
    6. import org.springframework.stereotype.Service;
    7. import javax.annotation.PostConstruct;
    8. import javax.annotation.Resource;
    9. import java.text.SimpleDateFormat;
    10. import java.util.Date;
    11. import java.util.concurrent.TimeUnit;
    12. @Service
    13. public class TestService {
    14. @Resource
    15. private AsyncService asyncService;
    16. @Resource
    17. private ThreadPoolTaskExecutor executor;
    18. @Autowired
    19. private RedissonClient redissonClient;
    20. /**
    21. * 添加延迟任务
    22. */
    23. public void addQueue() {
    24. //获取延迟队列
    25. RBlockingQueue blockingQueue = redissonClient.getBlockingQueue("delayedQueue");
    26. RDelayedQueue delayedQueue = redissonClient.getDelayedQueue(blockingQueue);
    27. for (int i = 1; i <= 10; i++) {
    28. long delayTime = 5+i; //延迟时间(秒)
    29. // long delayTime = 5; //这里时间统一,可以测试并发执行
    30. delayedQueue.offer("延迟任务"+i, delayTime, TimeUnit.SECONDS);
    31. }
    32. //打印时间方便查看
    33. SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    34. System.out.println("添加任务的时间:"+sdf.format(new Date()));
    35. }
    36. /**
    37. * 服务启动时加载,开始消费延迟队列
    38. */
    39. @PostConstruct
    40. public void consumer() {
    41. System.out.println("服务启动时加载>>>>>>");
    42. //获取延迟队列
    43. RBlockingQueue delayedQueue = redissonClient.getBlockingQueue("delayedQueue");
    44. //启用一个线程来消费这个延迟队列
    45. executor.execute(() ->{
    46. while (true){
    47. try {
    48. // System.out.println("while中的线程:"+Thread.currentThread().getName());
    49. //获取延迟队列中的任务
    50. Object value = delayedQueue.poll();
    51. if(value == null){
    52. //如果没有任务就休眠1秒,休眠时间根据业务自己定义
    53. Thread.sleep(1000); //这里休眠时间越短,误差就越小
    54. continue;
    55. }
    56. //异步处理延迟队列中的消息
    57. asyncService.executeQueue(value);
    58. } catch (Exception e) {
    59. e.printStackTrace();
    60. }
    61. }
    62. });
    63. }
    64. }
    65. 5.测试接口 

      1. import com.test.service.TestService;
      2. import org.springframework.beans.factory.annotation.Autowired;
      3. import org.springframework.web.bind.annotation.GetMapping;
      4. import org.springframework.web.bind.annotation.RequestMapping;
      5. import org.springframework.web.bind.annotation.RestController;
      6. @RestController
      7. @RequestMapping("/test")
      8. public class TestController {
      9. @Autowired
      10. private TestService testService;
      11. /*
      12. * 添加延迟任务
      13. */
      14. @GetMapping(value = "/addQueue")
      15. public String addQueue() {
      16. testService.addQueue();
      17. return "success";
      18. }
      19. }

      6.测试结果


       总结:

      1. Redisson的的RDelayedQueue是基于Redis实现的,而Redis本身并不保证数据的持久性。如果Redis服务器宕机,那么所有在RDelayedQueue中的数据都会丢失。因此,我们需要在应用层面进行持久化设计,例如定期将RDelayedQueue中的数据持久化到数据库。
      2. 在设计延迟任务时,我们应该根据实际需求来合理设置延迟时间,避免设置过长的延迟时间导致内存占用过高。

      源码:https://download.csdn.net/download/u011974797/89225515 

    66. 相关阅读:
      Sentinel配置持久化到Nacos实现流控熔断
      石化人员定位方案:uBeacon+ibeacon融合定位特点
      主线程和子线程的关系(讨论主线程结束,子线程是否要回收)
      【b站-湖科大教书匠】1 计算机网络概述-计算机网络微课堂
      使用 Pygame 构建和可视化数独游戏
      Vue源码探秘(一)——Vue-Router原理实现
      LeetCode+ 71 - 75
      安卓逆向之雷电模拟器中控
      C++构造函数中调用虚函数为什么不会实现多态
      4. 线性回归以及基础优化算法
    67. 原文地址:https://blog.csdn.net/u011974797/article/details/138195387
      • 最新文章
      • 攻防演习之三天拿下官网站群
        数据安全治理学习——前期安全规划和安全管理体系建设
        企业安全 | 企业内一次钓鱼演练准备过程
        内网渗透测试 | Kerberos协议及其部分攻击手法
        0day的产生 | 不懂代码的"代码审计"
        安装scrcpy-client模块av模块异常,环境问题解决方案
        leetcode hot100【LeetCode 279. 完全平方数】java实现
        OpenWrt下安装Mosquitto
        AnatoMask论文汇总
        【AI日记】24.11.01 LangChain、openai api和github copilot
      • 热门文章
      • 十款代码表白小特效 一个比一个浪漫 赶紧收藏起来吧!!!
        奉劝各位学弟学妹们,该打造你的技术影响力了!
        五年了,我在 CSDN 的两个一百万。
        Java俄罗斯方块,老程序员花了一个周末,连接中学年代!
        面试官都震惊,你这网络基础可以啊!
        你真的会用百度吗?我不信 — 那些不为人知的搜索引擎语法
        心情不好的时候,用 Python 画棵樱花树送给自己吧
        通宵一晚做出来的一款类似CS的第一人称射击游戏Demo!原来做游戏也不是很难,连憨憨学妹都学会了!
        13 万字 C 语言从入门到精通保姆级教程2021 年版
        10行代码集2000张美女图,Python爬虫120例,再上征途
      Copyright © 2022 侵权请联系2656653265@qq.com    京ICP备2022015340号-1
      正则表达式工具 cron表达式工具 密码生成工具

      京公网安备 11010502049817号