• RocketMQ源码阅读(十四)延迟消息


    目录

    源码实现

    CommitLog#asyncPutMessage

    ScheduleMessageService

    load()

    start()

    DeliverDelayedMessageTimerTask


    当消息写入到Broker后,需要等待指定的时长后才可被消费处理的消息,称为延时消息,本文记录延迟消息在RocketMQ的实现

    在生产者端发送延时消息的代码如下,只需要为消息设置属性DelayTimeLevel,参数为延时级别,即可实现消息延时

    message.setDelayTimeLevel(3);

    RocketMQ不支持自定义延时时间,系统定义了18个延时级别如下

    org.apache.rocketmq.store.config.MessageStoreConfig#messageDelayLevel

    参数3表示该条消息延时10s,参数4表示延时30s,依次类推

    源码实现

    1. 生产者为消息设置延时标记
    2. Broker判断消息如果需要延时,则将该条消息暂存到名为SCHEDULE_TOPIC_XXXX的Topic
    3. Broker端ScheduleMessageService进行延时调度,当消息延时时间到期后,重新将消息发送到原Topic的消费队列,供消费者消费

    RocketMQ规定了延时消息的Topic为SCHEDULE_TOPIC_XXXX,其消息队列也根据延时级别分为18个消息队列,文件存储如下,只有三个文件夹是因为暂时只用到了3个隔离级别,根据下图也能发现延时级别和queueid的关系(queueid = 延时级别 - 1)

    CommitLog#asyncPutMessage

    将消息写入CommitLog时,判断是否需要延时,然后替换Topic和queueid

    再进行消息分发,将消息写入ConsumeQueue时,也有一点特殊处理:

    CommitLog#checkMessageAndReturnSize(java.nio.ByteBuffer, boolean, boolean)

    延时消息存入SCHEDULE_TOPIC_XXXX的消费队列时,tagsCode存储的是消息的到期时间

    ScheduleMessageService

    进行延时任务调度,到期后将消息重新写入CommitLog

    load()

    加载SCHEDULE_TOPIC_XXXX的所有队列的消费offset情况和延迟级别信息到内存Map

    start()

    启动各个队列的消息分发定时任务 

    DeliverDelayedMessageTimerTask

    每个队列的消息分发任务

    executeOnTimeup

    1. 根据延迟级别获取队列对象
    2. 根据offset获取队列中未消费的所有消息,循环获取tagsCode(存储的是消息到期时间)判断是否需要进行消息分发
    3. 如果消息到期就恢复原消息的所有信息重新写入CommitLog,如果未到期则延时一段时间再次执行当前方法executeOnTimeup

  • 相关阅读:
    P1966 [NOIP2013 提高组] 火柴排队
    基于springboot+vue的中小企业财务管理系统(源码+论文)
    Tomcat
    visual studio 中添加qt类报错问题
    【初识算法】-Day1
    【嵌入式——QT】QDockWidget
    医疗项目业务介绍
    深度学习_1 介绍;安装环境
    【LLM】解析pdf文档生成摘要 | 智能文档概览
    acwing算法提高之图论--单源最短路的综合应用
  • 原文地址:https://blog.csdn.net/xyjy11/article/details/126301407