• 【Solution】商品秒杀之Redis缓存与MQ异步优化以及超卖一人一单等问题的解决


    目录

    一、Demo开始前准备

    1、数据库准备

    2、项目准备

    3、全局唯一id生成器

    二、秒杀业务基本实现

    1、秒杀基本业务逻辑

    2、秒杀接口设计

    3、秒杀业务代码实现

    4、超卖问题产生

    三、保证线程安全解决超卖少卖问题

    1、超卖产生的原因

    2、加锁方案:乐观锁

    3、使用乐观锁少卖问题产生

    4、少卖问题产生原因

    5、解决少卖问题

    四、一人一单基本实现

    1、一人一单业务逻辑

    2、代码实现

    3、一人多买问题产生原因

    4、加锁解决一人多买问题时注意点

    5、事务未提交锁提前释放问题

    五、Redis缓存与MQ异步优化

    1、优化思路

    2、保证原子性

    3、封装Java调用Redis执行lua脚本API

    4、MQ相关配置

    1.配置文件

    2.配置类创建队列

    3.封装消费者

    4.封装生产者

    4、最终代码实现


    一、Demo开始前准备

    1、数据库准备

    1. create database super_mall;
    2. user super_mall;
    3. create table orders(
    4. id bigint not null primary key,
    5. user_id bigint not null,
    6. product_id bigint not null,
    7. pay_type int default 1 comment '支付方式 1:余额支付 2:支付宝支付 3:微信支付',
    8. status int not null default 1 comment '订单状态 1:未支付 2:已支付 3:已退款 4:已核销',
    9. pay_time timestamp default current_timestamp,
    10. use_time timestamp default current_timestamp,
    11. ref_time timestamp default current_timestamp,
    12. update_time timestamp default current_timestamp
    13. );
    14. create table product(
    15. id bigint not null primary key,
    16. shop_id bigint not null,
    17. stock int not null comment '商品库存',
    18. product varchar(1024) not null,
    19. start_time timestamp default current_timestamp,
    20. end_time timestamp default current_timestamp,
    21. status int not null default 1 comment '商品状态 1上架 2下架 3缺货',
    22. price bigint not null,
    23. photo varchar(255) default null
    24. );
    25. create table userInfo(
    26. id bigint not null primary key,
    27. openid varchar(255) not null,
    28. nickname varchar(255) not null,
    29. sex int not null,
    30. photo varchar(255) not null,
    31. status int default 1 comment '用户状态 1注册 2禁止'
    32. );
    33. insert into userInfo(id,openid,nickname,sex,photo) values(1,"1","用户222",1,"defualt.jpg");

    主要有三张表:用户表、商品表、订单表,将上述sql脚本执行一遍即可

    2、项目准备

    在准备好数据库之后,我们需要创建一个SpringBoot项目

    【Java】两张图帮你的社区版IDEA创建SpringBoot项目_idea社区版不支持springboot_西瓜霜润喉片的博客-CSDN博客icon-default.png?t=N7T8https://blog.csdn.net/qq_61903414/article/details/130174514?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522169771109016800227471663%2522%252C%2522scm%2522%253A%252220140713.130102334.pc%255Fblog.%2522%257D&request_id=169771109016800227471663&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~blog~first_rank_ecpm_v1~rank_v31_ecpm-1-130174514-null-null.nonecase&utm_term=%E7%A4%BE%E5%8C%BA&spm=1018.2226.3001.4450

    3、全局唯一id生成器

    它是一种在分布式系统下用来生成全局唯一ID的工具,它具有唯一性,高可用,高性能,递增性,安全性。如果我们使用数据库中的自增主键则不能保证安全性。如在订单系统中,我们在数据库中有订单表,如果在该订单表中使用数据库的自增主键,它的id规律性太明显且受单表数量的限制,如果订单数量日益增多,后续添加新的订单表时,他的主键又会重新开始。此处我们使用31位时间戳+32位递增数字组合而成,一个long类型8个字节刚好64比特,64位表示符合位,接下来31位表示时间戳最后32位拼接递增的数字,递增数字基于redis实现

    1. @Component
    2. public class RedisIdWorker {
    3. @Autowired
    4. private StringRedisTemplate stringRedisTemplate;
    5. public long nextId(String prefixKey) {
    6. // 1. 生成时间戳
    7. long timestamp = System.currentTimeMillis();;
    8. // 2. 生成序列号
    9. String day = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
    10. Long count = stringRedisTemplate.opsForValue().increment("icr:" + prefixKey + ":" + day);
    11. // 3.拼接后返回
    12. return timestamp << 32 | count;
    13. }
    14. }

    二、秒杀业务基本实现

    1、秒杀基本业务逻辑

    首先我们需要从前端传回的参数中获取要购买的商品id,然后根据商品id进行查询信息,看库存是否足够,如果足够则扣减库存、生成订单进行下单

    2、秒杀接口设计

    controller层代码

    1. @Api(tags = "商品API")
    2. @RestController
    3. @RequestMapping("/product")
    4. public class ProductController {
    5. @Autowired
    6. private ProductService productService;
    7. @ApiOperation(value = "秒杀")
    8. @PostMapping("/order")
    9. public Return order(@RequestParam("id") @NotNull Long id) {
    10. if (id <= 0) {
    11. return Return.fail(Code.REQUEST_FAIL);
    12. }
    13. return productService.order(id);
    14. }
    15. }

    3、秒杀业务代码实现

    1. @Slf4j
    2. @Service
    3. public class ProductService {
    4. @Autowired
    5. private TokenUtil tokenUtil;
    6. @Autowired
    7. private RabbitMqProduct rabbitMqProduct;
    8. @Autowired
    9. private OrderMapper orderMapper;
    10. @Autowired
    11. private ProductMapper productMapper;
    12. @Autowired
    13. private RedisIdWorker redisIdWorker;
    14. @Autowired
    15. private StringRedisTemplate stringRedisTemplate;
    16. @Autowired
    17. private ObjectMapper objectMapper;
    18. @Transactional
    19. public Return order(Long id) {
    20. // 1. 根据id查询商品信息
    21. Product product = productMapper.queryById(id);
    22. // 2. 判断库存是否足够
    23. Integer stock = product.getStock();
    24. if (stock <= 0) {
    25. return Return.fail(Code.ORDER_STOCK);
    26. }
    27. // 3. 扣减库存
    28. int subtract = productMapper.subtract(id);
    29. if (subtract != 1) {
    30. return Return.fail(Code.ORDER_FAIL);
    31. }
    32. // 4. 生成订单信息
    33. Order order = new Order();
    34. Long orderId = redisIdWorker.nextId("order");
    35. order.setId(orderId);
    36. order.setProductId(id);
    37. Long userId = 1L; // todo: 后续从会话中获取用户信息
    38. order.setUserId(userId);
    39. orderMapper.add(order);
    40. // 5. 返回订单号
    41. return Return.success(Code.ORDER_SUCCESS,orderId);
    42. }
    43. }

    mapper层:

    1. "1.0" encoding="UTF-8"?>
    2. mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
    3. <mapper namespace="com.example.demo.mapper.ProductMapper">
    4. <resultMap id="product" type="com.example.demo.pojo.entity.Product">
    5. <id property="id" column="id"/>
    6. <result property="photo" column="photo"/>
    7. <result property="price" column="price"/>
    8. <result property="product" column="product"/>
    9. <result property="shopId" column="shop_id"/>
    10. <result property="startTime" column="start_time"/>
    11. <result property="endTime" column="end_time"/>
    12. <result property="status" column="status"/>
    13. <result property="stock" column="stock"/>
    14. resultMap>
    15. <insert id="insert">
    16. insert into product(id,shop_id,stock,product,price,photo,start_time,end_time) values(#{id},#{shopId},#{stock},#{product},#{price},#{photo},#{startTime},#{endTime})
    17. insert>
    18. <select id="queryById" resultMap="product">
    19. select * from product where id=#{id}
    20. select>
    21. <update id="subtract">
    22. update product set stock=stock-1 where id=#{id}
    23. update>
    24. mapper>

    4、超卖问题产生

    在上面的代码中,如果商品A在某一时刻的库存仅为1了,此时多个用户的线程访问下单接口,第一个线程查询商品信息后发现库存足够,但是还没有进行扣减库存生成订单操作。这个时候另外的一些线程也去查询了商品信息发现库存足够,于是也去进行下单操作。于是使得库存为负,导致商品超卖

    三、保证线程安全解决超卖少卖问题

    1、超卖产生的原因

    由此可见上述产生线程安全问题是因为判断库存与扣减操作不是原子性的,那么该如何去解决呢?如果使用悲观锁,给判断与扣减库存操作进行加锁操作,那么所有的下单操作都是串行,该接口性能极差用户体验不佳。我们可以使用乐观锁

    2、加锁方案:乐观锁

    乐观锁主要有两种方式,首先可以使用版本号法,维护一个版本号,每次修改都使得版本号+1,在进行修改时判断一下版本号是否相同,如果不同则修改失败。比如有两个线程,第一个线程查询库存为1版本号为1可以进行扣减库存操作,于是在修改时判断一些版本号是否一致,此时发现都是1,于是扣减成功版本号+1变为2,这个时候第二个线程在第一个线程扣减之前查询到库存为1版本号为1,于是也去进行扣减操作,判断版本号时线程2查询的版本号为1但是由于被线程1修改了所以真正的版本号不再是1而是2于是扣减失败。还有一种就是CAS方法,与上述类似,在扣减库存操作时判断查询到的库存与数据库中的库存是否相同,相同的成功反之失败。比如此时有两个线程都查询到数据库中该商品额库存为1,此时线程1执行扣减库存操作,这个时候会比较他当时查询出来的库存1是否与数据库中库存1一样,此处一致则扣减成功,库存变为0,此时线程2再去进行扣减操作的时候进行比较,线程2查询时的库存为1但此时数据库中的库存已经为0了,于是扣减失败。这里我们实现时采用第二种方式,他不需要引入额外的字段:版本号。我们在实现时只需要将扣减库存的sql语句进行修改即可

    update product set stock=stock-1 where id=#{id} and stock=#{stock}

    3、使用乐观锁少卖问题产生

    在上述实现中我们解决了超卖问题,但是新的问题又来了,由于这个秒杀商品所以该接口一定会被大量的线程所访问,如果此时商品库存有200个或者刚开始秒杀。当两个用户访问该接口时,他们都同时查询到了库存为200于是都去进行扣减操作,线程1进行扣减操作时数据库中的库存200与查询出的库存200相同则扣减成功,库存变为199,这个时候线程2再去进行扣减操作时发现他查询出来的库存为200但是数据库中的库存确是199于是下单失败。由此可见库存足够却下单失败

    4、少卖问题产生原因

    在上述描述中我们可以了解到是由于乐观锁实现时导致库存足够却下单失败的原因

    5、解决少卖问题

    商品只要库存足够就可以进行下单,在这里我们可以对上述乐观锁进行修改,将条件判断不在是判断库存是否相同,而是判断库存此时是否大于0也就是是否足够,这个时候就能解决超卖少卖问题

    update product set stock=stock-1 where id=#{id} and stock > 0

    四、一人一单基本实现

    1、一人一单业务逻辑

    在上述秒杀代码的基础上我们需要对下单操作进行限制,一个人只能下单一次,所以我们需要在上述扣减库存操作之前进行判断,判断该用户是否已经下过单,如果已经下单则返回下单失败

    2、代码实现

    1. package com.example.demo.service;
    2. import com.example.demo.component.RedisIdWorker;
    3. import com.example.demo.enums.Code;
    4. import com.example.demo.mapper.OrderMapper;
    5. import com.example.demo.mapper.ProductMapper;
    6. import com.example.demo.pojo.entity.Order;
    7. import com.example.demo.pojo.entity.Product;
    8. import com.example.demo.util.Return;
    9. import org.springframework.beans.factory.annotation.Autowired;
    10. import org.springframework.stereotype.Service;
    11. import org.springframework.transaction.annotation.Transactional;
    12. import java.time.LocalDateTime;
    13. @Service
    14. public class ProductService {
    15. @Autowired
    16. private OrderMapper orderMapper;
    17. @Autowired
    18. private ProductMapper productMapper;
    19. @Autowired
    20. private RedisIdWorker redisIdWorker;
    21. @Transactional
    22. public Return order(Long id) {
    23. // 1. 查询商品
    24. Product product = productMapper.queryById(id);
    25. // 4. 判断库存是否足够
    26. if (product.getStock() <= 0) {
    27. return Return.fail(Code.ORDER_STOCK);
    28. }
    29. // 5. 判断订单是否存在(用户是否已下单)
    30. // 5.1 获取用户id
    31. Long userId = 1L; // TODO: 2023/10/14 后续从Token获取
    32. // 5.2 根据商品id与用户id查询订单表
    33. int count = orderMapper.queryByUserIdAndId(userId,product.getId());
    34. if (count != 0) {
    35. return Return.fail(Code.ORDER_TWO);
    36. }
    37. // 6. 扣减库存
    38. int subtract = productMapper.subtract(id);
    39. if (subtract != 1) {
    40. return Return.fail(Code.ORDER_FAIL);
    41. }
    42. // 7. 生成订单
    43. // 7.1 订单id
    44. Order order = new Order();
    45. order.setId(redisIdWorker.nextId("order"));
    46. // 7.2 用户id
    47. order.setUserId(userId);
    48. // 7.3 商品id
    49. order.setProductId(product.getId());
    50. // 7.4 入库
    51. int isSuccess = orderMapper.add(order);
    52. if (isSuccess != 1) {
    53. return Return.fail(Code.ORDER_FAIL);
    54. }
    55. // 9. 返回订单id
    56. return Return.success(Code.ORDER_SUCCESS,order.getId());
    57. }
    58. }

    3、一人多买问题产生原因

    上述代码的实现如果有用户的多个线程来访问该接口,此时同一个用户有两个线程来访问该接口,线程1查询完订单表没有该用户购买该商品订单信息去进行扣减库存生成订单操作之前,线程2也查询完订单表也没有该用户购买该商品的订单,于是也去进行扣减库存生成订单,于是同一个用户购买了多次,并没有达到一人一单的效果。产生这一问题是因为查询订单与生成订单操作并不是原子性的,于是这里我们可以采用加锁的办法去实现

    4、加锁解决一人多买问题时注意点

    那么我们如何去加锁呢?我们需要对查询订单信息与生成订单的代码进行加锁操作,那么锁对象如何是什么呢?这里如果直接使用类对象或者类属性进行加锁,那么不同用户的线程访问时也需要串行执行,所以我们不能无脑加锁,此处产生线程安全问题的原因是同一用户的不同线程,所以我们可以对该用户的id进行加锁,只有同一个用户的不同线程访问时才会有锁竞争。此处还要注意的是用户的id他是一个Long类型的数据,同一用户的不同线程每次访问时他的id在堆中的地址并不是一致的,每次都会发生变化,那么锁对象也就没有意义,我们可以将用户id转为字符串并使用intern()方法将他存入字符串常量池,这样同一个用户锁对象的地址就不会发送变化。此处我们将用户下单操作抽取为方法,在上述代码中进行完库存判断后直接调用该方法即可

    1. @Transactional
    2. private Return createOrder(Long id) {
    3. // 5. 判断订单是否存在(用户是否已下单)
    4. // 5.1 获取用户id
    5. Long userId = 1L; // TODO: 2023/10/14 后续从Token获取
    6. synchronized (userId.toString().intern()) {
    7. // 5.2 根据商品id与用户id查询订单表
    8. int count = orderMapper.queryByUserIdAndId(userId, id);
    9. if (count != 0) {
    10. return Return.fail(Code.ORDER_TWO);
    11. }
    12. // 6. 扣减库存
    13. int subtract = productMapper.subtract(id);
    14. if (subtract != 1) {
    15. return Return.fail(Code.ORDER_FAIL);
    16. }
    17. // 7. 生成订单
    18. // 7.1 订单id
    19. Order order = new Order();
    20. order.setId(redisIdWorker.nextId("order"));
    21. // 7.2 用户id
    22. order.setUserId(userId);
    23. // 7.3 商品id
    24. order.setProductId(id);
    25. // 7.4 入库
    26. int isSuccess = orderMapper.add(order);
    27. if (isSuccess != 1) {
    28. return Return.fail(Code.ORDER_FAIL);
    29. }
    30. // 9. 返回订单id
    31. return Return.success(Code.ORDER_SUCCESS, order.getId());
    32. }
    33. }

    5、事务未提交锁提前释放问题

    上述代码存在一个新的问题,当方法执行完成锁会被释放,但是此时事务还没有提交,数据库中还是没有订单信息,此时该用户的其他线程就会获取到锁,判断订单表中没有该用户购买该商品的信息,于是进行下单操作,产生 问题。我们只需要让事务提交发生在锁释放之前即可,将锁的粒度进行修改

    1. package com.example.demo.service;
    2. import com.example.demo.component.RedisIdWorker;
    3. import com.example.demo.enums.Code;
    4. import com.example.demo.mapper.OrderMapper;
    5. import com.example.demo.mapper.ProductMapper;
    6. import com.example.demo.pojo.entity.Order;
    7. import com.example.demo.pojo.entity.Product;
    8. import com.example.demo.util.Return;
    9. import org.springframework.aop.framework.AopContext;
    10. import org.springframework.beans.factory.annotation.Autowired;
    11. import org.springframework.stereotype.Service;
    12. import org.springframework.transaction.annotation.Transactional;
    13. import java.time.LocalDateTime;
    14. @Service
    15. public class ProductService {
    16. @Autowired
    17. private OrderMapper orderMapper;
    18. @Autowired
    19. private ProductMapper productMapper;
    20. @Autowired
    21. private RedisIdWorker redisIdWorker;
    22. public Return order(Long id) {
    23. // 1. 查询商品
    24. Product product = productMapper.queryById(id);
    25. // 2. 判断秒杀是否开始
    26. if (product.getStartTime().isAfter(LocalDateTime.now())) {
    27. return Return.fail(Code.ORDER_START);
    28. }
    29. // 5. 创建订单
    30. Long userId = 1L; // TODO: 2023/10/14 后续从Token获取
    31. synchronized (userId.toString().intern()) {
    32. ProductService proxy = (ProductService) AopContext.currentProxy();
    33. return proxy.createOrder(userId,id);
    34. }
    35. }
    36. @Transactional
    37. private Return createOrder(Long userId, Long id) {
    38. // 5. 判断订单是否存在(用户是否已下单)
    39. // 5.1 获取用户id
    40. // 5.2 根据商品id与用户id查询订单表
    41. int count = orderMapper.queryByUserIdAndId(userId, id);
    42. if (count != 0) {
    43. return Return.fail(Code.ORDER_TWO);
    44. }
    45. // 6. 扣减库存
    46. int subtract = productMapper.subtract(id);
    47. if (subtract != 1) {
    48. return Return.fail(Code.ORDER_FAIL);
    49. }
    50. // 7. 生成订单
    51. // 7.1 订单id
    52. Order order = new Order();
    53. order.setId(redisIdWorker.nextId("order"));
    54. // 7.2 用户id
    55. order.setUserId(userId);
    56. // 7.3 商品id
    57. order.setProductId(id);
    58. // 7.4 入库
    59. int isSuccess = orderMapper.add(order);
    60. if (isSuccess != 1) {
    61. return Return.fail(Code.ORDER_FAIL);
    62. }
    63. // 9. 返回订单id
    64. return Return.success(Code.ORDER_SUCCESS, order.getId());
    65. }
    66. }

    五、Redis缓存与MQ异步优化

    1、优化思路

    上述代码中我们解决了线程安全的问题,但是由于秒杀接口是一个被高并发访问的接口,而上述的实现中数据库读写操作太多,这样在高并发的情况下对数据库的压力太大,此时我们可以对该代码进行分析优化,上述代码其实主要分为两步:1.进行数据库读操作判断用户是否有下单的权限 2.如果有则进行数据库写操作扣减库存插入订单 这个时候我们可以将数据库读操作使用redis做缓存处理来减缓数据库的压力,将库存信息与订单信息进行缓存处理,请求到达服务器去查询缓存,如果有下单权限,我们可以采用MQ异步地进行数据库写操作来减缓数据库压力。

    首先我们需要思考在redis中我们需要做什么?首先是对商品库存的查询,判断商品的库存是否足够其次需要判断该用户是否已经下单。在商品库存查询时我们可以使用redis中的string类型来处理,那如何判断用户是否已经下过单,我们可以使用set数据类型,他的特点是value都是唯一的,我们可以以商品的id作为key的组成,以下单用户的id为value存入其中,我们只需要根据商品id去查询该set中是否有该用户的id如果有就是已经购买,则不能继续购买,没有则将redis缓存中的库存扣减并在set集合中添加该用户的id,那么在redis中判断库存是否足够、判断用户是否下单与扣减库存插入用户id四个命令不是原子性的,会存在线程安全问题。这个时候我们可以通过lua来保证这四个命令的原子性。最后我们需要通过MQ异步的将扣减库存与生成订单操作入库

    2、保证原子性

    1. -- 获取参数
    2. -- 1.商品id
    3. local productId = ARGV[1];
    4. -- 2.用户id
    5. local userId = ARGV[2];
    6. -- 构造缓存的key
    7. -- 1.订单key
    8. local orderKey = "order:order:" .. productId;
    9. -- 2.库存id
    10. local stockKey = "order:stock:" .. productId;
    11. -- 判断库存是否足够
    12. if (tonumber(redis.call('get', stockKey)) <= 0) then
    13. -- 库存不足 返回1
    14. return 1;
    15. end
    16. -- 判断是否下过单
    17. if (redis.call('sismember',orderKey,userId) == 1) then
    18. -- 存在 返回2
    19. return 2;
    20. end
    21. -- 满足扣减库存
    22. redis.call('incrby',stockKey,-1);
    23. -- 下单:缓存订单中加入该用户
    24. redis.call("sadd",orderKey,userId);
    25. -- 返回0
    26. return 0

    3、封装Java调用Redis执行lua脚本API

    1. /**
    2. * 封装Java调用redis执行lua脚本的API
    3. */
    4. public class LuaUtil {
    5. /**
    6. *
    7. * @param type 返回类型
    8. * @param luaScriptPath 脚本路径
    9. * @param stringRedisTemplate redisTemplate
    10. * @param keys lua脚本所需要的keys
    11. * @param args lua脚本所需要的args
    12. * @param 返回值T
    13. * @return 返回lua执行结果
    14. */
    15. public static T execute(Class type,
    16. String luaScriptPath,
    17. StringRedisTemplate stringRedisTemplate,
    18. List keys,Object... args) {
    19. // 1. 初始化DefaultRedisScript
    20. DefaultRedisScript redisScript = new DefaultRedisScript<>();
    21. redisScript.setResultType(type);
    22. redisScript.setLocation(new ClassPathResource(luaScriptPath));
    23. // 2. 执行lua脚本
    24. T result = stringRedisTemplate.execute(redisScript, keys, args);
    25. // 3. 返回结果
    26. return result;
    27. }

    4、MQ相关配置

    1.配置文件

    配置文件中开启confirm、return、ack模式确保消息可靠性

    rabbitmq:
        host: 127.0.0.1
        port: 5672
        username: admin
        password: admin
        virtual-host: /super_mall
        publisher-confirm-type: correlated #??????
        publisher-returns: true            #??????
        listener:
          simple:
            acknowledge-mode: manual       #??????
            prefetch: 10                   #???????10????????????????10?
            retry:
              enabled: true                #????
              max-attempts: 4              #??????
              max-interval: 1000s          #??????

    2.配置类创建队列

    1. package com.example.demo.config;
    2. import org.springframework.amqp.core.*;
    3. import org.springframework.beans.factory.annotation.Qualifier;
    4. import org.springframework.context.annotation.Bean;
    5. import org.springframework.context.annotation.Configuration;
    6. @Configuration
    7. public class RabbitMQConfig {
    8. public static final String ORDER_KILL_QUEUE = "order:kill";
    9. public static final String ORDER_KILL_EXCHANGE = "order:change";
    10. public static final String ORDER_KILL_KEY = "order:kill:async";
    11. // 队列与交换机
    12. @Bean
    13. public Queue orderQueue() {
    14. // 创建队列,并设置持久化
    15. return QueueBuilder.durable(ORDER_KILL_QUEUE).build();
    16. }
    17. @Bean
    18. public DirectExchange orderExchange() {
    19. // 创建直连交换机,并设置持久化
    20. return ExchangeBuilder.directExchange(ORDER_KILL_EXCHANGE).durable(true).build();
    21. }
    22. // 绑定队列与交换机
    23. @Bean
    24. public Binding dlxBind(@Qualifier("orderQueue") Queue dlxQueue, @Qualifier("orderExchange") DirectExchange dlxExchange) {
    25. return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(ORDER_KILL_KEY);
    26. }
    27. }

    3.封装消费者

    消费者开启了ack模式

    1. @Slf4j
    2. @Component
    3. public class RabbitMqConsumer {
    4. @Autowired
    5. private ObjectMapper objectMapper;
    6. @Autowired
    7. private RabbitTemplate rabbitTemplate;
    8. @Autowired
    9. private OrderMapper orderMapper;
    10. @Autowired
    11. private ProductMapper productMapper;
    12. @RabbitListener(queues = RabbitMQConfig.ORDER_KILL_QUEUE)
    13. public void createOrder(Message message, Channel channel) throws IOException {
    14. long tag = message.getMessageProperties().getDeliveryTag();
    15. try {
    16. // 1. 获取消息
    17. Order order = objectMapper.readValue(message.getBody(), Order.class);
    18. if (order == null) {
    19. log.error("消息为空发送失败");
    20. throw new Exception("消息格式错误");
    21. }
    22. // 2. 消费消息
    23. int subtract = productMapper.subtract(order.getProductId());
    24. int add = orderMapper.add(order);
    25. if (subtract != 1 || add != 1) {
    26. throw new Exception("入库失败,消息重发");
    27. }
    28. // 3. 向MQ服务器发生ack
    29. log.info("订单创建成功:{}",order.toString());
    30. channel.basicAck(tag, true);
    31. } catch (Exception e) {
    32. // 4. 应答消息处理失败,允许重复投递
    33. channel.basicNack(tag, true, true);
    34. }
    35. }
    36. }

    4.封装生产者

    在这里需要注意可能会报出一下错误

    java.lang.IllegalStateException: Only one ConfirmCallback is supported by each RabbitTemplate
        at org.springframework.util.Assert.state(Assert.java:76) ~[spring-core-5.3.26.jar:5.3.26]
        at org.springframework.amqp.rabbit.core.RabbitTemplate.setConfirmCallback(RabbitTemplate.java:469) ~[spring-rabbit-2.4.11.jar:2.4.11]
        at com.example.demo.component.RabbitMqProduct.send(RabbitMqProduct.java:34) ~[classes/:na]
        at com.example.demo.service.ProductService.order(ProductService.java:68) ~[classes/:na]
        at com.example.demo.controller.ProductController.order(ProductController.java:23) ~[classes/:na]
        at sun.reflect.GeneratedMethodAccessor14.invoke(Unknown Source) ~[na:na]
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_192]
        at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_192]
     

    报错"每个RabbitTemplate只支持一个ConfirmCallback"的原因是在 `send()` 方法中多次设置了相同的 ConfirmCallback 实例给同一个 RabbitTemplate 对象。

    在每次调用 send()方法时,都会创建一个新的 ConfirmCallback实例并设置给 RabbitTemplate` ,这导致了多个 ConfirmCallback 被设置到同一个 RabbitTemplate上,从而触发了错误。

    为了解决这个问题,可以将 ConfirmCallback的设置提取到类的构造函数中,确保每次创建 RabbitMqProduct 对象时都会创建一个新的 ConfirmCallback实例,并将其设置给相应的 RabbitTemplate 对象。这样每个 RabbitMqProduct 对象都会有自己独立的 ConfirmCallback 。

    1. package com.example.demo.component;
    2. import com.example.demo.config.RabbitMQConfig;
    3. import com.fasterxml.jackson.databind.ObjectMapper;
    4. import lombok.SneakyThrows;
    5. import lombok.extern.slf4j.Slf4j;
    6. import org.springframework.amqp.rabbit.core.RabbitTemplate;
    7. import org.springframework.beans.factory.annotation.Autowired;
    8. import org.springframework.context.annotation.Scope;
    9. import org.springframework.stereotype.Component;
    10. import java.util.Map;
    11. @Component
    12. @Scope("prototype")
    13. @Slf4j
    14. public class RabbitMqProduct {
    15. private final RabbitTemplate rabbitTemplate;
    16. private final ObjectMapper objectMapper;
    17. private Object message;
    18. @Autowired
    19. public RabbitMqProduct(RabbitTemplate rabbitTemplate, ObjectMapper objectMapper) {
    20. this.rabbitTemplate = rabbitTemplate;
    21. this.objectMapper = objectMapper;
    22. // 创建一个 ConfirmCallback 实例
    23. RabbitTemplate.ConfirmCallback confirmCallback = (correlationData, ack, cause) -> {
    24. if (!ack) {
    25. // 如果消息发送失败,则重新发送
    26. send(RabbitMQConfig.ORDER_KILL_EXCHANGE, RabbitMQConfig.ORDER_KILL_KEY,message);
    27. }
    28. log.info("消息重送成功");
    29. };
    30. // 设置 ConfirmCallback
    31. this.rabbitTemplate.setConfirmCallback(confirmCallback);
    32. // 当消息无法路由时返回
    33. this.rabbitTemplate.setMandatory(true);
    34. this.rabbitTemplate.setReturnsCallback(returnedMessage -> {
    35. // 如果消息无法路由,则重新发送
    36. send(RabbitMQConfig.ORDER_KILL_EXCHANGE, RabbitMQConfig.ORDER_KILL_KEY,returnedMessage.getMessage());
    37. });
    38. }
    39. @SneakyThrows
    40. public void send(String exchange,String routingKey, T message) {
    41. // 将消息内容转化为JSON格式并发送
    42. String json = objectMapper.writeValueAsString(message);
    43. rabbitTemplate.convertAndSend(exchange, routingKey, json);
    44. }
    45. public void setMessage(Object message) {
    46. this.message = message;
    47. }
    48. }

    4、最终代码实现

    1. @SneakyThrows
    2. public Return order(Long id) {
    3. // 1. 执行lua脚本
    4. Long userId = 1L; // TODO: 2023/10/15 后续修改为会话获取
    5. Long result = LuaUtil.execute(Long.class, "./lua/order.lua",
    6. stringRedisTemplate, Collections.emptyList(),
    7. id.toString(), userId.toString());
    8. // 2. 判断lua鉴权结果
    9. int isSuccess = result.intValue();
    10. if (isSuccess != 0) {
    11. // 2.1 下单权限不足
    12. return Return.fail(isSuccess == 1 ? Code.ORDER_STOCK : Code.ORDER_TWO);
    13. }
    14. // 3. MQ异步入库
    15. // 3.1 构造Order对象
    16. Order order = new Order();
    17. order.setUserId(userId);
    18. order.setProductId(id);
    19. Long orderId = redisIdWorker.nextId("order");
    20. order.setId(orderId);
    21. product.send(RabbitMQConfig.ORDER_KILL_EXCHANGE,RabbitMQConfig.ORDER_KILL_KEY,order);
    22. log.info("下单成功,消息进入队列准备入库:{}",order.toString());
    23. // 4. 返回订单号
    24. return Return.success(Code.ORDER_SUCCESS,orderId);
    25. }

  • 相关阅读:
    第一个Vue程序
    html前端的几种加密/解密方式
    MaTiJi - MT3143 - 试管装液
    PDF转成清晰长图
    Docker 容器中运行 Kibana
    web自动化测试——跨平台设备管理方案Selenium Grid
    原生js html5 canvas制作flappy bird压扁小鸟游戏
    java Swing学生成绩管理系统【项目源码+数据库脚本】
    UrlBasedViewResolver类简介说明
    蓝皮书首发!车载毫米波雷达赛道进入「大变革周期」,技术/产品/市场并行升级
  • 原文地址:https://blog.csdn.net/qq_61903414/article/details/133932099