• RocketMQ的简单使用


    大家好,我是Leo!今天来和大家分享RocketMQ的一些用法。

    领域模型介绍

    Producer: 用于生产消息的运行实体。

    Topic: 主题,用于消息传输和存储的分组容器。

    MessageQueue: 消息传输和存储的实际单元容器。

    Message: 消息传输的最小单元。

    ConsumerGroup: 消费者组。

    Consumer: 消费者。

    Subscription: 订阅关系,发布订阅模式中消息过滤、重试、消费进度的规则配置。

    MQ的优势

    MQ的明显优势有3个。

    应用解耦: 以多服务为例,用户下单,需要通知订单服务和库存服务,我们可以通过MQ消息来解除下单和库存系统的耦合。

    异步提速: 以秒杀为例,我们可以先返回秒杀结果,后续再通过MQ异步消息去插入记录和扣减库存等,减少调用的链路长度。

    削峰填谷: 将某一时间内的请求量分摊到更多时间处理,比如系统A一秒只能处理10000个请求,但是我有100000个请求需要处理,我可以将请求发到MQ中,再分成10秒去消费这些请求。

    当然MQ也有劣势系统可用性降低系统复杂度提高一致性问题

    RocketMQ的主要角色

    主要包括Producer、Broker、Consumer、NameServer Cluster。

    一对多

    可以通过设置不同的消费者组

    不同组通过不同的消费者组既可以实现同时收到一样数量的消息,那同一个消费者组需要怎样才能收到同样数量的消息呢?

    // 消费者消费模式
    consumer.setMessageModel(MessageModel.BROADCASTING);
    

    默认是集群模式CLUSTERING,设置成广播模式

    既可以实现一对多的发送。

    同步消息(普通消息)

    同步消息需要阻塞等待消息发送结果的返回

    public class ProducerDemo {
    
        public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
    
            DefaultMQProducer producer = new DefaultMQProducer("group1");
    
            producer.setNamesrvAddr("localhost:9876");
            producer.start();
            Message message = new Message();
            message.setTopic("MQLearn");
            message.setTags("1.0.0");
            message.setBody("Hello MQ!".getBytes(StandardCharsets.UTF_8));
            SendResult result = producer.send(message);
            if (result.getSendStatus().equals(SendStatus.SEND_OK)) {
                System.out.println(result);
                System.out.println("发送成功:" + message);
            }
    
            producer.shutdown();
        }
    }
    

    异步消息

    异步消息需要实现发送成功和失败的回调函数。

    public class Producer {
    
        public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
    
            DefaultMQProducer producer = new DefaultMQProducer("group1");
    
            producer.setNamesrvAddr("192.168.246.140:9876");
            producer.start();
            // 异步消息
            for (int i = 0; i < 10; i++) {
                Message message = new Message();
                message.setTopic("topic7");
                message.setTags("1.0.0");
    
                message.setBody(("Hello World !" + i).getBytes(StandardCharsets.UTF_8));
                producer.send(message, new SendCallback() {
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        // 发送成功的回调方法
                        System.out.println(sendResult);
                    }
    
                    @Override
                    public void onException(Throwable e) {
                        // 发送失败的回调方法
                        System.out.println(e);
                    }
                });
            }
            TimeUnit.SECONDS.sleep(10);
            System.out.println("异步发送完成!");
        }
    }
    

    单向消息

    单向消息就类似UDP,只顾单向发送,不管是否发送成功,常用于日志收集等场景。

    public class SingleDirectionProducer {
    
        public static void main(String[] args) throws Exception {
            DefaultMQProducer producer = new DefaultMQProducer("group1");
    
            producer.setNamesrvAddr("localhost:9876");
            producer.start();
            // 单向消息
            for (int i = 0; i < 10; i++) {
                Message message = new Message();
                message.setTopic("topic8");
                message.setTags("1.0.0");
                message.setBody(("Hello World !" + i).getBytes(StandardCharsets.UTF_8));
                producer.sendOneway(message);
            }
            System.out.println("带向发送完成!");
        }
    }
    

    延时(定时)消息

    RocketMQ提供的定时消息并不能指定在什么时间点去投递消息。而是根据设定的等待时间,起到延时到达的缓冲作用在RocketMQ中,延时消息的delayTimeLevel支持以下级别:

    1 1s 2 5s 3 10s 4 30s 5 1m 6 2m 7 3m 8 4m 9 5m 10 6m 11 7m 12 8m 13 9m 14 10m 15 20m 16 30m 17 1h 18 2h

    // 设置消息延时级别
    message.setDelayTimeLevel(3);
    

    批量消息

    批量消息支持一次发送多条消息。

    注意:

    • 批量消息需要有相同的topic

    • 不能是延时消息

    • 消息内容不能超过4M,可以通过producer.setMaxMessageSize()和broker进行设置设置(可以通过拆分多次发送)

    public class BatchProducer {
    
        public static void main(String[] args) throws Exception {
            DefaultMQProducer producer = new DefaultMQProducer("group1");
    
            producer.setNamesrvAddr("localhost:9876");
            producer.start();
            List list = new ArrayList<>();
            // 批量消息
            for (int i = 0; i < 10; i++) {
                Message message = new Message();
                message.setTopic("topic10");
                message.setTags("1.0.0");
                message.setBody(("Hello World !" + i).getBytes(StandardCharsets.UTF_8));
                list.add(message);
            }
            SendResult result = producer.send(list);
            System.out.println(result);
            TimeUnit.SECONDS.sleep(2);
            System.out.println("发送完成!");
        }
    }
    

    顺序消息

    顺序消息支持按照消息的发送消息先后获取消息。

    比如:我的一笔订单有多个流程需要处理,比如创建->付款->推送->完成。

    通过同一笔订单放到一个队列中,这样就可以解决消费的无序问题。

    通过实现MessageQueueSelector来选择一个队列。

    public class Producer {
    
        public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
    
            DefaultMQProducer producer = new DefaultMQProducer("group1");
    
            producer.setNamesrvAddr("localhost:9876");
            producer.start();
    
                Message message = new Message();
                // 模拟业务ID
                int step = 10;
                message.setTopic("topic12");
                message.setTags("1.0.0");
                message.setBody(("Hello World !").getBytes(StandardCharsets.UTF_8));
                producer.send(message, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List mqs, Message msg, Object arg) {
                        // 队列数
                        int size = mqs.size();
                        // 取模
                        int orderId = step;
                        return mqs.get(orderId % size);
                    }
                }, null);
    
            System.out.println("发送完成!");
        }
    }
    
    public class Consumer {
    
        public static void main(String[] args) throws Exception{
    
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
            consumer.setConsumerGroup("group1");
            consumer.setNamesrvAddr("localhost:9876");
            consumer.subscribe("topic12", "*");
            // 消费者,起一个顺序监听,一个线程,只监听一个队列
            consumer.registerMessageListener(new MessageListenerOrderly() {
                @Override
                public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) {
                    for (MessageExt msg : msgs) {
                        System.out.println(msg);
                        byte[] body = msg.getBody();
                        System.out.println(new String(body));
                    }
                    return ConsumeOrderlyStatus.SUCCESS;
                }
            });
            consumer.start();
            System.out.println("消费者启动了!");
    
        }
    }
    

    事务消息

    RocketMQ中的事务消息支持在分布式场景下消息生产和本地事务的最终一致性。

    大致流程为,

    1. 生产者先将消息发送至RocketMQ。

    2. RocketMQBroker将消息持久化成功后,向生产者返回ACK消息确认已经返回成功,消息状态为暂时不能投递状态。

    3. 执行本地事务逻辑。

    4. 生产者根据事务执行结果向Broker提交commit或者rollback结果。

    5. 如果在断网或者重启情况下,未收到4的结果,或者返回Unknown未知状态,在固定时间对消息进行回查。

    6. 生产者收到消息回查后,需要本地事务执行的最终结果。

    7. 生产者对本地事务状态进行二次提交或确认。

    public class Producer {
    
        public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
            TransactionMQProducer producer = new TransactionMQProducer("group1");
            producer.setNamesrvAddr("localhost:9876");
            // 设置事务监听
            producer.setTransactionListener(new TransactionListener() {
                // 正常事务监听
                @Override
                public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                    // 把消息保存到mysql数据库
                    boolean ok = false;
    
                    if (ok) {
                        System.out.println("正常执行事务过程");
                        return LocalTransactionState.COMMIT_MESSAGE;
                    } else {
                        System.out.println("事务补偿过程");
                        return LocalTransactionState.UNKNOW;
                        //return LocalTransactionState.ROLLBACK_MESSAGE;
                    }
    
                }
                // 事务补偿事务
                @Override
                public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                    System.out.println("事务补偿过程");
                    // sql select
                    if (true) {
    
                    } else {
    
                    }
                    return LocalTransactionState.COMMIT_MESSAGE;
                }
            });
            producer.start();
            String msg = "Hello Transaction Message!";
            Message message = new Message("topic13", "tag", msg.getBytes(StandardCharsets.UTF_8));
            TransactionSendResult transactionSendResult = producer.sendMessageInTransaction(message, null);
            TimeUnit.SECONDS.sleep(2);
            System.out.println(transactionSendResult);
            System.out.println("发送完成!");
        }
    }
    

    消息的过滤

    在RocketMQ中的消息过滤功能能通过生产者和消费者对消息的属性和Tag进行定义,在消费端可以根据过滤条件进行筛选匹配,将符合条件的消息投递给消费者进行消费。

    支持两种方式:Tag标签过滤和SQL属性过滤。

    Message message = new Message();
    message.setTopic("topic11");
    message.setTags("tag");
    message.setBody(("Hello World !" + "tag").getBytes(StandardCharsets.UTF_8));
    message.putUserProperty("name", "zhangsan");
    message.putUserProperty("age", "16");
    SendResult result = producer.send(message);
    

    subscribe方法subExpression参数也支持Tag过滤

    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
    consumer.setConsumerGroup("group1");
    consumer.setNamesrvAddr("112.74.125.184:9876");
    consumer.subscribe("topic11", MessageSelector.bySql("age > 16"));
    

    SpringBoot整合RocketMQ的使用

    在SpringBoot项目中主要通过RocketMQTemplate进行消息的发送。

    // 普通消息
    rocketMQTemplate.convertAndSend("topic10", user);
    rocketMQTemplate.send("topic10", MessageBuilder.withPayload(user).
    SendResult result = rocketMQTemplate.syncSend("topic10", user);
    // 异步消息
    rocketMQTemplate.asyncSend("topic10", user, new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
            System.out.println("成功!");
        }
        @Override
        public void onException(Throwable e) {
            System.out.println(e);
        }
    }, 1000L);
    // 单向消息
    rocketMQTemplate.sendOneWay("topic10", user);
    // 延时消息
    rocketMQTemplate.syncSend("topic10", MessageBuilder.withPayload(user).build(), 2000L, 3); 
    // 批量消息
    rocketMQTemplate.syncSend("topic10", list, 1000);
    

    消费者:在注解中可以实现根据Tag和SQL进行属性的过滤。

    @Service
    //@RocketMQMessageListener(
    //        consumerGroup = "group1",
    //        topic = "topic10",
    //        selectorExpression = "tag1 || tag2"
    //)
    @RocketMQMessageListener(
            consumerGroup = "group1",
            topic = "topic10",
            selectorType = SelectorType.SQL92,
            selectorExpression = "age > 16",
            messageModel = MessageModel.BROADCASTING
    )
    public class UserConsumer implements RocketMQListener {
    
    
        @Override
        public void onMessage(User message) {
    
        }
    }
    

    总结

    今天主要分享了一下RocketMQ的一些基础使用,包括各种类型的消息的使用,偏向于代码实现部分,对于原理篇没有过多涉及。

  • 相关阅读:
    OneNote 教程,如何在 OneNote 中创建更多空间?
    趣味问题《寻人启事》的Python程序解决
    线程和进程区别
    TCP通信实现:多发多收消息、实现可以同时接收多个客户端
    【Linux实验】fork()创建子进程的一系列测试【进程管理】
    HashMap 源码逐行分析,j + oldCap 桶位置重分配公式手写验证
    【vue2高德地图api】02-npm引入插件,在页面中展示效果
    浅谈OAuth 2.0与JWT
    AWS无服务器 应用程序开发—第九章 权限(Amazon IAM)
    SSM整合redis及redis的注解式开发和解决Redis缓存问题
  • 原文地址:https://www.cnblogs.com/Leo-coding/p/17367503.html