• 云原生中间件RocketMQ-生产者核心解析、主从同步机制解析,生产者同步异步消息发送


    生产者核心参数

    在这里插入图片描述
    producerGroup: 组名
    createTopicKey:创建topic,实际生产实践不允许生产者创建top。
    defaultTopicQueueNums(默认为4):默认的topic关联的队列数量
    sendMsgTimeout(单位:ms):发送消息连接broker超时时间。
    compressMsgBodyOverHowmuch(默认压缩字节4096):消息体达到多少压缩。
    retryTimesWhenSendFailed (可配置):发送失败重试次数
    retryAnotherBrokerWhenNotStoreOK(默认false):发送broker存储失败换个broker发送。
    maxMessageSize(默认128K):消息最大可以设置多大。
    heartbeatBrokerInterval:与broker的心跳间隔(以微秒为单位,默认为30毫秒)

    Master - Slave主从同步机制解析

    同步的信息主要是数据内容和元数据信息

    数据内容同步

    实时进行同步,同步的是commitlog中的数据,对实时性要求高,并且丢失数据就无法恢复。使用原生socket进行同步。
    源码分析
    数据内容同步主要是在rocketmq-store中。主要涉及 HAConnectionHAServiceWaitNotifyObject三个类。并没有使用netty而是原生nio,是为了更加高效。
    在这里插入图片描述

    • 对于Master节点
      在这里插入图片描述
      HAService
      AcceptSocketService(内部类):接受slave节点连接。
      在这里插入图片描述
      HAConnection
      ReadSocketService(内部类):读来自Slave节点的数据。
      WriteSocketService(内部类):写往到Slave节点的数据。

    • 对于Slave节点

    HAService
    HAClient(内部类):对Master节点连接、读写数据。

    元数据信息同步

    broker判断如果是slave节点,那么会启动定时任务不断同步,如果丢失也可以从其他地方重试获取。包含topic信息和offset等。使用netty同步。
    源码分析
    元数据同步主要发生在broker,所以这部分代码主要在rocketmq-broker的模块中。是在handleSlaveSynchronize方法中通过定义了一个固定时间的定时任务,时间是10秒钟执行一次,当然前提条件是broker节点的角色是slave,而broker节点是master时,如果有定时任务会取消,因为master是不用同步元数据信息。
    在这里插入图片描述
    这个方法会有三个地方调用:

    • broker刚刚启动
    • master切换成slave
    • slave切换成master。
      在这里插入图片描述

    定时任务的逻辑是写在syncAll方法中。主要是需要同步4部分内容:

    • 同步topic配置信息
    • 同步消费者偏移量
    • 同步延时偏移量
    • 同步订阅组配置信息。

    在这里插入图片描述
    4个方法的内容实际上就是封装了netty做rpc的调用,对不同的操作都会对应到一个code。
    image.png
    image.png
    在这里插入图片描述

    通信协议

    Slave => Master:上报CommitLog已经同步到的物理位置。使用maxPhyOffset字段,代表CommitLog接受到的最大物理位置。
    Master => Slave:传输新的CommitLog数据。使用fromPhyOffset字段,代表CommitLog开始传输的物理位置。

    生产者消息发送

    生产者同步消息发送

    消息的同步发送:producer.send(msg)
    同步发送消息核心实现:DefaultMQProducerlmpl
    同步发送消息可以直接获取返回值:

    // 同步发送消息,直接获取发送结果
    SendResult sr = producer.send(message, new MessageQueueSelector() {
    
        @Override
        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
            Integer queueNumber = (Integer)arg;
            return mqs.get(queueNumber);
        }
    }, 2);
    System.err.println(sr);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    生产者异步消息发送

    producer.send(Message msg, SendCallback sendCallback)
    异步发送消息核心实现:DefaultMQProducerlmpl
    异步发送消息需要通过回调函数获取返回值:

    // 异步发送消息,回调函数获取结果
    producer.send(message, new SendCallback() {
        // 可靠性消息投递
        @Override
        public void onSuccess(SendResult sendResult) {
            System.err.println("msgId: " + sendResult.getMsgId() + ", status: " + sendResult.getSendStatus());
        }
        @Override
        public void onException(Throwable e) {
            // 发送失败做异常处理
            e.printStackTrace();
            System.err.println("------发送失败");
        }
    });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    本文内容到此结束了,
    如有收获欢迎点赞👍收藏💖关注✔️,您的鼓励是我最大的动力。
    如有错误❌疑问💬欢迎各位大佬指出。
    主页共饮一杯无的博客汇总👨‍💻

    保持热爱,奔赴下一场山海。🏃🏃🏃

    在这里插入图片描述

  • 相关阅读:
    node.js:《接口实现文件的上传和下载》
    Generalization
    android 切换系统键盘笔记
    jeecgBoot 路由配置和自定义路由配置
    前端项目中资源请求顺序和dom结构顺序不一致,资源启动器有(索引)解析器和脚本
    元宇宙之问:产业与资本为什么扎堆元宇宙
    qt QMutex 判断对象是否已经锁的状态
    手柄零件的工艺设计
    C语言简单题(4)方程求值、统计正负数及零的个数、区间乘积、平均值、奇数和、分数序列求和、使用函数求最值、数组交换输出
    Spring——三级缓存
  • 原文地址:https://blog.csdn.net/qq_35427589/article/details/126085594