目录
DeliverDelayedMessageTimerTask
当消息写入到Broker后,需要等待指定的时长后才可被消费处理的消息,称为延时消息,本文记录延迟消息在RocketMQ的实现
在生产者端发送延时消息的代码如下,只需要为消息设置属性DelayTimeLevel,参数为延时级别,即可实现消息延时
message.setDelayTimeLevel(3);
RocketMQ不支持自定义延时时间,系统定义了18个延时级别如下
org.apache.rocketmq.store.config.MessageStoreConfig#messageDelayLevel
![]()
参数3表示该条消息延时10s,参数4表示延时30s,依次类推
RocketMQ规定了延时消息的Topic为SCHEDULE_TOPIC_XXXX,其消息队列也根据延时级别分为18个消息队列,文件存储如下,只有三个文件夹是因为暂时只用到了3个隔离级别,根据下图也能发现延时级别和queueid的关系(queueid = 延时级别 - 1)

将消息写入CommitLog时,判断是否需要延时,然后替换Topic和queueid

再进行消息分发,将消息写入ConsumeQueue时,也有一点特殊处理:
CommitLog#checkMessageAndReturnSize(java.nio.ByteBuffer, boolean, boolean)
延时消息存入SCHEDULE_TOPIC_XXXX的消费队列时,tagsCode存储的是消息的到期时间

进行延时任务调度,到期后将消息重新写入CommitLog
加载SCHEDULE_TOPIC_XXXX的所有队列的消费offset情况和延迟级别信息到内存Map

启动各个队列的消息分发定时任务

每个队列的消息分发任务

executeOnTimeup

