• SpringBoot整合RabbitMQ及其原理分析


    上一篇:RabbitMQ基础知识

    1、相关依赖

    这里无需指定版本号,让其跟着SpringBoot版本走。本示例使用SpringBoot版本号为2.7.10。

    1. <dependency>
    2. <groupId>org.springframework.bootgroupId>
    3. <artifactId>spring-boot-starter-amqpartifactId>
    4. dependency>
    5. <dependency>
    6. <groupId>org.projectlombokgroupId>
    7. <artifactId>lombokartifactId>
    8. <optional>trueoptional>
    9. dependency>

    2、生产者、消费者

    创建两个SpringBoot应用,模拟消息生产者与消费者【publisher、consumer】。

    2-1生产者

    编写配置文件,用户名和密码等自行修改 这里虚拟机的名称是上一篇文章中新建的。

    1. server.port=8082
    2. #rabbitmq服务器ip
    3. spring.rabbitmq.host=localhost
    4. #rabbitmq的端口
    5. spring.rabbitmq.port=5672
    6. #用户名
    7. spring.rabbitmq.username=用户名
    8. #密码
    9. spring.rabbitmq.password=密码
    10. #配置虚拟机
    11. spring.rabbitmq.virtual-host=demo

    声明交换机、队列并绑定:

    1. @Configuration
    2. public class RabbitMqConfig {
    3. @Bean
    4. public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
    5. RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    6. rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
    7. return rabbitTemplate;
    8. }
    9. @Bean
    10. public MessageConverter jackson2JsonMessageConverter() {
    11. return new Jackson2JsonMessageConverter();
    12. }
    13. @Bean
    14. public DirectExchange getExchange(){
    15. return new DirectExchange("directExchange",false,false);
    16. }
    17. @Bean
    18. public Queue getQueue(){
    19. return new Queue("publisher.addUser",true,false,false);
    20. }
    21. @Bean
    22. public Binding getBinding(DirectExchange exchange,Queue queue){
    23. return BindingBuilder.bind(queue).to(exchange).with("publisher.addUser");
    24. }
    25. }

    新建User实体类

    1. @Data
    2. public class User {
    3. private Long id;
    4. private String name;
    5. private String desc;
    6. }

    在方法中使用RabbitTemplate来发送消息:

    1. public interface PublisherService {
    2. /**
    3. * 添加用户
    4. * @param user 用户信息
    5. */
    6. void addUser(User user);
    7. }
    1. @RequiredArgsConstructor
    2. @Service
    3. public class PublisherServiceImpl implements PublisherService{
    4. private final RabbitTemplate rabbitTemplate;
    5. @Override
    6. public void addUser(User user) {
    7. rabbitTemplate.convertAndSend("directExchange","publisher.addUser",user);
    8. }
    9. }

    以上需要注意的就是交换机的名称队列名routingKey。示例中使用的是直连交换机,routingKey需要和队列名保持一致。不懂的可以查看上一篇文章。

    controller:

    1. @RequiredArgsConstructor
    2. @RestController
    3. @RequestMapping("/user")
    4. public class UserController {
    5. private final PublisherService publisherService;
    6. @PostMapping("/add")
    7. public void add(){
    8. User user = new User();
    9. user.setId(1000L);
    10. user.setName("黄忠");
    11. user.setDesc("老兵不死,只是逐渐凋零");
    12. publisherService.addUser(user);
    13. }
    14. }

    2-2消费者

    消费者的配置和生产者一样,不赘述了,直接看代码:

    1. @Service
    2. @Slf4j
    3. public class ConsumerService {
    4. @RabbitListener(queues ="publisher.addUser")
    5. public void addUser(String userStr){
    6. User user = JSONObject.parseObject(userStr,User.class);
    7. log.info(user.toString());
    8. }
    9. }

    @RabbitListener 注解是指定某方法作为消息消费的方法,指定队列名称。@RabbitListener 如果标注在类上,需配合 @RabbitHandler 注解一起使用,根据接受的参数类型进入具体的方法中。

    2-3测试

    消费端在启动时可能会报找不到交换机或队列,只需要让生产者发送一次消息,从控制台就可以看到相关的交换机和队列等信息了。

    可以看到消费者成功消费了消息:

    3、消费流程

    通过上述操作,我们已经会简单地使用RabbitMQ了,接下来了解一下它的整个流程。如此可以让我们掌握的更牢固。

    生产者:

    • 生产者连接到Message Broker【也就是RabbitMQ服务】,建立一个连接( Connection)开启一个信道(Channel)。
    • 生产者声明一个交换机,并设置相关属性,比如交换机类型、是否持久化等。
    • 生产者声明一个队列并设置相关属性。
    • 生产者通过路由键【Routing Key】将交换机和队列绑定。
    • 生产者发送消息至RabbitMQ Broker,其中包含路由键、交换器等信息。
    • 相应的交换机根据接收到的路由键查找相匹配的队列。
    • 如果找到,则将从生产者发送过来的消息存入相应的队列中。
    • 如果没有找到,则根据生产者配置的属性选择丢弃还是回退给生产者
    • 关闭信道。
    • 关闭连接。

    消费者:

    • 消费者连接到RabbitMQ Broker ,建立一个连接(Connection),开启一个信道(Channel) 。
    • 消费者向RabbitMQ Broker 请求消费相应队列中的消息,可能会设置相应的回调函数,
    • 等待RabbitMQ Broker 回应并投递相应队列中的消息,消费者接收消息。
    • 消费者确认(ack) 接收到的消息。
    • RabbitMQ 从队列中删除相应己经被确认的消息。
    • 关闭信道。
    • 关闭连接。

  • 相关阅读:
    bash 脚本字符串截取表达式详细说明
    计算机毕业设计springboot+vue基本微信小程序的码高教育课后在线小程序
    web安全之XSS攻击
    【MySQL】面试题
    mybatis中#与$的区别
    基于nodejs+vue 衣服穿搭推荐系统
    264_BOOST中的Json库解析_BOOST_AUTO(itrpromodel, doc.FindMember(“productmodel“));
    Excel数据可视化—波士顿矩阵图【四象限图】
    SSM学习42:SpringMVC入门案例(重点)
    idea2021版本创建一个javaweb项目(含额外知识--添加tomcat相关jar包)
  • 原文地址:https://blog.csdn.net/QingXu1234/article/details/130842270