• SpringBoot3集成RocketMq


    标签:RocketMq5.Dashboard;

    一、简介

    RocketMQ因其架构简单、业务功能丰富、具备极强可扩展性等特点被广泛应用,比如金融业务、互联网、大数据、物联网等领域的业务场景;

    二、环境部署

    1、编译打包

    1、下载5.0版本源码包
    rocketmq-all-5.0.0-source-release.zip
    
    2、解压后进入目录,编译打包
    mvn -Prelease-all -DskipTests -Dspotbugs.skip=true clean install -U
    

    2、修改配置

    在distribution/target/rocketmq-5.0.0/rocketmq-5.0.0/bin/runserver.sh
    

    distribution/target/rocketmq-5.0.0/rocketmq-5.0.0/bin/runbroker.sh
    

    3、服务启动

    1、该目录下
    distribution/target/rocketmq-5.0.0/rocketmq-5.0.0/bin/
    
    2、启动NameServer
    sh mqnamesrv
    
    输出日志
    The Name Server boot success. serializeType=JSON
    
    3、启动Broker+Proxy
    sh mqbroker -n localhost:9876 --enable-proxy
    
    输出日志
    rocketmq-proxy startup successfully
    
    4、关闭服务
    sh mqshutdown namesrv
    Send shutdown request to mqnamesrv(18636) OK
    
    sh mqshutdown broker
    Send shutdown request to mqbroker with proxy enable OK(18647)
    

    4、控制台安装

    1、下载master源码包
    rocketmq-dashboard-master
    
    2、解压后进入目录,编译打包
    mvn clean package -Dmaven.test.skip=true
    
    3、启动服务
    java -jar target/rocketmq-dashboard-1.0.1-SNAPSHOT.jar
    
    4、输出日志
    INFO main - Tomcat started on port(s): 8080 (http) with context path ''
    
    5、访问服务:localhost:8080
    

    三、工程搭建

    1、工程结构

    2、依赖管理

    rocketmq-starter组件中,实际上依赖的是rocketmq-client组件的5.0版本,由于两个新版框架间的兼容问题,需要添加相关配置解决该问题;

    <dependency>
        <groupId>org.apache.rocketmqgroupId>
        <artifactId>rocketmq-spring-boot-starterartifactId>
        <version>${rocketmq-starter.version}version>
    dependency>
    

    3、配置文件

    配置RocketMq服务地址,消息生产者和消费者;

    rocketmq:
      name-server: 127.0.0.1:9876
      # 生产者
      producer:
        group: boot_group_1
        # 消息发送超时时间
        send-message-timeout: 3000
        # 消息最大长度4M
        max-message-size: 4096
        # 消息发送失败重试次数
        retry-times-when-send-failed: 3
        # 异步消息发送失败重试次数
        retry-times-when-send-async-failed: 2
      # 消费者
      consumer:
        group: boot_group_1
        # 每次提取的最大消息数
        pull-batch-size: 5
    

    4、配置类

    在配置类中主要定义两个Bean的加载,即RocketMQTemplateDefaultMQProducer,主要是提供消息发送的能力,即生产消息;

    @Configuration
    public class RocketMqConfig {
    
        @Value("${rocketmq.name-server}")
        private String nameServer;
    
        @Value("${rocketmq.producer.group}")
        private String producerGroup;
    
        @Value("${rocketmq.producer.send-message-timeout}")
        private Integer sendMsgTimeout;
    
        @Value("${rocketmq.producer.max-message-size}")
        private Integer maxMessageSize;
    
        @Value("${rocketmq.producer.retry-times-when-send-failed}")
        private Integer retryTimesWhenSendFailed ;
    
        @Value("${rocketmq.producer.retry-times-when-send-async-failed}")
        private Integer retryTimesWhenSendAsyncFailed ;
    
        @Bean
        public RocketMQTemplate rocketMqTemplate(){
            RocketMQTemplate rocketMqTemplate = new RocketMQTemplate();
            rocketMqTemplate.setProducer(defaultMqProducer());
            return rocketMqTemplate;
        }
    
        @Bean
        public DefaultMQProducer defaultMqProducer() {
            DefaultMQProducer producer = new DefaultMQProducer();
            producer.setNamesrvAddr(this.nameServer);
            producer.setProducerGroup(this.producerGroup);
            producer.setSendMsgTimeout(this.sendMsgTimeout);
            producer.setMaxMessageSize(this.maxMessageSize);
            producer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed);
            producer.setRetryTimesWhenSendAsyncFailed(this.retryTimesWhenSendAsyncFailed);
            return producer;
        }
    }
    

    四、基础用法

    1、消息生产

    编写一个生产者接口类,分别使用RocketMQTemplateDefaultMQProducer实现消息发送的功能,然后可以通过Dashboard控制面板查看消息详情;

    @RestController
    public class ProducerWeb {
        private static final Logger log = LoggerFactory.getLogger(ProducerWeb.class);
    
        @Autowired
        private RocketMQTemplate rocketMqTemplate;
    
        @GetMapping("/send/msg1")
        public String sendMsg1 (){
            try {
                // 构建消息主体
                JsonMapper jsonMapper = new JsonMapper();
                String msgBody = jsonMapper.writeValueAsString(new MqMsg(1,"boot_mq_msg"));
                // 发送消息
                rocketMqTemplate.convertAndSend("boot-mq-topic",msgBody);
            } catch (Exception e) {
                e.printStackTrace();
            }
            return "OK" ;
        }
    
        @Autowired
        private DefaultMQProducer defaultMqProducer ;
    
        @GetMapping("/send/msg2")
        public String sendMsg2 (){
            try {
                // 构建消息主体
                JsonMapper jsonMapper = new JsonMapper();
                String msgBody = jsonMapper.writeValueAsString(new MqMsg(2,"boot_mq_msg"));
                // 构建消息对象
                Message message = new Message();
                message.setTopic("boot-mq-topic");
                message.setTags("boot-mq-tag");
                message.setKeys("boot-mq-key");
                message.setBody(msgBody.getBytes());
                // 发送消息,打印日志
                SendResult sendResult = defaultMqProducer.send(message);
                log.info("msgId:{},sendStatus:{}",sendResult.getMsgId(),sendResult.getSendStatus());
            } catch (Exception e) {
                e.printStackTrace();
            }
            return "OK" ;
        }
    }
    

    2、消息消费

    编写消息监听类,实现RocketMQListener接口,通过RocketMQMessageListener注解控制监听的具体信息;

    @Service
    @RocketMQMessageListener(consumerGroup = "boot_group_1",topic = "boot-mq-topic")
    public class ConsumerListener implements RocketMQListener {
    
        private static final Logger log = LoggerFactory.getLogger(ConsumerListener.class);
    
        @Override
        public void onMessage(String message) {
            log.info("\n=====\n message:{} \n=====\n",message);
        }
    }
    

    五、参考源码

    文档仓库:
    https://gitee.com/cicadasmile/butte-java-note
    
    源码仓库:
    https://gitee.com/cicadasmile/butte-spring-parent
    
  • 相关阅读:
    《计算机视觉中的多视图几何》笔记(4)
    AIDE:自动驾驶目标检测的自动数据引擎
    C语言字符指针
    第四节:Vben Admin登录对接后端getUserInfo接口
    计算机毕设(附源码)JAVA-SSM家庭安防系统
    使用node-cmd重启electron
    商业模式,淘宝,拼多多,京东,短视频商业模式
    悬浮工具球(仿 iphone 辅助触控)
    深度学习环境安装教程-anaconda-python-pytorch
    【数仓】数据质量监控
  • 原文地址:https://www.cnblogs.com/cicada-smile/p/17636729.html