• RabbitMQ开启消息发送确认和消费手动确认


    开启RabbitMQ的生产者发送消息到RabbitMQ服务端的接收确认(ACK)和消费者通过手动确认或者丢弃消费的消息。
    通过配置 publisher-confirm-type: correlatedpublisher-returns: true开启生产者确认消息。

    server:
      port: 8014
    
    spring:
      rabbitmq:
        username: admin
        password: 123456
        dynamic: true
    #    port: 5672
    #    host: 192.168.49.9
        addresses: 192.168.49.10:5672,192.168.49.9:5672,192.168.49.11:5672
        publisher-confirm-type: correlated
        publisher-returns: true
      application:
        name: shushan
      datasource:
          driver-class-name: com.mysql.cj.jdbc.Driver
          url: jdbc:mysql://ip/shushan
          username: root
          password: 
          hikari:
            minimum-idle: 10
            maximum-pool-size: 20
            idle-timeout: 50000
            max-lifetime: 540000
            connection-test-query: select 1
            connection-timeout: 600000
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    RabbitConfig :

    package com.kexuexiong.shushan.common.config;
    
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.ReturnedMessage;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.connection.CorrelationData;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Slf4j
    @Configuration
    public class RabbitConfig {
    
        @Bean
        public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {
            RabbitTemplate rabbitTemplate = new RabbitTemplate();
    
            rabbitTemplate.setConnectionFactory(connectionFactory);
    
            rabbitTemplate.setMandatory(true);
    
            rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
                log.info("confirmCallback  data: " + correlationData);
                log.info("confirmCallback ack :" + ack);
                log.info("confirmCallback cause :" + cause);
            });
    
            rabbitTemplate.setReturnsCallback(returned -> log.info("returnsCallback msg : " + returned));
    
            return rabbitTemplate;
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    AckReceiver 手动确认消费者:

    package com.kexuexiong.shushan.common.mq;
    
    import com.rabbitmq.client.Channel;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
    import org.springframework.stereotype.Component;
    
    import java.io.ByteArrayInputStream;
    import java.io.ObjectInputStream;
    import java.util.Map;
    import java.util.Objects;
    
    @Slf4j
    @Component
    public class AckReceiver implements ChannelAwareMessageListener {
        @Override
        public void onMessage(Message message, Channel channel) throws Exception {
            long deliveryTag = message.getMessageProperties().getDeliveryTag();
            byte[] messageBody = message.getBody();
            try (ObjectInputStream inputStream = new ObjectInputStream(new ByteArrayInputStream(messageBody));) {
    
                Map<String, String> msg = (Map<String, String>) inputStream.readObject();
                log.info(message.getMessageProperties().getConsumerQueue()+"-ack Receiver :" + msg);
                log.info("header msg :"+message.getMessageProperties().getHeaders());
                if(Objects.equals(message.getMessageProperties().getConsumerQueue(),MqConstant.BUSINESS_QUEUE)){
                    channel.basicNack(deliveryTag,false,false);
    
                }else if(Objects.equals(message.getMessageProperties().getConsumerQueue(),MqConstant.DEAD_LETTER_QUEUE)){
                    channel.basicAck(deliveryTag, true);
                }else {
                    channel.basicAck(deliveryTag, true);
                }
    
            } catch (Exception e) {
                channel.basicReject(deliveryTag, false);
                log.error(e.getMessage());
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41

    通过配置 simpleMessageListenerContainer.setQueueNames(MqConstant.DEAD_LETTER_QUEUE)可以监听多个消息队列。

    package com.kexuexiong.shushan.common.mq;
    
    import org.springframework.amqp.core.AcknowledgeMode;
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class MessageListenerConfig {
    
        @Autowired
        private CachingConnectionFactory connectionFactory;
    
        @Autowired
        private AckReceiver ackReceiver;
    
        @Bean
        public SimpleMessageListenerContainer simpleMessageListenerContainer() {
            SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
    
            simpleMessageListenerContainer.setConcurrentConsumers(2);
            simpleMessageListenerContainer.setMaxConcurrentConsumers(2);
            simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
            //,MqConstant.demoDirectQueue, MqConstant.FANOUT_A, MqConstant.BIG_CAR_TOPIC
    
            simpleMessageListenerContainer.setQueueNames(MqConstant.DEAD_LETTER_QUEUE);
    
            simpleMessageListenerContainer.setMessageListener(ackReceiver);
    
            return simpleMessageListenerContainer;
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    package com.kexuexiong.shushan.controller.mq;
    
    import cn.hutool.core.date.DateUtil;
    import cn.hutool.core.util.RandomUtil;
    import com.kexuexiong.shushan.common.mq.MqConstant;
    import com.kexuexiong.shushan.controller.BaseController;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.MessagePostProcessor;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import java.util.Date;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.UUID;
    
    @Slf4j
    @RestController
    @RequestMapping("/mq/")
    public class MqController extends BaseController {
    
        @Autowired
        RabbitTemplate rabbitTemplate;
    
        @GetMapping("/callback/sendDirectMessage")
        public String sendDirectMessageCallback(){
    
            String msgId = UUID.randomUUID().toString();
            String msg = "demo msg ,kexuexiong";
            String createTime = DateUtil.format(new Date(),"YYYY-MM-dd HH:mm:ss");
    
            Map<String,Object> map = new HashMap();
            map.put("msgId",msgId);
            map.put("msg",msg);
            map.put("createTime",createTime);
    
            rabbitTemplate.convertAndSend("noneDirectExchange","demoDirectRouting",map);
    
            return "ok";
        }
    
        @GetMapping("/callback/lonelyDirectExchange")
        public String lonelyDirectExchange(){
    
            String msgId = UUID.randomUUID().toString();
            String msg = "demo msg ,kexuexiong";
            String createTime = DateUtil.format(new Date(),"YYYY-MM-dd HH:mm:ss");
    
            Map<String,Object> map = new HashMap();
            map.put("msgId",msgId);
            map.put("msg",msg);
            map.put("createTime",createTime);
    
            rabbitTemplate.convertAndSend(MqConstant.lonelyDirectExchange,"demoDirectRouting",map);
    
            return "ok";
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62

    测试:

    发送dirct消息 找不到交换机情况
    在这里插入图片描述

    2023-10-10T17:04:58.492+08:00 ERROR 27232 --- [.168.49.10:5672] o.s.a.r.c.CachingConnectionFactory       : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'noneDirectExchange' in vhost '/', class-id=60, method-id=40)
    2023-10-10T17:04:58.492+08:00  INFO 27232 --- [nectionFactory6] c.k.shushan.common.config.RabbitConfig   : confirmCallback  data: null
    2023-10-10T17:04:58.492+08:00  INFO 27232 --- [nectionFactory6] c.k.shushan.common.config.RabbitConfig   : confirmCallback ack :false
    2023-10-10T17:04:58.492+08:00  INFO 27232 --- [nectionFactory6] c.k.shushan.common.config.RabbitConfig   : confirmCallback cause :channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'noneDirectExchange' in vhost '/', class-id=60, method-id=40)
    
    
    • 1
    • 2
    • 3
    • 4
    • 5

    ack 为false。

    发送dirct消息 找不到队列
    在这里插入图片描述

    2023-10-10T17:05:55.851+08:00  INFO 27232 --- [nectionFactory5] c.k.shushan.common.config.RabbitConfig   : confirmCallback  data: null
    2023-10-10T17:05:55.852+08:00  INFO 27232 --- [nectionFactory5] c.k.shushan.common.config.RabbitConfig   : confirmCallback ack :true
    2023-10-10T17:05:55.852+08:00  INFO 27232 --- [nectionFactory5] c.k.shushan.common.config.RabbitConfig   : confirmCallback cause :null
    2023-10-10T17:05:55.865+08:00  INFO 27232 --- [nectionFactory6] c.k.shushan.common.config.RabbitConfig   : returnsCallback msg : ReturnedMessage [message=(Body:'[serialized object]' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]), replyCode=312, replyText=NO_ROUTE, exchange=lonelyDirectExchange, routingKey=demoDirectRouting]
    
    
    • 1
    • 2
    • 3
    • 4
    • 5

    ACK为true,replyText=NO_ROUTE。

  • 相关阅读:
    规则引擎go
    《Python趣味工具》——ppt的操作(1)
    纯虚函数和抽象类
    预测模块详解
    无线通信技术_Fundamentals of Wireless Communication
    【Python Web】django框架(十)员工管理系统(用户管理)
    Windows 下 Sublime Text 2.0.2 下载及配置
    msf手机渗透二(实现外网渗透)
    【论文导读】- FederatedScope-GNN(FederatedScope-GNN:迈向统一、全面、高效的联邦图学习包)
    Windows 10 docker-compose 自动化构建【lnmp】
  • 原文地址:https://blog.csdn.net/qq_22744093/article/details/133751441