• RocketMQ的可靠性传输


    整体

    分析:

    需确保一发一存一消费这些过程均无消息丢失

    利用ACK机制保证每个阶段需要执行的操作成功后,再往下一个阶段推动(放行)

    消息处理过程:

    由上图分析可知:

    消息丢失,可能发生在三个阶段,生产阶段、存储阶段、消费阶段

    如下,为每个阶段保证消息不丢失:

    消息生产阶段

    利用MQ的ack确认机制,在try-catch中处理好Broker的返回值,如果返回失败,则进行重试,若重试次数过多,则进行报警日志打印,排查解决问题

    消息存储阶段

    刷盘存储的消息进行多副本备份处理,从高可用角度取设计中间件,搭建集群;同时,中间件也会进行备份,至少两个节点以上备份成功之后才会给生产者返回ack确认消息

    消息消费阶段

    消费者从消费队列中拉去消息后,不是立马给Broker返回ack确认消息,而是等待业务代码顺利执行完成之后,再给Broker返回ack确认消息

    实现:

    Producer——>Broker

    • 发送方式

      • 同步发送

        • Producer向broker发送消息,会阻塞当前线程等待broker响应结果
        public class SyncProducer {
        	public static void main(String[] args) throws Exception {
            	// 实例化消息生产者Producer
                DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
            	// 设置NameServer的地址
        	    	producer.setNamesrvAddr("localhost:9876");
            	// 启动Producer实例
                producer.start();
            	for (int i = 0; i < 100; i++) {
            	    // 创建消息,并指定Topic,Tag和消息体
            	    Message msg = new Message("TopicTest" /* Topic */,
                	"TagA" /* Tag */,
                	("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                	);
                	// 发送消息到一个Broker
                    SendResult sendResult = producer.send(msg);
                    // 通过sendResult返回消息是否成功送达
                    System.out.printf("%s%n", sendResult);
            	}
            	// 如果不再发送消息,关闭Producer实例。
            	producer.shutdown();
            }
        }
        
      • 异步发送

        • Producer首先构建一个向broker发送消息的任务,把该任务提交给线程池,等执行完该任务时,回调用户自定义的回调函数,执行处理结果
        public class AsyncProducer {
        	public static void main(String[] args) throws Exception {
            	// 实例化消息生产者Producer
                DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
            	// 设置NameServer的地址
                producer.setNamesrvAddr("localhost:9876");
            	// 启动Producer实例
                producer.start();
                producer.setRetryTimesWhenSendAsyncFailed(0);
        	
        	int messageCount = 100;
                // 根据消息数量实例化倒计时计算器
        	final CountDownLatch2 countDownLatch = new CountDownLatch2(messageCount);
            	for (int i = 0; i < messageCount; i++) {
                        final int index = i;
                    	// 创建消息,并指定Topic,Tag和消息体
                        Message msg = new Message("TopicTest",
                            "TagA",
                            "OrderID188",
                            "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                        // SendCallback接收异步返回结果的回调
                        producer.send(msg, new SendCallback() {
                            @Override
                            public void onSuccess(SendResult sendResult) {
                                countDownLatch.countDown();
                                System.out.printf("%-10d OK %s %n", index,
                                    sendResult.getMsgId());
                            }
                            @Override
                            public void onException(Throwable e) {
                                countDownLatch.countDown();
              	                System.out.printf("%-10d Exception %s %n", index, e);
              	                e.printStackTrace();
                            }
                    	});
            	}
        	// 等待5s
        	countDownLatch.await(5, TimeUnit.SECONDS);
            	// 如果不再发送消息,关闭Producer实例。
            	producer.shutdown();
            }
        }
        
      • Oneway

        • Oneway方式只负责发送请求,不等待应答,Producer只负责把请求发出去,不会处理响应结果
        public class OnewayProducer {
        	public static void main(String[] args) throws Exception{
            	// 实例化消息生产者Producer
                DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
            	// 设置NameServer的地址
                producer.setNamesrvAddr("localhost:9876");
            	// 启动Producer实例
                producer.start();
            	for (int i = 0; i < 100; i++) {
                	// 创建消息,并指定Topic,Tag和消息体
                	Message msg = new Message("TopicTest" /* Topic */,
                        "TagA" /* Tag */,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                	);
                	// 发送单向消息,没有任何返回结果
                	producer.sendOneway(msg);
        
            	}
            	// 如果不再发送消息,关闭Producer实例。
            	producer.shutdown();
            }
        }
        
    • 推荐

      同步发送:

      • 同步发送会返回四个状态码
        • SEND_OK:消息发送成功
        • FLUSH_DISK_TIMEOUT:消息发送成功但是消息刷盘超时
        • FLUSH_SLAVE_TIMEOUT:消息发送成功但是消息同步到 slave 节点时超时
        • SLAVE_NOT_AVAILABLE:消息发送成功但是 broker 的 slave 节点不可用
      • 处理
        • 根据返回的状态码,进行消息重试,默认设置为3次,可以通过设置调整

          producer.setRetryTimesWhenSendFailed(重试次数);

      异步发送:

      • 在onException()方法中处理,如果发送失败,则在这里执行重试

      额外问题:

      • 如果Broker收到消息后,就因为某些原因宕机了,就算Producer再怎么重试都是无法解决消息丢失的问题,该如何处理?

      👉 利用多主模式,挂了一个,就换一个master继续消息发送

    总结:

    保证Producer——>Broker消息不丢失的方案

    Broker存储及备份

    • 刷盘

      • 同步刷盘

        • 消息写入内存后,立刻调用刷盘线程进行刷盘
        • 如果消息在约定的时间内未刷盘成功(默认5s),则返回FLUSH_DISK_TIMEOUT,Producer收到后进行重试
      • 异步刷盘(默认

        • 消息写入CommitLog时,不会直接写入磁盘,而是先写到PageCache缓存后返回成功
        • 启用后台线程异步将消息刷入磁盘
    • 高可用
      • 多主
        • 多个Master节点,防止单主宕机,丢失消息问题
      • 主从+双写
        • 主从的情况下(写入master成功后立即ACK给Producer),会发生,master——>slave时,主节点Broker宕机,同步失败,从而导致消息丢失
        • 开启双写,只有等master和slave都写入成功,即双写成功后才会ACK给Producer,否则,会触发Producer的重试机制

    总结

    保证Broker存储及备份阶段,消息不丢失

    Broker——>Consumer

    • 消息确认

      • 消费者从Broker中拉去消息后,不是立马给Broker返回ack确认消息,而是等待业务代码顺利执行完成之后,再给Broker返回ack确认消息
    • 消息重试

      • 消息消费失败后,需提供重试消息的能力,RocketMQ本身提供了重新消费的能力

      总结

      保证Broker——>Consumer阶段,消息不丢失

    最终方案:

  • 相关阅读:
    数字集成电路设计(四、Verilog HDL数字逻辑设计方法)(一)
    基础练习 十进制转十六进制
    Android的六大布局详解
    组织赋能,统一企业门户实现高效化、移动化协作
    【793. 阶乘函数后 K 个零】
    C# 中的那些锁,在内核态都是怎么保证同步的?
    虹科动态 | 8月23日,虹科诚邀您参加上海电子设计创新大会(EDICON Across China 2022上海站)
    原生小程序一键获取手机号
    11.9树的表示方法(孩子,父亲,孩子兄弟),树、森林的遍历,一些操作,决策树,前缀树
    项目中的Object划分
  • 原文地址:https://www.cnblogs.com/zhaorongbiao/p/15990277.html