• 【redis】springboot 用redis stream实现MQ消息队列 考虑异常ack重试场景


    redis stream是redis5引入的特性,一定程度上借鉴了kafka等MQ的设计,部署的redis版本必须 >= 5

    本文主要讲的是思路,结合简单的源码分析(放心,无需深入大量源码);讲述在redis stream文档缺乏,网上资料欠缺,gpt回答不上来的情况下,博主是如何用两三天的时间 从没接触过redis stream 到分析完成了redis stream mq功能。
    博主通过分析源码给spring提出了设计不足处,spring最终也承认了体验不好并计划修改源码 。博主始终认为 有明确的思路 才能知道什么代码是正确的 能复制拿来用,什么代码只是单纯跑起来demo的 绝对达不到生产级别。
    本文源自csdn博主:孟秋与你 ,博主虽才疏学浅 却也是在资料极少的情况下 ,辛苦研究源码、整理思路 撰写的本文,转载请声明出处。


    (本文基于springboot3.3 jdk17 redis6环境,
    理论上springboot2 redis5也是通用教程 可能会有细微的api差异 稍微分析一下源码方法都能处理)

    redisTemplate API的熟悉

    我们在操作redis的时候 通常是使用spring-data-redis提供的redisTemplate或者jedis 本文以redisTemplate为例。
    (实际业务场景可能需要考虑用jedis替换 因为mq通常在数据量、并发量都大的场景;redisTemplate的优势在于和springboot的完美集成,且不需要考虑通过连接池来管理线程安全问题)

    用过redisTemplate的同学应该都会自己封装一下工具类,因为redisTemplate封装的不够好,不管怎么样 我们都需要先看看这个类

    redisTemplate.opsForHash()redisTemplate.opsForValue()

    各位应该很熟悉了, stream是一种新引入的格式,那么我们直接在RedisTemplate类里面搜stream就好了,正常都会有对应API
    (没对应API那就是spring版本太老了 spring那个老版本出来的时候 redis还没出到5 )

    搜到了opsForStream()方法在这里插入图片描述 继续查看方法 如下图: 在这里插入图片描述

    这里说明一下,redis的streamKey就类似mq的topic, group是消费者组,cousumer是消费者,acknowledge即ack 应答机制 告诉mq已经成功消费了,claim是强制将消息转至其它消费者 通常用于消费失败/多次消费失败的场景,pending存放的是未ack的消息 就比如消费某个消息时 出现了异常 没能执行到ack 这些消息就会放在pending list 确保消息不丢失。

    通过api,加上我们掌握的mq基本知识,大概就能理解是怎么一回事了。demo搭建不难,但是代码要上生产,我们就必须考虑消息消费失败了怎么办 该如何重试,也就是说重点的api在acknowledge和pending上面。

    一个简单的封装

    	
    	@Component
    	public class RedisStreamUtil {
    	    @Autowired
    	    private RedisTemplate<String, Object> redisTemplate;
    	
    	    /**
    	     * 创建消费组
    	     *
    	     * @param key   键名称
    	     * @param group 组名称
    	     * @return {@link String}
    	     */
    	    public String createGroup(String key, String group) {
    	        return redisTemplate.opsForStream().createGroup(key, group);
    	    }
    	
    	    /**
    	     * 获取消费者信息
    	     *
    	     * @param key   键名称
    	     * @param group 组名称
    	     * @return {@link StreamInfo.XInfoConsumers}
    	     */
    	    public StreamInfo.XInfoConsumers queryConsumers(String key, String group) {
    	        return redisTemplate.opsForStream().consumers(key, group);
    	    }
    	
    	    /**
    	     * 查询组信息
    	     *
    	     * @param key 键名称
    	     * @return
    	     */
    	    public StreamInfo.XInfoGroups queryGroups(String key) {
    	        return redisTemplate.opsForStream().groups(key);
    	    }
    	
    	    /**
    	     * 添加Map消息
    	     * @param key
    	     * @param value
    	     */
    	    public String addMap(String key, Map<String, Object> value) {
    	        return redisTemplate.opsForStream().add(key, value).getValue();
    	    }
    	
    	    /**
    	     * 读取消息
    	     * @param key
    	     */
    	    public List<MapRecord<String, Object, Object>> read(String key) {
    	        return redisTemplate.opsForStream().read(StreamOffset.fromStart(key));
    	    }
    	
    	    /**
    	     * 确认消费
    	     * @param key
    	     * @param group
    	     * @param recordIds
    	     */
    	    public Long ack(String key, String group, String... recordIds) {
    	        return redisTemplate.opsForStream().acknowledge(key, group, recordIds);
    	    }
    	
    	    /**
    	     * 删除消息
    	     * 当一个节点的所有消息都被删除,那么该节点会自动销毁
    	     * @param key
    	     * @param recordIds
    	     */
    	    public Long del(String key, String... recordIds) {
    	        return redisTemplate.opsForStream().delete(key, recordIds);
    	    }
    	
    	    /**
    	     *  判断是否存在key
    	     * @param key
    	     */
    	    public boolean hasKey(String key) {
    	        Boolean flag= redisTemplate.hasKey(key);
    	        return flag != null && flag;
    	    }
    	
    	}
    	
    

    注意:会有循环依赖的问题,如果没有那就是springboot版本太低,低版本默认是开启允许循环依赖的,高版本默认不允许(2.7已经不允许了 具体版本不记得了)

    解决方法1: 在yml配置里面允许循环依赖

    server:
      port: 8586
    
    spring:
      application:
        name: springboot3-demo
      data:
        redis:
          port: 6579
          host: 192.168.1.1
          password: xxxxxxx
          database: 1
          lettuce:
            pool:
              max-wait: 5000ms
              max-active: 1000
    
      datasource:
        driver-class-name: com.mysql.cj.jdbc.Driver
        url: jdbc:mysql://localhost:3306/test?characterEncoding=utf8&serverTimezone=UTC&rewriteBatchedStatements=true
        type: com.alibaba.druid.pool.DruidDataSource
        username: root
        password: root
    # 允许循环依赖
      main:
        allow-circular-references: true
    
    

    解决方法2:该工具类不交给spring托管 代码如下图所示
    在spring bean初始化的时候 把redisTemplate bean赋值到工具类即可,工具类方法变成静态方法
    在这里插入图片描述

    配置

    redis mq config

    以下代码展示了如何配置多个生产者,也是这个代码最难写, 尤其是Subscription的创建 不能用spring官方文档里面提供的demo!

    package com.qiuhuanhen.springboot3demo.redis.mq.config;
    
    import cn.hutool.core.map.MapUtil;
    import cn.hutool.core.util.StrUtil;
    import com.fasterxml.jackson.annotation.JsonAutoDetect;
    import com.fasterxml.jackson.annotation.PropertyAccessor;
    import com.fasterxml.jackson.databind.ObjectMapper;
    import com.qiuhuanhen.springboot3demo.redis.mq.consumer.RedisConsumer;
    import com.qiuhuanhen.springboot3demo.redis.mq.consumer.listener.RedisConsumersListener;
    import com.qiuhuanhen.springboot3demo.redis.utils.RedisStreamUtil;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.data.redis.connection.RedisConnectionFactory;
    import org.springframework.data.redis.connection.RedisServerCommands;
    import org.springframework.data.redis.connection.stream.Consumer;
    import org.springframework.data.redis.connection.stream.MapRecord;
    import org.springframework.data.redis.connection.stream.ReadOffset;
    import org.springframework.data.redis.connection.stream.StreamOffset;
    import org.springframework.data.redis.core.RedisCallback;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
    import org.springframework.data.redis.serializer.StringRedisSerializer;
    import org.springframework.data.redis.stream.StreamListener;
    import org.springframework.data.redis.stream.StreamMessageListenerContainer;
    import org.springframework.data.redis.stream.Subscription;
    
    import javax.annotation.Resource;
    import java.time.Duration;
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Objects;
    import java.util.Properties;
    import java.util.concurrent.ThreadPoolExecutor;
    
    /**
     * @author csdn:孟秋与你
     */
    @Configuration
    @Slf4j
    public class RedisStreamConfig{
    
        @Resource
        private RedisStreamUtil redisStreamUtil;
    
        @Autowired
        private ThreadPoolExecutor threadPoolExecutor;
    
        @Autowired
        private RedisMqProperties redisMqProperties;
    
        @Autowired
        private RedisConsumersListener redisConsumersListener;
    
        /**
         * 多个订阅者对应同一个key时 会几乎同时获取到同一条消息
         * 如果两个消费组共用一个监听 消息就会在该监听器并发出现两次 即消息重复问题
         * 如果有N个消费组共用监听 就会有N的并发 这对去重要求很高
         * 推荐做法是 key-group 1对1  不同业务去使用不同的key就好了
         * 如果硬要 1对N 这对代码的封装性会有破坏,几乎就是把业务代码分别直接/间接写在不同的监听器里面了
         * 不同监听器知道哪些消息是属于它的 可以用简单的map映射 如下
         */
    //    static Map reflect = new HashMap<>();
    //    static {
    //        // xxxListener implements StreamListener
    //        reflect.put("orderGroup", new OrderOrderListener());
    //        reflect.put("goodsGroup", new OrderGoodsListener());
    //        // ... put other
    //    }
    
        /**
         * redis序列化
         *
         * @param redisConnectionFactory
         * @return {@code RedisTemplate}
         */
        @Bean
        public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
            RedisTemplate<String, Object> template = new RedisTemplate<>();
            template.setConnectionFactory(redisConnectionFactory);
            ObjectMapper om = new ObjectMapper();
            om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
            om.activateDefaultTyping(om.getPolymorphicTypeValidator(), ObjectMapper.DefaultTyping.NON_FINAL);
            Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(om,Object.class);
            StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
            template.setKeySerializer(stringRedisSerializer);
            template.setHashKeySerializer(stringRedisSerializer);
            template.setValueSerializer(jackson2JsonRedisSerializer);
            template.setHashValueSerializer(jackson2JsonRedisSerializer);
            template.afterPropertiesSet();
            return template;
        }
    
        @ConditionalOnProperty(value = "redis.stream.enable", havingValue = "true", matchIfMissing = false)
        @Bean(initMethod = "start", destroyMethod = "stop")
        public  StreamMessageListenerContainer<String, MapRecord<String,String, String>>  subscriptions(RedisConnectionFactory factory) {
    
            List<RedisMqConfig> configs = Objects.requireNonNull(redisMqProperties.getConfigs(), "config error: config is null");
    
            StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options =
                    StreamMessageListenerContainer
                            .StreamMessageListenerContainerOptions
                            .builder()
                            .executor(threadPoolExecutor)
                            // 每次从Redis Stream中读取消息的最大条数 (32为rocketmq的pullBatchSize默认数量)
                            .batchSize(32)
                            // 轮询拉取消息的时间 (如果流中没有消息,它会等待这么久的时间,然后再次检查。)
                            .pollTimeout(Duration.ofSeconds(1))
                            .errorHandler(throwable -> {
                                log.error("[redis MQ handler exception]", throwable);
                                throwable.printStackTrace();
                            })
                            .build();
    
            var listenerContainer = StreamMessageListenerContainer.create(factory, options);
            // 创建不同的订阅者
            List<Subscription> subscriptions = new ArrayList<>();
            for (int i = 0; i < configs.size(); i++) {
                subscriptions.add( createSubscription(listenerContainer, configs.get(i).getStreamName(), configs.get(i).getGroupName(), configs.get(i).getConsumerName()));
            }
            listenerContainer.start();
            return listenerContainer;
        }
    
        /**
         * @param listenerContainer
         * @param streamName   类似 topic
         * @param groupName    消费组是 Redis Streams 中的一个重要特性,它允许多个消费者协作消费同一个流中的消息。每个消费组可以有多个消费者。
         * @param consumerName 这是消费组中的具体消费者名称。每个消费者会从消费组中领取消息进行处理。
         * @return
         */
        private Subscription createSubscription(StreamMessageListenerContainer<String, MapRecord<String,String, String>> listenerContainer, String streamName, String groupName, String consumerName) {
    
            initStream(streamName, groupName);
    
    
    
            // 手动ask消息
    //        Subscription subscription = listenerContainer.receive(Consumer.from(groupName, consumerName),
    //                //  创建一个流的偏移量实例。 含义: 指定从哪个偏移量开始读取消息。ReadOffset.lastConsumed()表示从上次消费的位置开始。
    //                StreamOffset.create(streamName, ReadOffset.lastConsumed()), redisConsumersListener);
    
            // 自动ask消息
    //            Subscription subscription = listenerContainer.receiveAutoAck(Consumer.from(groupName, consumerName),
    //                    StreamOffset.create(streamName, ReadOffset.lastConsumed()), redisConsumersListener);
    
            // 手动创建 核心在于 cancelOnError(t -> false)  出现异常不退出
            StreamMessageListenerContainer.ConsumerStreamReadRequest<String> build = StreamMessageListenerContainer.StreamReadRequest.builder(StreamOffset.create(streamName, ReadOffset.lastConsumed()))
                    .consumer(Consumer.from(groupName, consumerName))
                    .autoAcknowledge(false)
                    // 重要!
                    .cancelOnError(t -> false).build();
    
            Subscription subscription = listenerContainer.register(build, redisConsumersListener);
    
            return subscription;
        }
    
        /**
         * 初始化流 保证stream流程是正常的
         *
         * @param key
         * @param group
         */
        private void initStream(String key, String group) {
            boolean hasKey = redisStreamUtil.hasKey(key);
            if (!hasKey) {
                Map<String, Object> map = new HashMap<>(1);
                map.put("field", "value");
                //创建主题
                String result = redisStreamUtil.addMap(key, map);
                //创建消费组
                redisStreamUtil.createGroup(key, group);
                //将初始化的值删除掉
                redisStreamUtil.del(key, result);
                log.info("stream:{}-group:{} initialize success", key, group);
            } else {
                List<String> existGroupList = new ArrayList<>();
                redisStreamUtil.queryGroups(key).forEach(
                        item -> existGroupList.add(item.groupName())
                );
                // 这里考虑的是同一个key需要不同消费组场景
                if (!existGroupList.contains(group)) {
                    //创建消费组
                    redisStreamUtil.createGroup(key, group);
                }
            }
        }
    
    
        /**
         * 可选方法: 校验 Redis 版本号,是否满足最低的版本号要求
         */
        private static void checkRedisVersion(RedisTemplate<String, ?> redisTemplate) {
            // 获得 Redis 版本
            Properties info = redisTemplate.execute((RedisCallback<Properties>) RedisServerCommands::info);
            String version = MapUtil.getStr(info, "redis_version");
            // 校验最低版本必须大于等于 5.0.0
            int majorVersion = Integer.parseInt(StrUtil.subBefore(version, '.', false));
            if (majorVersion < 5) {
                throw new IllegalStateException(StrUtil.format("您当前的 Redis 版本为 {},小于最低要求的 5.0.0 版本!", version));
            }
        }
    }
    
    
    

    我们简单阐述一下上面代码中的initStream方法:

      /**
         * 初始化流 保证stream流程是正常的
         *
         * @param key
         * @param group
         */
        private void initStream(String key, String group) {
            boolean hasKey = redisStreamUtil.hasKey(key);
            // 如果没有该key 则创建key和组
            if (!hasKey) {
            	// 随便放一个数据
                Map<String, Object> map = new HashMap<>(1);
                map.put("field", "value");
                // 将key和数据发布
                String result = redisStreamUtil.addMap(key, map);
                //创建消费组
                redisStreamUtil.createGroup(key, group);
                //将初始化没有的数据删除掉
                redisStreamUtil.del(key, result);
                // 以上做法主要是因为重复创建createGroup会报错
                // 直接对createGroup方法try catch也是可以的 
                // 但博主不喜欢报错 所以是加的判断
                log.info("stream:{}-group:{} initialize success", key, group);
            } else {
                List<String> existGroupList = new ArrayList<>();
                redisStreamUtil.queryGroups(key).forEach(
                        item -> existGroupList.add(item.groupName())
                );
                // 这里考虑的是同一个key需要不同消费组场景
                if (!existGroupList.contains(group)) {
                    //创建消费组
                    redisStreamUtil.createGroup(key, group);
                    // 同样是因为重复创建组会报错 所以需要判断一下
                }
            }
        }
    
    
    

    先创建了一对K-V, 接着创建了一个消费组,再把K-V删除,剩下的就是消费组了。因为我们在createSubscription的时候声明了消费组,redis stream mq机制如此 如果redis里面没有消费组会直接报错消费组不存在 而不会自动创建 (与rocketMq类似)

    那么有同学可能会问 直接createGroup不行吗?第一次创建当然是没问题的,但是后面项目再启动时 就会报错group已存在

    聪明的你可能会有疑惑,那先查询组是否存在 再创建不行吗?
    我们来看看redisTemplate的api:
    redisTemplate.opsForStream().groups(key)
    这个是查询消费组信息的api, 如果消费组不存在,会直接报错该消费组不存在。

    所以initSream方法是一个小技巧,有点类似于卡bug。

    当然 ,如果硬要只使用createGroup方法也不是不可以,加个try catch就好了,但这就相当于除了第一次初始化之外,之后每次启动项目 其实都会发生一次异常。

    监听器:

    (核心是实现StreamListener接口)

    package com.qiuhuanhen.springboot3demo.redis.mq.consumer.listener;
    
    import com.qiuhuanhen.springboot3demo.redis.mq.annotation.RedisStream;
    import com.qiuhuanhen.springboot3demo.redis.mq.consumer.RedisConsumer;
    import com.qiuhuanhen.springboot3demo.redis.mq.entity.RedisMsg;
    import com.qiuhuanhen.springboot3demo.redis.utils.RedisStreamUtil;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.aop.support.AopUtils;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.data.redis.connection.stream.MapRecord;
    import org.springframework.data.redis.connection.stream.RecordId;
    import org.springframework.data.redis.stream.StreamListener;
    import org.springframework.stereotype.Component;
    
    import java.util.LinkedHashMap;
    import java.util.Map;
    import java.util.Objects;
    
    @Component
    @Slf4j
    public class RedisConsumersListener implements StreamListener<String, MapRecord<String, String, String>> {
    
        @Autowired
        private Map<String, RedisConsumer> redisConsumer;
        @Autowired
        private RedisStreamUtil redisStreamUtil;
    
        /**
         * 监听器
         *
         * @param message
         */
        @Override
        public void onMessage(MapRecord<String, String, String> message) {
            System.out.println("监听测试--------------" + message.getId());
            // stream的key值
            String streamName = message.getStream();
            //消息ID
            RecordId recordId = message.getId();
            //消息内容
            Map<String, String> msg = message.getValue();
    
            Map<String,Map<String, String>> map = new LinkedHashMap<>();
            map.put(recordId.getValue(),msg);
    
            for (Map.Entry<String, RedisConsumer> redisConsumerEntry : redisConsumer.entrySet()) {
    
                RedisConsumer redisConsumer = redisConsumerEntry.getValue();
                // 获取目标类
                Class<?> targetClass = AopUtils.getTargetClass(redisConsumer);
                RedisStream redisStream =
                        targetClass.getAnnotation(RedisStream.class);
                if (Objects.isNull(redisStream)) {
                    continue;
                }
                String consumerStreamName = redisStream.streamName();
    
                if (!Objects.equals(streamName, consumerStreamName)) {
                    continue;
                }
    
                String consumerGroupName = redisStream.groupName();
    
                RedisMsg redisMsg = new RedisMsg();
                redisMsg.setStreamName(streamName);
                redisMsg.setMsg(map);
                try {
                    redisConsumer.getMessage(redisMsg);
                    //逻辑处理完成后,ack消息,删除消息,group为消费组名称
                    redisStreamUtil.ack(streamName, consumerGroupName, recordId.getValue());
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
                // 不能直接删除 需要业务层自己维护 等到所有消费组都消费完才删除 一般可以不考虑删除
    //            redisStreamUtil.del(streamName, recordId.getValue());
            }
            log.info("【streamName】= " + streamName + ",【recordId】= " + recordId + ",【msg】=" + msg);
    
        }
    }
    
    
    
    

    感兴趣可以看博主踩到的坑, 看完思路才能自行判断 代码是否能直接复制使用 (个人感觉这才是分析技术最精彩的地方 有正确的思路才能在使用新技术时披荆斩棘); 不感兴趣可以直接跳到下一目录

    ===== ====== ====== 踩坑start ===== ==== ===== =====
    一开始使用的是receive方法 (被注释的部分)

            // 手动ask消息
    //        Subscription subscription = listenerContainer.receive(Consumer.from(groupName, consumerName),
    //                //  创建一个流的偏移量实例。 含义: 指定从哪个偏移量开始读取消息。ReadOffset.lastConsumed()表示从上次消费的位置开始。
    //                StreamOffset.create(streamName, ReadOffset.lastConsumed()), redisConsumersListener);
    

    这也是网上使用最多的方法(因为spring给的文档demo就是这么创建的),但是它非常坑!
    在这里插入图片描述

    通过方法名我们可以判断出 receiveAutoAck是会自动ack的,不出异常还好,那如果出现异常呢 如何ack? 所以我们肯定是要手动控制的。
    在这里插入图片描述
    我们可以看看源码 它们的差异:
    在这里插入图片描述
    是的,就是一个是否自动ack的差别。

    既然引入了消息队列,那说明数据量是比较大的,所以肯定是需要考虑异常情况下 消息不能丢失的,于是博主在消费时,故意编写了异常模拟不触发ack的场景. 结果发现 一旦消费出现异常 没有ack时,pending list不再新增数据,在项目重启后数据又增加了,但是再次消息异常时 pending list又阻塞了,这种现象非常奇怪! 难道一个消息没ack redis stream就阻塞吗?这显然不符合设计。 反复思考后,看起来像是出现异常后就停止了轮询,这个mq就像极了是一次性的。
    但是和轮询相关的 也就一个pollTimeout参数,它能掀起多大的火花呢?

    于是继续看代码 配置redis mq时,都有哪些api. 使用receive方法后 返回的是一个Subscription ,Subscription类有isActive()方法 ,于是在定时器中打印subsciption.isActive() 发现它竟然为false

    于是我们追踪这个方法:
    在这里插入图片描述
    追踪到了StreamPollTask类
    在这里插入图片描述
    如果是task类 那么应该会有run方法 ,我们直接在里面搜run()

    在这里插入图片描述
    run方法里面主要就这两个方法
    this.pollState.running();
    this.doLoop();
    第一个running方法 一眼看到头,没什么东西 ;我们看doLoop() 这个方法看起来是循环执行,如果任务中断了 说明是loop出问题了
    在这里插入图片描述
    里面有行代码:

        if (this.cancelSubscriptionOnError.test(ex)) {
                        this.cancel();
                    }
    

    也就是说在cancelSubscriptionOnError.test为true的时候 会取消执行
    在这里插入图片描述

    还记得isActive()方法吗 它正是去判断该状态的.

    通过构造方法 可以看出 该参数是StreamMessageListenerContainer.StreamReadRequest streamRequest 传进来的
    在这里插入图片描述

    StreamMessageListenerContainer.StreamReadRequest在我们查看listenerContainer.receive源码时 有过一面之缘:
    在这里插入图片描述

    我们再看看StreamReadRequest.builder出来的StreamReadRequestBuilder类:
    在这里插入图片描述
    至此,分析完成了闭环,因为receive方法创建出来 默认是遇到异常就取消执行 这明显不符合实际使用,这个设计个人感觉非常欠佳。

    这便是为什么使用以下代码来创建的原因

         StreamMessageListenerContainer.ConsumerStreamReadRequest<String> build = StreamMessageListenerContainer.StreamReadRequest.builder(StreamOffset.create(streamName, ReadOffset.lastConsumed()))
                    .consumer(Consumer.from(groupName, consumerName))
                    .autoAcknowledge(false)
                    // 重要!
                    .cancelOnError(t -> false).build();
    

    博主在issue区说明了源码设计缺陷, spring redis data团队已经承认了确实体验不好,并将问题加入至enhancement标签 相信在不久的将来源码将会得到改善 感兴趣可自行查看issue #2919

    ===== ====== ====== 踩坑end ===== ==== ===== =====

    降级处理(定时处理pending list消息)

    tips:
    redis stream的设计: 被消费但是没有被ack的消息会进入到pending list
    Q: 如何界定消息被消费呢?会不会消息一发布就直接进入pending list?
    A: 调用了如xReadGroup命令之后才算被消费,不会直接进入pending list

    tips: 在spring data redis的设计中 我们可以简单看一下源码, 所以我们不需要显示去调用消费消息的命令,我们重写监听器 进入onMessage方法的时候就表示消息已经被消费了。
    在这里插入图片描述

    代码比较乱 注释代码比较多的原因 不是因为瞎写,而是那些api 在实际业务中可能会使用到,所以特地写在下面了

    
    import com.qiuhuanhen.springboot3demo.redis.mq.annotation.RedisStream;
    import com.qiuhuanhen.springboot3demo.redis.mq.config.RedisMqConfig;
    import com.qiuhuanhen.springboot3demo.redis.mq.config.RedisMqProperties;
    import com.qiuhuanhen.springboot3demo.redis.mq.consumer.RedisConsumer;
    import com.qiuhuanhen.springboot3demo.redis.mq.entity.RedisMsg;
    import com.qiuhuanhen.springboot3demo.redis.utils.RedisStreamUtil;
    import org.springframework.aop.support.AopUtils;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
    import org.springframework.data.domain.Range;
    import org.springframework.data.redis.connection.Limit;
    import org.springframework.data.redis.connection.stream.MapRecord;
    import org.springframework.data.redis.connection.stream.PendingMessage;
    import org.springframework.data.redis.connection.stream.PendingMessages;
    import org.springframework.data.redis.connection.stream.PendingMessagesSummary;
    import org.springframework.data.redis.connection.stream.RecordId;
    import org.springframework.data.redis.core.StreamOperations;
    import org.springframework.data.redis.core.StringRedisTemplate;
    import org.springframework.scheduling.annotation.Scheduled;
    import org.springframework.stereotype.Component;
    
    import java.util.List;
    import java.util.Map;
    import java.util.Objects;
    import java.util.Set;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.ConcurrentSkipListMap;
    
    /**
     * @author csdn:孟秋与你
     */
    @Component
    @ConditionalOnProperty(value = "redis.stream.enable", havingValue = "true", matchIfMissing = false)
    public class RedisPendingMessageHandler {
    
        @Autowired
        private RedisStreamUtil redisStreamUtil;
        @Autowired
        private StringRedisTemplate redisTemplate;
        @Autowired
        private Map<String, RedisConsumer> redisConsumerMap;
        @Autowired
        private RedisMqProperties redisMqProperties;
    
        /**
         * 定期处理 pending list 中的消息
         */
        @Scheduled(cron = "0/30 * * * * ?")
        public void processPendingMessages() {
    
            for (RedisMqConfig config : redisMqProperties.getConfigs()) {
                // Redis Stream 的键
                String streamName = config.getStreamName();
                // 消费者组的名称
                String groupName = config.getGroupName();
    
                StreamOperations<String, String, String> streamOps = redisTemplate.opsForStream();
    
                // 获取 pending list 中未确认的消息概要
                PendingMessagesSummary pendingSummary = streamOps.pending(streamName, groupName);
    
                // 所有pending消息的数量
                long totalPendingMessages = pendingSummary.getTotalPendingMessages();
    
                if (pendingSummary.getTotalPendingMessages() == 0L) {
                    continue;
                }
    
                // 消费组名称
                String groupName1 = pendingSummary.getGroupName();
    
                // pending队列中的最小ID
                String minMessageId = pendingSummary.minMessageId();
    
                // pending队列中的最大ID
                String maxMessageId = pendingSummary.maxMessageId();
    
                if (pendingSummary.getTotalPendingMessages() > 0) {
                    // 读取消费者pending队列的前10条记录,从ID=0的记录开始,一直到ID最大值
    //            PendingMessages pendingMessages = streamOps.pending(streamKey, Consumer.from(groupName, consumerName), Range.closed("0", "+"), 10);
    
                    // 获取 pending list 中具体的消息
                    PendingMessages pendingMessages = streamOps.pending(streamName, groupName, Range.unbounded(), 100);
                    int size = pendingMessages.size();
                    // 获取当前批次的消息
                    PendingMessage currentBatchMin = pendingMessages.get(0);
                    PendingMessage currentBatchMax = pendingMessages.get(size - 1);
    
                    ConcurrentHashMap<String, Long> readCountMap = new ConcurrentHashMap<>();
                    pendingMessages.forEach(
                            pendingMessage ->
                            {
                                // 消息被获取的次数 可以根据次数做不同业务 超过一定次数未消费 考虑是否要ack并del
                                long deliveryCount = pendingMessage.getTotalDeliveryCount();
                                readCountMap.put(pendingMessage.getId().getValue(), deliveryCount);
                            }
                    );
    
                    // 读取每个未确认的消息
    //                List> messages = streamOps.read(
    //                        StreamReadOptions.empty(),
    //                        StreamOffset.create(streamKey,ReadOffset.lastConsumed())
                            StreamOffset.create(streamKey,ReadOffset.from("0"))
    //                );
    
                    List<MapRecord<String, String, String>> messages = streamOps.range(streamName, Range.closed(currentBatchMin.getId().toString(), currentBatchMax.getId().toString()), Limit.limit().count(20));
    
                    try {
                        // 处理消息
                        processMessage(streamName, groupName, messages);
                    } catch (Exception e) {
                        // 处理异常情况
                        e.printStackTrace();
                    }
    
                }
            }
        }
    
    
        /**
         * 消息处理逻辑
         */
        private void processMessage(String streamName, String groupName, List<MapRecord<String, String, String>> messages) {
    
            Map<String, Map<String, String>> msg = new ConcurrentSkipListMap<>();
    
            for (MapRecord<String, String, String> message : messages) {
                RecordId id = message.getId();
                Map<String, String> value = message.getValue();
                msg.put(id.getValue(), value);
    
            }
            for (Map.Entry<String, RedisConsumer> entry : redisConsumerMap.entrySet()) {
    
                RedisConsumer redisConsumer = entry.getValue();
                // 获取目标类
                Class<?> targetClass = AopUtils.getTargetClass(redisConsumer);
                RedisStream redisStream =
                        targetClass.getAnnotation(RedisStream.class);
                if (Objects.isNull(redisStream)) {
                    continue;
                }
    
                if (!Objects.equals(streamName, redisStream.streamName())) {
                    continue;
                }
                if (!Objects.equals(groupName, redisStream.groupName())) {
                    continue;
                }
    
                RedisMsg redisMsg = new RedisMsg();
                redisMsg.setStreamName(streamName);
                redisMsg.setMsg(msg);
                // 模拟降级操作 捕获异常: 消息被读取 n 次 仍然失败 考虑删除
                redisConsumer.fallBack(redisMsg);
    
                Set<String> keySet = msg.keySet();
    
                String[] ids = keySet.toArray(new String[0]);
    
                //逻辑处理完成后,ack消息,group为消费组名称  ack是根据消费组来的 即A组ack 在B组的pending list还能看到
                redisStreamUtil.ack(streamName, groupName,ids);
                // 同样不能直接删除 需要业务层自己维护 等到所有消费组都消费完才删除 一般可以不考虑删除
    //            redisStreamUtil.del(streamName, recordId.getValue());
            }
    
    
        }
    }
    

    其它示例代码

    消费者

    仅供参考 消费者更多的是结合自己业务考虑想要做什么 如何设计,不建议直接照搬

    /**
     * @author github:qiuhuanhen csdn:孟秋与你
     */
    @Retention(RetentionPolicy.RUNTIME)
    @Target({ElementType.TYPE})
    @Component
    public @interface RedisStream {
    
        /**
         * 消息主题
         * 不设计数组: 1. 接口应遵循单一职责 2.数组带来ack应答问题
         */
        String streamName() ;
    
        /**
         * 消费组
         * 不设计数组: 1. 接口应遵循单一职责 2.数组带来ack应答问题
         */
        String groupName();
    
        /**
         * 消费者 ack按组应答 不使用消费者概念 增大acK控制难度
         */
    
    
    }
    
    
    
    import lombok.Data;
    
    @Data
    public class ConsumerBase {
        private String msgId;
    }
    
    
    
    import com.qiuhuanhen.springboot3demo.redis.mq.entity.ConsumerBase;
    import lombok.Data;
    
    /**
     * @author qkj
     */
    @Data
    public class Oper extends ConsumerBase {
    
        private Long testId;
    
        private String testDesc;
    
        private String testXxx;
    
        private Integer ukKey;
    
        private Integer isDeleted;
    
        private Integer version;
    
        private Integer groupId;
    
    }
    
    
    @Data
    public class RedisMsg {
        private String streamName;
        private Map<String,Map<String, String>> msg;
    }
    

    这个根据自己业务 自定义封装 ,灵活性比较高 , 不能照搬代码

    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.JSONArray;
    import com.qiuhuanhen.springboot3demo.redis.mq.entity.ConsumerBase;
    import com.qiuhuanhen.springboot3demo.redis.mq.entity.RedisMsg;
    
    import java.util.ArrayList;
    import java.util.Collection;
    import java.util.List;
    import java.util.Map;
    
    public interface RedisConsumer<T extends ConsumerBase> {
    
        /**
         * 转成实体类
         * 注: redis stream存取出来的数据有问题
         *
         * @param redisMsg 消息对象
         */
        default List<T> getMessage(RedisMsg redisMsg) {
    
            Map<String, Map<String, String>> map = redisMsg.getMsg();
    
            Collection<Map<String, String>> values = map.values();
            // 这里封装得一般 可以再优化
            List<T> res = new ArrayList<T>();
            for (Map<String, String> msg : values) {
                T obj = null;
                try {
                    // 遍历 Map 并尝试反序列化每个值
                    for (Map.Entry<String, String> entry : msg.entrySet()) {
    
                        String json = entry.getValue();
                        JSONArray jsonArray = JSONArray.parseArray(json);
                        // 动态加载实体类
                        Class<?> entityClass = Class.forName(String.valueOf(jsonArray.get(0)));
                        String jsonString = JSON.toJSONString(jsonArray.get(1));
                        // 将数据转换为实体类对象
                        obj = JSON.parseObject(jsonString, (Class<T>) entityClass);
                        // 设置消息id用于业务去重
                        //                    obj.setMsgId();
                        res.add(obj);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
    
    
            return res;
        }
    
        default List<T> fallBack(RedisMsg redisMsg) {
    
            Map<String, Map<String, String>> map = redisMsg.getMsg();
    
            Collection<Map<String, String>> values = map.values();
    
            List<T> res = new ArrayList<T>();
            for (Map<String, String> msg : values) {
                T obj = null;
                try {
                    // 遍历 Map 并尝试反序列化每个值
                    for (Map.Entry<String, String> entry : msg.entrySet()) {
    
                        String json = entry.getValue();
                        JSONArray jsonArray = JSONArray.parseArray(json);
                        // 动态加载实体类
                        Class<?> entityClass = Class.forName(String.valueOf(jsonArray.get(0)));
                        String jsonString = JSON.toJSONString(jsonArray.get(1));
                        // 将数据转换为实体类对象
                        obj = JSON.parseObject(jsonString, (Class<T>) entityClass);
                        res.add(obj);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
    
    
            return res;
        }
    }
    
    

    以下是实现类 为了方便调试博主就直接写在controller了,实际不建议这么写

    
    import com.qiuhuanhen.springboot3demo.bean.Oper;
    import com.qiuhuanhen.springboot3demo.mapper.RedisStreamBizMapper;
    import com.qiuhuanhen.springboot3demo.redis.mq.annotation.RedisStream;
    import com.qiuhuanhen.springboot3demo.redis.mq.consumer.RedisConsumer;
    import com.qiuhuanhen.springboot3demo.redis.mq.entity.RedisMsg;
    import com.qiuhuanhen.springboot3demo.redis.utils.RedisStreamUtil;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Lazy;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import java.time.LocalDateTime;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    @RedisStream(streamName = "orderStream",groupName = "orderGroup")
    @RestController
    @RequestMapping("/test/redis/order")
    public class OrderConsumerController implements RedisConsumer<Oper> {
    
        /**
         * 注意: redisMsg并不是一个bean
         * 只是记录个小技巧测试代码 没有用的 可以删除
         * 这里演示 通过指定required = false 或 指定懒加载 都可以解决项目启动时找不到bean报错的问题
         */
        @Autowired(required = false)
        @Lazy
        private RedisMsg redisMsg;
    
        @Autowired
        private RedisStreamUtil redisStreamUtil;
        @Autowired
        private RedisStreamBizMapper redisStreamBizMapper;
    
        @GetMapping("/stream")
        public String testStream() {
    
            String mystream = "";
            for (int i = 0; i < 1; i++) {
                Oper oper = new Oper();
                oper.setTestId(11111111L);
                oper.setTestDesc("订单消息队列");
                oper.setVersion(i);
                oper.setTestXxx(LocalDateTime.now().toString());
                Map<String, Object> map = new HashMap<>();
                map.put("oper", oper);
                mystream = redisStreamUtil.addMap("orderStream", map);
            }
    
            return String.valueOf(mystream);
        }
    
        /**
         * 模拟消费操作
         *
         * @param redisMsg 消息对象
         * @return
         */
        @Override
        public List<Oper> getMessage(RedisMsg redisMsg) {
            System.out.println("收到了订单消息============="+Thread.currentThread().getName()+"-------------- 当前线程");
            System.out.println(1 / 0);
            List<Oper> message = RedisConsumer.super.getMessage(redisMsg);
            System.out.println("order收到消息 准备ack:-----------------------------------------------------" + message);
            return message;
        }
    
        /**
         * 模拟降级操作 (从pending list获取消息)
         *
         * @param redisMsg
         * @return
         */
        @Override
        public List<Oper> fallBack(RedisMsg redisMsg) {
            List<Oper> message = RedisConsumer.super.fallBack(redisMsg);
            System.out.println("order完成了降级:-----------------------------------------------------" + message);
            return message;
        }
    }
    
    
    
    import com.qiuhuanhen.springboot3demo.bean.Oper;
    import com.qiuhuanhen.springboot3demo.mapper.RedisStreamBizMapper;
    import com.qiuhuanhen.springboot3demo.redis.mq.annotation.RedisStream;
    import com.qiuhuanhen.springboot3demo.redis.mq.consumer.RedisConsumer;
    import com.qiuhuanhen.springboot3demo.redis.mq.entity.RedisMsg;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import java.util.List;
    
    @RedisStream(streamName = "orderStream",groupName = "goodsGroup")
    @RestController
    @RequestMapping("/test/redis/goods")
    public class GoodsConsumerController implements RedisConsumer<Oper> {
    
        @Autowired
        private RedisStreamBizMapper redisStreamBizMapper;
    
        /**
         * 模拟消费操作
         *
         * @param redisMsg 消息对象
         * @return
         */
        @Override
        public List<Oper> getMessage(RedisMsg redisMsg) {
            System.out.println("商品消费者 收到了订单消息=============");
            List<Oper> message = RedisConsumer.super.getMessage(redisMsg);
            System.out.println("商品成功消费order 准备ack");
            return message;
        }
    
        /**
         * 模拟降级操作 (从pending list获取消息)
         *
         * @param redisMsg
         * @return
         */
        @Override
        public List<Oper> fallBack(RedisMsg redisMsg) {
            List<Oper> messageList = RedisConsumer.super.fallBack(redisMsg);
            System.out.println("商品消费者 order完成了降级:-----------------------------------------------------" + messageList);
            return messageList;
        }
    }
    
    
    
    import com.qiuhuanhen.springboot3demo.bean.Oper;
    import com.qiuhuanhen.springboot3demo.redis.mq.annotation.RedisStream;
    import com.qiuhuanhen.springboot3demo.redis.utils.RedisStreamUtil;
    import com.qiuhuanhen.springboot3demo.redis.mq.consumer.RedisConsumer;
    import com.qiuhuanhen.springboot3demo.redis.mq.entity.RedisMsg;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import java.time.LocalDateTime;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    
    @RedisStream(streamName = "productStream",groupName = "productGroup")
    @RestController
    @RequestMapping("/test/redis/product")
    public class ProductConsumerController implements RedisConsumer<Oper> {
    
        @Autowired
        private RedisStreamUtil redisStreamUtil;
    
    
        @GetMapping("/stream")
        public String testStream() {
    
            for (int i = 0; i < 100000; i++) {
                Oper oper = new Oper();
                oper.setTestId(11111111L);
                oper.setTestDesc("订单消息队列");
                oper.setVersion(i);
                oper.setTestXxx(LocalDateTime.now().toString());
                Map<String, Object> map = new HashMap<>();
                map.put("oper", oper);
                redisStreamUtil.addMap("productStream", map);
            }
            return "ok";
        }
    
        @Override
        public List<Oper> getMessage(RedisMsg redisMsg) {
            List<Oper> message = RedisConsumer.super.getMessage(redisMsg);
    
            System.out.println("product收到消息:-----------------------------------------------------" + message);
            return message;
        }
    }
    
    

    配置类

    @Data
    public class RedisMqConfig {
        private String streamName;
        private String groupName;
        private String consumerName;
    }
    
    
    
    @Data
    @Component
    @ConfigurationProperties(prefix = "redis.stream")
    public class RedisMqProperties {
    
        private boolean enable;
    
        private List<RedisMqConfig> configs;
    
    
    }
    
    

    yml:

    redis:
      stream:
        enable: true
        configs:
          - stream-name: orderStream
            group-name: orderGroup
            consumer-name: orderConsumer
    
          - stream-name: orderStream
            group-name: goodsGroup
            consumer-name: goodsConsumer
    
          - stream-name: productStream
            group-name: productGroup
            consumer-name: productConsumer
    

    优化方向

    1. 建立一个消费者抽象类,定义消费方法

    2. 建议一个降级处理抽象类,定义补偿方法(即消费失败时的处理)

    3. 定义spring的properties类 把生产者消费者字段写到里面
      (以上三点可以参照博主代码)

    4. redis需要部署集群,可在博主的主页搜索哨兵,有哨兵架构教程。

    5. 实际业务中,消费消息很可能是存入数据库,在入库完成之后 redis ack完成之前,如果这一瞬间突然宕机了,而数据量又非常大,可能会导致消费重复的情况,因为没有完成ack 下次还是会把该数据从pending list里面取出来。

      解决方案1 :考虑是加redisson
      解决方案2:数据库存入消息id字段并建立唯一索引
      (唯一索引的魅力体现出来了)

    6. 获取pending list数据 如果有其它更好的方式也可以用其它方式,如果是定时获取 建议用分布式调度平台 博主主页也有xxl job相关教程 搜索即可

    7. 阅读RedisStreamConfig类文档注释部分,理解为什么会有重复消息,博主能力有限 只想到了注释里的两个方法,应该是还可以有其它不同的优化方向

    至此,一份生产级别的redis stream mq架构思路成立,再次提醒 代码不建议照搬 必须有自己对redis mq的理解, 博主不保证代码没有bug 但是大体思路是这个方向。

  • 相关阅读:
    Aruba CX交换机VSX配置
    详解模板引擎一
    Arch Linux 的安装
    freertos之任务运行时间统计实验
    python 笔记(3)——request、爬虫、socket、多线程
    【微信小程序】自定义组件(二)
    批量导出 PPT的备注到一个txt文本中
    【JavaEE】Servlet(创建Maven、引入依赖、创建目录、编写及打包、部署和验证、smart Tomcat)
    [DDC]Deep Domain Confusion: Maximizing for Domain Invariance
    SAP-MM-收货操作报错123
  • 原文地址:https://blog.csdn.net/qq_36268103/article/details/140959645