• Stream入门


    Redis Stream

    什么是Stream

    Stream是Redis 5.0版本新增加的数据结构,主要用于消息队列(MQ,Message Queue)。

    其实Redis本身有一个发布订阅(publish/subscribe)来实现消息队列的功能,但是有一个缺点就是消息无法持久化,如出现网络断开或者Redis宕机,那么消息则会被丢弃。总结就是发布订阅可以分发消息,但是无法记录历史消息

    而Stream则提供了消息的持久化主备复制的功能,可以让任何客户端访问任何时刻的数据,并且还能记住每一个客户端的访问位置,保证消息不丢失。

    Stream的结构

    它有一个消息链表,将所有加入的消息都串起来,每个消息都有唯一的 ID和对应内容:

    在这里插入图片描述

    每个Stream(MQ)都有唯一的名称,对应Redis中的key,你甚至可以利用

    keys *
    
    • 1

    列出它们。

    • Consumer Group(消费组):消费message的组,一个消费组可以有0到n个消费者(Consumer)
    • last_delivered_id(游标):每个消费组都会有个游标,任意一个消费者读取了消息都会使得游标向前移动。
    • pending_ids(消费者未确认的消息的id):记录当前已经被客户端读取的消息,但是还没有被ack(Acknowledge character:确认字符)的消息的id

    消息(message)相关命令

    Xadd 添加消息到末尾

    Xadd key ID field value [field value ...]
    
    • 1
    • key:队列名称,如果不存在则创建队列
    • ID:消息id,推荐使用*,表示由redis自动生成,如果自定义请确保递增性。
    • field value:记录,key-value键值对的方式存储
    127.0.0.1:6379> Xadd test_queue * name xiaoming age 18
    "1658053697953-0" # 返回由redis自动生成的消息ID
    
    • 1
    • 2

    Xdel 删除消息

    Xdel key ID [ID ...]
    
    • 1

    根据ID删除一个或者多个消息

    127.0.0.1:6379> Xdel test_queue 1658053697953-0
    (integer) 1
    
    • 1
    • 2

    Xlen 获取stream包含的消息数量

    Xlen key
    
    • 1

    返回队列的消息个数

    127.0.0.1:6379> Xlen test_queue
    (integer) 0
    127.0.0.1:6379> Xadd test_queue * name xiaoming age 18
    "1658053868394-0"
    127.0.0.1:6379> Xlen test_queue
    (integer) 1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    Xrange 列出消息列表(不包括已经删除的消息)

    Xrange key start end [COUNT count]
    
    • 1
    • start:开始值,-表示最小值
    • end:结束值,+表示最大值
    • count:数量,默认是列出所有
    127.0.0.1:6379> Xrange test_queue - +
    1) 1) "1658053868394-0"
       2) 1) "name"
          2) "xiaoming"
          3) "age"
          4) "18"
    127.0.0.1:6379> Xadd test_queue * name xiaoming2 age 20
    "1658054319414-0"
    127.0.0.1:6379> Xrange test_queue - +
    1) 1) "1658053868394-0"
       2) 1) "name"
          2) "xiaoming"
          3) "age"
          4) "18"
    2) 1) "1658054319414-0"
       2) 1) "name"
          2) "xiaoming2"
          3) "age"
          4) "20"
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    Xrevrange 反向迭代消息列表

    Xrevrange key end start [COUNT count]
    
    • 1

    Xread 以阻塞或非阻塞的方式读取消息

    不使用消费组,直接进行消息的读取

    XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
    
    • 1
    • milliseconds(可选参数):阻塞毫秒数,默认是非阻塞模式
    • id:可以指定确切的消息id,也可以使用0-0表示用首部开始读

    Tips

    STREAMS表示可以同时读取多个队列中的消息

    消费组(Xgroup)常用命令

    Xgroup create 创建消费组

    为队列创建一个从某位置开始读取的消费组

    XGROUP [CREATE key groupname id-or-$] [MKSTREAM]
    
    • 1
    • key:队列名称
    • groupname:消费组名
    • id-or- : i d 表示从确切的地方开始消费, :id表示从确切的地方开始消费, id表示从确切的地方开始消费,从尾部开始开始消费(即,只接受新消息)
    • MKSTREAM:创建队列。如果在创建消费组的时候key不存在(队列不存在),则同时会创建队列

    Tips

    如果队列中的消费组(同一队列中,消费组名不能重复)已经存在,则应该会产生异常。

    127.0.0.1:6379> XGROUP CREATE test_queue test_group 0-0
    OK # 为test_queue队列创建名为test_group的消费组,从头开始读
    127.0.0.1:6379> XGROUP CREATE test_queue test_group 0-0
    (error) BUSYGROUP Consumer Group name already exists # 消费组重复,抛出异常
    
    • 1
    • 2
    • 3
    • 4

    0-0表示创建消费组时的游标指向此时的首部,切记切记切记

    Xgroup delconsumer 删除组中消费者

    删除某队列中某消费组的某个消费者

    XGROUP [DELCONSUMER key groupname consumername]
    
    • 1
    • key:队列名称
    • groupname:消费组名
    • consumername:消费者名

    Xgroup destroy 销毁消费组

    删除某队列中的某消费组

    XGROUP [DESTROY key groupname]
    
    • 1
    • key:队列名称
    • groupname:消费组名

    消费组读取(Xreadgroup)

    指定消费组中的**消费者(consumer)**读取队列中的消息

    XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
    
    • 1
    • group:消费组名
    • consumer:消费者名
    • count:读取消息的数量
    • milliseconds:阻塞毫秒数
    • key:队列名
    • ID:消息ID

    Tips

    消费组read后的消息会立马进入pengding,待消息被ack后,消息会从pending中消失

    使用名为c1的消费组代表消费组test_group消费消息,由于创建消费组时,游标指向首部,所以使用>来表示消费首部的右边一个消息,即首个消息。此时消息进入pending

    127.0.0.1:6379> XREADGROUP GROUP test_group c1 count 1 STREAMS test_queue >
    1) 1) "test_queue"
       2) 1) 1) "1658053868394-0"
             2) 1) "name"
                3) "age"
                4) "18"
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    待确认消息组(Xpending)

    xpending key group [[IDLE min-idle-time] start end count [consumer]]
    
    • 1
    • min-idle-time:限制时间过滤器
    • start:从什么地方开始
    • end:从什么地方结束
    • count:显示数量
    • consumer:可选,消费者名

    可以查看到刚刚c1消费者消费的消息进入了pending

    127.0.0.1:6379> xpending test_queue test_group - + 99
    1) 1) "1658053868394-0"
       2) "c1"
       3) (integer) 1073303
       4) (integer) 1
    
    • 1
    • 2
    • 3
    • 4
    • 5

    待消息被ack后才会从pending列表中移除

    转移消息归属权(Xclaim)

    可以将进入pending的消息转移给其他消费者(副作用:重置消息的空闲时间)如果两个消费者同时认领消息将永远不会成功,只有一个消费者能成功认领。

    Xclaim key group consumer min-idle-time ID [ID ...] [IDLE ms] [TIME ms-unix-time] [RETRYCOUNT count] [force] [justid] 
    
    • 1
    • key:队列名称
    • group:消费组名称
    • consumer:消费者名称
    • min-idle-time:最小闲置时间(单位:毫秒),即只会尝试认领超过该闲置时间的消息。如:消息1的闲置时间是1000,消息2的闲置时间是500,min-idle-time设置为700,那么只有消息1会被重新认领,消息2则不会被重新认领。
    • ID:消息的id
    127.0.0.1:6379> xpending test_queue test_group - + 10
    1) 1) "1658053868394-0"
       2) "c2"
       3) (integer) 16611
       4) (integer) 2
    127.0.0.1:6379> Xclaim test_queue test_group c3 86400000 1658053868394-0
    (empty array)	# 尝试让c3认领消息,但是消息的闲置时间没有超过8640000
    127.0.0.1:6379> Xclaim test_queue test_group c3 10000 1658053868394-0
    1) 1) "1658053868394-0"
       2) 1) "name"
          2) "xiaoming"
          3) "age"
          4) "18"
    127.0.0.1:6379> xpending test_queue test_group - + 99
    1) 1) "1658053868394-0"
       2) "c3"
       3) (integer) 70866
       4) (integer) 3
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    可以看到消息已经进入了c3的pending

    确认消息(Xack)

    确认某队列某消费组的消息

    Xack key group ID [ID ...]
    
    • 1
    127.0.0.1:6379> xpending test_queue test_group - + 99
    1) 1) "1658053868394-0"
       2) "c3"
       3) (integer) 70866
       4) (integer) 3
    127.0.0.1:6379> Xack test_queue test_group 1658053868394-0
    (integer) 1
    127.0.0.1:6379> xpending test_queue test_group - + 99
    (empty array)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    消息被ack之后,就会从pending中消失

    详情(Xinfo)常用命令

    Xinfo [CONSUMERS key groupname] [GROUPS key] [STREAM key] [HELP]
    
    • 1

    stream详情

    根据队列名称查看队列详情

    Xinfo STREAM key
    
    • 1
    127.0.0.1:6379> Xinfo STREAM test_queue
     1) "length"
     2) (integer) 2
     3) "radix-tree-keys"
     4) (integer) 1
     5) "radix-tree-nodes"
     6) (integer) 2
     7) "last-generated-id"
     8) "1658054319414-0"
     9) "groups"
    10) (integer) 1
    11) "first-entry"
    12) 1) "1658053868394-0"
        2) 1) "name"
           2) "xiaoming"
           3) "age"
           4) "18"
    13) "last-entry"
    14) 1) "1658054319414-0"
        2) 1) "name"
           2) "xiaoming2"
           3) "age"
           4) "20"
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • “length”:消息的长度(个数)
    • “groups”:队列中消费组的个数
    • “first-entry”:首消息
    • “last-entry”:尾消息

    groups详情

    查看队列中的所有消费组

    Xinfo GROUPS key
    
    • 1
    127.0.0.1:6379> Xinfo GROUPS test_queue
    1) 1) "name"
       2) "test_group"
       3) "consumers"
       4) (integer) 3
       5) "pending"
       6) (integer) 0
       7) "last-delivered-id"
       8) "1658053868394-0"
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    CONSUMERS详情

    查看队列中某消费组中的所有消费者

    Xinfo CONSUMERS key groupname
    
    • 1
    127.0.0.1:6379> Xinfo CONSUMERS test_queue test_group
    1) 1) "name"
       2) "c1"
       3) "pending"
       4) (integer) 0
       5) "idle"
       6) (integer) 164374831
    2) 1) "name"
       2) "c2"
       3) "pending"
       4) (integer) 0
       5) "idle"
       6) (integer) 1759377
    3) 1) "name"
       2) "c3"
       3) "pending"
       4) (integer) 0
       5) "idle"
       6) (integer) 1508038
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    Tips

    idle是闲置时间,单位是毫秒。

    参考链接

    本笔记大量内容参考自https://www.runoob.com/redis/redis-stream.html

  • 相关阅读:
    【Mybatis】动态SQL之choose标签
    详解设计模式:模版方法模式
    阿里云优惠券如何领取(阿里云在哪领取优惠券)
    【华为OD机试真题 python】数轴上两个点集距离
    计算机毕业设计ssm社区疫情防控系统3j56g系统+程序+源码+lw+远程部署
    某医疗机构:建立S-SDLC安全开发流程,保障医疗前沿科技应用高质量发展
    外文论文的格式规范要求有哪些?
    【前端基础小案例】HTML+CSS实现酷狗音乐热榜效果
    [ansible] playbook运用
    114页5万字字智能交通大数据综合服务平台建设方案
  • 原文地址:https://blog.csdn.net/weixin_45747080/article/details/125876687