• RocketMQ消息存储机制


    目录

    消息存储概述

    CommitLog

    ConsumeQueue

    IndexFile

    刷盘机制

    源代码

    同步刷盘

    异步刷盘 


    消息存储概述

    RocketMQ默认的消息存储路径在/root/store/

    生产者每次投递的消息都存储在commitLog文件里,再开启一个线程(ReputMessageService)异步的生成consumerQueue和indexFile

    消费者根据queueOffset到conumerQueue找到对应消息的commitLogOffset,再到commitLog文件中找到具体的消息并返回

     

    CommitLog

    1. 默认大小是1G
    2. 文件名是以起始偏移量命名
    3. 顺序写入,随机读写
    4. 被消费的消息不会立即删除,是以删除策略进行删除

    ConsumeQueue

    可以将ConsumeQueue理解为CommitLog的索引,因为CommitLog存储了所有topic的消息,通过引入ConsumeQueue来提高消息消费的速度。默认大小是48M,ConsumeQueue 存储的条目是固定大小,只会存储 8 字节的 commitlog 物理偏移量,4 字节的消息长度和 8 字节 Tag 的哈希值,固定 20 字节。

    IndexFile

    IndexFile就是索引文件,提供了一种可以通过key或时间区间来查询消息的方法

    消息存储的核心代码主要在源码的CommitLog类中的putMessage方法,主要是干了将消息追加写入堆外内存中(对应方法mappedFile.appendMessage())和处理刷盘

    刷盘机制

    RocketMQ如果将消息只存在内存中的话,可靠性不高,虽然刷盘效率低但是可靠性高,并且保证断电后未删除的消息又能恢复使用。RocketMQ提供了两种写入方式,分别是同步刷盘和异步刷盘。

    源代码

    1. public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
    2. // Synchronization flush
    3. if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
    4. final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
    5. if (messageExt.isWaitStoreMsgOK()) {
    6. GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
    7. service.putRequest(request);
    8. boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
    9. if (!flushOK) {
    10. log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
    11. + " client address: " + messageExt.getBornHostString());
    12. putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
    13. }
    14. } else {
    15. service.wakeup();
    16. }
    17. }
    18. // Asynchronous flush
    19. else {
    20. if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
    21. flushCommitLogService.wakeup();
    22. } else {
    23. commitLogService.wakeup();
    24. }
    25. }
    26. }

    同步刷盘

    同步刷盘是等消息写到磁盘时才返回ack给Producer

    优点:不会丢失数据,不会出现消息状态不一致的情况

    缺点:性能低

    流程:

    1. 将消息封装成GroupCommitRequest
    2. 将GroupCommitRequest放到List中,再唤醒GroupCommitService线程
    3. 循环将requestsRead刷到磁盘中
    4. 等待GroupCommitService线程将requestsRead刷完,超时则报错

    异步刷盘 

    异步刷盘是等消息写到堆外内存就返回ACK给Producer

    优点:性能高

    缺点:Master宕机或者磁盘损坏等情况会丢失少量数据

    流程:

    1. 如果距离上次刷盘间隔10s,则直接刷盘
    2. 等待500ms刷一次,默认一次刷4页数据,如果超过10s则直接刷
    3. 刷完之后记录刷盘位置
    4. 最后冲洗之后再退出

    如果将TransientStorePoolEnable设置为true,则会调用CommitRealTimeService线程来异步刷盘,该线程是用fileChannel.write将消息写入磁盘

    流程:

    1. 每次至少刷4页数据
    2. 如果距离上次大于200ms则直接刷
    3. 最后冲洗之后再退出

     

  • 相关阅读:
    阿里二面:SpringCloud 有几种服务调用方式?
    Windows安装C语言环境
    基于Hutool,文件导入读取,批量转换bean并且自定义校验
    Spring Boot的自动装配中的@ConditionalOnBean条件装配注解在Spring启动过程中,是如何保证处理顺序靠后的
    java计算机毕业设计校园快递联盟系统源码+系统+mysql数据库+lw文档
    【python】直方图正则化详解和示例
    73. 矩阵置零
    项目介绍:开源安防摄像机(嵌入式软件)
    Android NFC开发详解:NFC读卡实例解析及总结
    # ubuntu22下配置postgresql远程访问
  • 原文地址:https://blog.csdn.net/qq_35597828/article/details/125540355