目录
Redis Stream 是 Redis 5.0 版本新增加的数据结构。
Redis Stream 主要用于消息队列(MQ,Message Queue),Redis 本身是有一个 Redis 发布订阅 (pub/sub) 来实现消息队列的功能,但它有 个缺点就是消息无法持久化,如果出现网络断开、Redis 宕机等,消息就会被丢弃。
简单来说发布订阅 (pub/sub) 可以分发消息,但无法记录历史消息。而 Redis Stream 提供了消息的持久化和主备复制功能,可以让任何客户
端访问任何时刻的数据,并且能记住每一个客户端的访问位置,还能保证消息不丢失。
流(stream)中信息条目(一个 Stream 条目,由一个或多个键值对组成的)的 ID 必须是单调增的,为此,redis 采用时间戳+自增 id 这种方式 来保证,并且这两个数都是 64bit,不会有溢出问题,最后一点,redis 在增加信息条目时会检查当前 id 与上一条目的 id,自动纠正错误的情况, 一定要保证后面的 id 比前面大。
一个流中信息条目的 ID 必须是单调增的,这是流的基础
几个特殊的 ID:
- + :最小和最大可能出现的 Id
$ :当前流中最大的 id,可用于将要到来的信息
> :用于 XREADGROUP 命令中,表示迄今还没有发送给组中使用者的信息,会更新消费者组的最后 ID
* :用于 XADD 命令中,让系统自动生成 id

Consumer Group:消费组,使用 XGROUP CREATE 命令创建,一个消费组有多个消费者(Consumer)。
last_delivered_id:游标,每个消费组会有个 last_delivered_id,任意一个消费者读取了消息都会使游标 last_delivered_id 往前移动。
pending_ids:消费者(Consumer)的状态变量,作用是维护消费者的未确认的 id。pending_ids 记录了当前已经被客户端读取的消息,但是还没 有 ack (Acknowledge character:确认字符)。
xadd mystream * id 1 name longge #生产消息
xdel mystream 1654521221581-0 #删除某条消息
xrange mystream - + #查看消息列表
XREAD count 1 streams mystream 0 #读取一条消息
xgroup create mystream group1 0 #创建消费者组
xreadgroup group group1 c1 count 1 streams mystream > #消费者读消息
xpending stream1 group1 - + 10 c1 #查看待确认的列表
XACK mystream group1 1654522733049-0 #确认消息
XADD - 添加消息到末尾
XTRIM - 对流进行修剪,限制长度
XDEL - 删除消息
XLEN - 获取流包含的元素数量,即消息长度
XRANGE - 获取消息列表,会自动过滤已经删除的消息
XREVRANGE - 反向获取消息列表,ID 从大到小
XREAD - 以阻塞或非阻塞方式获取消息列表
XGROUP CREATE - 创建消费者组
XACK - 将消息标记为"已处理"
XGROUP SETID - 为消费者组设置新的最后递送消息 ID
XGROUP DELCONSUMER - 删除消费者
XGROUP DESTROY - 删除消费者组
XPENDING - 显示待处理消息的相关信息
XCLAIM - 转移消息的归属权
XINFO - 查看流和消费者组的相关信息
XREADGROUP GROUP - 读取消费者组中的消息