• RocketMQ源码分析(九)之AllocateMappedFileService


    简介

    1. AllocateMappedFileService继承了ServiceThread,说明它是服务线程类。AllocateMappedFileService用于提前创建一个MappedFile和下一个MappedFile

    2. 核心属性

      public class AllocateMappedFileService extends ServiceThread {
          private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
          //等待创建MappedFile的超时时间,默认5秒
          private static int waitTimeOut = 1000 * 5;
          //用来保存当前所有待处理的分配请求,其中key是filePath,value是分配请求AllocateRequest。
          //如果分配请求被成功处理,即获取到映射文件则从请求会从requestTable中移除
          private ConcurrentMap<String, AllocateRequest> requestTable =
              new ConcurrentHashMap<String, AllocateRequest>();
          private PriorityBlockingQueue<AllocateRequest> requestQueue =
              new PriorityBlockingQueue<AllocateRequest>();
          //创建MappedFile是否有异常
          private volatile boolean hasException = false;
          private DefaultMessageStore messageStore;
          
          ...省略...
      }    
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
    3. 服务线程运行逻辑

      /**
       * 此线程在DefaultMessageStore创建时启动
       */
      public void run() {
          log.info(this.getServiceName() + " service started");
      
          while (!this.isStopped() && this.mmapOperation()) {
      
          }
          log.info(this.getServiceName() + " service end");
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
    4. AllocateMappedFileService有两个核心方法putRequestAndReturnMappedFilemmapOperation。两个方法配合实现MappedFile文件的的创建和预热MappedFile 在这里插入图片描述

    5. 流程图
      在这里插入图片描述

    AllocateRequest

    1. AllocateRequestAllocateMappedFileService的静态内部类,实现了Comparable接口,用于优先级队列

      static class AllocateRequest implements Comparable<AllocateRequest> {
          // Full file path
          private String filePath;
          private int fileSize;
          //为0表示MappedFile创建完成
          private CountDownLatch countDownLatch = new CountDownLatch(1);
          private volatile MappedFile mappedFile = null;
          /**
           * fileSize大的优先级高,文件大小相同,文件的offset越小优先级越高
           */
          public int compareTo(AllocateRequest other) {
              if (this.fileSize < other.fileSize)
                  return 1;
              else if (this.fileSize > other.fileSize) {
                  return -1;
              } else {
                  int mIndex = this.filePath.lastIndexOf(File.separator);
                  long mName = Long.parseLong(this.filePath.substring(mIndex + 1));
                  int oIndex = other.filePath.lastIndexOf(File.separator);
                  long oName = Long.parseLong(other.filePath.substring(oIndex + 1));
                  if (mName < oName) {
                      return -1;
                  } else if (mName > oName) {
                      return 1;
                  } else {
                      return 0;
                  }
              }
              // return this.fileSize < other.fileSize ? 1 : this.fileSize >
              // other.fileSize ? -1 : 0;
          }
      }  
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32

    putRequestAndReturnMappedFile

    1. putRequestAndReturnMappedFile是外部创建MappedFile的入口(MappedFileQueue#getLastMappedFile里调用,前提是allocateMappedFileService不为空),创建当前的MappedFile和下一个MappedFile

    2. putRequestAndReturnMappedFile只是将

      • 创建一个AllocateRequest,并放在待处理的缓存中(处理成功后会从缓存中移除)
      • 如果在CountDownLatch#await前已经有异常(hasException使用volatile修饰,具备可见性),表示mmapOperation已经执行完成,此时直接返回null
      • 执行CountDownLatch#await,默认等待5s。如果没有执行成功,直接返回null,但不移除requestTable(下次可以直接到wait这里)。如果执行成功,移除requestTable,直接返回创建好的MappedFile
    3. 源码

      /**
       * 提交MappedFile的创建请求。包含下一个和下下个MappedFile.
       * @param nextFilePath  下一个文件的路径
       * @param nextNextFilePath 下下个文件的路径
       * @param fileSize
       * @return
       */
      public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String nextNextFilePath, int fileSize) {
          int canSubmitRequests = 2;
          if (this.messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
              //快速失败策略时
              if (this.messageStore.getMessageStoreConfig().isFastFailIfNoBufferInStorePool()
                  && BrokerRole.SLAVE != this.messageStore.getMessageStoreConfig().getBrokerRole()) { //if broker is slave, don't fast fail even no buffer in pool
                  //计算TransientStorePool中剩余的buffer数量减去requestQueue中待分配的数量后,剩余的buffer数量
                  canSubmitRequests = this.messageStore.getTransientStorePool().remainBufferNumbs() - this.requestQueue.size();
              }
          }
      
          AllocateRequest nextReq = new AllocateRequest(nextFilePath, fileSize);
          //查看是否已经存在
          boolean nextPutOK = this.requestTable.putIfAbsent(nextFilePath, nextReq) == null;
      
          if (nextPutOK) {
              //TransientStorePool 不足,不能创建,直接返回null
              if (canSubmitRequests <= 0) {
                  log.warn("[NOTIFYME]TransientStorePool is not enough, so create mapped file error, " +
                      "RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().remainBufferNumbs());
                  this.requestTable.remove(nextFilePath);
                  return null;
              }
              /**
               * FIXME jannal 无界队列offer永远返回true,此处的判断毫无意义吧
               */
              boolean offerOK = this.requestQueue.offer(nextReq);
              if (!offerOK) {
                  log.warn("never expected here, add a request to preallocate queue failed");
              }
              canSubmitRequests--;
          }
      
          AllocateRequest nextNextReq = new AllocateRequest(nextNextFilePath, fileSize);
          boolean nextNextPutOK = this.requestTable.putIfAbsent(nextNextFilePath, nextNextReq) == null;
          if (nextNextPutOK) {
              if (canSubmitRequests <= 0) {
                  log.warn("[NOTIFYME]TransientStorePool is not enough, so skip preallocate mapped file, " +
                      "RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().remainBufferNumbs());
                  this.requestTable.remove(nextNextFilePath);
              } else {
                  boolean offerOK = this.requestQueue.offer(nextNextReq);
                  if (!offerOK) {
                      log.warn("never expected here, add a request to preallocate queue failed");
                  }
              }
          }
          // mmapOperation已经执行完成,并且创建MappedFile有异常
          if (hasException) {
              log.warn(this.getServiceName() + " service has exception. so return null");
              return null;
          }
      
          AllocateRequest result = this.requestTable.get(nextFilePath);
          try {
              if (result != null) {
                  //默认5s,等待run方法中的mmapOperation执行释放countDown
                  boolean waitOK = result.getCountDownLatch().await(waitTimeOut, TimeUnit.MILLISECONDS);
                  // 超时直接返回null(此时不移除requestTable,下次可直接直接到wait这里,上面的缓存put无需再次执行)
                  if (!waitOK) {
                      log.warn("create mmap timeout " + result.getFilePath() + " " + result.getFileSize());
                      return null;
                  } else {
                      this.requestTable.remove(nextFilePath);
                      return result.getMappedFile();
                  }
              } else {
                  //FIXME 这里完全没有必要打log,先put,然后get,其他线程也没有remove,所以是必然可以拿到的
                  log.error("find preallocate mmap failed, this never happen");
              }
          } catch (InterruptedException e) {
              log.warn(this.getServiceName() + " service has exception. ", e);
          }
      
          return null;
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55
      • 56
      • 57
      • 58
      • 59
      • 60
      • 61
      • 62
      • 63
      • 64
      • 65
      • 66
      • 67
      • 68
      • 69
      • 70
      • 71
      • 72
      • 73
      • 74
      • 75
      • 76
      • 77
      • 78
      • 79
      • 80
      • 81
      • 82
      • 83

    mmapOperation

    1. mmapOperation方法主要做以下两件事情,而且除非线程被中断或者服务终止,否则这个过程一直进行

      • 初始化MappedFile
      • 预热MappedFile
    2. mmapOperation源码

      • 从优先级队列中获取AllocateRequest
      • 创建MappedFile
      • 根据配置是否预热MappedFile(填充0字节),将MappedFile放入到AllocateRequest
      • 如果出现IOException将AllocateRequest重新放入优先级队列
      • 调用AllocateRequest的CountDownLatch#countDown方法通知putRequestAndReturnMappedFile线程
    3. 源码逻辑

      /**
       * Only interrupted by the external thread, will return false
       */
      private boolean mmapOperation() {
          boolean isSuccess = false;
          AllocateRequest req = null;
          try {
              // 从优先级队列里获取AllocateRequest
              req = this.requestQueue.take();
              //从Map里获取AllocateRequest
              AllocateRequest expectedRequest = this.requestTable.get(req.getFilePath());
              if (null == expectedRequest) {
                  log.warn("this mmap request expired, maybe cause timeout " + req.getFilePath() + " "
                      + req.getFileSize());
                  return true;
              }
              //putRequestAndReturnMappedFile里map与优先级队列并不是强一致,是最终一致的
              if (expectedRequest != req) {
                  log.warn("never expected here,  maybe cause timeout " + req.getFilePath() + " "
                      + req.getFileSize() + ", req:" + req + ", expectedRequest:" + expectedRequest);
                  return true;
              }
      
              if (req.getMappedFile() == null) {
                  long beginTime = System.currentTimeMillis();
      
                  MappedFile mappedFile;
                  //堆外内存
                  if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
                      try {
                          mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
                          mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
                      } catch (RuntimeException e) {
                          log.warn("Use default implementation.");
                          mappedFile = new MappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
                      }
                  } else {
                      mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());
                  }
      
                  long eclipseTime = UtilAll.computeEclipseTimeMilliseconds(beginTime);
                  //创建MappedFile 花费大于10ms打印日志
                  if (eclipseTime > 10) {
                      int queueSize = this.requestQueue.size();
                      log.warn("create mappedFile spent time(ms) " + eclipseTime + " queue size " + queueSize
                          + " " + req.getFilePath() + " " + req.getFileSize());
                  }
      
                  // pre write mappedFile 默认warmMapedFileEnable=false,即默认不预热
                  if (mappedFile.getFileSize() >= this.messageStore.getMessageStoreConfig()
                      .getMapedFileSizeCommitLog()
                      &&
                      this.messageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
                      mappedFile.warmMappedFile(this.messageStore.getMessageStoreConfig().getFlushDiskType(),
                          this.messageStore.getMessageStoreConfig().getFlushLeastPagesWhenWarmMapedFile());
                  }
      
                  req.setMappedFile(mappedFile);
                  this.hasException = false;
                  isSuccess = true;
              }
          } catch (InterruptedException e) {
              log.warn(this.getServiceName() + " interrupted, possibly by shutdown.");
              this.hasException = true;
              return false;
          } catch (IOException e) {
              log.warn(this.getServiceName() + " service has exception. ", e);
              this.hasException = true;
              if (null != req) {
                  //重新插入请求到队列
                  requestQueue.offer(req);
                  try {
                      Thread.sleep(1);
                  } catch (InterruptedException ignored) {
                  }
              }
          } finally {
              //AllocateRequest计数器减一,表示MappedFile已经创建完成
              if (req != null && isSuccess)
                  req.getCountDownLatch().countDown();
          }
          return true;
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55
      • 56
      • 57
      • 58
      • 59
      • 60
      • 61
      • 62
      • 63
      • 64
      • 65
      • 66
      • 67
      • 68
      • 69
      • 70
      • 71
      • 72
      • 73
      • 74
      • 75
      • 76
      • 77
      • 78
      • 79
      • 80
      • 81
      • 82
      • 83
  • 相关阅读:
    Python深入分享之对象的属性
    【图像误差测量】测量 2 张图像之间的差异,并测量图像质量(Matlab代码实现)
    891. 子序列宽度之和(每日一难phase3-4)
    java毕业设计采购系统mybatis+源码+调试部署+系统+数据库+lw
    在CARLA中手动开车,添加双目相机stereo camera,激光雷达Lidar
    服务器防火墙状态怎么查看
    【HTML】HTML基础8.1(表单标签)
    【手写数据库toadb】表relation访问实现概述,分层设计再实践,表访问层与表操作层简化代码复杂度
    Qt扫盲-QTreeView 理论总结
    VGG 07
  • 原文地址:https://blog.csdn.net/usagoole/article/details/126394486