• 浅析RocketMQ-消息重建


    所谓消息重建是指broker接收到消息之后,将消息再分发给comsumequeue和index,进行追加新数据的过程。

    本篇将分为三部分进行阐述:

    1. 重建服务ReputMessageService
    2. 追加消费队列文件CommitLogDispatcherBuildConsumeQueue
    3. 追加索引文件CommitLogDispatcherBuildIndex

    一. ReputMessageService

    ReputMessageService 实现Runnable接口,在构造DefualtMessageStore对象时,会初始化并启动。
    run方法内部会每隔1ms调用一次doReput方法

        class ReputMessageService extends ServiceThread {
    		// 起始重建偏移量
            private volatile long reputFromOffset = 0;
    
            private void doReput() {
            	// getMinOffset这个方法获取commitlog最小的可用文件起始偏移量
                if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
                    this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
                }
                // isCommitLogAvailable的判断标准是reputFromOffset 小于 commitlog最大文件的已提交偏移量
                for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
    				// 是否可重复发送,默认false
                    if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()
                        && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
                        break;
                    }
    				// 获取已提交的所有数据
                    SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
                    if (result != null) {
                        try {
                        	// 获取最新的偏移量
                            this.reputFromOffset = result.getStartOffset();
    						
                            for (int readSize = 0; readSize < result.getSize() && doNext; ) {
                            	// checkMessageAndReturnSize这个方法主要从SelectMappedBufferResult中读取一条消息,并封装成一个DispatchRequest 
                                DispatchRequest dispatchRequest =
                                    DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
                                // 如果有额外操作对消息进行包装,消息大小就不是消息的大小了,默认都是 dispatchRequest.getMsgSize() 	
                                int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();
    							// 读取消息成功都是success
                                if (dispatchRequest.isSuccess()) {
                                    if (size > 0) {
                                    	// 将消息分发给consumequeue和index进行追加数据
                                        DefaultMessageStore.this.doDispatch(dispatchRequest);
    									// 如果是master broker则通知其他监听服务有消息到达了
                                        if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
                                                && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()
                                                && DefaultMessageStore.this.messageArrivingListener != null) {
                                            DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
                                                dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
                                                dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
                                                dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
                                            notifyMessageArrive4MultiQueue(dispatchRequest);
                                        }
    
                                        this.reputFromOffset += size;
                                        readSize += size;
                                        ... // 省略数据统计的操作
    
                                    } else if (size == 0) {
                                        this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
                                        readSize = result.getSize();
                                    }
                                } else if (!dispatchRequest.isSuccess()) {}
                            }
                        } finally {
                        	// 这里操作来源自getData操作,解除引用,可以看做解锁操作
                            result.release();
                        }
                    } else {
                    	// 没有可重建的数据退出循环
                        doNext = false;
                    }
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    1.getData

    getData方法分成两步走:

    1. 根据offset查找对应mappedFile
        public SelectMappedBufferResult getData(final long offset) {
            return this.getData(offset, offset == 0);
        }
    
        public SelectMappedBufferResult getData(final long offset, final boolean returnFirstOnNotFound) {
            int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();
            // 查找对应文件
            MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, returnFirstOnNotFound);
            if (mappedFile != null) {
            	// 计算在MappedFile中的偏移量
                int pos = (int) (offset % mappedFileSize);
                SelectMappedBufferResult result = mappedFile.selectMappedBuffer(pos);
                return result;
            }
    
            return null;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    1. mappedFile 找对对应的数据,并生成一个共享内存缓存区
        public SelectMappedBufferResult selectMappedBuffer(int pos) {
        	// 已提交指针
            int readPosition = getReadPosition();
            if (pos < readPosition && pos >= 0) {
            	// 对应上文的release操作
                if (this.hold()) {
                    ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
                    byteBuffer.position(pos);
                    int size = readPosition - pos;
                    ByteBuffer byteBufferNew = byteBuffer.slice();
                    byteBufferNew.limit(size);
                    return new SelectMappedBufferResult(this.fileFromOffset + pos, byteBufferNew, size, this);
                }
            }
    
            return null;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    2.doDispatch

    doDispatch 这里会将封装的数据转给对应的处理器,dispatcherList 在初始化时,放入了CommitLogDispatcherBuildConsumeQueue和CommitLogDispatcherBuildIndex两种CommitLogDispatcher 处理器

        public void doDispatch(DispatchRequest req) {
            for (CommitLogDispatcher dispatcher : this.dispatcherList) {
                dispatcher.dispatch(req);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    二. CommitLogDispatcherBuildConsumeQueue

    CommitLogDispatcherBuildConsumeQueue 主要做一个中转,实际由putMessagePositionInfo执行

        class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {
            public void dispatch(DispatchRequest request) {
                final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
                switch (tranType) {
                    case MessageSysFlag.TRANSACTION_NOT_TYPE:
                    case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                        DefaultMessageStore.this.putMessagePositionInfo(request);
                        break;
                    case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
                    case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                        break;
                }
            }
        }
        	
        public void putMessagePositionInfo(DispatchRequest dispatchRequest) {
        	// 查找要追加数据的队列
            ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());
            // 将数据追加到队列中
            cq.putMessagePositionInfoWrapper(dispatchRequest, checkMultiDispatchQueue(dispatchRequest));
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    1. 查找队列

    查找的过程与查找实际文件类似,先寻找topic的文件夹,再查找该目录下queue文件夹。不过这个过程中并没有建立实际的文件路径

        public ConsumeQueue findConsumeQueue(String topic, int queueId) {
        	// 根据topic找到对应ConsumeQueue
            ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);
            // 首次构建进行初始化
            if (null == map) {
                ConcurrentMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);
                ConcurrentMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);
                if (oldMap != null) {
                    map = oldMap;
                } else {
                    map = newMap;
                }
            }
    		// 查找对应队列
            ConsumeQueue logic = map.get(queueId);
            // 不存在则新建一个队列
            if (null == logic) {
                ConsumeQueue newLogic = new ConsumeQueue(
                    topic,
                    queueId,
                    StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
                    this.getMessageStoreConfig().getMappedFileSizeConsumeQueue(),
                    this);
                ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);
                if (oldLogic != null) {
                    logic = oldLogic;
                } else {
                    if (MixAll.isLmq(topic)) {
                        lmqConsumeQueueNum.getAndIncrement();
                    }
                    logic = newLogic;
                }
            }
    
            return logic;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    2. 追加数据
    public void putMessagePositionInfoWrapper(DispatchRequest request, boolean multiQueue) {
    		// 最大重试次数
            final int maxRetries = 30;
            // 是否可以写入
            boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable();
            for (int i = 0; i < maxRetries && canWrite; i++) {
                long tagsCode = request.getTagsCode();
                // 是否有额外的扩展信息,默认为false
                if (isExtWriteEnable()) {...}
                
                boolean result = this.putMessagePositionInfo(request.getCommitLogOffset(),
                    request.getMsgSize(), tagsCode, request.getConsumeQueueOffset());
                if (result) {
                    ...
                    return;
                } else {
                	// 追加失败间隔一秒重试
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) { }
                }
            }
            this.defaultMessageStore.getRunningFlags().makeLogicsQueueError();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
      private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
            final long cqOffset) {
    		// 可能存在重复构建消费队列消息
            if (offset + size <= this.maxPhysicOffset) {
                return true;
            }
    		// 将消息条目存入buffer中,CQ_STORE_UNIT_SIZE =20,每个写入的条目大小都是20字节
            this.byteBufferIndex.flip();
            this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
            this.byteBufferIndex.putLong(offset);
            this.byteBufferIndex.putInt(size);
            this.byteBufferIndex.putLong(tagsCode);
    		// 期待的添加位置的偏移量
            final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;
    		// 查找期待偏移值对应的文件,这里如果没找到对应文件会创建一个实际的文件
            MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
            if (mappedFile != null) {
    			// 首次创建的文件
                if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) {
                    this.minLogicOffset = expectLogicOffset;
                    this.mappedFileQueue.setFlushedWhere(expectLogicOffset);
                    this.mappedFileQueue.setCommittedWhere(expectLogicOffset);
                    this.fillPreBlank(mappedFile, expectLogicOffset);
                }
    			// 这里只有 currentLogicOffset == expectLogicOffset 才是正常情况
                if (cqOffset != 0) {
                    long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();
                    if (expectLogicOffset < currentLogicOffset) {
                        return true;
                    }
                    if (expectLogicOffset != currentLogicOffset) {}
                }
                this.maxPhysicOffset = offset + size;
                // 这里将消息条目追到到mappedByteBuffer中
                return mappedFile.appendMessage(this.byteBufferIndex.array());
            }
            return false;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    putMessagePositionInfo 操作只是将数据追到到mappedByteBuffer中,那什么时候存入硬盘呢
    答案是FlushConsumeQueueService

    3.FlushConsumeQueueService

    类似于commitlog的刷盘操作,comsumequeue也有一个刷盘服务FlushConsumeQueueService。
    FlushConsumeQueueService默认情况是1s执行一次

    class FlushConsumeQueueService extends ServiceThread {
            private static final int RETRY_TIMES_OVER = 3;
            private long lastFlushTimestamp = 0;
    
            private void doFlush(int retryTimes) {
            	// 最少刷盘页面,默认2页
                int flushConsumeQueueLeastPages = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueLeastPages();
    			// 服务退出时,会强制刷盘一次
                if (retryTimes == RETRY_TIMES_OVER) {
                    flushConsumeQueueLeastPages = 0;
                }
    
                long logicsMsgTimestamp = 0;
    			// 最大刷盘间隔,默认60s
                int flushConsumeQueueThoroughInterval = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueThoroughInterval();
                long currentTimeMillis = System.currentTimeMillis();
                // 长久未刷盘,强制刷一次
                if (currentTimeMillis >= (this.lastFlushTimestamp + flushConsumeQueueThoroughInterval)) {
                    this.lastFlushTimestamp = currentTimeMillis;
                    flushConsumeQueueLeastPages = 0;
                    logicsMsgTimestamp = DefaultMessageStore.this.getStoreCheckpoint().getLogicsMsgTimestamp();
                }
    
                ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable;
    			// 遍历所有主题下的队列,持久化数据
                for (ConcurrentMap<Integer, ConsumeQueue> maps : tables.values()) {
                    for (ConsumeQueue cq : maps.values()) {
                        boolean result = false;
                        for (int i = 0; i < retryTimes && !result; i++) {
                        	// 具体的flush操作与commitlog的flush操作一致,前文说过,此处不赘述
                            result = cq.flush(flushConsumeQueueLeastPages);
                        }
                    }
                }
    
                if (0 == flushConsumeQueueLeastPages) {
                    if (logicsMsgTimestamp > 0) {
                        DefaultMessageStore.this.getStoreCheckpoint().setLogicsMsgTimestamp(logicsMsgTimestamp);
                    }
                    DefaultMessageStore.this.getStoreCheckpoint().flush();
                }
            }
    
     }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    三. CommitLogDispatcherBuildIndex

    CommitLogDispatcherBuildIndex 用于中转构建Index操作,buildIndex逻辑大体如下:

    1. 查找Index文件,如果不存在则新建一个
    2. 根据唯一key,追加条目数据
    3. 根据指定的key,追加条目数据
        class CommitLogDispatcherBuildIndex implements CommitLogDispatcher {
            public void dispatch(DispatchRequest request) {
            	// 是否开启构建Index,默认true
                if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) {
                    DefaultMessageStore.this.indexService.buildIndex(request);
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
        public void buildIndex(DispatchRequest req) {
        	// 获取索引文件,不存在则新建一个
            IndexFile indexFile = retryGetAndCreateIndexFile();
            if (indexFile != null) {
                long endPhyOffset = indexFile.getEndPhyOffset();
                DispatchRequest msg = req;
                String topic = msg.getTopic();
                String keys = msg.getKeys();
                // 如果当前数据偏移量小于 index中的偏移量,可能是重复构建了
                if (msg.getCommitLogOffset() < endPhyOffset) {
                    return;
                }
    
                final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
                switch (tranType) {
                    case MessageSysFlag.TRANSACTION_NOT_TYPE:
                    case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
                    case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                        break;
                    case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                        return;
                }
    			// 构建唯一key的索引
                if (req.getUniqKey() != null) {
                	// buildKey 是 topic#key
                    indexFile = putKey(indexFile, msg, buildKey(topic, req.getUniqKey()));
                    if (indexFile == null) {
                        return;
                    }
                }
    			// 构建多个普通key的索引
                if (keys != null && keys.length() > 0) {
                    String[] keyset = keys.split(MessageConst.KEY_SEPARATOR);
                    for (int i = 0; i < keyset.length; i++) {
                        String key = keyset[i];
                        if (key.length() > 0) {
                            indexFile = putKey(indexFile, msg, buildKey(topic, key));
                            if (indexFile == null) {
                                return;
                            }
                        }
                    }
                }
            } else {}
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    1. retryGetAndCreateIndexFile

    retryGetAndCreateIndexFile 会有3次机会进行尝试,首先会在IndexList中获取index文件

      public IndexFile retryGetAndCreateIndexFile() {
            IndexFile indexFile = null;
    		// 获取Index失败,每隔1s重试获取一次,最多重试3次
            for (int times = 0; null == indexFile && times < MAX_TRY_IDX_CREATE; times++) {
                indexFile = this.getAndCreateLastIndexFile();
                if (null != indexFile)
                    break;
    
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {}
            }
    
            if (null == indexFile) {
                this.defaultMessageStore.getAccessRights().makeIndexFileError();
            }
    
            return indexFile;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
        public IndexFile getAndCreateLastIndexFile() {
            IndexFile indexFile = null;
            IndexFile prevIndexFile = null;
            long lastUpdateEndPhyOffset = 0;
            long lastUpdateIndexTimestamp = 0;
    
            {
                this.readWriteLock.readLock().lock();
                if (!this.indexFileList.isEmpty()) {
                	 // 存在Index文件,则获取末尾文件
                    IndexFile tmp = this.indexFileList.get(this.indexFileList.size() - 1);
                    if (!tmp.isWriteFull()) {
                        indexFile = tmp;
                    } else {
                        lastUpdateEndPhyOffset = tmp.getEndPhyOffset();
                        lastUpdateIndexTimestamp = tmp.getEndTimestamp();
                        prevIndexFile = tmp;
                    }
                }
    
                this.readWriteLock.readLock().unlock();
            }
    		// index不存在则新建一个
            if (indexFile == null) {
                try {
                    String fileName =
                        this.storePath + File.separator
                            + UtilAll.timeMillisToHumanString(System.currentTimeMillis());
                    // IndexFile这里包含了在硬盘中创建实际文件的过程
                    indexFile =
                        new IndexFile(fileName, this.hashSlotNum, this.indexNum, lastUpdateEndPhyOffset,
                            lastUpdateIndexTimestamp);
                    this.readWriteLock.writeLock().lock();
                    this.indexFileList.add(indexFile);
                } catch (Exception e) { } finally {
                    this.readWriteLock.writeLock().unlock();
                }
    			// 这里针对前一个文件,创建了flush服务
                if (indexFile != null) {
                    final IndexFile flushThisFile = prevIndexFile;
                    Thread flushThread = new Thread(new Runnable() {
                        @Override
                        public void run() {
                            IndexService.this.flush(flushThisFile);
                        }
                    }, "FlushIndexFileThread");
    
                    flushThread.setDaemon(true);
                    flushThread.start();
                }
            }
    
            return indexFile;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    2. putKey

    putKey就是不断重试调用IndexFile的putkey方法

        private IndexFile putKey(IndexFile indexFile, DispatchRequest msg, String idxKey) {
            for (boolean ok = indexFile.putKey(idxKey, msg.getCommitLogOffset(), msg.getStoreTimestamp()); !ok; ) {
                indexFile = retryGetAndCreateIndexFile();
                if (null == indexFile) {
                    return null;
                }
    
                ok = indexFile.putKey(idxKey, msg.getCommitLogOffset(), msg.getStoreTimestamp());
            }
    
            return indexFile;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    index文件的数据由header,hash槽,index条目数组成,其中index是不断递增的,前两者都是进行更新数据,位置都固定。其中hash槽记录的值是上一个的index数值

        public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
            if (this.indexHeader.getIndexCount() < this.indexNum) {
            	// 获取hash值
                int keyHash = indexKeyHashMethod(key);
                // 计算key在hash槽的位置
                int slotPos = keyHash % this.hashSlotNum;
                // 计算hash槽的偏移量
                int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
    
                FileLock fileLock = null;
    
                try {
                	// 获取之前的index位置
                    int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
                    // 如果不存在则设置为0
                    if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
                        slotValue = invalidIndex;
                    }
    
                    long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();
    
                    timeDiff = timeDiff / 1000;
    
                    if (this.indexHeader.getBeginTimestamp() <= 0) {
                        timeDiff = 0;
                    } else if (timeDiff > Integer.MAX_VALUE) {
                        timeDiff = Integer.MAX_VALUE;
                    } else if (timeDiff < 0) {
                        timeDiff = 0;
                    }
    				// 计算index条款的位置,只有this.indexHeader.getIndexCount() 是变动的,其他参数数值固定
                    int absIndexPos =
                        IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
                            + this.indexHeader.getIndexCount() * indexSize;
    				// key的hash值
                    this.mappedByteBuffer.putInt(absIndexPos, keyHash);
                    // commitlog中的偏移量
                    this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
                    this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
                    // 上一个槽位的index值
                    this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
    
                    this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
    
                    if (this.indexHeader.getIndexCount() <= 1) {
                        this.indexHeader.setBeginPhyOffset(phyOffset);
                        this.indexHeader.setBeginTimestamp(storeTimestamp);
                    }
    				// 记录已使用槽位数
                    if (invalidIndex == slotValue) {
                        this.indexHeader.incHashSlotCount();
                    }
                    this.indexHeader.incIndexCount();
                    this.indexHeader.setEndPhyOffset(phyOffset);
                    this.indexHeader.setEndTimestamp(storeTimestamp);
    
                    return true;
                } catch (Exception e) { } finally {
                    if (fileLock != null) {
                        try {
                            fileLock.release();
                        } catch (IOException e) {}
                    }
                }
            } else {    }
    
            return false;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    3. flush

    flush 首先更新IndexHeader的数据,这里要注意indexHeader对应的内存空间也是mappedByteBuffer 的一部分,force执行时,也会刷盘IndexHeader里面的数据。

        public void flush() {
            long beginTime = System.currentTimeMillis();
            if (this.mappedFile.hold()) {
                this.indexHeader.updateByteBuffer();
                this.mappedByteBuffer.force();
                this.mappedFile.release();
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    构造IndexFile对象有如下一个操作

        public IndexFile(final String fileName, final int hashSlotNum, final int indexNum,
            final long endPhyOffset, final long endTimestamp) throws IOException {
            
    		...
            ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
    		...
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
  • 相关阅读:
    独立站:最新选品建议
    Vue3通透教程【十八】TS为组件的props标注类型
    http,https,ip,tcp,udp
    Can‘t call numpy() on Tensor that requires grad. Use tensor.detach().numpy() instead.
    微信小程序自动化测试pytest版工具使用方法
    Win7纯净版系统镜像64位介绍
    【数据结构】单值二叉树 & 相同的树 & 翻转二叉树(五)
    数据分析利器---jupyter
    Http协议网络原理概述
    opencv之cv2.findContours和drawContours(python)
  • 原文地址:https://blog.csdn.net/qq_34789577/article/details/126930192