• RocketMQ源码阅读(十)消息消费-—消息队列负载均衡


    目录

    RebalanceService

    RebalanceImpl

    rebalanceByTopic

    AllocateMessageQueueStrategy

    updateProcessQueueTableInRebalance


    入口org.apache.rocketmq.client.impl.factory.MQClientInstance#start

    RocketMQ的消息消费流程分为三部分,一消息队列负载均衡,二消息的拉取,三消息的消费

    RebalanceService

    负载均衡定时任务的实现类

     org.apache.rocketmq.client.impl.factory.MQClientInstance#doRebalance

     org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#doRebalance

    RebalanceImpl

    org.apache.rocketmq.client.impl.consumer.RebalanceImpl#doRebalance

    rebalanceByTopic

    广播模式所有队列都需订阅, 不用负载均衡,下边只关注集群模式

    1、获取所有消息队列和消费者实例

    2、使用负载均衡算法进行分配

    3、根据分配后最新的队列信息,进行消息拉取或停止原队列消费 

    1. private void rebalanceByTopic(final String topic, final boolean isOrder) {
    2. switch (messageModel) {
    3. case BROADCASTING: {
    4. 。。。。
    5. case CLUSTERING: {
    6. // 从缓存表获取该Topic所有的消息队列
    7. Set mqSet = this.topicSubscribeInfoTable.get(topic);
    8. // 从Broker获取该消费者组所有消费者列表
    9. List cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
    10. if (mqSet != null && cidAll != null) {
    11. List mqAll = new ArrayList();
    12. mqAll.addAll(mqSet);
    13. Collections.sort(mqAll);
    14. Collections.sort(cidAll);
    15. // 负载均衡策略处理
    16. AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
    17. List allocateResult = null;
    18. try {
    19. allocateResult = strategy.allocate(
    20. this.consumerGroup,
    21. this.mQClientFactory.getClientId(),
    22. mqAll,
    23. cidAll);
    24. } catch (Throwable e) {
    25. log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
    26. e);
    27. return;
    28. }
    29. Set allocateResultSet = new HashSet();
    30. if (allocateResult != null) {
    31. allocateResultSet.addAll(allocateResult);
    32. }
    33. // 根据最新的队列信息,更新本地
    34. // 如果有队列被移除,停止消费原队列
    35. // 如果有队列新增,新增拉取消息的任务
    36. boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
    37. if (changed) {
    38. log.info(
    39. "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
    40. strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
    41. allocateResultSet.size(), allocateResultSet);
    42. this.messageQueueChanged(topic, mqSet, allocateResultSet);
    43. }
    44. }
    45. break;
    46. }
    47. default:
    48. break;
    49. }
    50. }

    AllocateMessageQueueStrategy

    分配队列的策略接口,实现类有如下,假设有8个消费队列,3个消费者ABC

    AllocateMessageQueueAveragely  默认使用的,分配后A123, B456, C67

    AllocateMessageQueueAveragelyByCircle  分配后A147     B258    C36

    AllocateMessageQueueConsistentHash   环形一致性hash算法,下节

    AllocateMessageQueueByConfig   自定义配置,为每个消费者自定义要订阅的队列

    AllocateMessageQueueByMachineRoom   也是自定义配置,每个消费者只负载自定义配置的机房中Broker的队列信息,BrokerName也需要规范命名

    AllocateMessageQueueAveragely#allocate  代码如下

    updateProcessQueueTableInRebalance

    如果有删除的队列,停止消费消息,移除消息队列

    如果有新增的队列,新增消息拉取任务

  • 相关阅读:
    C++-继承-单继承-多继承-虚函数-内存结构分析-类分析-无源码-逆向分析(三)
    Go 语言初探:从基础到实战
    性能测试流程注意事项(亲身经历希望能帮助到你)
    基于Springboot实现体质测试数据分析平台管理系统项目【项目源码+论文说明】计算机毕业设计
    vue3拖拽排序 使用 vuedraggable
    matlab串口读写
    shell脚本自学笔记
    手持式水质监测仪在污水处理中的应用
    文献学习-4-面向机器人手术的基于数据驱动控制的连续体腹腔镜器械跟踪控制方法
    Tomcat 与 JDK 对应版本关系
  • 原文地址:https://blog.csdn.net/xyjy11/article/details/126174547