
在京东下单,订单创建成功,等待支付,一般会给30分钟的时间,开始倒计时。如果在这段时间内用户没有支付,则默认订单取消。
该如何实现?
- SimpleDateFormat simpleDateFormat = new SimpleDateFormat("HH:mm:ss");
- Timer timer = new Timer();
- TimerTask timerTask = new TimerTask() {
- @Override
- public void run() {
- System.out.println("用户没有付款,交易取消:" + simpleDateFormat.format(new Date(System.currentTimeMillis())));
- timer.cancel();
- }
- };
- System.out.println("等待用户付款:" + simpleDateFormat.format(new Date(System.currentTimeMillis())));
- // 10秒后执行timerTask
- timer.schedule(timerTask, 10 * 1000);
- SimpleDateFormat format = new SimpleDateFormat("HH:mm:ss");
- // 线程工厂
- ThreadFactory factory = Executors.defaultThreadFactory();
- // 使用线程池
- ScheduledExecutorService service = new ScheduledThreadPoolExecutor(10, factory);
- System.out.println("开始等待用户付款10秒:" + format.format(new Date()));
- service.schedule(new Runnable() {
- @Override
- public void run() {
- System.out.println("用户未付款,交易取消:" + format.format(new Date()));
- }// 等待10s 单位秒
- }, 10, TimeUnit.SECONDS);
TTL,Time to Live 的简称,即过期时间。
RabbitMQ 可以对消息和队列两个维度来设置TTL。
任何消息中间件的容量和堆积能力都是有限的,如果有一些消息总是不被消费掉,那么需要有一种过期的机制来做兜底。
目前有两种方法可以设置消息的TTL。
如果两种方法一起使用,则消息的TTL 以两者之间较小数值为准。通常来讲,消息在队列中的生存时间一旦超过设置的TTL 值时,就会变成“死信”(Dead Message),消费者默认就无法再收到该消息。当然,“死信”也是可以被取出来消费的
- package com.lagou.rabbitmq.demo;
-
- import com.rabbitmq.client.*;
-
- import java.nio.charset.StandardCharsets;
- import java.util.HashMap;
- import java.util.Map;
-
- public class Producer {
- public static void main(String[] args) throws Exception {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setUri("amqp://root:123456@192.168.80.121:5672/%2f");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
-
- Map
arguments = new HashMap<>(); - // 消息队列中消息的过期时间
- arguments.put("x-message-ttl", 10 * 1000);
- // 如果消息队列中没有消费者,则10s后消息过期,队列将会被自动删除
- arguments.put("x-expires", 60 * 1000);
-
- channel.queueDeclare("queue.ttl.waiting",
- true,
- false,
- false,
- arguments);
-
- channel.exchangeDeclare("ex.ttl.waiting",
- BuiltinExchangeType.DIRECT,
- true,
- false,
- false,
- null);
-
- channel.queueBind("queue.ttl.waiting", "ex.ttl.waiting", "key.ttl.waiting");
-
- AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
- .contentEncoding("utf-8")
- .deliveryMode(2) // 设置消息持久化,2代表设置消息持久化
- .build();
-
- channel.basicPublish("ex.ttl.waiting", "key.ttl.waiting", properties, "等待的订单号".getBytes(StandardCharsets.UTF_8));
-
- channel.close();
- connection.close();
- }
- }
此外,还可以通过命令行方式设置全局TTL,执行如下命令:
rabbitmqctl set_policy TTL ".*" '{"message-ttl":30000}' --apply-to queues
默认规则:
注意理解message-ttl 、x-expires 这两个参数的区别,有不同的含义。但是这两个参数属性都遵循上面的默认规则。一般TTL相关的参数单位都是毫秒(ms)
(1)pom.xml添加依赖
- <dependencies>
- <dependency>
- <groupId>org.springframework.bootgroupId>
- <artifactId>spring-boot-starter-amqpartifactId>
- dependency>
- <dependency>
- <groupId>org.springframework.bootgroupId>
- <artifactId>spring-boot-starter-webartifactId>
- dependency>
- <dependency>
- <groupId>org.springframework.bootgroupId>
- <artifactId>spring-boot-starter-testartifactId>
- <scope>testscope>
- <exclusions>
- <exclusion>
- <groupId>org.junit.vintagegroupId>
- <artifactId>junit-vintage-engineartifactId>
- exclusion>
- exclusions>
- dependency>
- <dependency>
- <groupId>org.springframework.amqpgroupId>
- <artifactId>spring-rabbit-testartifactId>
- <scope>testscope>
- dependency>
- dependencies>
(2)application.properties添加rabbitmq连接信息
- spring.application.name=ttl
- spring.rabbitmq.host=node1
- spring.rabbitmq.virtual-host=/
- spring.rabbitmq.username=root
- spring.rabbitmq.password=123456
- spring.rabbitmq.port=5672
(3)主入口类
- package com.lagou.rabbitmq.demo;
- import org.springframework.boot.SpringApplication;
- import org.springframework.boot.autoconfigure.SpringBootApplication;
- @SpringBootApplication
- public class RabbitmqDemo {
- public static void main(String[] args) {
- SpringApplication.run(RabbitmqDemo07.class, args);
- }
- }
(4)RabbitConfig类
- package com.lagou.rabbitmq.demo.config;
-
- import org.springframework.amqp.core.*;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- import java.util.HashMap;
- import java.util.Map;
-
- @Configuration
- public class RabbitConfig {
- @Bean
- public Queue queueTTLWaiting() {
- Map
props = new HashMap<>(); - // 对于该队列中的消息,设置都等待10s
- props.put("x-message-ttl", 10000);
- Queue queue = new Queue("q.pay.ttl-waiting", false, false, false, props);
- return queue;
- }
-
- @Bean
- public Queue queueWaiting() {
- Queue queue = new Queue("q.pay.waiting", false, false, false);
- return queue;
- }
-
- @Bean
- public Exchange exchangeTTLWaiting() {
- DirectExchange exchange = new DirectExchange("ex.pay.ttlwaiting", false, false);
- return exchange;
- }
-
- /**
- * 该交换器使用的时候,需要给每个消息设置有效期
- *
- * @return
- */
- @Bean
- public Exchange exchangeWaiting() {
- DirectExchange exchange = new DirectExchange("ex.pay.waiting", false, false);
- return exchange;
- }
-
- @Bean
- public Binding bindingTTLWaiting() {
- return BindingBuilder.bind(queueTTLWaiting()).to(exchangeTTLWaiting()).with("pay.ttl - waiting").noargs();
- }
-
- @Bean
- public Binding bindingWaiting() {
- return BindingBuilder.bind(queueWaiting()).to(exchangeWaiting()).with("pay.waiting").noargs();
- }
- }
(5)PayController类
- package com.lagou.rabbitmq.demo.controller;
-
- import org.springframework.amqp.core.AmqpTemplate;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.core.MessageProperties;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.RestController;
-
- import java.io.UnsupportedEncodingException;
- import java.util.HashMap;
- import java.util.Map;
-
- @RestController
- public class PayController {
- @Autowired
- private AmqpTemplate rabbitTemplate;
-
- @RequestMapping("/pay/queuettl")
- public String sendMessage() {
- rabbitTemplate.convertAndSend("ex.pay.ttl-waiting", "pay.ttlwaiting", "发送了TTL-WAITING-MESSAGE");
- return "queue-ttl-ok";
- }
-
- @RequestMapping("/pay/msgttl")
- public String sendTTLMessage() throws UnsupportedEncodingException {
- MessageProperties properties = new MessageProperties();
- properties.setExpiration("5000");
- Message message = new Message("发送了WAITINGMESSAGE".getBytes("utf-8"), properties);
- rabbitTemplate.convertAndSend("ex.pay.waiting", "pay.waiting", message);
- return "msg-ttl-ok";
- }
- }