• RabbitMQ之TTL机制


            在京东下单,订单创建成功,等待支付,一般会给30分钟的时间,开始倒计时。如果在这段时间内用户没有支付,则默认订单取消。 

    该如何实现?

    • 定期轮询(数据库等)
      • 用户下单成功,将订单信息放入数据库,同时将支付状态放入数据库,用户付款更改数据库状态。定期轮询数据库支付状态,如果超过30分钟就将该订单取消。
      • 优点:设计实现简单
      • 缺点:需要对数据库进行大量的IO操作,效率低下。
    • Timer
      1. SimpleDateFormat simpleDateFormat = new SimpleDateFormat("HH:mm:ss");
      2. Timer timer = new Timer();
      3. TimerTask timerTask = new TimerTask() {
      4. @Override
      5. public void run() {
      6. System.out.println("用户没有付款,交易取消:" + simpleDateFormat.format(new Date(System.currentTimeMillis())));
      7. timer.cancel();
      8. }
      9. };
      10. System.out.println("等待用户付款:" + simpleDateFormat.format(new Date(System.currentTimeMillis())));
      11. // 10秒后执行timerTask
      12. timer.schedule(timerTask, 10 * 1000);
      • 缺点
        • Timers没有持久化机制
        • Timers不灵活 (只可以设置开始时间和重复间隔,对等待支付貌似够用)
        • Timers 不能利用线程池,一个timer一个线程
        • Timers没有真正的管理计划
    • ScheduledExecutorService
      1. SimpleDateFormat format = new SimpleDateFormat("HH:mm:ss");
      2. // 线程工厂
      3. ThreadFactory factory = Executors.defaultThreadFactory();
      4. // 使用线程池
      5. ScheduledExecutorService service = new ScheduledThreadPoolExecutor(10, factory);
      6. System.out.println("开始等待用户付款10秒:" + format.format(new Date()));
      7. service.schedule(new Runnable() {
      8. @Override
      9. public void run() {
      10. System.out.println("用户未付款,交易取消:" + format.format(new Date()));
      11. }// 等待10s 单位秒
      12. }, 10, TimeUnit.SECONDS);
      • 优点:可以多线程执行,一定程度上避免任务间互相影响,单个任务异常不影响其它任务。
      • 在高并发的情况下,不建议使用定时任务去做,因为太浪费服务器性能,不建议。
    • RabbitMQ:使用TTL
    • Quartz
    • Redis Zset
    • JCronTab
    • SchedulerX
    • 。。。

    TTL,Time to Live 的简称,即过期时间。

    RabbitMQ 可以对消息和队列两个维度来设置TTL。

       任何消息中间件的容量和堆积能力都是有限的,如果有一些消息总是不被消费掉,那么需要有一种过期的机制来做兜底。

    目前有两种方法可以设置消息的TTL。

    1. 通过Queue属性设置,队列中所有消息都有相同的过期时间。
    2. 对消息自身进行单独设置,每条消息的TTL 可以不同。

            如果两种方法一起使用,则消息的TTL 以两者之间较小数值为准。通常来讲,消息在队列中的生存时间一旦超过设置的TTL 值时,就会变成“死信”(Dead Message),消费者默认就无法再收到该消息。当然,“死信”也是可以被取出来消费的

    1、原生API案例

    1. package com.lagou.rabbitmq.demo;
    2. import com.rabbitmq.client.*;
    3. import java.nio.charset.StandardCharsets;
    4. import java.util.HashMap;
    5. import java.util.Map;
    6. public class Producer {
    7. public static void main(String[] args) throws Exception {
    8. ConnectionFactory factory = new ConnectionFactory();
    9. factory.setUri("amqp://root:123456@192.168.80.121:5672/%2f");
    10. Connection connection = factory.newConnection();
    11. Channel channel = connection.createChannel();
    12. Map arguments = new HashMap<>();
    13. // 消息队列中消息的过期时间
    14. arguments.put("x-message-ttl", 10 * 1000);
    15. // 如果消息队列中没有消费者,则10s后消息过期,队列将会被自动删除
    16. arguments.put("x-expires", 60 * 1000);
    17. channel.queueDeclare("queue.ttl.waiting",
    18. true,
    19. false,
    20. false,
    21. arguments);
    22. channel.exchangeDeclare("ex.ttl.waiting",
    23. BuiltinExchangeType.DIRECT,
    24. true,
    25. false,
    26. false,
    27. null);
    28. channel.queueBind("queue.ttl.waiting", "ex.ttl.waiting", "key.ttl.waiting");
    29. AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
    30. .contentEncoding("utf-8")
    31. .deliveryMode(2) // 设置消息持久化,2代表设置消息持久化
    32. .build();
    33. channel.basicPublish("ex.ttl.waiting", "key.ttl.waiting", properties, "等待的订单号".getBytes(StandardCharsets.UTF_8));
    34. channel.close();
    35. connection.close();
    36. }
    37. }

    此外,还可以通过命令行方式设置全局TTL,执行如下命令:

    rabbitmqctl set_policy TTL ".*" '{"message-ttl":30000}' --apply-to queues

    默认规则:

    • 如果不设置TTL,则表示此消息不会过期;
    • 如果TTL设置为0,则表示除非此时可以直接将消息投递到消费者,否则该消息会被立即丢弃;

            注意理解message-ttlx-expires 这两个参数的区别,有不同的含义。但是这两个参数属性都遵循上面的默认规则。一般TTL相关的参数单位都是毫秒(ms)

    2、springboot案例

    (1)pom.xml添加依赖

    1. <dependencies>
    2. <dependency>
    3. <groupId>org.springframework.bootgroupId>
    4. <artifactId>spring-boot-starter-amqpartifactId>
    5. dependency>
    6. <dependency>
    7. <groupId>org.springframework.bootgroupId>
    8. <artifactId>spring-boot-starter-webartifactId>
    9. dependency>
    10. <dependency>
    11. <groupId>org.springframework.bootgroupId>
    12. <artifactId>spring-boot-starter-testartifactId>
    13. <scope>testscope>
    14. <exclusions>
    15. <exclusion>
    16. <groupId>org.junit.vintagegroupId>
    17. <artifactId>junit-vintage-engineartifactId>
    18. exclusion>
    19. exclusions>
    20. dependency>
    21. <dependency>
    22. <groupId>org.springframework.amqpgroupId>
    23. <artifactId>spring-rabbit-testartifactId>
    24. <scope>testscope>
    25. dependency>
    26. dependencies>

    (2)application.properties添加rabbitmq连接信息

    1. spring.application.name=ttl
    2. spring.rabbitmq.host=node1
    3. spring.rabbitmq.virtual-host=/
    4. spring.rabbitmq.username=root
    5. spring.rabbitmq.password=123456
    6. spring.rabbitmq.port=5672

    (3)主入口类

    1. package com.lagou.rabbitmq.demo;
    2. import org.springframework.boot.SpringApplication;
    3. import org.springframework.boot.autoconfigure.SpringBootApplication;
    4. @SpringBootApplication
    5. public class RabbitmqDemo {
    6. public static void main(String[] args) {
    7. SpringApplication.run(RabbitmqDemo07.class, args);
    8. }
    9. }

    (4)RabbitConfig类

    1. package com.lagou.rabbitmq.demo.config;
    2. import org.springframework.amqp.core.*;
    3. import org.springframework.context.annotation.Bean;
    4. import org.springframework.context.annotation.Configuration;
    5. import java.util.HashMap;
    6. import java.util.Map;
    7. @Configuration
    8. public class RabbitConfig {
    9. @Bean
    10. public Queue queueTTLWaiting() {
    11. Map props = new HashMap<>();
    12. // 对于该队列中的消息,设置都等待10s
    13. props.put("x-message-ttl", 10000);
    14. Queue queue = new Queue("q.pay.ttl-waiting", false, false, false, props);
    15. return queue;
    16. }
    17. @Bean
    18. public Queue queueWaiting() {
    19. Queue queue = new Queue("q.pay.waiting", false, false, false);
    20. return queue;
    21. }
    22. @Bean
    23. public Exchange exchangeTTLWaiting() {
    24. DirectExchange exchange = new DirectExchange("ex.pay.ttlwaiting", false, false);
    25. return exchange;
    26. }
    27. /**
    28. * 该交换器使用的时候,需要给每个消息设置有效期
    29. *
    30. * @return
    31. */
    32. @Bean
    33. public Exchange exchangeWaiting() {
    34. DirectExchange exchange = new DirectExchange("ex.pay.waiting", false, false);
    35. return exchange;
    36. }
    37. @Bean
    38. public Binding bindingTTLWaiting() {
    39. return BindingBuilder.bind(queueTTLWaiting()).to(exchangeTTLWaiting()).with("pay.ttl - waiting").noargs();
    40. }
    41. @Bean
    42. public Binding bindingWaiting() {
    43. return BindingBuilder.bind(queueWaiting()).to(exchangeWaiting()).with("pay.waiting").noargs();
    44. }
    45. }

    (5)PayController类

    1. package com.lagou.rabbitmq.demo.controller;
    2. import org.springframework.amqp.core.AmqpTemplate;
    3. import org.springframework.amqp.core.Message;
    4. import org.springframework.amqp.core.MessageProperties;
    5. import org.springframework.beans.factory.annotation.Autowired;
    6. import org.springframework.web.bind.annotation.RequestMapping;
    7. import org.springframework.web.bind.annotation.RestController;
    8. import java.io.UnsupportedEncodingException;
    9. import java.util.HashMap;
    10. import java.util.Map;
    11. @RestController
    12. public class PayController {
    13. @Autowired
    14. private AmqpTemplate rabbitTemplate;
    15. @RequestMapping("/pay/queuettl")
    16. public String sendMessage() {
    17. rabbitTemplate.convertAndSend("ex.pay.ttl-waiting", "pay.ttlwaiting", "发送了TTL-WAITING-MESSAGE");
    18. return "queue-ttl-ok";
    19. }
    20. @RequestMapping("/pay/msgttl")
    21. public String sendTTLMessage() throws UnsupportedEncodingException {
    22. MessageProperties properties = new MessageProperties();
    23. properties.setExpiration("5000");
    24. Message message = new Message("发送了WAITINGMESSAGE".getBytes("utf-8"), properties);
    25. rabbitTemplate.convertAndSend("ex.pay.waiting", "pay.waiting", message);
    26. return "msg-ttl-ok";
    27. }
    28. }
  • 相关阅读:
    基于JAVA图书管理系统计算机毕业设计源码+数据库+lw文档+系统+部署
    【机器学习】python实现随机森林
    MES生产管理系统的五个关键组件
    数据库事务到底是什么?
    java在cmd中乱码的问题解决
    CSS盒子模型
    LLVM学习笔记(57)
    2022/09/04 day01:Linux背景
    用动图详细讲解——栈
    百趣代谢组学资讯: 帕金森和肠道菌群有关系?
  • 原文地址:https://blog.csdn.net/weixin_52851967/article/details/128127805