• SSM整合RabbitMQ,Spring4.x整合RabbitMQ


    前言

    SSM框架整合RabbitMQ【比较简单,复制粘贴可用】
    本人使用的Spring版本是4.x

    版本

    RabbitMQ相关
    erl10.0.1
    RabbitMQ3.7.9
    安装步骤参考:https://www.cnblogs.com/saryli/p/9729591.html

    相关依赖
    spring4.0.2.RELEASE
    spring-rabbit1.3.5.RELEASE

    实现

    目录参考

    这是我整合时的项目结构
    关键:rabbitmq文件包和rabbitmq.properties、spring-rabbitmq.xml、spring-mvc.xml
    在这里插入图片描述

    pom.xml依赖

    在现成的SSM项目中整合

    	
    	<dependency>
    	    <groupId>org.springframework.amqpgroupId>
    	    <artifactId>spring-rabbitartifactId>
    	    <version>1.3.5.RELEASEversion>
    	dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    rabbitmq.properties配置文件

    将 rabbitmq.properties配置文件添加到resources目录下

    mq.host=127.0.0.1
    mq.username=guest
    mq.password=guest
    mq.port=5672
    mq.virtual-host=/
    
    • 1
    • 2
    • 3
    • 4
    • 5

    spring-rabbitmq.xml

    将spring-rabbitmq.xml添加到resources目录下

    
    <beans 	xmlns="http://www.springframework.org/schema/beans"
           	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
           	xmlns:context="http://www.springframework.org/schema/context" 
           	xmlns:rabbit="http://www.springframework.org/schema/rabbit"
           	xsi:schemaLocation="http://www.springframework.org/schema/beans
    						   	http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
    							http://www.springframework.org/schema/context 
    							http://www.springframework.org/schema/context/spring-context-4.0.xsd
    						    http://www.springframework.org/schema/rabbit
    						    http://www.springframework.org/schema/rabbit/spring-rabbit-1.3.xsd">
    
    	
        <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">    
            <property name="location" value="classpath:rabbitmq.properties" />
        bean> 
        
        
        <rabbit:connection-factory id="connectionFactory"
        		username="${mq.username}"
    			password="${mq.password}" 
    			host="${mq.host}" 
    			port="${mq.port}"
          		virtual-host="${mq.virtual-host}" />
    
        
        <rabbit:admin id="connectAdmin" connection-factory="connectionFactory" />
    
        
        <rabbit:queue name="queueTest" durable="true" auto-delete="false" exclusive="false" declared-by="connectAdmin" />
        <rabbit:queue name="queueTest1" durable="true" auto-delete="false" exclusive="false" declared-by="connectAdmin" />
    
        
        <rabbit:direct-exchange name="exchangeTest" durable="true" auto-delete="false" declared-by="connectAdmin">
            <rabbit:bindings>
                <rabbit:binding queue="queueTest" key="queueTestKey">rabbit:binding>
                <rabbit:binding queue="queueTest1" key="queueTestKey1">rabbit:binding>
            rabbit:bindings>
        rabbit:direct-exchange>
    
        
        <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="exchangeTest" />
    
        
        <bean id="messageReceiver" class="com.rabbitmq.MessageConsumer">bean>
        <bean id="messageReceiver1" class="com.rabbitmq.MessageConsumer2">bean>
    
        
        <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="100">
            <rabbit:listener queues="queueTest" ref="messageReceiver" />
            <rabbit:listener queues="queueTest1" ref="messageReceiver1" />
        rabbit:listener-container>
    
    	
    	<context:component-scan base-package="com.rabbitmq" />
    
    beans>
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58

    spring-mvc.xml或applicationContext.xml

    我这里使用的spring-mvc.xml,根据自己配置文件使用

    <import resource="classpath:spring-rabbitmq.xml" />
    
    • 1

    将这个import引入添加到 spring-mvc.xml 里的最前面,如果不添加到前面可能会报错
    在这里插入图片描述

    rabbitmq目录下

    这个目录下的java文件已在spring-rabbitmq.xml中进行扫描注入

    MessageConsumer.java

    说明:MessageConsumer和MessageConsumer2其实都可以使用同一个类,修改xml指向即可,但是分开明了些

    package com.rabbitmq;
    
    import java.nio.charset.Charset;
    
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageListener;
    
    /**
     * @Title消息消费者
     * @date 2023/10/8
     */
    public class MessageConsumer implements MessageListener {
    
        @Override
        public void onMessage(Message message) {
        	// 逻辑处理
            System.out.println("message------->:" + new String(message.getBody(), Charset.forName("utf-8")));
        }
        
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    MessageConsumer2.java

    package com.rabbitmq;
    
    import java.nio.charset.Charset;
    
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageListener;
    
    /**
     * @Title消息消费者2
     * @date 2023/10/8
     */
    public class MessageConsumer2 implements MessageListener {
    
        @Override
        public void onMessage(Message message) {
        	// 逻辑处理
            System.out.println("message2------->:" + new String(message.getBody(), Charset.forName("utf-8")));
        }
        
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    MessageProducer.java

    package com.rabbitmq;
    
    import javax.annotation.Resource;
    
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.stereotype.Service;
    
    /**
     * @Title 消息生产者
     * @date 2023/10/8
     */
    @Service
    public class MessageProducer {
    
        @Resource
        private AmqpTemplate amqpTemplate;
    
        
        public void sendMessage(String key, Object message){
            amqpTemplate.convertAndSend(key, message);
        }
        
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    MessageConstant.java

    package com.rabbitmq;
    
    /**
     * @Title 消息队列常量
     * @date 2023/10/8
     */
    public class MessageConstant{
    
    	public static String queueTestKey = "queueTestKey";
    	public static String queueTestKey1 = "queueTestKey1";
        
    	
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    测试调用

    比如这个下面在某个类里作为接口调用测试

    	@Autowired
        private MessageProducer messageProducer;
    
    	@RequestMapping(value = "/testMq")
        @ResponseBody
        public Result testMq(HttpServletRequest request) throws IOException {
            messageProducer.sendMessage(MessageConstant.queueTestKey, "登录");
            messageProducer.sendMessage(MessageConstant.queueTestKey1, "退出");
            return Result.success("测试成功");
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    调用接口后打印结果
    在这里插入图片描述
    连接结果
    在这里插入图片描述
    以上即可!

    扩展

    包括消息手动确认,消息失败重新加入队列处理

    并发限制

    比如:同一个时间发了5000条记录,队列需要排队处理消息;rabbitmq1.x版本默认是队列长度1,rabbitmq2.x版本默认长度是250,我这里使用了1.x的,可以看到处理起来是比较慢的一条一条排队处理,所以要手动设置队列长度,如下

    在spring-rabbitmq.xml中加入prefetch属性值设置

        
        <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="100">
            <rabbit:listener queues="queueTest" ref="messageReceiver" />
            <rabbit:listener queues="queueTest1" ref="messageReceiver1" />
        rabbit:listener-container>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    消息重发

    SpringBoot版可在配置文件中设置,且异常后直接抛出即可

    方式一

    package com.rabbitmq;
    
    import java.nio.charset.Charset;
    
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
    
    import com.rabbitmq.client.Channel;
    
    /**
     * @Title 消息消费者
     * @date 2023/10/8
     */
    public class MessageConsumer2 implements ChannelAwareMessageListener {
    	
    	private int aa = 1;
    
    	@Override
    	public void onMessage(Message message, Channel channel) throws Exception {
    		try {
    		
    			// 逻辑处理
    			if(aa == 1) {
    				aa = 2;
    				int a = 1/0;
    			}
    			
    			System.out.println("成功处理确认message2------->:" + new String(message.getBody(), Charset.forName("utf-8")));
    			// 消费者ack确认【消息处理成功确认】
    	        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    		}catch (Exception e) {
    			System.out.println("失败重新入队message2------->:" + new String(message.getBody(), Charset.forName("utf-8")));
    			// 消费者reject确认【消息失败重新加入队列-重发】
    			channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
    		}
    	}
        
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    方式二

    MessageConstant.java中加入

    	/** 重试次数 3 */
    	public static Integer RETRY_COUNT = 3;
    
    • 1
    • 2

    消息接收处理类

    package com.rabbitmq;
    
    import java.nio.charset.Charset;
    
    import org.apache.log4j.Logger;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
    import org.springframework.beans.factory.annotation.Autowired;
    
    import com.alibaba.fastjson.JSONObject;
    import com.bean.ConsumptionRequest;
    import com.rabbitmq.client.Channel;
    import com.service.ReceiveDormitoryService;
    
    /**
     * 宿舍mq消息处理
     * 
     * @author Administrator
     */
    public class MessageConsumerSuShe implements ChannelAwareMessageListener {
    	
    	private final Logger logger = Logger.getLogger(MessageConsumerSuShe.class);
    	
    	@Autowired
    	private ReceiveDormitoryService service;
    	
    
    	@Override
    	public void onMessage(Message message, Channel channel) throws Exception {
    		int retryCount = 0; // 重试机制
    		long deliveryTag = message.getMessageProperties().getDeliveryTag();
    		while(retryCount < MessageConstant.RETRY_COUNT) {
    			retryCount ++;
    			try {
    				// 逻辑处理
    				String s = new String(message.getBody(), Charset.forName("utf-8"));
    				ConsumptionRequest bean = JSONObject.parseObject(s, ConsumptionRequest.class);
    				//service.uploadData(bean, bean.getPath());
    				
    				//logger.info("【SUSHE_QUEUE_KEY宿舍队列成功】:" + new String(message.getBody(), Charset.forName("utf-8")));
    				// 消费者ack确认【消息处理成功确认】
    		        channel.basicAck(deliveryTag, false);
    		        return;
    			}catch (Exception e) {
    				logger.error("【SUSHE_QUEUE_KEY宿舍队列错误,重试"+retryCount+"】:" + new String(message.getBody(), Charset.forName("utf-8")));
    				// 0.5s重试一次
    				Thread.sleep(500);
    			}
    		}
    		// 重试3次后直接处理(这里设置为死信消息)
    		if(retryCount >= MessageConstant.RETRY_COUNT) {
    			channel.basicNack(deliveryTag, false, false);
    		}
    	}
        
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57

    多部署Tomcat下问题

    本人使用单个RabbitMQ服务
    测试两个Tomcat服务连接同一个交换机和队列进行发送消息,并没有造成两个Tomcat服务都推送处理这条消息,而是单个Tomcat处理了这条消息
    所以未造成多部署下一条消息多服务处理问题

    其他
    参考类似博客1:https://blog.csdn.net/u012988901/article/details/89499634
    参考类似博客2:https://blog.csdn.net/weixin_42654295/article/details/109006276

  • 相关阅读:
    基于MFC的MVC框架的编程实践
    【VyOS-开源篇-3】- container for vyos 搭建 Halo 博客-vyos-开源篇
    宇视雷达&雷视交付|问题定位(素材收集篇)
    这是什么代码帮我看看
    Docker知识总结 (二) Docker 底层原理
    多分类问题:初试手写数字识别
    2022年Java秋招面试必看的 | RabbitMQ 面试题
    数字孪生赋能实景三维中国建设分论坛成功举办
    【C语言】明解数组(1)
    PCIe系列专题之二:2.7 Flow Control的实现过程
  • 原文地址:https://blog.csdn.net/weixin_43992507/article/details/133694457