• springcloud 整合 RabbitMQ 消息中间件


    以下是在 Spring Cloud 中整合 RabbitMQ 消息中间件的详细步骤、代码说明,以及分析和解决消息丢失和消息重复消费问题的示例:

    1. 依赖添加:

    在 Maven 项目的 pom.xml 文件中添加 RabbitMQ 和 Spring Cloud Stream 的依赖:

    
        
            org.springframework.cloud
            spring-cloud-stream
            3.2.5
        
        
            org.springframework.amqp
            spring-rabbit
            3.2.5
        
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    2. 配置 RabbitMQ:

    在 Spring Cloud 配置文件(例如 application.yml)中添加 RabbitMQ 的连接配置:

    spring:
      cloud:
        stream:
          bindings:
            output:
              destination: my-exchange
              binder: rabbitmq
          rabbitmq:
            binder:
              # RabbitMQ 服务器地址
              address: localhost
              # RabbitMQ 端口
              port: 5672
              # RabbitMQ 虚拟主机
              virtual-host: /my_vhost
              username: guest
              password: guest
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    3. 创建消息生产者:

    创建一个发送消息的 Spring Cloud Stream 组件(例如 MessageProducer),并配置输出通道和消息转换器:

    import org.springframework.cloud.stream.messaging.Source;
    import org.springframework.messaging.support.MessageBuilder;
    import org.springframework.stereotype.Component;
    
    @Component
    public class MessageProducer {
    
        private final Source source;
    
        public MessageProducer(Source source) {
            this.source = source;
        }
    
        public void sendMessage(String message) {
            // 将消息发送到输出通道
            this.source.output().send(MessageBuilder.withPayload(message).build());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    4. 创建消息消费者:

    创建一个接收消息的 Spring Cloud Stream 组件(例如 MessageConsumer),并配置输入通道和消息处理器:

    import org.springframework.cloud.stream.messaging.Sink;
    import org.springframework.stereotype.Component;
    
    @Component
    public class MessageConsumer {
    
        private final Sink sink;
    
        public MessageConsumer(Sink sink) {
            this.sink = sink;
        }
    
        public void consumeMessage(String message) {
            // 从输入通道接收消息
            this.sink.input().receive(MessageBuilder.withPayload(message).build());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    5. 解决消息丢失问题:
    消息丢失可能发生在生产者发送消息时或消费者处理消息时

    生产者端:
    生产者在发送消息时出现异常,导致消息未能成功发送到消息中间件
    生产者在发送消息后没有正确处理返回的确认信息,导致消息可能被丢弃。

    消息中间件(如 RabbitMQ)端:
    消息中间件在处理消息时出现故障,导致消息丢失。
    消息中间件的配置问题,例如缓冲区大小设置不合理,导致消息在缓冲区溢出时丢失。
    消息中间件在进行数据持久化时出现问题,导致消息未能正确存储。

    消费者端:
    消费者在处理消息时出现异常,导致消息未能被正确处理。
    消费者在确认消息已处理之前出现故障,导致消息可能被重新分配给其他消费者或丢失。

    为了确保消息不丢失,可以采取以下措施:
    • 在生产者端启用消息确认机制,确保消息成功到达 RabbitMQ 服务器。
    • 在消费者端启用手动确认机制,确保消息在处理完成后被确认。
    6. 解决消息重复消费问题:

    消息重复消费可能发生在消费者重启或网络故障等情况下。为了避免重复处理消息,可以采取以下措施:

    • 在消息处理逻辑中添加幂等性处理,确保相同的消息不会被重复处理。
    • 使用消息唯一标识(例如消息的 UUID)来避免重复处理相同的消息。

    请注意,上述示例代码中的 my-exchange 是 RabbitMQ 的交换器名称,你可以根据实际需求进行修改。另外,还需要确保在启动应用时,正确配置和启动 Spring Cloud Stream 和 RabbitMQ 相关的服务。

  • 相关阅读:
    阿里云ModelScope 是一个“模型即服务”(MaaS)平台
    美国NSC大规模数据泄露,涉及壳牌、戴尔、特斯拉等2000多家公司
    flash内存分配和使用注意事项
    Pytorch(GPU)环境安装
    Wi-Fi环境下基于注意力机制及深度学习的鲁棒被动感知技术
    人工智能学习:NMIST数据分类识别-CNN网络(3)
    【剧前爆米花--爪哇岛寻宝】面向对象的三大特性——封装、继承以及多态的详细剖析(中——多态)。
    Linux每日智囊
    HMS Core打造影音娱乐行业解决方案,助推视听新浪潮
    STM32与ZigBee无线通信技术在工业自动化中的应用
  • 原文地址:https://blog.csdn.net/momo_128/article/details/136437910