Stream是Redis 5.0版本新增加的数据结构,主要用于消息队列(MQ,Message Queue)。
其实Redis本身有一个发布订阅(publish/subscribe)来实现消息队列的功能,但是有一个缺点就是消息无法持久化,如出现网络断开或者Redis宕机,那么消息则会被丢弃。总结就是发布订阅可以分发消息,但是无法记录历史消息。
而Stream则提供了消息的持久化和主备复制的功能,可以让任何客户端访问任何时刻的数据,并且还能记住每一个客户端的访问位置,保证消息不丢失。
它有一个消息链表,将所有加入的消息都串起来,每个消息都有唯一的 ID和对应内容:

每个Stream(MQ)都有唯一的名称,对应Redis中的key,你甚至可以利用
keys *
列出它们。
Xadd key ID field value [field value ...]
*,表示由redis自动生成,如果自定义请确保递增性。127.0.0.1:6379> Xadd test_queue * name xiaoming age 18
"1658053697953-0" # 返回由redis自动生成的消息ID
Xdel key ID [ID ...]
根据ID删除一个或者多个消息
127.0.0.1:6379> Xdel test_queue 1658053697953-0
(integer) 1
Xlen key
返回队列的消息个数
127.0.0.1:6379> Xlen test_queue
(integer) 0
127.0.0.1:6379> Xadd test_queue * name xiaoming age 18
"1658053868394-0"
127.0.0.1:6379> Xlen test_queue
(integer) 1
Xrange key start end [COUNT count]
-表示最小值+表示最大值127.0.0.1:6379> Xrange test_queue - +
1) 1) "1658053868394-0"
2) 1) "name"
2) "xiaoming"
3) "age"
4) "18"
127.0.0.1:6379> Xadd test_queue * name xiaoming2 age 20
"1658054319414-0"
127.0.0.1:6379> Xrange test_queue - +
1) 1) "1658053868394-0"
2) 1) "name"
2) "xiaoming"
3) "age"
4) "18"
2) 1) "1658054319414-0"
2) 1) "name"
2) "xiaoming2"
3) "age"
4) "20"
Xrevrange key end start [COUNT count]
不使用消费组,直接进行消息的读取
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
0-0表示用首部开始读Tips
STREAMS表示可以同时读取多个队列中的消息
为队列创建一个从某位置开始读取的消费组
XGROUP [CREATE key groupname id-or-$] [MKSTREAM]
Tips
如果队列中的消费组(同一队列中,消费组名不能重复)已经存在,则应该会产生异常。
127.0.0.1:6379> XGROUP CREATE test_queue test_group 0-0
OK # 为test_queue队列创建名为test_group的消费组,从头开始读
127.0.0.1:6379> XGROUP CREATE test_queue test_group 0-0
(error) BUSYGROUP Consumer Group name already exists # 消费组重复,抛出异常
0-0表示创建消费组时的游标指向此时的首部,切记切记切记
删除某队列中某消费组的某个消费者
XGROUP [DELCONSUMER key groupname consumername]
删除某队列中的某消费组
XGROUP [DESTROY key groupname]
指定消费组中的**消费者(consumer)**读取队列中的消息
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
Tips
消费组read后的消息会立马进入pengding,待消息被ack后,消息会从pending中消失
使用名为c1的消费组代表消费组test_group消费消息,由于创建消费组时,游标指向首部,所以使用>来表示消费首部的右边一个消息,即首个消息。此时消息进入pending
127.0.0.1:6379> XREADGROUP GROUP test_group c1 count 1 STREAMS test_queue >
1) 1) "test_queue"
2) 1) 1) "1658053868394-0"
2) 1) "name"
3) "age"
4) "18"
xpending key group [[IDLE min-idle-time] start end count [consumer]]
可以查看到刚刚c1消费者消费的消息进入了pending
127.0.0.1:6379> xpending test_queue test_group - + 99
1) 1) "1658053868394-0"
2) "c1"
3) (integer) 1073303
4) (integer) 1
待消息被ack后才会从pending列表中移除
可以将进入pending的消息转移给其他消费者(副作用:重置消息的空闲时间)如果两个消费者同时认领消息将永远不会成功,只有一个消费者能成功认领。
Xclaim key group consumer min-idle-time ID [ID ...] [IDLE ms] [TIME ms-unix-time] [RETRYCOUNT count] [force] [justid]
127.0.0.1:6379> xpending test_queue test_group - + 10
1) 1) "1658053868394-0"
2) "c2"
3) (integer) 16611
4) (integer) 2
127.0.0.1:6379> Xclaim test_queue test_group c3 86400000 1658053868394-0
(empty array) # 尝试让c3认领消息,但是消息的闲置时间没有超过8640000
127.0.0.1:6379> Xclaim test_queue test_group c3 10000 1658053868394-0
1) 1) "1658053868394-0"
2) 1) "name"
2) "xiaoming"
3) "age"
4) "18"
127.0.0.1:6379> xpending test_queue test_group - + 99
1) 1) "1658053868394-0"
2) "c3"
3) (integer) 70866
4) (integer) 3
可以看到消息已经进入了c3的pending
确认某队列某消费组的消息
Xack key group ID [ID ...]
127.0.0.1:6379> xpending test_queue test_group - + 99
1) 1) "1658053868394-0"
2) "c3"
3) (integer) 70866
4) (integer) 3
127.0.0.1:6379> Xack test_queue test_group 1658053868394-0
(integer) 1
127.0.0.1:6379> xpending test_queue test_group - + 99
(empty array)
消息被ack之后,就会从pending中消失
Xinfo [CONSUMERS key groupname] [GROUPS key] [STREAM key] [HELP]
根据队列名称查看队列详情
Xinfo STREAM key
127.0.0.1:6379> Xinfo STREAM test_queue
1) "length"
2) (integer) 2
3) "radix-tree-keys"
4) (integer) 1
5) "radix-tree-nodes"
6) (integer) 2
7) "last-generated-id"
8) "1658054319414-0"
9) "groups"
10) (integer) 1
11) "first-entry"
12) 1) "1658053868394-0"
2) 1) "name"
2) "xiaoming"
3) "age"
4) "18"
13) "last-entry"
14) 1) "1658054319414-0"
2) 1) "name"
2) "xiaoming2"
3) "age"
4) "20"
查看队列中的所有消费组
Xinfo GROUPS key
127.0.0.1:6379> Xinfo GROUPS test_queue
1) 1) "name"
2) "test_group"
3) "consumers"
4) (integer) 3
5) "pending"
6) (integer) 0
7) "last-delivered-id"
8) "1658053868394-0"
查看队列中某消费组中的所有消费者
Xinfo CONSUMERS key groupname
127.0.0.1:6379> Xinfo CONSUMERS test_queue test_group
1) 1) "name"
2) "c1"
3) "pending"
4) (integer) 0
5) "idle"
6) (integer) 164374831
2) 1) "name"
2) "c2"
3) "pending"
4) (integer) 0
5) "idle"
6) (integer) 1759377
3) 1) "name"
2) "c3"
3) "pending"
4) (integer) 0
5) "idle"
6) (integer) 1508038
Tips
idle是闲置时间,单位是毫秒。
参考链接
本笔记大量内容参考自https://www.runoob.com/redis/redis-stream.html