• 6 张图告诉你 RocketMQ 是怎么保存偏移量的


    对消息队列来说,偏移量是一个非常重要的概念,如果偏移量保存失败,可能会造成消息丢失、消息重复消费等问题。今天来聊一聊 RocketMQ 是怎么保存消息偏移量的。

    1 消息拉取

    RocketMQ 客户端启动的时候,会启动重平衡线程 RebalanceService,在这里创建拉取消息的请求。下面 UML 类图展示了客户端启动重平衡线程的调用关系:

    业务入口是 MQClientInstance 中的 start 方法,start 方法中启动了 RebalanceService 线程,线程中的 run 方法又返回来调用了 MQClientInstance 中的 doRebalance 方法,最终调用到了 RebalanceImpl 中的 doRebalance 方法。

    RebalanceService 的 run 方法是一个死循环,不停地进行重平衡操作,代码如下:

    1. public void run() {
    2.  log.info(this.getServiceName() + " service started");
    3.  while (!this.isStopped()) {
    4.   this.waitForRunning(waitInterval);
    5.   this.mqClientFactory.doRebalance();
    6.  }
    7.  log.info(this.getServiceName() + " service end");
    8. }

    那重平衡操作具体是做什么呢?再看下面的代码:

    1. private void rebalanceByTopic(final String topic, final boolean isOrder) {
    2.  switch (messageModel) {
    3.   case BROADCASTING: {
    4.    Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
    5.    if (mqSet != null) {
    6.     boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
    7.    } else {}
    8.    break;
    9.   }
    10.   case CLUSTERING: {
    11.    Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
    12.    List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
    13.    if (mqSet != null && cidAll != null) {
    14.     List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
    15.     mqAll.addAll(mqSet);
    16.     List<MessageQueue> allocateResult = null;
    17.     //这里根据负载均衡策略进行获取分配给自己的 MessageQueue,逻辑省略
    18.     Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
    19.     if (allocateResult != null) {
    20.      allocateResultSet.addAll(allocateResult);
    21.     }
    22.     boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
    23.    }
    24.    break;
    25.   }
    26.   default:
    27.    break;
    28.  }
    29. }

    可以看到,无论是集群模式还是广播模式,updateProcessQueueTableInRebalance 方法最终都被调用了。这个方法封装了拉取消息的请求 PullRequest。这些请求被 put 到 PullMessageService 的 pullRequestQueue,然后 PullMessageService 使用死循环不停地从 pullRequestQueue 中 take 请求 发送到 Broker。这个处理的 UML 类图如下:

    偏移量这个参数封装在消息拉取请求 PullRequest 中,看一下封装拉取请求的代码:

    1. private boolean updateProcessQueueTableInRebalance(final String topic, final Set mqSet,
    2.  final boolean isOrder) {
    3.  boolean changed = false;
    4.     //省略掉判断
    5.  List pullRequestList = new ArrayList();
    6.  for (MessageQueue mq : mqSet) {
    7.   if (!this.processQueueTable.containsKey(mq)) {
    8.             //省略部分逻辑
    9.    long nextOffset = -1L;
    10.    try {
    11.     nextOffset = this.computePullFromWhereWithException(mq);
    12.    } //省略catch
    13.    if (nextOffset >= 0) {
    14.     ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
    15.     if (pre != null) {} else {
    16.      PullRequest pullRequest = new PullRequest();
    17.      pullRequest.setConsumerGroup(consumerGroup);
    18.      pullRequest.setNextOffset(nextOffset);
    19.      pullRequest.setMessageQueue(mq);
    20.      pullRequest.setProcessQueue(pq);
    21.      pullRequestList.add(pullRequest);
    22.      changed = true;
    23.     }
    24.    } //省略 else
    25.   }
    26.  }
    27.  this.dispatchPullRequest(pullRequestList);
    28.  return changed;
    29. }

    从上面代码可以看出,程序是通过 computePullFromWhereWithException 这个方法获取消息偏移量。下面着重看一下这个方法。

    2 偏移量处理

    首先,看一下获取偏移量的方法:

    1. public long computePullFromWhereWithException(MessageQueue mq) throws MQClientException {
    2.  long result = -1;
    3.  final ConsumeFromWhere consumeFromWhere = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeFromWhere();
    4.  final OffsetStore offsetStore = this.defaultMQPushConsumerImpl.getOffsetStore();
    5.  switch (consumeFromWhere) {
    6.   case CONSUME_FROM_LAST_OFFSET: {
    7.    long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
    8.    if (lastOffset >= 0) {
    9.     result = lastOffset;
    10.    }
    11.    // First start,no offset
    12.    else if (-1 == lastOffset) {
    13.     if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
    14.      result = 0L;
    15.     } else {
    16.      try {
    17.       result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
    18.      } catch (MQClientException e) {
    19.      }
    20.     }
    21.    } else {
    22.     result = -1;
    23.    }
    24.    break;
    25.   }
    26.   default:
    27.    break;
    28.  }
    29.  return result;
    30. }

    CONSUME_FROM_LAST_OFFSET 这个分支是默认的分支,其他情况一般不会用,为了不放大量代码,这里做了省略。

    上面的代码看出,偏移量是从 OffsetStore 中获取的。OffsetStore 是一个接口,实现类有两个,如下图:

    那获取偏移量的时候,这两个实现类选择哪一个呢?从 DefaultMQPushConsumerImpl 的 start 方法中可以看出:

    1. public synchronized void start() throws MQClientException {
    2.  switch (this.serviceState) {
    3.   case CREATE_JUST:
    4.    if (this.defaultMQPushConsumer.getOffsetStore() != null) {
    5.     this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
    6.    } else {
    7.     switch (this.defaultMQPushConsumer.getMessageModel()) {
    8.      case BROADCASTING:
    9.       this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
    10.       break;
    11.      case CLUSTERING:
    12.       this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
    13.       break;
    14.      default:
    15.       break;
    16.     }
    17.     this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
    18.    }
    19.    this.offsetStore.load();
    20.    break;
    21.   default:
    22.    break;
    23.  }
    24. }

    从上面的代码可以看到,OffsetStore 的两个实现类对应了广播模式和集群模式。从文件名也可以看到,LocalFileOffsetStore 是从本地读取偏移量,而 RemoteBrokerOffsetStore 则从 Broker 端请求偏移量

    OffsetStore 的两个实现类保存偏移量的数据结构是一样的,如下图:

    3 广播模式

    从前面的分析可以看到,广播模式的偏移量是保存在本地,分析源码可以看到,文件默认保存在:

    /home/${user}/.rocketmq_offsets/${clientId}/${groupName}/offsets.json
    

    可以通过参数【rocketmq.client.localOffsetStoreDir】进行配置,这样文件保存的路径就是:

    /${rocketmq.client.localOffsetStoreDir}/.rocketmq_offsets/${clientId}/${groupName}/offsets.json
    

    OffsetStore 的 load 方法读取上面文件,如果读取失败或者文件内容是空,就会读取备份文件,路径是上面的文件名后面加 .bak。

    3.1 加载偏移量

    load 方法读取这个 json 文件,然后把内容读取到 LocalFileOffsetStore 类的 offsetTable 这个数据结构中:

    1. //LocalFileOffsetStore类
    2. public void load() throws MQClientException {
    3.  OffsetSerializeWrapper offsetSerializeWrapper = this.readLocalOffset();
    4.  if (offsetSerializeWrapper != null && offsetSerializeWrapper.getOffsetTable() != null) {
    5.   offsetTable.putAll(offsetSerializeWrapper.getOffsetTable());
    6.  }
    7. }

    上面调用的 readLocalOffset 方法代码如下:

    1. private OffsetSerializeWrapper readLocalOffset() throws MQClientException {
    2.  String content = null;
    3.  try {
    4.   content = MixAll.file2String(this.storePath);
    5.  } catch (IOException e) {
    6.   log.warn("Load local offset store file exception", e);
    7.  }
    8.  if (null == content || content.length() == 0) {
    9.      //读取失败或者文件内容是空,则从 .bak 文件获取
    10.   return this.readLocalOffsetBak();
    11.  } else {
    12.   OffsetSerializeWrapper offsetSerializeWrapper = null;
    13.   try {
    14.    offsetSerializeWrapper =
    15.     OffsetSerializeWrapper.fromJson(content, OffsetSerializeWrapper.class);
    16.   } catch (Exception e) {
    17.    log.warn("readLocalOffset Exception, and try to correct", e);
    18.    return this.readLocalOffsetBak();
    19.   }
    20.   return offsetSerializeWrapper;
    21.  }
    22. }

    3.2 读取偏移量

    在拉取消息时,首先会封装 PullRequest 请求,PullRequest 中的 nextOffset 参数需要从 offsetTable 获取,代码如下:

    1. public long readOffset(final MessageQueue mq, final ReadOffsetType type) {
    2.  if (mq != null) {
    3.   switch (type) {
    4.    //省略其他 case
    5.    case READ_FROM_STORE: {
    6.     OffsetSerializeWrapper offsetSerializeWrapper;
    7.     try {
    8.      offsetSerializeWrapper = this.readLocalOffset();
    9.     } catch (MQClientException e) {
    10.      return -1;
    11.     }
    12.     if (offsetSerializeWrapper != null && offsetSerializeWrapper.getOffsetTable() != null) {
    13.      AtomicLong offset = offsetSerializeWrapper.getOffsetTable().get(mq);
    14.      if (offset != null) {
    15.          //新读取到的偏移量添加到 offsetTable 中
    16.       this.updateOffset(mq, offset.get(), false);
    17.       return offset.get();
    18.      }
    19.     }
    20.    }
    21.    default:
    22.     break;
    23.   }
    24.  }
    25.  return -1;
    26. }

    上面的代码省略了 READ_FROM_MEMORY 这个 case,因为我跟踪了源代码,广播模式并不会走到这个 case。

    3.3 更新偏移量

    MQClientInstance 初始化时,会启动定时任务,每隔 5s 执行一次,把偏移量持久化到本地文件,代码如下:

    1. this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    2.  @Override
    3.  public void run() {
    4.   try {
    5.    MQClientInstance.this.persistAllConsumerOffset();
    6.   } catch (Exception e) {
    7.    log.error("ScheduledTask persistAllConsumerOffset exception", e);
    8.   }
    9.  }
    10. }, 1000 * 10this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

    上面代码最终调用了 LocalFileOffsetStore 类的 persistAll 方法,这个方法把内存中 offsetTable 变量中保存的值写入本地 offsets.json 文件。

    写文件时分四步:

    1. 首先把内容写入到 offsets.json.tmp 文件;

    2. offsets.json 内容备份到 offsets.json.bak;

    3. 删除 offsets.json 文件;

    4. 把 offsets.json.tmp 改名为 offsets.json。

    总结,广播模式下,偏移量保存在消费者本地服务器。这是因为所有的消费者都要消费同一个队列,消费者维护偏移量会更加方便。

    4 集群模式

    前面分析过,集群模式客户端处理偏移量的类是 RemoteBrokerOffsetStore。

    4.1 加载偏移量

    集群模式下,偏移量是从 Broker 端获取,所以客户端 RemoteBrokerOffsetStore 中的 load 方法没有内容。

    在 Broker 初始化时,会加载本地的偏移量文件,调用关系的 UML 类图如下:

    BrokerController 初始化代码如下:

    1. //BrokerController.java
    2. public boolean initialize() throws CloneNotSupportedException {
    3.     //省略其他代码
    4.  result = result && this.consumerOffsetManager.load();
    5. }

    这里最终调用了 ConsumerOffsetManager 的 decode 方法,代码如下:

    1. public void decode(String jsonString) {
    2.  if (jsonString != null) {
    3.   ConsumerOffsetManager obj = RemotingSerializable.fromJson(jsonString, ConsumerOffsetManager.class);
    4.   if (obj != null) {
    5.    this.offsetTable = obj.offsetTable;
    6.   }
    7.  }
    8. }

    从上面可以看到,Broker 上的偏移量最终保存在 offsetTable 这个变量上,数据结构如下图:

    从源码中可以看到,Broker 端偏移量文件如下:

    /home/${user}/store/config/consumerOffset.json
    

    4.2 获取偏移量

    获取 Broker 端偏移量的时候,会向 Broker 发送一个请求,请求码是 QUERY_CONSUMER_OFFSET,通过请求码就可以找到 Broker 处理的逻辑。代码如下:

    1. private RemotingCommand queryConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request)
    2.  throws RemotingCommandException {
    3.  //省略变量定义
    4.  //从 offsetTable 变量中查找
    5.  long offset =
    6.   this.brokerController.getConsumerOffsetManager().queryOffset(
    7.    requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId());
    8.  if (offset >= 0) {
    9.   responseHeader.setOffset(offset);
    10.   response.setCode(ResponseCode.SUCCESS);
    11.   response.setRemark(null);
    12.  } else {
    13.   long minOffset =
    14.    this.brokerController.getMessageStore().getMinOffsetInQueue(requestHeader.getTopic(),
    15.     requestHeader.getQueueId());
    16.   if (minOffset <= 0
    17.    && !this.brokerController.getMessageStore().checkInDiskByConsumeOffset(
    18.    requestHeader.getTopic(), requestHeader.getQueueId(), 0)) {
    19.    responseHeader.setOffset(0L);
    20.    response.setCode(ResponseCode.SUCCESS);
    21.    response.setRemark(null);
    22.   } else {
    23.    response.setCode(ResponseCode.QUERY_NOT_FOUND);
    24.    response.setRemark("Not found, V3_0_6_SNAPSHOT maybe this group consumer boot first");
    25.   }
    26.  }
    27.  return response;
    28. }

    上面的处理逻辑如下:

    1. 首先从 offsetTable 变量中查找,如果找到了就直接返回给消费者;

    2. 如果没有找到,则从 Broker 上查找这个 MessageQueue 的最小偏移量,如果偏移量小于等于 0 并且没有被交换到磁盘上(保存在内存里),则返回偏移量是 0;

    3. 否则返回查找失败。

    4.3 更新偏移量

    在消费端,定时任务每 5s 向 Broker 发送更新消息偏移量的请求,请求码是 UPDATE_CONSUMER_OFFSET。

    Broker 收到后,ConsumerOffsetManager 类更新 offsetTable 变量。

    Broker 端也会用定时任务每 5s 从 offsetTable 变量刷到本地文件。逻辑跟消费端的保存逻辑一样,就不再介绍了。

    5 总结

    广播模式下,偏移量保存在消费者本地。这也是最合理的,因为每个消费者都要消费同一个 MessageQueue,自己维护自己的偏移量更简单。不过这可能也是广播模式下不支持消息重试的原因,因为如果一个消费者消费失败了,这批消息其他消费成功的消费者也需要重试,导致重复消费。

    集群模式下,偏移量保存在 Broker 服务器,消费者需要通过请求的方式来获取和维护偏移量。

     

     

  • 相关阅读:
    算法----LRU缓存机制
    数学建模学习视频及资料集(2022.08.10)
    接口自动化中cookies的处理技术
    振弦采集模块的信号检测与分析计算
    kubernetes && kuboard 端口
    基于resnet网络架构训练图像分类模型
    ThingsBoard教程(二七):设备批量导入,包含设备id,设备token
    产品经理进阶:阻碍产品上市的5个因素
    想兼职?学网络安全,钱赚到你手软
    AI 大战高考作文!实测 ChatGPT、文心一言、通义千问等 8 款“神器”
  • 原文地址:https://blog.csdn.net/m0_71777195/article/details/126345158