• redis实现消息队列的几种方式及其优劣


    概述

    常用的消息队列有,rabbitMq、kafka、RocketMq、ActiveMq等。这些消息队列需要独立安装部署,作为一个中间件来提供服务,虽然有着高性能、高可靠的优点,但是额外部署这些中间件也会增加运维成本,和服务器成本。

    本篇文章探讨了一下如何使用redis实现消息队列。使用redis无需额外的部署,如果原先就有使用redis的话。此外redis更为轻量也更容易维护。但是redis实现消息队列有多种方案,这些方案有其优点也有其缺点,适用于不同的应用场景。以下从“实时性”、“可靠性”、“功能性”这几个维度做一些对比分析探讨。

    一、理论部分

    “消息队列”是在消息的传输过程中保存消息的容器。
    消息队列常被使用在“流量削峰”、“系统解耦”、“异步调用”这几个方面。
    消息队列主要面对的几个问题是,
    1、并发性能
    2、实时性
    3、如何防止消息丢失,保证可靠性

    从简单的讲,消息队列就是一个“队列”queue,生产者负责发送消息,消息队列存储消息,消费者则负责接收消息。

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-T6PXcPYP-1651055823390)(https://www.hengyumo.cn/momoclouddisk/file/download?code=202204151015636_image.png)]

    在面对一些亿级流量场景,消息队列届的大哥kafka是如何保证高性能的呢?

    1. Kafka Reactor模型架构
    2. 页缓存技术+磁盘顺序写
    3. ZeroCopy:零拷贝技术
    4. 使用批量消息提升服务端处理能力
      https://www.hengyumo.cn/momoblog/detail/202204162051750

    那使用redis能否获得和kafka一样的高性能呢?答案是一定的。
    redis是如何实现高性能的呢?

    1. IO多路复用
    2. 单线程
    3. 基于内存存储
    4. 高效数据结构
    5. 写时拷贝(CopyOnWrite)
    6. 客户端管道批量命令
    7. 零拷贝技术
      https://www.hengyumo.cn/momoblog/detail/202204162116630

    二、实现消息队列

    2.1 基于list实现

    消息队列的基础结构是队列,而redis正好有相对应的数据结构:list。

    实现方式

    1. 生产者写消息

      lpush mq hello1
      lpush mq hello2
      lpush mq hello3
      lpush mq hello4
      lpush mq hello5

    lpush 命令向指定列表的左边推入元素,以上命令模拟了向mq这个消息队列列表中写入五条消息,分别是hello1 ~ hello5。

    同时写入多条也可以跟着多个,如

    lpush mq hello6 hello7
    
    • 1
    1. 消费者读取消息

    首先,基于list实现的消息队列是可以有效保证实时性的。消费者要如何检测到有新消息推送过来呢?

    • 要么是不停自旋调用llen mq获取队列的长度,如果不为0则读取。或者自旋调用rpop不停读取数据。虽然能保证高实时性,但是这会造成redis的性能浪费和消费者本身的性能浪费,严重时会导致系统崩溃。
    • 定时调用llen mq获取队列的长度。实时性取决于定时任务的频率,如果每100ms一次,则就有100ms的延迟。
    • brpop,brpop可以理解为rpop命令的阻塞升级版,brpop mq 1,会尝试阻塞读取mq 1秒时间,如果1秒内没有消息则会返回nil,如果有消息,会立即返回。

    生产者

    127.0.0.1:6379>
    127.0.0.1:6379> rpush mqb hello
    (integer) 1
    
    • 1
    • 2
    • 3

    消费者

    127.0.0.1:6379> brpop mqb 1
    1) "mqb"
    2) "hello"
    127.0.0.1:6379> brpop mqb 1
    (nil)
    (1.08s)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    基于list实现,读取消息可以通过两种方式,一种是rpop,从列表的右边读取并弹出元素。该操作是原子性的,并发下安全。

    rpop mq
    rpop mq
    
    • 1
    • 2

    依次弹出的是hello1,hello2,按照先进先出的顺序弹出。

    第二种方式是lrange,使用lrange可以实现消息的批量消费,lrange list start stop 读取list的从start - stop之间的元素。读取之后为了防止重复消费,需要使用ltrim start stop进行清除。因为期间需要进行两个操作,因此不是并发安全的,需要通过分布式锁来保证安全性。此外还存在着事务的问题,如果读取完消息之后进程挂掉,会导致之前已经读取的消息在下次运行时被重复消费。这种方式适用于对消息可靠性要求不高,但是要求处理性能高的情况,如处理大量的日志数据进行分析操作。除了使用ltrim之外也可以使用LREM key count value来删除已经消费的数据。

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-isAj1MD5-1651055823391)(https://www.hengyumo.cn/momoclouddisk/file/download?code=202204241041249_image.png)]

    如图所示,假设mq中有7条消息,每次消费3条消息。那么第一条命令lrange mq -3 -1,读取倒数第3到倒数第1之间的所有元素。第二条命令ltrim mq 0 -4,保留未读取的零到倒数第4条消息,把已经读取的消息删除。

    以下演示了在redis-cli中的模拟:

    127.0.0.1:6379> lrange mq -3 -1
    1) "hello3"
    2) "hello2"
    3) "hello1"
    127.0.0.1:6379> ltrim mq 0 -4
    OK
    127.0.0.1:6379> lrange mq 0 -1
    1) "hello7"
    2) "hello6"
    3) "hello5"
    4) "hello4"
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    1. 多生产者多消费者
    • 多生产者
      基于list的多生产者是没有问题的,多个生产者同时向mq中推送消息,仍然能保证消息有序。

    • 多消费者

      • rpop方式 多消费者下并发同样安全,不会出现消息被重复消费的情况
      • lrange + ltrim 方式 多消费者下并发不安全,需要使用分布式锁保证有序,否则会出现消息被重复消费的问题。同时不保证事务安全性。需要通过额外手段记录读取mq的位置,以保证宕机复位时不会出现消息重复读取的问题。
    1. 发布订阅方式

    发布订阅简单的理解是将一个消息广播给多个消费者,每个消费者针对该消息只消费一次。

    针对发布订阅有两种思路,一种是较为简单的,既然一对一消费可以通过一个list实现,那么一对多消费就使用多个list来一一对应各个消费者:

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-kWu8BIMc-1651055823391)(https://www.hengyumo.cn/momoclouddisk/file/download?code=202204241058734_image.png)]

    这里只需要维护一个消费者和消息队列名称映射的列表,生产者发送消息时发送给所有的消费者对应的队列。
    消费者读取自己对应的消息队列。

    实现起来简单,但是存在两个问题:

    • 资源浪费,原本只需要一个列表存储,变成了几个消费者就需要几个列表,而且列表的数据都是相同的,这无疑造成了浪费。当数据量不大,消费者不多时可以不顾及这点。
    • 无法保证消息可靠的同步发送到各个队列上。如果生产者写入完mq1之后就宕机了,就好导致只有消费者1接收到了消息,而其它的消费者无法接收到消息,这就会导致数据不一致的情况出现。

    第二种实现方式,就是仍然只维护一个队列,这样使用方法1两个问题就没有了。但是要如何保证各个消费者能消费到数据呢?

    使用rpop肯定不行的,会导致消息丢失,所以还是得用lrange来读取数据。

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-IEEY0gjI-1651055823392)(https://www.hengyumo.cn/momoclouddisk/file/download?code=202204241117840_image.png)]

    上图演示了实现的思路,

    首先是维护一个读取位置的map,保存每个消息队列读取到的位置。每个消费者读取数据时从自己上一次读取的位置继续读取。读取完成之后更新该位置。

    这种方式也实现了消息应答。可以保证消费者宕机的情况下,下次能回到前次消费的坐标进行消费,以防止消息丢失。

    当然这种方式也存在着问题

    • 消息未被清除,会一直堆积下去。这个可以通过一个定时线程清除已读取的消息并且更新所有消费者读取消息的位置。但是还要保证并发安全性,因此这个定时清除线程还需要加锁。这会造成一部分的性能损失。
    1. 消息应答

    消息应答机制是保证消费者可靠消费消息的有效手段。主要过程就是,消费者拿到数据之后,先进行消费的业务逻辑,消费完成之后回复消息队列一个ACK报文,之后消息队列才将消息移除掉。通过这种方式可以有效防止消费者宕机导致消息丢失的情况。

    在redis可以通过rpoplpush这个命令来实现ACK机制。

    Redis rpoplpush 命令用于移除列表的最后一个元素,并将该元素添加到另一个列表并返回。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-gYIXDvP1-1651055823392)(https://www.hengyumo.cn/momoclouddisk/file/download?code=202204241220718_image.png)]

    针对上图的点对点方式的消息队列,在不考虑并发消费的情况下,是可靠的。

    1. brpoplpush mq ack 1 阻塞读取mq的消息,之后写入ack列表
    2. lpop ack 清除ACK最左侧已经消费的数据,模拟推送ack报文操作

    假设这时消费者宕机,完成了步骤1,但是未完成步骤2,那么在ACK队列第一个元素之后就会备份这些未消费的数据。

    再启动一个线程,作为纠错服务。定时读取ACK队列第二个元素及其之后的元素,重新推入MQ,进行消费。这样可以有效的防止消息丢失。

    list方式的消息队列总结

    • 实时性 实时性较好,可以通过brpop方式阻塞获取新消息,有高实时性;或者通过定时任务方式监听队列,存在较低延迟。
    • 可靠性 综合可靠性一般,在点对点的消息推送机制下(lpush + rpop方案)不容易存在消息丢失,可以保证高可靠性。在其它场景可能存在并发安全性问题,但是可以通过加锁解决。存在事务失败问题,但是发生的概率较低。存在消息丢失问题,但是可以自己实现ack机制。
    • 功能性 功能性一般,通过自己的额外扩展可以满足多种不同的消息队列功能,如多对多、发布订阅模式。

    基于list的特点,可以推导出在两种常见场景下的实现方案:

    针对生产者都采用lpush的方式推送数据,

    针对消费者:

    1. 对实时性和可靠性要求高的情况,消费者使用brpop阻塞读取和消费数据
    2. 对实时性和可靠性要求不高的情况,但是推送数据量大,要求处理性能高,消费者使用定时任务+lrange+ltrim方式读取和消费数据

    2.2 基于pub-sub实现

    pub-sub 是redis官方支持的一种发布订阅模式。

    实现方式

    1. 生产者写消息

    首先需要着重注意的是,pub-sub机制和list不同,list是redis的数据结构,redis自身会保证其数据持久化的可靠性,而pub-sub则没有持久化机制,这意味着,如果发生消息时,消费者宕机,那么消息也就丢失了。所以该种方式是不满足可靠性要求的。

    生产者发生消息使用命令 publish channel message的方式,

    例如

    publish mq hello1
    publish mq hello2
    publish mq hello3
    
    • 1
    • 2
    • 3

    在发送消息时没有消费者订阅该频道的话,那么该消息就会被丢弃。

    1. 消费者读取消息

    消费者使用subscribe channel命令来订阅某个频道,其中channel代表频道的名称,执行该命令之后就会阻塞,直到有消息到来。

    例如

    • 消费者监听消息

      127.0.0.1:6379> subscribe mq
      Reading messages… (press Ctrl-C to quit)

      1. “subscribe”
      2. “mq”
      3. (integer) 1
    • 在生产者发送消息

      127.0.0.1:6379> publish mq hello1
      (integer) 1

    • 消费者监听到新消息

      127.0.0.1:6379> subscribe mq
      Reading messages… (press Ctrl-C to quit)

      1. “subscribe”
      2. “mq”
      3. (integer) 1
      4. “message”
      5. “mq”
      6. “hello1”

    此外还可以使用psubscribe channelLike命令来执行通配符匹配,满足名称匹配的频道的消息都会被接收。通过这种方式可以很简单的实现一个消费者订阅多个话题。

    例如

    • 消费者监听消息

      127.0.0.1:6379> psubscribe mq*
      Reading messages… (press Ctrl-C to quit)

      1. “psubscribe”
      2. “mq*”
      3. (integer) 1
    • 在生产者发送消息

      127.0.0.1:6379> publish mq1 hello1
      (integer) 1
      127.0.0.1:6379> publish mq2 hello2
      (integer) 1

    • 消费者监听到新消息

      127.0.0.1:6379> psubscribe mq*
      Reading messages… (press Ctrl-C to quit)

      1. “psubscribe”
      2. “mq*”
      3. (integer) 1
      4. “pmessage”
      5. “mq*”
      6. “mq1”
      7. “hello1”
      8. “pmessage”
      9. “mq*”
      10. “mq2”
      11. “hello2”
    1. 多生产者多消费者

    pub-sub实现多生产者是很简单的,和点对点模式基本没有区别。但是多消费者的情况下,会出现消息被重复消费的情况。

    多生产者

    多个生产者向一个频道写入消息:
    例如

    生产者1:

    127.0.0.1:6379> publish mq hello1
    (integer) 1
    
    • 1
    • 2

    生产者2:

    127.0.0.1:6379> publish mq hello2
    (integer) 1
    
    • 1
    • 2

    多消费者
    假设有一个服务,存在两个实例,作为消费者若使用subscribe它们是独立的两个消费者,但是作为功能而言,它们是同一个服务。如果出现多实例反复消费多次的情况下,数据的一致性就会出现问题。

    如果两个实例都订阅了mq这个频道。那么每个消息都会被重复消费一次。

    那要怎么保证并发消费时不被重复消费呢。pub-sub本身并没有解决方案。也无法判断收到的消息是否已经被消费过。只能绕开pub-sub。

    例如让每个实例都监听不同的频道,生产者采用轮询或者随机负载算法,来将消息写入某一个指定的频道,而指定的某个频道,只有一个消费者来消费它,依靠这样的流程来实现并发消费。这种方案性能有较高的保障,但是存在一旦某个实例宕机,某个频道的消息全部都无法被消费的情况。

    或者采用一个分布式锁的方式来调度这样的竞争访问。例如实例A读取到了消息,那么它申请一个锁,在这个锁有效期内,它可以任意的消费频道的消息。而其它竞争对手尝试获取锁失败之后,则不允许消费数据,虽然也接收到了消息,但是直接丢弃,不做处理。直到这个分布式锁释放被另一个实例占用。这种方式实现起来相对简单,但是存在着性能上的极大浪费。严重情况可能还不如单实例监听,但是多实例方案至少避免了单实例挂掉的情况下,所有消息都无法被消费的情况。

    1. 发布订阅方式

    pub-sub原生就是发布订阅模式的实现,因此不再叙述,可以参考上述内容。

    1. 消息应答

    以下通过引入一个纠错消费者的概念来实现ack机制。同时保证消息消费的可靠性。
    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-qdXPncKe-1651055823393)(https://www.hengyumo.cn/momoclouddisk/file/download?code=202204241531189_image.png)]

    针对每一条消息,生成一个唯一不重复的ID来标识消息。纠错消费者接受到消息之后将其存入本地持久化map中,此处可以使用redis或者数据库,考虑到性能采用redis zset的方式会更好一些。

    例如 message Msg27839:hello,Msg27839是该条消息的唯一标识。hello是实际的消息内容。

    纠错消费者获取到该消息之后,先将其存入redis中,分别存入三个key中,一个key是Msg27839,value是hello,用来存储原始的消息,设置时间ttl为1min。一个key是zset ack1:Msg27839,score是当前时间戳+30s的时间,一个key是zset ack2:Msg27839,score也是当前时间戳+30s的时间,分别代表ack1和ack2两个频道的ack接收确认,利用zset的有序性,可以很轻松的一次性读取出所有过期未收到ack报文的消息

    当消费者1消费完消息之后,向ack1频道发送消息,消息内容为消息ID,纠错消费者监听到消息之后,删除zset中的ack1:Msg27839。此时假设消费者2宕机,未能成功消费消息并且发送ack消息到ack2频道。

    纠错服务定时每半分钟读取一次zset,从zset取出分数小于当前时间戳的所有消息,将其重新写入到bak1或者bak2频道中。此时ack3:Msg27839消息未接收到ack报文,因此重新将其推送到频道bak2中进行二次消费,同时重新设置Msg27839这个存储原始消息的key的过期时间为1min。

    此处的30s读取一次zset,我们认为该消费正常被处理的时间是30s如果超过30s未返回ack则认为是失败

    此处的原始消息存储1min时间,是让其能自动过期,释放空间,不用手动删除。1min时间是30s的两倍,是为了保证纠错服务进行回写bak频道时消息仍然能被读取到。

    我们假设一种场景,如果消费者1和消费者2全部宕机的情况下。那么消息也会通过纠错服务一直持久化到redis中,不会导致消息丢失。一旦两个消费者恢复可用,之前的历史消息就能被纠错服务通过定时任务的方式推送给两个消费者,消息没有丢失。

    基于以上的实现逻辑我们总能保证消息都会被所有消费者消费到。并且也间接实现了pub-sub的持久化。

    pub-sub方式的消息队列总结

    • 实时性 实时性极好,实现的消息推送机制本身就是高实时性的。
    • 可靠性 可靠性较差。一旦消费者宕机消息就会直接被丢弃。
    • 功能性 功能性一般,实现简单的发布订阅还好,但是为了满足一些高可用性就需要增加很多额外的操作。

    基于pub-sub的特点,可以推导出在其常用场景:

    不在意消息丢失,不在意消息接收可靠性,需要发布订阅功能,需要高实时性的场景。
    这种场景一般常见于日志推送。

    2.3 基于redis5的stream实现

    https://www.runoob.com/redis/redis-stream.html

    Redis Stream 是 Redis 5.0 版本新增加的数据结构。

    Redis Stream 主要用于消息队列(MQ,Message Queue),Redis 本身是有一个 Redis 发布订阅 (pub/sub) 来实现消息队列的功能,但它有个缺点就是消息无法持久化,如果出现网络断开、Redis 宕机等,消息就会被丢弃。

    简单来说发布订阅 (pub/sub) 可以分发消息,但无法记录历史消息。

    而 Redis Stream 提供了消息的持久化和主备复制功能,可以让任何客户端访问任何时刻的数据,并且能记住每一个客户端的访问位置,还能保证消息不丢失。

    Redis Stream 的结构如下所示,它有一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的 ID 和对应的内容:

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-uijc8w1d-1651055823393)(https://www.hengyumo.cn/momoclouddisk/file/download?code=202204251319871_image.png)]

    其实redis stream是针对list的一个加强,底层的数据结构也是list。其实现原理和前文2.1介绍的list实现发布订阅类似,都是通过一个标志位记录当前consumer消费者访问的位置。

    对于一个stream可以对应多个consumer group(消费者组)。一个消费者组可以包含多个消费者。

    last_deliverd_id记录了该消费者组访问到list的位置,也就是游标。下次读取消息时从该位置继续往前读取。需要注意的是,记录的是消费者组的位置,消费者组中任意一个消费者读取了消息都会使游标 last_delivered_id 往前移动。

    pending_ids 记录了消费者(Consumer)的状态变量,作用是维护消费者的未确认的 id。 pending_ids 记录了当前已经被客户端读取的消息,但是还没有 ack (Acknowledge character:确认字符)。通过这个实现了消息应答机制,保证了消息推送的可靠性。

    stream的部分命令和list一致,只是将L替换成为了X作为特殊标识,例如:

    XADD - 添加消息到末尾
    XTRIM - 对流进行修剪,限制长度
    XLEN - 获取流包含的元素数量,即消息长度
    XRANGE - 获取消息列表,会自动过滤已经删除的消息
    
    • 1
    • 2
    • 3
    • 4

    以上命令在list中都是存在的。

    而以下命令是stream特有的:

    XDEL - 删除消息
    XREVRANGE - 反向获取消息列表,ID 从大到小
    XREAD - 以阻塞或非阻塞方式获取消息列表
    
    XGROUP CREATE - 创建消费者组
    XREADGROUP GROUP - 读取消费者组中的消息
    XACK - 将消息标记为"已处理"
    XGROUP SETID - 为消费者组设置新的最后递送消息ID
    XGROUP DELCONSUMER - 删除消费者
    XGROUP DESTROY - 删除消费者组
    XPENDING - 显示待处理消息的相关信息
    XCLAIM - 转移消息的归属权
    XINFO - 查看流和消费者组的相关信息;
    XINFO GROUPS - 打印消费者组的信息;
    XINFO STREAM - 打印流信息
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    使用stream之前需要保证当前的redis版本>5:

    ./redis-server -v
    Redis server v=5.0.5 sha=00000000:0 malloc=jemalloc-5.1.0 bits=64 build=d448d45b28029e7
    
    • 1
    • 2

    需要注意的是windows的redis已经停止更新了,目前只到3:
    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-fD7QvqR7-1651055823393)(https://www.hengyumo.cn/momoclouddisk/file/download?code=202204251345256_image.png)]

    所以需要使用linux版本。

    实现方式

    1. 生产者写消息

    使用XADD key ID field value [field value ...]来创建并写消息:

    • key :队列名称,如果不存在就创建

    • ID :消息 id,我们使用 * 表示由 redis 生成,可以自定义,但是要自己保证递增性。

    • field value : 记录。

      127.0.0.1:6379> xadd mq * msg hello
      “1650865623927-0”
      127.0.0.1:6379> xadd mq * msg2 hello2 msg3 hello3
      “1650865654238-0”

    查看信息:

    # 查看长度
    127.0.0.1:6379> xlen mq
    (integer) 2
    
    # 查看stream信息
    127.0.0.1:6379> xinfo stream mq
     1) "length"
     2) (integer) 2
     3) "radix-tree-keys"
     4) (integer) 1
     5) "radix-tree-nodes"
     6) (integer) 2
     7) "groups"
     8) (integer) 0
     9) "last-generated-id"
    10) "1650865654238-0"
    11) "first-entry"
    12) 1) "1650865623927-0"
        2) 1) "msg"
           2) "hello"
    13) "last-entry"
    14) 1) "1650865654238-0"
        2) 1) "msg2"
           2) "hello2"
           3) "msg3"
           4) "hello3"
    
    # 查看数据 XRANGE key start end [COUNT count]
    127.0.0.1:6379> xrange mq 1650865623927-0 1650865654238-0
    1) 1) "1650865623927-0"
       2) 1) "msg"
          2) "hello"
    2) 1) "1650865654238-0"
       2) 1) "msg2"
          2) "hello2"
          3) "msg3"
          4) "hello3"
    
    # 使用- + 代表开始到结尾
    127.0.0.1:6379> xrange mq - +
    1) 1) "1650865623927-0"
       2) 1) "msg"
          2) "hello"
    2) 1) "1650865654238-0"
       2) 1) "msg2"
          2) "hello2"
          3) "msg3"
          4) "hello3"
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48

    上述已经往stream中写入了两条数据。

    还可以使用 XTRIM 对流进行修剪,限制长度, 语法格式:

    XTRIM key MAXLEN [~] count

    • key :队列名称
    • MAXLEN :长度
    • count :数量****

    使用 XDEL 删除消息,语法格式:

    XDEL key ID [ID ...]

    • key:队列名称
    • ID :消息 ID

    使用 XREVRANGE 逆序获取消息列表,会自动过滤已经删除的消息 ,语法格式:

    XREVRANGE key end start [COUNT count]

    • key :队列名
    • end :结束值, + 表示最大值
    • start :开始值, - 表示最小值
    • count :数量

    XREAD
    使用 XREAD 以阻塞或非阻塞方式获取消息列表 ,语法格式:

    XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]

    • count :数量
    • milliseconds :可选,阻塞毫秒数,没有设置就是非阻塞模式
    • key :队列名
    • id :消息 ID 该ID代表从此处开始往后读,读取大于该ID的数据,不包含该ID的数据

    例如

    # 读取第一条数据
    127.0.0.1:6379> xread count 1 block 1000 streams mq 0-0
    1) 1) "mq"
       2) 1) 1) "1650865623927-0"
             2) 1) "msg"
                2) "hello"
    
    # 读取1650865623927-0后一条数据
    127.0.0.1:6379> xread count 1 block 1000 streams mq 1650865623927-0
    1) 1) "mq"
       2) 1) 1) "1650865654238-0"
             2) 1) "msg2"
                2) "hello2"
                3) "msg3"
                4) "hello3"
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    1. 消费者读取消息

    创建消费者组

    XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]

    • key :队列名称,如果不存在就创建

    • groupname :组名。

    • id or $ : id代表从该ID之后开始消费。$表示从尾部开始消费,只接受新消息,当前 Stream 消息会全部忽略。

      创建了一个名为c的消费者组,从头部开始消费mq

      127.0.0.1:6379> xgroup create mq c 0-0
      OK

    创建消费者并消费数据

    XREADGROUP GROUP
    使用 XREADGROUP GROUP 读取消费组中的消息,语法格式:

    XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]

    • group :消费组名

    • consumer :消费者名,没有会自动创建。

    • count : 读取数量。

    • milliseconds : 阻塞毫秒数。

    • key : 队列名。

    • ID : 消息 ID,> 代表从头部开始。

      为消费者组c创建消费者c1并从mq读取数据,从头部开始读取

      127.0.0.1:6379> xreadgroup group c c1 streams mq >

        1. “mq”
            1. “1650865623927-0”
              1. “msg”
              2. “hello”
            1. “1650865654238-0”
              1. “msg2”
              2. “hello2”
              3. “msg3”
              4. “hello3”

      读取1个、阻塞10s,从1650865623927-0之后读取

      127.0.0.1:6379> xreadgroup group c c1 count 1 block 10000 streams mq 1650865623927-0

        1. “mq”
            1. “1650865654238-0”
              1. “msg2”
              2. “hello2”
              3. “msg3”
              4. “hello3”

    我们使用ack命令标识stream的两条数据为已经消费完成

    127.0.0.1:6379> xack mq c 1650865623927-0 1650865654238-0
    (integer) 2
    
    • 1
    • 2

    之后再使用xreadgroup读取数据,因为此时队列的数据都已经消费过,所以会阻塞10s等待新的消息推送过来

    127.0.0.1:6379>  xreadgroup group c c1 count 1 block 10000 streams mq >
    (nil)
    (11.15s)
    
    • 1
    • 2
    • 3

    写入一条新的消息:

    127.0.0.1:6379> xadd mq * msg hi
    "1650868048238-0"
    
    • 1
    • 2

    消费者成功接收到消息:

    127.0.0.1:6379>  xreadgroup group c c1 count 1 block 10000 streams mq >
    1) 1) "mq"
       2) 1) 1) "1650868048238-0"
             2) 1) "msg"
                2) "hi"
    (3.62s)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    查看消费者状态:

    127.0.0.1:6379> xinfo groups mq
    1) 1) "name"
       2) "c"
       3) "consumers"
       4) (integer) 1
       5) "pending"
       6) (integer) 2
       7) "last-delivered-id"
       8) "1650868048238-0"
    
    127.0.0.1:6379> xinfo consumers mq c
    1) 1) "name"
       2) "c1"
       3) "pending"
       4) (integer) 2
       5) "idle"
       6) (integer) 270305
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    mq有一个消费者组c,c有一个消费者c1,c当前读取到1650868048238-0,可以看到该位置就是队列的末尾了。但是还有两个pending的,代表有两条消息没有收到ACK,而270305代表其等待的时间ms。

    127.0.0.1:6379> xrange mq - +
    1) 1) "1650865623927-0"
       2) 1) "msg"
          2) "hello"
    2) 1) "1650865654238-0"
       2) 1) "msg2"
          2) "hello2"
          3) "msg3"
          4) "hello3"
    3) 1) "1650867786477-0"
       2) 1) "msg"
          2) "hi"
    4) 1) "1650867796123-0"
       2) 1) "msg"
          2) "hi"
    5) 1) "1650867834732-0"
       2) 1) "msg"
          2) "hi"
    6) 1) "1650868048238-0"
       2) 1) "msg"
          2) "hi"
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    使用xpending查看这两条数据:
    xpendign key groupName [start end count] [consumerName]

    127.0.0.1:6379> xpending mq c
    1) (integer) 2
    2) "1650867834732-0"
    3) "1650868048238-0"
    4) 1) 1) "c1"
          2) "2"
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    原来是"1650867834732-0","1650868048238-0"没有被成功消费掉。

    发送ack,再查看已经没有pending的消息了。

    127.0.0.1:6379> xack mq c 1650867834732-0 1650868048238-0
    (integer) 2
    127.0.0.1:6379> xpending mq c
    1) (integer) 0
    2) (nil)
    3) (nil)
    4) (nil)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    这些pending的数据,同个消费者组下的其它消费者也是读取不到的。

    redis会对这些pending的消息做数据恢复吗?答案是不会,需要手动去处理它,使用xdel将其删除,然后重新写入mq。需要注意的是xdel不会删除pending的数据,需要在重新写入完成之后将其ack。

    例如当前有一条未ack的消息:

    127.0.0.1:6379> xpending mq c - + 10 c1
    1) 1) "1650868714168-0"
       2) "c1"
       3) (integer) 156886
       4) (integer) 1
    
    • 1
    • 2
    • 3
    • 4
    • 5

    我们可以这样操作,将其删除,重新写入mq,然后ack。消费者再次读取时就没有问题了。

    127.0.0.1:6379> xdel mq 1650868714168-0
    (integer) 1
    127.0.0.1:6379> xpending mq c
    1) (integer) 1
    2) "1650868714168-0"
    3) "1650868714168-0"
    4) 1) 1) "c1"
          2) "1"
    127.0.0.1:6379> xadd mq * msg hi
    "1650869436235-0"
    
    127.0.0.1:6379> xack mq c 1650868714168-0
    (integer) 1
    127.0.0.1:6379> xpending mq c
    1) (integer) 0
    2) (nil)
    3) (nil)
    4) (nil)
    127.0.0.1:6379>  xreadgroup group c c2 count 1 block 10000 streams mq >
    1) 1) "mq"
       2) 1) 1) "1650869436235-0"
             2) 1) "msg"
                2) "hi"
    127.0.0.1:6379> xack mq c 1650869436235-0
    (integer) 1
    127.0.0.1:6379> xpending mq c
    1) (integer) 0
    2) (nil)
    3) (nil)
    4) (nil)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    除了删除重新写入的场景之外,还可以使用xclaim来做消息转移。xclaim可以将消息转移到同消费者组的另一个消费者的pending队列中,同时其还会返回消息内容。如果某个消费者故障,转移到另一个消费者受理是一个不错的方式。

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-aSKnxlGE-1651055823393)(https://www.hengyumo.cn/momoclouddisk/file/download?code=202204251524928_image.png)]

    解释:XCLAIM命令用于进行消息转移,当某个等待队列中的消息长时间没有被处理(没有ACK)的时候,可以用XCLAIM命令将其转移到其他消费者的等待列表中。

    key:表示消息队列的名称。
    group:表示消费者组名称。
    consumer:表示消费者名称。
    min-idle-time:表示消息空闲时长(表示消息已经读取,但还未处理)。
    ID [ID …]:可选参数,表示要转移的消息的消息ID,可传入多个消息ID。
    [IDLE ms]:可选参数,设置消息空闲时间(上一次读取消息的时间),如果未指定,这假定IDLE为0,即每次转移消息之后重置消息空闲时间。因为如果空闲时间一直累加的话,消息会一直转移。
    [TIME ms-unix-time]:可选参数,与IDLE参数相同,只是它将空闲时间设置为特定的Unix时间(以毫秒为单位),而不是相对的毫秒量。这对于重写生成XCLAIM命令的AOF文件非常有用。
    [RETRYCOUNT count]:可选参数,设置重试计数器的值,每次消息被读取时,该计数器都会递增。一般XCLAIM命令不需要修改重试计数器的值。
    [FORCE]:可选参数,即使指定要转移的消息的消息ID在其他等待列表中不存在,也强制将该消息ID加入到执行消费者的等待列表中。
    [JUSTID]:可选参数,仅返回要转移消息的消息ID,使用此参数意味着重试计数器不会递增。

    1. 多生产者多消费者

    多生产者
    只要都使用 xadd命令往同一个stream写入即可。

    多消费者
    创建一个消费者组,将多个消费者都包含进该消费者组中,即可。

    1. 发布订阅方式
      实现多个消费者组监听同一个stream即可。

    2. 消息应答
      使用strean的原生的ack机制即可。不过需要注意,对应pending的数据需要安排一个线程定时去处理和恢复。

    stream方式的消息队列总结

    • 实时性 实时性好,可以通过block方式阻塞获取新消息,有高实时性;
    • 可靠性 可靠性好,redis的自带的持久化机制可以防止消息丢失,但是相比kafka的磁盘写入还是略不可靠。自带的ack机制可以满足消息应答实现,防止消息丢失。但是需要注意对于死信息积压在pending区的数据需要定时去处理回收。此外,积压的消息会一直保存在stream中,哪怕已经ack过,还需要额外的一个线程定时将已经读取完的消息删除。
    • 功能性 功能性好,可以轻松实现多种不同的消息队列功能,如多对多、发布订阅模式。

    基于stream的特点,可以推导出在其常用场景:

    大部分使用消息队列的场景都可以使用stream替代。基于redis的高性能和使用内存的机制使得其的性能优于大部分消息队列。在小规模场景会有更出色的表现。但是针对大流量的场景不推荐使用stream,毕竟内存的大小是有限的,这也是所有redis实现的消息队列的局限之处。


    END

    写文不易,这篇文章断断续续写了一周时间,如果对您有帮助不如点个赞支持一下~

    []~( ̄▽ ̄)~*
    
    • 1

    关注我获取golang、架构相关博文更新。
    同时欢迎您访问我的博客www.hengyumo.cn阅读第一手博客。

  • 相关阅读:
    员工未回复群消息被罚200元,企业:符合公司规章制度
    【无标题】
    Linux 查看日志
    TWDS车辆轮对故障、尺寸动态检测系统
    P2241 统计方形(数据加强版)
    Java各种非空判断常用API
    网络协议--Traceroute程序
    神经网络中的知识蒸馏
    Junit执行源码分析,Junit是怎么跑起来的(二)
    threejs texture旋转后,看不到了
  • 原文地址:https://blog.csdn.net/m0_67401545/article/details/126040326