• RocketMq存储设计——MappedFile


    MappedFile

    RocketMq存储很多地方使用到了他,例如commit log(存储真正的消息数据),consume queue(真正的消息数据的索引),store check point等。

    是什么

    你可以把MappedFile理解成一个工具,当你需要把数据储存到磁盘,从磁盘快速查找数据,使用它会非常高效。
    它使用mmap和send file系统调用加速文件的存储与查找。mmap映射的就是os page cache,一页默认大小4kb,即使mq宕机了,只要机器还在运行,os会把page cache刷到磁盘,不会丢失数据。

    优化

    当消息过多的时候,频繁使用os page cache可能导致broker busy,于是引入了transientStorePool(堆外内存)来减轻os page cache的压力。写数据的时候先写入堆外内存,mq线程会定时将堆外内存刷到page cache,再由os 将page cache刷到磁盘,读数据的时候直接读page cache,这就是所谓的mq 内存级别的读写分离。

    • 优点:减轻page cache压力
    • 缺点:堆外内存的数据会随着mq的宕机而销毁(不稳定)
      在这里插入图片描述

    源码

    MappedFile常量变量

    	//操作系统页缓存一页大小
    	 public static final int OS_PAGE_SIZE = 1024 * 4;
    	 
        //提交指针,开启堆外内存后,由该指针指明堆外内存哪些数据被刷到page cache了
        protected final AtomicInteger committedPosition = new AtomicInteger(0);
        
        //写指针,page cache的指针
        protected final AtomicInteger wrotePosition = new AtomicInteger(0);
        
        //刷盘指针,指明page cache哪些数据刷到磁盘了
        private final AtomicInteger flushedPosition = new AtomicInteger(0);
        
        //文件大小
        protected int fileSize;
    
        //flushedPosition <= committedPosition <= wrotePosition <=fileSize
    
        protected FileChannel fileChannel;
    
        /**
         * 开启参数后,先写入堆外内存,再写入page cache
         * Message will put to here first, and then reput to FileChannel if writeBuffer is not null.
         * 堆外内存,需要开启transientStorePoolEnable
         */
        protected ByteBuffer writeBuffer = null;
        protected TransientStorePool transientStorePool = null;
    
        private String fileName;
        private long fileFromOffset;
        private File file;
    
        //mmap的 page cache
        private MappedByteBuffer mappedByteBuffer;
    
        private volatile long storeTimestamp = 0;
        private boolean firstCreateInQueue = false;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    初始化

    	public void init(final String fileName, final int fileSize,final TransientStorePool transientStorePool) {
            init(fileName, fileSize);
            //开启了堆外内存
            this.writeBuffer = transientStorePool.borrowBuffer();
            this.transientStorePool = transientStorePool;
        }
    
        private void init(final String fileName, final int fileSize) throws IOException {
            this.fileName = fileName;
            this.fileSize = fileSize;
            this.file = new File(fileName);
            this.fileFromOffset = Long.parseLong(this.file.getName());
            ensureDirOK(this.file.getParent());
            try {
               	//send file
                this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
                //mmap 映射一块page cache
                this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
            } 
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    写入数据

    public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb,
                PutMessageContext putMessageContext) {
            
            //page cache的写指针位置
            int currentPos = this.wrotePosition.get();
    
            if (currentPos < this.fileSize) {
                //开启了堆外内存就用堆外内存,否则用page cache
                ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
                //定位到写指针位置
                byteBuffer.position(currentPos);
                
                //省略callback
                
                //写指针后移bytes位
                this.wrotePosition.addAndGet(result.getWroteBytes());
                this.storeTimestamp = result.getStoreTimestamp();
                return result;
            }
            
            return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    flush

    将page cahce刷到磁盘,page cache一是由os自己刷盘,另外mq会定时刷

     public int flush(final int flushLeastPages) {
            if (this.isAbleToFlush(flushLeastPages)) {
                if (this.hold()) {
                    //没开启对外内存使用写指针,否则使用提交指针
                    int value = getReadPosition();
    
                    try {
                        //We only append data to fileChannel or mappedByteBuffer, never both.
                        if (writeBuffer != null || this.fileChannel.position() != 0) {
                            //使用了堆外内存,则使用send file 刷盘
                            this.fileChannel.force(false);
                        } else {
                            //page cache 刷盘, mmap一般用于小文件映射,send file用于大文件,这里的mapper file都是使用mmap映射,对于commit log这种1G的大文件是否不合适呢?
                            this.mappedByteBuffer.force();
                        }
                    } 
                    //设置刷盘指针
                    this.flushedPosition.set(value);
                    this.release();
                } else {
                    this.flushedPosition.set(getReadPosition());
                }
            }
            return this.getFlushedPosition();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    commit

    commit操作只有在开启堆外内存才会执行,作用是把堆外内存刷到page cache

     protected void commit0() {
            int writePos = this.wrotePosition.get();
            int lastCommittedPosition = this.committedPosition.get();
    
            if (writePos - lastCommittedPosition > 0) {
                try {
                    //使用了堆外内存
                    ByteBuffer byteBuffer = writeBuffer.slice();
                    byteBuffer.position(lastCommittedPosition);
                    byteBuffer.limit(writePos);
                    //使用send file 写到堆外内存,并设置提交指针
                    this.fileChannel.position(lastCommittedPosition);
                    this.fileChannel.write(byteBuffer);
                    this.committedPosition.set(writePos);
                } catch (Throwable e) {
                    log.error("Error occurred when commit data to FileChannel.", e);
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
  • 相关阅读:
    如何用快解析自制IoT云平台
    jmeter发送post请求后台接收不到请求参数
    Django(8):请求对象和响应对象
    SpringCloud链路追踪SkyWalking-第二章-部署搭建及高可用
    【微服务36】分布式事务Seata源码解析四:图解Seata Client 如何与Seata Server建立连接、通信【云原生】
    uniapp 使用地图
    2309d用dmd重写dfmt
    这种动态规划你见过吗——状态机动态规划之股票问题(下)
    多维度深入剖析QLC SSD硬件延迟的来源
    barzilar_borwein算法微调函数的优化收敛
  • 原文地址:https://blog.csdn.net/LiuRenyou/article/details/125514027