目录
RocketMQ默认的消息存储路径在/root/store/
生产者每次投递的消息都存储在commitLog文件里,再开启一个线程(ReputMessageService)异步的生成consumerQueue和indexFile
消费者根据queueOffset到conumerQueue找到对应消息的commitLogOffset,再到commitLog文件中找到具体的消息并返回

可以将ConsumeQueue理解为CommitLog的索引,因为CommitLog存储了所有topic的消息,通过引入ConsumeQueue来提高消息消费的速度。默认大小是48M,ConsumeQueue 存储的条目是固定大小,只会存储 8 字节的 commitlog 物理偏移量,4 字节的消息长度和 8 字节 Tag 的哈希值,固定 20 字节。
IndexFile就是索引文件,提供了一种可以通过key或时间区间来查询消息的方法
消息存储的核心代码主要在源码的CommitLog类中的putMessage方法,主要是干了将消息追加写入堆外内存中(对应方法mappedFile.appendMessage())和处理刷盘
RocketMQ如果将消息只存在内存中的话,可靠性不高,虽然刷盘效率低但是可靠性高,并且保证断电后未删除的消息又能恢复使用。RocketMQ提供了两种写入方式,分别是同步刷盘和异步刷盘。
- public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
- // Synchronization flush
- if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
- final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
- if (messageExt.isWaitStoreMsgOK()) {
- GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
- service.putRequest(request);
- boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
- if (!flushOK) {
- log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
- + " client address: " + messageExt.getBornHostString());
- putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
- }
- } else {
- service.wakeup();
- }
- }
- // Asynchronous flush
- else {
- if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
- flushCommitLogService.wakeup();
- } else {
- commitLogService.wakeup();
- }
- }
- }
同步刷盘是等消息写到磁盘时才返回ack给Producer

优点:不会丢失数据,不会出现消息状态不一致的情况
缺点:性能低
流程:
异步刷盘是等消息写到堆外内存就返回ACK给Producer

优点:性能高
缺点:Master宕机或者磁盘损坏等情况会丢失少量数据
流程:
如果将TransientStorePoolEnable设置为true,则会调用CommitRealTimeService线程来异步刷盘,该线程是用fileChannel.write将消息写入磁盘
流程: