• 五、RocketMQ发送顺序消息


    顺序消息的应用场景

    在有序事件处理、撮合交易、数据实时增量同步等场景下,异构系统间需要维持强一致的状态同步,上游的事件变更需要按照顺序传递到下游进行处理。

    例如需要保证一个订单的生成、付款和发货,这三件事情是被顺序执行的。

    如何消息的顺序性

    RocketMQ消息的顺序性分为两部分:生产顺序性和消费顺序性,只有同时满足了生产顺序性和消费顺序性才能达到消息整体的有序性

    生产的顺序性

    要保证发送消息的顺序性,就必须保证消息以下条件

    • 单一生产者:顺序消息必须由单一生产者产生,不同生产者分布在不同的系统,即使设置相同的分区键,不同生产者之间产生的消息也无法判定其先后顺序。
    • 串行发送:生产者客户端支持多线程安全访问,但如果生产者使用多线程并行发送,则不同线程间产生的消息将无法判定其先后顺序。

    总结:单一生产者需要制定消息的顺序性,并且需要将顺序消息根据分区键发送到一个队列上,在发送时,需要使用串行发送

    @Test
    public void sendOrder() throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("test-producer-group");
        producer.setNamesrvAddr(RocketMQConfig.NAME_SERVER_ADDR);
        producer.start();
    
        // 分区key
        int orderId = 1;
        for (int i = 0; i < 1000; i++) {
            Message message = new Message(RocketMQConfig.TEST_TOPIC, ("顺序" + i).getBytes(Charset.defaultCharset()));
            // 发送顺序消息,需要传递分区键
            SendResult sendResult1 = producer.send(message, new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    // 分区键
                    int orderId = (int) arg;
                    // 根据分区键取模
                    int index = orderId % mqs.size();
                    // 返回要发送到哪个队列中去
                    return mqs.get(index);
                }
            }, orderId);
            System.out.println(sendResult1.getSendStatus());
        }
        producer.shutdown();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    消息的顺序性

    消费者在push模式下,有两种消息方式

    • MessageListenerOrderly会启动多个线程处理消息,但是会加锁,实际上会转变为串行,进行实现消息的顺序性

      顺序消费的结果

    在这里插入图片描述

    • MessageListenerConcurrently会启动多个线程处理消费者,并且不保证加锁,不保证消息的顺序性

      非顺序消费的结果

    在这里插入图片描述

    因此为了保证消息的顺序性,需要使用MessageListenerOrderly来处理消息

    @Test
    public void consumerOrder() throws Exception{
        // 1.创建消费端,指明消费者属于哪个组
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_group_order");
    
        // 2.注册NameServer地址
        consumer.setNamesrvAddr(RocketMQConfig.NAME_SERVER_ADDR);
    
        // 3.订阅topic,并且可以根据标签进行定向消费
        consumer.subscribe(RocketMQConfig.TEST_TOPIC, "*");
    
        // 4.注册监听器,broker推送消息后,处理顺序消息
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                System.out.println(Thread.currentThread().getName() + "收到的msg大小:" + msgs.size());
                for (MessageExt msg : msgs) {
                    String content = new String(msg.getBody());
                    System.out.println(Thread.currentThread().getName() + "收到的消息体:" + content);
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
    
        // 5.启动消费端
        consumer.start();
    
        // 防止主线程退出
        Thread.sleep(Integer.MAX_VALUE);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    注:在验证顺序消息时,当这两种消费模式在启动情况下,是都可以顺序消费的。

    只有先发送消息,在启动消费,并发消费才会出现乱序的情况

    猜测:

    1. 单生产者进行发送消息,并发量不高
    2. 发送的数据量太低
  • 相关阅读:
    mvn deploy tongweb-embed-7.0.E.5_P3 依赖上传
    面试官:字节流可以处理一切文件为什么还需要字符流呢?
    form表单内嵌套动态表格添加校验功能
    36、Java——一个案例学会三层架构对数据表的增删改查
    八、PL/SQL 记录
    QUIC不是TCP的替代品
    COCI2021-2022#1 Logičari
    peer_connection_interface参数
    自定义Lua解析器管理器-------演化脚本V0.5
    Java线程池maximumPoolSize和workQueue哪个先饱和?
  • 原文地址:https://blog.csdn.net/HueyLong/article/details/133843406