• [MQ] SpringBoot使用直连交换机Direct Exchange


    ✨个人主页:沫洺的主页

    📚📚系列专栏: 📖 JavaWeb专栏📖 JavaSE专栏 📖 Java基础专栏📖vue3专栏 

                               📖MyBatis专栏📖Spring专栏📖SpringMVC专栏📖SpringBoot专栏

                               📖Docker专栏📖Reids专栏📖MQ专栏📖SpringCloud专栏     

    💖💖如果文章对你有所帮助请留下三连✨✨

    1️⃣环境搭建

    创建SpringBoot项目,引入相关依赖

     application.properties配置rabbitmq配置信息

    1. spring.rabbitmq.host=192.168.0.109
    2. spring.rabbitmq.port=5670
    3. #如果使用的是/,可以不用配置,因为默认就是/
    4. spring.rabbitmq.virtual-host=/
    5. spring.rabbitmq.username=guest
    6. spring.rabbitmq.password=guest

    2️⃣消费者接收消息

    @RabbitHandler注解: 当spring扫到该注解,就当成消费者

    @RabbitListener注解: 绑定队列Queue与交换机Exchange
    durable持久化
    autoDelete自动删除
    type=ExchangeTypes.DIRECT交换机类型为直连交换机,不写默认直连
    key: 就是Routing key

    1. @Component
    2. public class DirectConsumer {
    3. @RabbitHandler
    4. @RabbitListener(bindings =
    5. @QueueBinding(value = @Queue(value = "211-DirectQueue-01", durable = "true", autoDelete = "false"),
    6. exchange = @Exchange(value = "211-DirectExchage-01", type = ExchangeTypes.DIRECT),
    7. key = "211-Direct-RoutingKey-01"))
    8. public void process2(Message message){
    9. String msg = new String(message.getBody());
    10. System.out.println("消费者1-1收到:"+msg+"");
    11. //这里写业务逻辑代码
    12. }
    13. }

    3️⃣生产者发送消息

    注入RabbitTemplate,调用convertAndSend方法发送消息

    参数一:交换机 参数二: Routing key 参数三: 消息

    1. @Component
    2. public class DirectProducer {
    3. @Autowired
    4. private RabbitTemplate rabbitTemplate;
    5. public void sendMessage(){
    6. rabbitTemplate.convertAndSend("211-DirectExchage-01","211-Direct-RoutingKey-01","生产者发送的第一条消息");
    7. //System.out.println("生产者第一条消息发送成功");
    8. }
    9. }

    启动类调用sendMessage发送

    1. @SpringBootApplication
    2. public class App {
    3. public static void main(String[] args) {
    4. ConfigurableApplicationContext context = SpringApplication.run(App.class, args);
    5. //调用生产者发送消息
    6. DirectProducer producer = context.getBean(DirectProducer.class);
    7. producer.sendMessage();
    8. }
    9. }

    💦多个消费者消费同一个队列

    当生产者发送多条消息时,同一个队列的多个消费者去接收消息

    生产者发送多条消息

    1. @Component
    2. public class DirectProducer {
    3. @Autowired
    4. private RabbitTemplate rabbitTemplate;
    5. public void sendMessage(){
    6. rabbitTemplate.convertAndSend("211-DirectExchage-01","211-Direct-RoutingKey-01","生产者发送的第一条消息");
    7. System.out.println("生产者第一条消息发送成功");
    8. rabbitTemplate.convertAndSend("211-DirectExchage-01","211-Direct-RoutingKey-01","生产者发送的第二条消息");
    9. System.out.println("生产者第二条消息发送成功");
    10. }
    11. }

    同一个队列的多个消费者去接收消息

    都是value = "211-DirectQueue-01"的队列

    1. @Component
    2. public class DirectConsumer {
    3. @RabbitHandler
    4. @RabbitListener(bindings =
    5. @QueueBinding(value = @Queue(value = "211-DirectQueue-01", durable = "true", autoDelete = "false"),
    6. exchange = @Exchange(value = "211-DirectExchage-01", type = ExchangeTypes.DIRECT),
    7. key = "211-Direct-RoutingKey-01"))
    8. public void process2(Message message){
    9. String msg = new String(message.getBody());
    10. System.out.println("消费者1-1收到:"+msg+"");
    11. //这里写业务逻辑代码
    12. }
    13. @RabbitHandler
    14. @RabbitListener(bindings =
    15. @QueueBinding(value = @Queue(value = "211-DirectQueue-01", durable = "true", autoDelete = "false"),
    16. exchange = @Exchange(value = "211-DirectExchage-01", type = ExchangeTypes.DIRECT),
    17. key = "211-Direct-RoutingKey-01"))
    18. public void process2(Message message){
    19. String msg = new String(message.getBody());
    20. System.out.println("消费者1-2收到:"+msg+"");
    21. //这里写业务逻辑代码
    22. }
    23. }

    可以理解为快递员送快递,只关注哪一家(队列)签收的快递,而不关注该家哪个成员(消费者)签收的快递,(只是在签收时的消费者是轮循的)

    像这种同一队列多个消费者的好处就是保障了高可用性,只要有一个消费者就能保障接收到消息

    💦多个消费者消费不同的队列

    当生产者发送多条消息时,不同队列的多个消费者去接收消息

    生产者发送多条消息

    1. @Component
    2. public class DirectProducer {
    3. @Autowired
    4. private RabbitTemplate rabbitTemplate;
    5. public void sendMessage(){
    6. rabbitTemplate.convertAndSend("211-DirectExchage-01","211-Direct-RoutingKey-01","生产者发送的第一条消息");
    7. System.out.println("生产者第一条消息发送成功");
    8. rabbitTemplate.convertAndSend("211-DirectExchage-01","211-Direct-RoutingKey-01","生产者发送的第二条消息");
    9. System.out.println("生产者第二条消息发送成功");
    10. }
    11. }

    不同队列的多个消费者去接收消息

    不同队列value = "211-DirectQueue-01"和value = "211-DirectQueue-02"

    1. @Component
    2. public class DirectConsumer {
    3. @RabbitHandler
    4. @RabbitListener(bindings =
    5. @QueueBinding(value = @Queue(value = "211-DirectQueue-01", durable = "true", autoDelete = "false"),
    6. exchange = @Exchange(value = "211-DirectExchage-01", type = ExchangeTypes.DIRECT),
    7. key = "211-Direct-RoutingKey-01"))
    8. public void process2(Message message){
    9. String msg = new String(message.getBody());
    10. System.out.println("消费者1收到:"+msg+"");
    11. //这里写业务逻辑代码
    12. }
    13. @RabbitHandler
    14. @RabbitListener(bindings =
    15. @QueueBinding(value = @Queue(value = "211-DirectQueue-02", durable = "true", autoDelete = "false"),
    16. exchange = @Exchange(value = "211-DirectExchage-01", type = ExchangeTypes.DIRECT),
    17. key = "211-Direct-RoutingKey-01"))
    18. public void process2(Message message){
    19. String msg = new String(message.getBody());
    20. System.out.println("消费者2收到:"+msg+"");
    21. //这里写业务逻辑代码
    22. }
    23. }

    同样的不是同一家(队列)的成员(消费者),在签收快递时,要签收自己家所有快递(消息)

    ⛅消费者重试机制/自动应答

    application.properties配置

    1. 配置消费者应答模式
    2. 配置rabbitmq重试配置信息
    1. #开启消费者应答模式为 auto自动应答 manual手动应答
    2. spring.rabbitmq.listener.direct.acknowledge-mode = auto
    3. #spring.rabbitmq.listener.simple.acknowledge-mode = auto
    4. #开启消费者自动重试机制,也就是消费者函数只要抛出异常,就会触发重试 false关闭
    5. spring.rabbitmq.listener.simple.retry.enabled=true
    6. #设置重试最大次数
    7. spring.rabbitmq.listener.simple.retry.max-attempts=5
    8. #设置重试时间最大间隔
    9. spring.rabbitmq.listener.simple.retry.max-interval= 8000ms
    10. #设置重试时间间隔
    11. spring.rabbitmq.listener.simple.retry.initial-interval=1000ms
    12. #设置重试时间间隔的倍数
    13. spring.rabbitmq.listener.simple.retry.multiplier=2

    当不配置重试机制时,消费者应答出现异常不处理时,就会出现死循环

    如下代码段

    1. @Component
    2. public class DirectConsumer {
    3. @RabbitHandler
    4. @RabbitListener(bindings =
    5. @QueueBinding(value = @Queue(value = "211-DirectQueue-01", durable = "true", autoDelete = "false"),
    6. exchange = @Exchange(value = "211-DirectExchage-01", type = ExchangeTypes.DIRECT),
    7. key = "211-Direct-RoutingKey-01"))
    8. public void process2(Message message){
    9. String msg = new String(message.getBody());
    10. System.out.println("消费者1-1收到:"+msg+"");
    11. //这里写业务逻辑代码
    12. int i = 1/0//会出现死循环
    13. }
    14. }

    配置后

    ⛅手动应答模式

    配置手动应答,取消重试机制

    1. #开启消费者应答模式为 auto自动应答 manual手动应答
    2. spring.rabbitmq.listener.direct.acknowledge-mode = manual
    3. #开启消费者自动重试机制,也就是消费者函数只要抛出异常,就会触发重试
    4. spring.rabbitmq.listener.simple.retry.enabled=false

    手动应答模式

    1. @Component
    2. public class DirectConsumer {
    3. @RabbitHandler
    4. @RabbitListener(bindings =
    5. @QueueBinding(value = @Queue(value = "211-DirectQueue-01", durable = "true", autoDelete = "false"),
    6. exchange = @Exchange(value = "211-DirectExchage-01", type = ExchangeTypes.DIRECT),
    7. key = "211-Direct-RoutingKey-01"))
    8. //手动应答
    9. public void process1(Message message, Channel channel) throws IOException {
    10. String msg = new String(message.getBody());
    11. System.out.println("消费者1-1收到:"+msg+ DateUtil.format(DateUtil.date(),"HH:ss"));
    12. //获取应答标签
    13. long deliveryTag = message.getMessageProperties().getDeliveryTag();
    14. try {
    15. int i = 1/0;//会出现死循环
    16. //手动应答 参数一:消息标识 参数二:true批量应答,false单个应答
    17. channel.basicAck(deliveryTag,false);
    18. } catch (Exception ex) {
    19. //把异常消息插入数据库
    20. System.out.println("消费者1-1收到:"+msg+"出现异常信息插入数据库");
    21. System.out.println("异常信息: "+ex.getMessage());
    22. channel.basicAck(deliveryTag,false);
    23. }
    24. }
    25. }

     出现异常,将异常消息插入数据库后,这样保障应答后在队列中不堵,然后再让生产者发送消息

    channel.basicAck正常应答

    channel.basicCancel取消应答

    channel.basicReject拒绝应答

    没有重试机制,需要自己去写,所以一般不会使用手动应答

    🧭投递业务对象

    一般生产的都是领域对象dto

    例如

    1. @Data
    2. @Builder
    3. @NoArgsConstructor
    4. @AllArgsConstructor
    5. public class UserRegisterOk {
    6. private String name;
    7. private String phone;
    8. }

    生产者生产对象

    1. @Component
    2. public class DirectProducer {
    3. @Autowired
    4. private RabbitTemplate rabbitTemplate;
    5. public void sendMessage(){
    6. UserRegisterOk userRegisterOk = UserRegisterOk.builder().name("张三").phone("123456").build();
    7. //要将对象序列化,转成字符串,使用消息转换器MessageConverter
    8. rabbitTemplate.convertAndSend("211-DirectExchage-01","211-Direct-RoutingKey-01",userRegisterOk);
    9. System.out.println("生产者生产对象发送成功");
    10. }
    11. }

    消费者接收对象

    1. @Component
    2. public class DirectConsumer {
    3. @RabbitHandler
    4. @RabbitListener(bindings =
    5. @QueueBinding(value = @Queue(value = "211-DirectQueue-01", durable = "true", autoDelete = "false"),
    6. exchange = @Exchange(value = "211-DirectExchage-01", type = ExchangeTypes.DIRECT),
    7. key = "211-Direct-RoutingKey-01"))
    8. public void process4(UserRegisterOk userRegisterOk){
    9. System.out.println("消费者收到:"+userRegisterOk.getName()+","+userRegisterOk.getPhone());
    10. }
    11. }

    运行后报错

     配置序列化

    1. @Configuration
    2. public class RabbitConfig {
    3. @Bean
    4. public MessageConverter messageConverter(){
    5. return new Jackson2JsonMessageConverter();
    6. }
    7. }

    💖补充写法

    1. @Component
    2. public class DirectConsumer {
    3. private static final String ENAME = "211-DirectExchage-01";
    4. private static final String QNAME = "211-DirectQueue-01";
    5. private static final String KEY = "211-Direct-RoutingKey-01";
    6. //定义一个交换机
    7. @Bean
    8. public DirectExchange directExchange(){
    9. return new DirectExchange(ENAME, true, false);
    10. }
    11. //定义一个队列
    12. @Bean
    13. public Queue directQueue(){
    14. return QueueBuilder.durable(QNAME).build();
    15. }
    16. //创建队列和交换机的绑定关系
    17. @Bean
    18. public Binding binding(){
    19. return BindingBuilder.bind(directQueue()).to(directExchange()).with(KEY);
    20. }
    21. @RabbitHandler
    22. @RabbitListener(queues =QNAME)
    23. public void process4(UserRegisterOk userRegisterOk){
    24. System.out.println("消费者收到:"+userRegisterOk.getName()+","+userRegisterOk.getPhone());
    25. }
    26. }
  • 相关阅读:
    Alibaba Fastjson的基本使用
    android列表下拉刷新上拉加载更多实现的几种文件组织方式
    【微服务】服务容错---Sentinel
    Go :测试终结器用于微小的组合分配(附完整源码)
    Python Opencv实践 - 视频文件写入(格式和分辨率修改)
    深度学习入门(十六)实战Kaggle比赛:预测房价
    使用vmware虚拟机安装centos7以及终端管理工具
    React-Hook 轮子公开课(从零开始)用最基础的方式写出一个简单的组件库——第二课【Button】组件
    黑塞矩阵-二阶偏导矩阵
    《两化融合 数字化转型 价值效益参考模型》国家标准全文
  • 原文地址:https://blog.csdn.net/HeyVIrBbox/article/details/127862804