• RocketMq快速入门(详解)


    目录

    Docker-compose安装RocketMQ

    一、docker目录下建立以下目录文件

    二、docker-compose.yml配置

    三、修改broker.conf

    四.开启端口

     五. 安装启动

     六. 查看是否启动

    七. 浏览器访问检验

    简单消息示例

    1.添加依赖 

    2.异步发送消息

    3.消费消息

    4.启动测试

     

    RocketMQ作为一款纯java、分布式、队列模型的开源消息中间件,支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。

    RocketMQ主要有四大核心组成部分:NameServerBrokerProducer以及Consumer四部分。

    RocketMQ 优势 

    支持事务型消息(消息发送和 DB 操作保持两方的最终一致性,RabbitMQ 和 Kafka 不支持)
    支持结合 RocketMQ 的多个系统之间数据最终一致性(多方事务,二方事务是前提)
    支持 18 个级别的延迟消息(Kafka 不支持)
    支持指定次数和时间间隔的失败消息重发(Kafka 不支持,RabbitMQ 需要手动确认)
    支持 Consumer 端 Tag 过滤,减少不必要的网络传输(即过滤由MQ完成,而不是由消费者完成。RabbitMQ 和 Kafka 不支持)
    支持重复消费(RabbitMQ 不支持,Kafka 支持)
     

    Docker-compose安装RocketMQ

    一、docker目录下建立以下目录文件

    1. mkdir rocketmq
    2. mkdir conf
    3. mkdir logs
    4. mkdir store

     

    二、docker-compose.yml配置

    1. version: "3"
    2. services:
    3. mqnamesrv:
    4. image: foxiswho/rocketmq:4.7.0 #安装什么版本就写什么版本
    5. container_name: mqnamesrv
    6. ports:
    7. - 9876:9876
    8. environment:
    9. JAVA_OPT: -server -Xms256m -Xmx256m
    10. command: sh mqnamesrv
    11. mqbroker:
    12. image: foxiswho/rocketmq:4.7.0
    13. container_name: mqbroker
    14. ports:
    15. - 10911:10911
    16. - 10909:10909
    17. volumes:
    18. - ./conf/broker.conf:/usr/local/dockerCompose/rocketmq/conf/broker.conf
    19. environment:
    20. JAVA_OPT_EXT: -server -Xms256m -Xmx256m -Xmn128m
    21. NAMESRV_ADDR: mqnamesrv:9876
    22. command: sh mqbroker -n mqnamesrv:9876 -c /usr/local/dockerCompose/rocketmq/conf/broker.conf
    23. mqconsole:
    24. image: styletang/rocketmq-console-ng
    25. container_name: mqconsole
    26. ports:
    27. - 19876:8080
    28. environment:
    29. JAVA_OPTS: -Drocketmq.namesrv.addr=mqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=falses

    三、修改broker.conf

    vi broker.conf
    1. brokerClusterName = DefaultCluster
    2. brokerName = broker-a
    3. brokerId = 0
    4. deleteWhen = 04
    5. fileReservedTime = 48
    6. brokerRole = ASYNC_MASTER
    7. flushDiskType = ASYNC_FLUSH
    8. # 主机IP
    9. brokerIP1 = 自己的IP

     

    四.开启端口

     五. 安装启动

    docker-compose up -d

     六. 查看是否启动

    docker ps

     

    七. 浏览器访问检验

     

    简单消息示例

    1.添加依赖 

    1. <dependency>
    2. <groupId>org.apache.rocketmq</groupId>
    3. <artifactId>rocketmq-client</artifactId>
    4. <version>4.7.0</version>
    5. </dependency>

    2.异步发送消息

    1. public class AsyncProducer {
    2. public static void main(String[] args) throws Exception {
    3. // 创建指定分组名的生产者
    4. DefaultMQProducer producer = new DefaultMQProducer("qiu");
    5. //自己的服务器地址
    6. producer.setNamesrvAddr("106.52.242.189:9876");
    7. // 启动生产者
    8. producer.start();
    9. for (int i = 0; i < 128; i++)
    10. try {
    11. // 构建消息
    12. Message msg = new Message("TopicTest",
    13. "TagA",
    14. "OrderID188",
    15. "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
    16. // 同步发送
    17. SendResult sendResult = producer.send(msg);
    18. // 打印发送结果
    19. System.out.printf("%s%n", sendResult);
    20. } catch (Exception e) {
    21. e.printStackTrace();
    22. }
    23. producer.shutdown();
    24. }
    25. }

     

    3.消费消息

    1. public class Consumer {
    2. public static void main(String[] args) throws InterruptedException, MQClientException {
    3. // Instantiate with specified consumer group name.
    4. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("qiu");
    5. // Specify name server addresses.
    6. consumer.setNamesrvAddr("106.52.242.189:9876");
    7. // Subscribe one more more topics to consume.
    8. consumer.subscribe("TopicTest", "*");
    9. // Register callback to execute on arrival of messages fetched from brokers.
    10. consumer.registerMessageListener(new MessageListenerConcurrently() {
    11. @Override
    12. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
    13. ConsumeConcurrentlyContext context) {
    14. System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
    15. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    16. }
    17. });
    18. //Launch the consumer instance.
    19. consumer.start();
    20. System.out.printf("Consumer Started.%n");
    21. }
    22. }

     

    4.启动测试

    发送消息成功

     接收消息成功

     

    后台监控成功

     

  • 相关阅读:
    Reeds-Shepp曲线学习笔记及相关思考
    零基础学python之流程控制
    4.4.1-Testing_for_Credentials_Transported_over_an_Encrypted_Channel
    Spring注解驱动之@Bean注解指定初始化和销毁的方法
    JUC-不得不说的Java“锁”事
    第86步 时间序列建模实战:Transformer回归建模
    从Spring源码探究IOC初始化流程
    【LeetCode】【剑指offer】【从上到下打印二叉树(一)】
    【AI视野·今日Robot 机器人论文速览 第五十七期】Wed, 18 Oct 2023
    CBAM:Convolutional Block Attention Module--通道+空间混合注意力
  • 原文地址:https://blog.csdn.net/lu__lala/article/details/125457565