• RabbitMQ: SpringBoot 整合 RabbitMQ


    一、生产者模块

    1.导入依赖

    重点是这个依赖

    1. <dependency>
    2. <groupId>org.springframework.bootgroupId>
    3. <artifactId>spring-boot-starter-amqpartifactId>
    4. dependency>
    1. <properties>
    2. <maven.compiler.source>8maven.compiler.source>
    3. <maven.compiler.target>8maven.compiler.target>
    4. <project.build.sourceEncoding>UTF-8project.build.sourceEncoding>
    5. <spring-boot.version>2.2.9.RELEASEspring-boot.version>
    6. properties>
    7. <dependencyManagement>
    8. <dependencies>
    9. <dependency>
    10. <groupId>org.springframework.bootgroupId>
    11. <artifactId>spring-boot-dependenciesartifactId>
    12. <version>${spring-boot.version}version>
    13. <type>pomtype>
    14. <scope>importscope>
    15. dependency>
    16. dependencies>
    17. dependencyManagement>
    18. <dependencies>
    19. <dependency>
    20. <groupId>org.springframework.bootgroupId>
    21. <artifactId>spring-boot-starter-webartifactId>
    22. dependency>
    23. <dependency>
    24. <groupId>org.projectlombokgroupId>
    25. <artifactId>lombokartifactId>
    26. dependency>
    27. <dependency>
    28. <groupId>org.springframework.bootgroupId>
    29. <artifactId>spring-boot-starter-testartifactId>
    30. dependency>
    31. <dependency>
    32. <groupId>org.springframework.bootgroupId>
    33. <artifactId>spring-boot-starter-amqpartifactId>
    34. dependency>
    35. dependencies>

    2.yml配置文件

    1. spring:
    2. rabbitmq:
    3. host: 8.140.244.227
    4. port: 6786
    5. username: test
    6. password: test
    7. virtual-host: /test
    8. server:
    9. port: 8081

    3.用接口方式实现生产者

    通过

    @Autowired
    RabbitTemplate rabbitTemplate;//这个模板
    1. package com.qf.bootmq2302.controller;
    2. import org.springframework.amqp.rabbit.core.RabbitTemplate;
    3. import org.springframework.beans.factory.annotation.Autowired;
    4. import org.springframework.web.bind.annotation.GetMapping;
    5. import org.springframework.web.bind.annotation.RequestMapping;
    6. import org.springframework.web.bind.annotation.RestController;
    7. import java.util.HashMap;
    8. import java.util.Map;
    9. import java.util.TreeMap;
    10. @RestController
    11. public class TestController {
    12. @Autowired
    13. RabbitTemplate rabbitTemplate;
    14. @GetMapping("/test1")
    15. public String test1(String msg){
    16. System.out.println(msg);
    17. String exchangeName = "";//默认交换机
    18. String routingkey = "hello";//队列名字
    19. //生产者发送消息
    20. rabbitTemplate.convertAndSend(exchangeName,routingkey,msg);
    21. return "ok";
    22. }
    23. @GetMapping("/test2")
    24. public String test2(String name,Integer age){
    25. TreeMap map = new TreeMap();
    26. map.put("name",name);
    27. map.put("age",age);
    28. String exchangeName = "";//默认交换机
    29. String routingkey = "work";//队列名字
    30. //生产者发送消息
    31. rabbitTemplate.convertAndSend(exchangeName,routingkey,map);
    32. return "ok";
    33. }
    34. }

    4.主启动类

    1. package com.qf.bootmq2302;
    2. import org.springframework.boot.SpringApplication;
    3. import org.springframework.boot.autoconfigure.SpringBootApplication;
    4. @SpringBootApplication
    5. public class BootMqApp {
    6. public static void main(String[] args) {
    7. SpringApplication.run(BootMqApp.class,args);
    8. }
    9. }

    二、消费者模块

      1.导入依赖

                 和上一个一样

       2.yml配置文件

    1. spring:
    2. rabbitmq:
    3. host: 8.140.244.227
    4. port: 6786
    5. username: test
    6. password: test
    7. virtual-host: /test
    8. #手动ACK
    9. listener:
    10. simple:
    11. acknowledge-mode: manual
    12. prefetch: 1 #等价于basicQos(1)

    3.通过注解绑定 队列名字

    1. package com.qf.bootconsumer.consumer;
    2. import com.rabbitmq.client.Channel;
    3. import org.springframework.amqp.core.Message;
    4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
    5. import org.springframework.stereotype.Component;
    6. import java.io.IOException;
    7. import java.io.UnsupportedEncodingException;
    8. import java.util.HashMap;
    9. import java.util.Map;
    10. import java.util.TreeMap;
    11. @Component
    12. public class MyConsumer {
    13. // @RabbitListener(queues = "hello")
    14. // public void getMsg(Message message) throws UnsupportedEncodingException {
    15. // byte[] body = message.getBody();
    16. // String s = new String(body, "utf-8");
    17. // System.out.println(s);
    18. //
    19. // }
    20. // @RabbitListener(queues = "hello")
    21. // public void getMsg(String msg) throws UnsupportedEncodingException {
    22. //
    23. // System.out.println(msg);
    24. //
    25. // }
    26. @RabbitListener(queues = "hello")
    27. public void getMsg(Map message) throws UnsupportedEncodingException {
    28. System.out.println(message);
    29. }
    30. @RabbitListener(queues = "work")
    31. public void getMsg1(Map data, Channel channel,Message message) throws IOException {
    32. System.out.println(data);
    33. //手动ack//若开启手动ack,不给手动ack,就按照 prefetch: 1 #等价于basicQos(1)的量,就这么多,不会多给你了,因为你没有确认。确认一条,就给你一条
    34. channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    35. }
    36. }

    4.通过配置类,创建队列,交换机,绑定队列给交换机和给予路由

     

    1. package com.qf.bootconsumer.config;
    2. import org.springframework.amqp.core.*;
    3. import org.springframework.context.annotation.Bean;
    4. import org.springframework.context.annotation.Configuration;
    5. /**
    6. * 在 该配置类中可以,通过@Bean 方法定义 mq内部的交换机和队列 ,及其绑定关系
    7. */
    8. @Configuration
    9. public class MyConfig {
    10. @Bean
    11. public TopicExchange exchange01(){
    12. return new TopicExchange("boot-exchange",true,false);
    13. }
    14. @Bean
    15. public Queue queue01(){
    16. /**
    17. * 第一个参数:队列名字
    18. * 第二个参数:true:代表服务重启后,此队列还存在
    19. * 第三个参数: true:排外,不能让其他连接来访问此队列,只有创建此队列的连接能访问消费此队列
    20. * 第四个参数: true:代表服务关闭时,RabbitMQ会自动把 此队列删除了。
    21. */
    22. Queue queue = new Queue("boot-Queue", true, false, false);
    23. return queue;
    24. }
    25. @Bean
    26. public Binding binding01(TopicExchange exchange01,Queue queue01){
    27. Binding binding = BindingBuilder.bind(queue01).to(exchange01).with("*.orange.*");
    28. return binding;
    29. }
    30. @Bean
    31. public Binding binding02(TopicExchange exchange01,Queue queue01){
    32. Binding binding = BindingBuilder.bind(queue01).to(exchange01).with("*.*.rabbit");
    33. return binding;
    34. }
    35. @Bean
    36. public FanoutExchange exchange02(){
    37. return new FanoutExchange("boot-fanout");
    38. }
    39. @Bean
    40. public Queue queue02(){
    41. return new Queue("boot-queue02",true,false,false);
    42. }
    43. @Bean
    44. public Queue queue03(){
    45. return new Queue("boot-queue03",true,false,false);
    46. }
    47. @Bean
    48. public Binding binding03(FanoutExchange exchange02,Queue queue02){
    49. Binding binding = BindingBuilder.bind(queue02).to(exchange02);
    50. return binding;
    51. }
    52. @Bean
    53. public Binding binding04(FanoutExchange exchange02,Queue queue03){
    54. Binding binding = BindingBuilder.bind(queue03).to(exchange02);
    55. return binding;
    56. }
    57. }

  • 相关阅读:
    java-php-python-医院挂号管理系统计算机毕业设计
    学生HTML个人网页作业作品:基于HTML实现教育培训机构网站模板毕业源码(8页)
    5.DApp-前端网页怎么连接MetaMask
    JAVA毕业设计Vue网上书籍购买商城登录计算机源码+lw文档+系统+调试部署+数据库
    插件器件小的引脚孔,制造过程中可能存在的一些问题
    Android - AsyncTask
    远程连接ubuntu的mysql服务报错10061的解决方案
    Qt5开发及实例V2.0-第十八章-Qt-MyselfQQ实例
    leetcode 50. Pow(x, n)
    HTTP学习——协议与术语、HTTP、缓存、Cookie
  • 原文地址:https://blog.csdn.net/qq_53374893/article/details/132743387