• SpringBoot整合RocketMQ笔记


    SpringBoot版本为2.3.12.Release

    RocketMQ对比kafka

    在这里插入图片描述

    学习链接

    https://zhuanlan.zhihu.com/p/335216381 代码实战

    https://www.cnblogs.com/RedOrange/p/17401238.html Centos安装rocketmq

    https://blog.csdn.net/chuige2013/article/details/123783612 RocketMQ详细配置与使用详解

    https://rocketmq-1.gitbook.io/rocketmq-connector/quick-start/qian-qi-zhun-bei/ji-qun-huan-jing 官网学习地址

    前言

    淘宝内部的交易系统使用了淘宝自主研发的Notify消息中间件,使用MySQL作为消息存储媒介,支持水平扩容。为了进一步降低成本,阿里中间件团队认为Notify可进一步优化。

    2011年初,Linkedin开源了kafka, 阿里中间件团队在对kafka做了充分的review之后,被kafka的无限消息堆积能力、高效的持久化速度深深吸引,但同时发现kafka主要定位于日志传输,对于使用在淘宝交易、订单、充值等场景下,还有若干特性不满足。因此,阿里中间件团队基于Java重新编写了RocketMQ,定位于不仅限于日志场景的可靠消息传输。

    目前,RocketMQ在阿里集团被广泛应用于订单、充值、交易、流计算、消息推送、日志流式处理、binlog分发等场景。

    RocketMQ与kafka的不同

    1、数据可靠性
    RocketMQ:支持异步实时刷盘、同步刷盘、同步复制、异步复制。
    kafka:使用异步刷盘方式,异步复制/同步复制。

    总结:
    1、RocketMQ支持kafka所不具备的“同步刷盘”功能,在单机可靠性上比kafka更高,不会因为操作系统Crash而导致数据丢失。
    2、kafka的同步replication理论上性能低于RocketMQ的replication,这是因为kafka的数据以partition为单位,这样一个kafka实例上可能多上百个partition。而一个RocketMQ实例上只有一个partition,RocketMQ可以充分利用IO组的commit机制,批量传输数据。同步replication与异步replication相比,同步replication性能上损耗约20%-30%。

    一句话概括:RocketMQ新增了同步刷盘机制,保证了可靠性;一个RocketMQ实例只有一个partition, 在replication时性能更好。

    2、性能对比
    1、kafka单机写入TPS月在百万条/秒,消息大小为10个字节。
    2、RocketMQ单机写入TPS单实例约7万条/秒,若单机部署3个broker,可以跑到最高12万条/秒,消息大小为10个字节。

    总结:
    kafka的单机TPS能跑到每秒上百万,是因为Producer端将多个小消息合并,批量发向broker。

    那么RocketMQ为什么没有这样做呢?

    发送消息的Producer通常是用Java语言,缓存过多消息,GC是个很严重的问题。(问题:难道kafka用scala不需要GC?)
    Producer发送消息到broker, 若消息发送出去后,未达到broker,就通知业务消息发送成功,若此时Broker宕机,则会导致消息丢失,从而导致业务出错。
    Producer通常为分布式系统,且每台机器都是多线程发送,通常来说线上单Producer产生的消息数量不会过万。
    消息合并功能完全可由上层业务来做。
    一句话概括:RocketMQ写入性能上不如kafka, 主要因为kafka主要应用于日志场景,而RocketMQ应用于业务场景,为了保证消息必达牺牲了性能,且基于线上真实场景没有在RocketMQ层做消息合并,推荐在业务层自己做。

    3、单机支持的队列数
    1、kafka单机若超过了64个partition/队列,CPU load会发生明显飙高,partition越多,CPU load越高,发消息的响应时间变长。
    2、RocketMQ单机支持最高5万个队列,CPU load不会发生明显变化。

    队列多有什么好处呢?
    1、单机可以创建更多个topic, 因为每个topic都是有一组队列组成。
    2、消费者的集群规模和队列数成正比,队列越多,消费类集群可以越大。

    一句话概括:RocketMQ支持的队列数远高于kafka支持的partition数,这样RocketMQ可以支持更多的consumer集群。

    4、消息投递的实时性
    1、kafka采用短轮询的方式,实时性取决于轮询时间间隔,0.8以后版本支持长轮询。
    2、RocketMQ使用长轮询,同Push实时性一致,消息投递的延迟通常在几毫秒内,

    一句话:kafka与RocketMQ都支持长轮询,消息投递的延迟在几毫秒内。

    5、消费失败重试
    1、kafka不支持消费失败重试。
    2、RocketMQ消费失败支持定时重试,每次重试间隔时间顺延。

    总结:以充值类应用为例,若当前时刻调用运营商网管失败,可能运营商网关此时压力过大,稍后再调用就会成功。这里的重试指可靠的重试,即失败重试的消息不是因为consumer宕机而导致的消息丢失。

    一句话概括:RocketMQ支持消费失败重试功能,主要用于第一次调用不成功,后面可调用成功的场景。而kafka不支持消费失败重试。

    6、严格保证消息有序
    1、kafka可保证同一个partition上的消息有序,但一旦broker宕机,就会产生消息乱序。
    2、Rocket支持严格的消息顺序,一台broker宕机,发送消息会失败,但不会乱序。举例:MySQL的二进制日志分发需要保证严格的顺序。

    一句话概括:kafka不保证消息有序,RocketMQ可保证严格的消息顺序,即使单台Broker宕机,仅会造成消息发送失败,但不会消息乱序。

    7、定时消息
    1、kafka不支持定时消息
    2、开源版本的RocketMQ仅支持定时级别,定时级别用户可定制

    8、分布式事务消息
    1、kafka不支持分布式事务消息
    2、RocketMQ支持分布式事务消息。

    9、消息查询
    1、kafka不支持消息查询
    2、RocketMQ支持根据消息标识(发送消息时指定一个消息key, 任意字符串,如指定为订单编号)查询消息,也支持根据消息内容查询消息。

    总结:消息查询功能对于定位消息丢失问题非常有用,例如某个订单处理失败,可用此功能查询是消息没收到,还是收到了但处理出错了。

    一句话概括:RocketMQ支持按消息标识或消息内容查询消息,用于排查消息丢失问题;kafka不支持消息查询。

    10、消息回溯
    1、kafka可按照消息的offset来回溯消息
    2、RocketMQ支持按照时间来回溯消息,精度到毫秒,例如从一天的几点几分几秒几毫秒来重新消费消息。

    总结:RocketMQ按时间做回溯消息的典型应用场景为,consumer做订单分析,但是由于程序逻辑或依赖的系统发生故障等原因,导致今天处理
    的消息全部无效,需要从昨天的零点重新处理。

    11、消息并行度
    1、kafka的消息并行度,依赖于topic里配置的partition数,如果partition数为10,那么最多10台机器来消费,每台机器只能开启一个线程;或者一台机器消费,最多开启10个线程。消费的并行度与partition个数一致。
    2、RocketMQ并行消费分两种情况:
    1)顺序消费方式的并行度与kafka一致。
    2)乱序消费方式的并行度取决于consumer的线程数,如topic配置10个队列,10台机器消费,每台机器100个线程,那么并行度为1000。

    一句话概括:kafka的消费并行度等于partition数;RocketMQ的消费并行度等于消费的线程数,不受队列数限制。

    12、开发语言
    1、kafka采用scala开发
    2、RocketMQ采用Java开发

    13、消息堆积能力
    kafka比RocketMQ的消息堆积能力更强,不过RocketMQ单机也可支持亿级的消息积压能力,这个堆积能力也能够完全满足业务需求。

    14、开源社区活跃度
    1、kafka社区更新较慢
    2、RocketMQ的Github社区有250人,公司用户登记了联系方式,QQ群超过1000人,
    3、kafka原开发团队成立了新公司,暂时未看到相关产品。
    4、RocketMQ已在阿里云商业化,目前以云服务形式供外部商用,并向用户承诺99.99%的可靠性,同时彻底解决了用户自己搭建MQ产品的运维复杂性问题。

    15、应用领域成熟度
    1、kafka在日志领域比较成熟
    2、RocketMQ在阿里集团内部有大量的应用在使用,并顺利支持了多次天猫双十一的考验。

    总结

    kafka和RocketMQ的总体区别是,kafka设计初衷是用于日志传输,而RocketMQ的设计用于解决各类应用可靠的消息传输,阿里云官网承诺RocketMQ数据可靠性为10个9,服务可靠性为99.95%。

    kafka相比RocketMQ的优势
    1、单机吞吐量TPS可上百万,远高于RocketMQ的TPS7万每秒,适用于日志类消息。
    2、kafka支持多语言的客户端

    RocketMQ相比kafka的优势
    1、保证消息不丢( 数据可靠性达10个9)
    2、可严格保证消息有序
    3、支持分布式事务消息
    4、支持按时间做消息回溯(可精确到毫秒级)
    5、支持按标识和内容查询消息,用于排查丢消息
    6、支持消费失败重试
    7、可支持更多的partition, 即更多的消费线程数

    Linux快速安装RocketMQ

    (1)Apache仓库:https://archive.apache.org/dist/rocketmq/
    (2)官网:https://rocketmq.apache.org/zh/
    (3)上传到centos7
    (4)解压
    在这里插入图片描述
    在这里插入图片描述

    (5)安装说明:https://blog.csdn.net/weixin_67767103/article/details/127260319

    修改配置文件

    进入bin文件加中修改三个文件:runserver.sh、runbroker.sh、tools.sh,主要是为了调整jvm堆内存,防止启动失败

    进入bin目录,首先我们修改runsever.sh的。使用vim命令:vim runserver.sh

    JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=256m"
    
    • 1

    接着修改runbroker.sh文件,同样vim runbroker.sh

    JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"
    
    • 1

    再接着,修改tools.sh,同样vim tools.sh

    JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=200m"
    
    • 1

    启动mq,linux记得关防火墙

    systemctl stop firewalld或者对9876端口开放访问

    使用命令
    nohup sh bin/mqnamesrv &

    查看启动日志
    tail -f ~/logs/rocketmqlogs/namesrv.log
    The Name Server boot success…
    在这里插入图片描述启动mqbroker服务
    nohup sh bin/mqbroker -n localhost:9876 &
    8976为rocketmq的默认端口

    查看日志
    tail -f ~/logs/rocketmqlogs/broker.log
    The broker[%s, 172.30.30.233:10911] boot success…

    在这里插入图片描述

    测试RocketMQ

    发送消息
    设置环境变量 export NAMESRV_ADDR=localhost:9876 #
    使用安装包的Demo发送消息 sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
    在这里插入图片描述

    接收消息
    设置环境变量 export NAMESRV_ADDR=localhost:9876
    接收消息 sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
    在这里插入图片描述

    关闭RocketMQ
    关闭NameServer sh bin/mqshutdown namesrv
    关闭Broker sh bin/mqshutdown broker

    各角色介绍

    Producer:消息的发送者;举例:发件者
    Consumer:消息接收者;举例:收件人
    Consumer Group:消费组;每一个 consumer 实例都属于一个 consumer group,每一条消息只会被同一个 consumer group 里的一个 consumer 实例消费。(不同consumer group可以同时消费同一条消息)
    Broker:暂存和传输消息;举例:快递公司
    NameServer:管理 Broker;举例:快递公司的管理机构
    Topic:区分消息的种类;一个发送者可以发送消息给一个或者多个 Topic;一个消息的接收者可以订阅一个或者多个 Topic 消息
    Message Queue:相当于是 Topic 的分区;用于并行发送和接收消息

    可视化监控平台搭建

    下载地址:https://github.com/apache/rocketmq-dashboard
    拉下来配置maven和jdk,直接使用idea启动
    配置mq地址
    在这里插入图片描述
    在这里插入图片描述
    设置开机启动
    将启动脚本挂载到系统init文件下
    vim /etc/rc.d/rc.local

    测试用例

    Java语言的生产者消费者测试用例

    初始化生产者消费者

    入门建议手动创建连接。
    发送同步消息步骤

    1.创建DefaultMQProducer,输入组名
    2.设置mq地址
    3.启动producer
    4.编写消息,设置topic、tag、body
    5.发送mq消息
    6.关闭producer
    
    排查Topic是否已创建
    cat ~/logs/rocketmqlogs/broker.log  | grep topicName=自己的Topic
    
    启动broker时开启自动创建Topic或自己去手动创建
    nohup sh mqbroker -n localhost:9876 autoCreateTopicEnable=true > ../broker.log &
    手动创建topic:进入mq的安装目录 ,执行如下命令创建topic。
    
    mq安装的相对目录是rocketmq-all-4.4.0/distribution/target/apache-rocketmq/bin
    
    以下命令创建了一个名称是orderPay的topic
    
    sh mqadmin updateTopic -b localhost:10911 -t orderPay
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    涉及到的常量

    package club.xxx.mq.constants;
    
    public class MQAddrConst {
        /**
         * mq地址
         */
        public static final String IP1 = "192.168.202.14:9876";
    
    
        public static final String TOPIC1 = "Order_Transaction_TopicTest";
        public static final String GROUP_NAME1 = "my_first_mq_test_group_name";
        public static final String TAG1 = "my_first_mq_test_tag";
    }
    
    
    package club.xxx.mq.constants;
    
    public class OrderConst {
        public static final String ORDER1 = "ORDER_PREFIX:username:1702935568079523840";
        public static final String ORDER2 = "ORDER_PREFIX:username:1702935568083718144";
        public static final String ORDER3 = "ORDER_PREFIX:username:1702935568083718145";
        public static final String ORDER4 = "ORDER_PREFIX:username:1702935568083718146";
        public static final String ORDER5 = "ORDER_PREFIX:username:1702935568083718147";
        public static final String ORDER6 = "ORDER_PREFIX:username:1702935568083718148";
        public static final String ORDER7 = "ORDER_PREFIX:username:1702935568083718149";
        public static final String ORDER8 = "ORDER_PREFIX:username:1702935568083718150";
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    生产者代码

    package club.xxx.mq._02myrawmq;
    
    import club.xxx.mq.constants.MQAddrConst;
    import club.xxx.mq.constants.OrderConst;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    
    /**
     * 1、Producer端发送同步消息 这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。
     */
    public class _01MySyncProducer {
        public static void main(String[] args) throws Exception {
            // 实例化消息生产者Producer
            DefaultMQProducer producer = new DefaultMQProducer(MQAddrConst.GROUP_NAME1);
            // 设置NameServer的地址
            producer.setNamesrvAddr(MQAddrConst.IP1);
            // 启动Producer实例
            producer.start();
            // 创建消息,并指定Topic,Tag和消息体
            Message msg = new Message(MQAddrConst.TOPIC1/* Topic */,
                    MQAddrConst.TAG1 /* Tag */,
                    (OrderConst.ORDER1).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
    //        msg.setKeys("KEY???");
            // 发送消息到一个Broker
            SendResult sendResult = producer.send(msg);
            // 通过sendResult返回消息是否成功送达
            System.out.printf("sync send ok: %s%n", sendResult);
            // 如果不再发送消息,关闭Producer实例。
            producer.shutdown();
        }
    }
    
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    消费者代码

    package club.xxx.mq._02myrawmq;
    
    import club.xxx.mq.constants.MQAddrConst;
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.message.MessageExt;
    
    import java.nio.charset.StandardCharsets;
    import java.util.List;
    
    /**
     * 消费者
     */
    public class _02MyConsumer {
    
        public static void main(String[] args) throws InterruptedException, MQClientException {
    
            // 实例化消费者
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(MQAddrConst.GROUP_NAME1);
    
            // 设置NameServer的地址
            consumer.setNamesrvAddr(MQAddrConst.IP1);
    
            // 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
            consumer.subscribe(MQAddrConst.TOPIC1, MQAddrConst.TAG1); // subExpression <=> tags
            // 注册回调实现类来处理从broker拉取回来的消息
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    String topic = msgs.get(0).getTopic();
                    String tags = msgs.get(0).getTags();
                    String msgId = msgs.get(0).getMsgId();
                    String data = new String(msgs.get(0).getBody(), StandardCharsets.UTF_8);
    
                    System.out.printf("收到订阅消息: %s, %s, %s, %s, %s\r\n", Thread.currentThread().getName(), topic, tags, msgId, data);
                    // 手动标记消息已被消费/标记该消息已经被成功消费
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            // 启动消费者实例
            consumer.start();
            System.out.printf("Consumer Started.%n");
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48

    TCC分布式事务消息

    Java实现
    jdk1.8
    rocketmq-4.3.0
    生产者

    package club.xxx.mq._02myrawmq;
    
    import club.xxx.mq.constants.MQAddrConst;
    import club.xxx.mq.constants.OrderConst;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    
    /**
     * 

    rocketmq-4.3.0

    * 1、Producer端发送同步消息 这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。 */
    public class _01MySyncProducer { public static void main(String[] args) throws Exception { // 实例化消息生产者Producer DefaultMQProducer producer = new DefaultMQProducer(MQAddrConst.GROUP_NAME1); // 设置NameServer的地址 producer.setNamesrvAddr(MQAddrConst.IP1); // 启动Producer实例 producer.start(); // 创建消息,并指定Topic,Tag和消息体 Message msg = new Message(MQAddrConst.TOPIC1/* Topic */, MQAddrConst.TAG1 /* Tag */, (OrderConst.ORDER1).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); // msg.setKeys("KEY???"); // 发送消息到一个Broker SendResult sendResult = producer.send(msg); // 通过sendResult返回消息是否成功送达 System.out.printf("sync send ok: %s%n", sendResult); // 如果不再发送消息,关闭Producer实例。 producer.shutdown(); } }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    消费者

    package club.xxx.mq._02myrawmq;
    
    import club.xxx.mq.constants.MQAddrConst;
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.message.MessageExt;
    
    import java.nio.charset.StandardCharsets;
    import java.util.List;
    
    /**
     * 

    rocketmq-4.3.0

    * 消费者 */
    public class _02MyConsumer { public static void main(String[] args) throws InterruptedException, MQClientException { // 实例化消费者 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(MQAddrConst.GROUP_NAME1); // 设置NameServer的地址 consumer.setNamesrvAddr(MQAddrConst.IP1); // 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息 consumer.subscribe(MQAddrConst.TOPIC1, MQAddrConst.TAG1); // subExpression <=> tags // 注册回调实现类来处理从broker拉取回来的消息 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { String topic = msgs.get(0).getTopic(); String tags = msgs.get(0).getTags(); String msgId = msgs.get(0).getMsgId(); String data = new String(msgs.get(0).getBody(), StandardCharsets.UTF_8); System.out.printf("收到订阅消息: %s, %s, %s, %s, %s\r\n", Thread.currentThread().getName(), topic, tags, msgId, data); // 手动标记消息已被消费/标记该消息已经被成功消费 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 启动消费者实例 consumer.start(); System.out.printf("Consumer Started.%n"); } }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48

    事务消息生产者TCC简单实现

    package club.xxx.mq._02myrawmq;
    
    import club.xxx.mq.constants.MQAddrConst;
    import club.xxx.mq.constants.OrderConst;
    import org.apache.rocketmq.client.producer.LocalTransactionState;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.client.producer.TransactionListener;
    import org.apache.rocketmq.client.producer.TransactionMQProducer;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.common.message.MessageExt;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    
    /**
     * 

    rocketmq-4.3.0

    * 1、Producer端发送同步消息 这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。 */
    public class _03MyTransactionProducer { public static void main(String[] args) throws Exception { // 实例化消息生产者Producer TransactionMQProducer producer = new TransactionMQProducer(MQAddrConst.GROUP_NAME1); // 设置NameServer的地址 producer.setNamesrvAddr(MQAddrConst.IP1); // 启动Producer实例 producer.start(); producer.setTransactionListener(new TransactionListener() { @Override public LocalTransactionState executeLocalTransaction(Message message, Object o) { // 这里做自己的事务操作 return LocalTransactionState.COMMIT_MESSAGE; } @Override public LocalTransactionState checkLocalTransaction(MessageExt messageExt) { // 事务回滚操作 return LocalTransactionState.COMMIT_MESSAGE; } }); // 创建事务消息,并指定Topic,Tag和消息体 Message msg = new Message(MQAddrConst.TOPIC1/* Topic */, MQAddrConst.TAG1 /* Tag */, (OrderConst.ORDER1).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); // msg.setKeys("KEY???"); // 发送消息到一个Broker SendResult sendResult = producer.sendMessageInTransaction(msg, null); // 通过sendResult返回消息是否成功送达 System.out.printf("sync send ok: %s%n", sendResult); // 如果不再发送消息,关闭Producer实例。 producer.shutdown(); } }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
  • 相关阅读:
    【LeetCode】滑动窗口题解汇总
    JVM学习四
    实验5-1——本地yum源的配置
    【高等数学基础进阶】多元函数的极值与最值
    vue递归组件
    栈的生长方向不总是向下
    【Python基础篇009】那就浅浅回顾一下生成器吧
    SQL游戏行业实战案例5:玩家在线分布(自定义排序,条件求和)
    个人微信api
    Vue3 基础 – 快速上手 & 常用指令
  • 原文地址:https://blog.csdn.net/qq_27577113/article/details/132791664