• RabbitMQ快速上手以及RabbitMQ交换机的四种模式


    Win10安装:

    ​win10下安装 RabbitMQ​_柚几哥哥的博客-CSDN博客

    Linux安装:

    Linux下载安装 RabbitMQ​_柚几哥哥的博客-CSDN博客

    一、基础使用

    1、导入依赖

    1. <dependency>
    2. <groupId>org.springframework.bootgroupId>
    3. <artifactId>spring-boot-starter-amqpartifactId>
    4. dependency>
    5. <dependency>
    6. <groupId>org.springframework.amqpgroupId>
    7. <artifactId>spring-rabbit-testartifactId>
    8. <scope>testscope>
    9. dependency>

    2、配置application.yml

    1. spring:
    2.  #RabbitMQ
    3. rabbitmq:
    4.    #服务器地址
    5.   host: 192.168.10.100
    6.    #用户名
    7.   username: guest
    8.    #密码
    9.   password: guest
    10.    #虚拟主机
    11.   virtual-host: /
    12.    #端口
    13.   port: 5672
    14.   listener:
    15.     simple:
    16.        #消费者最小数量
    17.       concurrency: 10
    18.        #消费者最大数量
    19.       max-concurrency: 10
    20.        #限制消费者每次只处理一条消息,处理完再继续下一条消息
    21.       prefetch: 1
    22.        #启动时是否默认启动容器,默认true
    23.       auto-startup: true
    24.        #被拒绝时重新进入队列
    25.       default-requeue-rejected: true
    26.   template:
    27.     retry:
    28.        #发布重试,默认false
    29.       enabled: true
    30.        #重试时间 默认1000ms
    31.       initial-interval: 1000
    32.        #重试最大次数,默认3次
    33.       max-attempts: 3
    34.        #重试最大间隔时间,默认10000ms
    35.       max-interval: 10000
    36.        #重试间隔的乘数。比如配2.0 第一次等10s,第二次等20s,第三次等40s
    37.       multiplier: 1.0

    3、编写配置类RabbitMQConfig.java

    1. package com.xxxx.seckill.config;
    2. import org.springframework.amqp.core.Queue;
    3. import org.springframework.context.annotation.Bean;
    4. import org.springframework.context.annotation.Configuration;
    5. /**
    6. * @author zhoubin
    7. * @since 1.0.0
    8. */
    9. @Configuration
    10. public class RabbitMQConfig {
    11.   @Bean
    12.   public Queue queue(){
    13.      return new Queue("queue",true);
    14.   }
    15. }

    4、编写发送者MQSender.java

    1. package com.xxxx.seckill.rabbitmq;
    2. import lombok.extern.slf4j.Slf4j;
    3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
    4. import org.springframework.beans.factory.annotation.Autowired;
    5. import org.springframework.stereotype.Service;
    6. /**
    7. * @author zhoubin
    8. * @since 1.0.0
    9. */
    10. @Service
    11. @Slf4j
    12. public class MQSender {
    13.   @Autowired
    14.   private RabbitTemplate rabbitTemplate;
    15.   public void send(Object msg) {
    16.      log.info("发送消息:"+msg);
    17.      rabbitTemplate.convertAndSend("queue", msg);
    18.   }
    19. }

    5、编写接收者MQReceiver.java

    1. /**
    2. * @author zyw
    3. * @since 1.0.0
    4. */
    5. @Service
    6. @Slf4j
    7. public class MQReceiver {
    8. @RabbitListener(queues = "queue")
    9. public void receive(Object msg) {
    10. log.info("接受消息:" + msg);
    11. }
    12. }

    6、编写测试接口UserController.java

    1. /**
    2. * 测试发送RabbitMQ消息
    3. */
    4. @RequestMapping("/mq")
    5. @ResponseBody
    6. public void mq() {
    7.   mqSender.send("Hello");
    8. }

    7、结果

     二、RabbitMQ交换机

    Fanout模式

    不处理路由键,只需要简单的将队里绑定到交换机上
    发送到交换机的消息都会被转发到与该交换机绑定的所有队列上
    Fanout 交换机转发消息是最快的

    1、RabbitMQConfig.java

    1. package com.xxxx.seckill.config;
    2. import org.springframework.amqp.core.Binding;
    3. import org.springframework.amqp.core.BindingBuilder;
    4. import org.springframework.amqp.core.FanoutExchange;
    5. import org.springframework.amqp.core.Queue;
    6. import org.springframework.context.annotation.Bean;
    7. import org.springframework.context.annotation.Configuration;
    8. /**
    9. * @author zhoubin
    10. * @since 1.0.0
    11. */
    12. @Configuration
    13. public class RabbitMQConfig {
    14.   private static final String QUEUE01 = "queue_fanout01";
    15.   private static final String QUEUE02 = "queue_fanout02";
    16.   private static final String EXCHANGE = "fanoutExchange";
    17.   @Bean
    18.   public Queue queue01(){
    19.      return new Queue(QUEUE01);
    20.   }
    21.   @Bean
    22.   public Queue queue02(){
    23.      return new Queue(QUEUE02);
    24.   }
    25.   @Bean
    26.   public FanoutExchange fanoutExchange(){
    27.      return new FanoutExchange(EXCHANGE);
    28.   }
    29.   @Bean
    30.   public Binding binding01(){
    31.      return BindingBuilder.bind(queue01()).to(fanoutExchange());
    32.   }
    33.   @Bean
    34.   public Binding binding02(){
    35.      return BindingBuilder.bind(queue02()).to(fanoutExchange());
    36.   }
    37. }

    2、MQSender.java

    1. package com.xxxx.seckill.rabbitmq;
    2. import lombok.extern.slf4j.Slf4j;
    3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
    4. import org.springframework.beans.factory.annotation.Autowired;
    5. import org.springframework.stereotype.Service;
    6. /**
    7. * @author zhoubin
    8. * @since 1.0.0
    9. */
    10. @Service
    11. @Slf4j
    12. public class MQSender {
    13.   @Autowired
    14.   private RabbitTemplate rabbitTemplate;
    15.   public void send(Object msg) {
    16.      log.info("发送消息:"+msg);
    17.      rabbitTemplate.convertAndSend("fanoutExchange","",msg);
    18.   }
    19. }

    3、MQReceiver.java

    1. package com.xxxx.seckill.rabbitmq;
    2. import lombok.extern.slf4j.Slf4j;
    3. import org.springframework.amqp.rabbit.annotation.RabbitListener;
    4. import org.springframework.stereotype.Service;
    5. /**
    6. * @author zhoubin
    7. * @since 1.0.0
    8. */
    9. @Service
    10. @Slf4j
    11. public class MQReceiver {
    12.   @RabbitListener(queues = "queue_fanout01")
    13.   public void receive01(Object msg) {
    14.      log.info("QUEUE01接受消息:" + msg);
    15.   }
    16.   @RabbitListener(queues = "queue_fanout02")
    17.   public void receive02(Object msg) {
    18.      log.info("QUEUE02接受消息:" + msg);
    19.   }
    20. }

    4、UserController.java

    1. /**
    2. * 测试发送RabbitMQ消息
    3. */
    4. @RequestMapping("/mq/fanout")
    5. @ResponseBody
    6. public void mq() {
    7.   mqSender.send("Hello");
    8. }

    5、测试

    调用 mq/direct01 接口,消息经由交换机转发到绑定该交换机的所有队列

    Direct模式

    所有发送到 Direct Exchange 的消息被转发到 RouteKey 中指定的 Queue
    注意: Direct 模式可以使用 RabbitMQ 自带的 Exchange default Exchange, 所以不需要将
    Exchange 进行任何绑定 (binding) 操作,消息传递时, RouteKey 必须完全匹配才会被队列接收,否
    则该消息会被抛弃。
    重点: routing key 与队列 queues key 保持一致,即可以路由到对应的 queue 中。

    1、RabbitMQConfig.java

    1. package com.xxxx.seckill.config;
    2. import org.springframework.amqp.core.Binding;
    3. import org.springframework.amqp.core.BindingBuilder;
    4. import org.springframework.amqp.core.DirectExchange;
    5. import org.springframework.amqp.core.Queue;
    6. import org.springframework.context.annotation.Bean;
    7. import org.springframework.context.annotation.Configuration;
    8. /**
    9. * @author zhoubin
    10. * @since 1.0.0
    11. */
    12. @Configuration
    13. public class RabbitMQConfig {
    14. private static final String QUEUE01 = "queue_direct01";
    15. private static final String QUEUE02 = "queue_direct02";
    16. private static final String EXCHANGE = "directExchange";
    17. private static final String ROUTINGKEY01 = "queue.red";
    18. private static final String ROUTINGKEY02 = "queue.green";
    19. @Bean
    20. public Queue queue01(){
    21. return new Queue(QUEUE01);
    22. }
    23. @Bean
    24. public Queue queue02(){
    25. return new Queue(QUEUE02);
    26. }
    27. @Bean
    28. public DirectExchange directExchange(){
    29. return new DirectExchange(EXCHANGE);
    30. }
    31. @Bean
    32. public Binding binding01(){
    33. return
    34. BindingBuilder.bind(queue01()).to(directExchange()).with(ROUTINGKEY01);
    35. }
    36. @Bean
    37. public Binding binding02(){
    38. return
    39. BindingBuilder.bind(queue02()).to(directExchange()).with(ROUTINGKEY02);
    40. }
    41. }

    2、MQSender.java

    1. package com.xxxx.seckill.rabbitmq;
    2. import lombok.extern.slf4j.Slf4j;
    3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
    4. import org.springframework.beans.factory.annotation.Autowired;
    5. import org.springframework.stereotype.Service;
    6. /**
    7. * @author zhoubin
    8. * @since 1.0.0
    9. */
    10. @Service
    11. @Slf4j
    12. public class MQSender {
    13. @Autowired
    14. private RabbitTemplate rabbitTemplate;
    15. public void send01(Object msg) {
    16. log.info("发送red消息:"+msg);
    17. rabbitTemplate.convertAndSend("directExchange","queue.red",msg);
    18. }
    19. public void send02(Object msg) {
    20. log.info("发送green消息:"+msg);
    21. rabbitTemplate.convertAndSend("directExchange","queue.green",msg);
    22. }
    23. }

    3、MQReceiver.java

    1. package com.xxxx.seckill.rabbitmq;
    2. import lombok.extern.slf4j.Slf4j;
    3. import org.springframework.amqp.rabbit.annotation.RabbitListener;
    4. import org.springframework.stereotype.Service;
    5. /**
    6. * @author zhoubin
    7. * @since 1.0.0
    8. */
    9. @Service
    10. @Slf4j
    11. public class MQReceiver {
    12. @RabbitListener(queues = "queue_direct01")
    13. public void receive01(Object msg) {
    14. log.info("QUEUE01接受消息:" + msg);
    15. }
    16. @RabbitListener(queues = "queue_direct02")
    17. public void receive02(Object msg) {
    18. log.info("QUEUE02接受消息:" + msg);
    19. }
    20. }

    4、UserController.java

    1. /**
    2. * 测试发送RabbitMQ消息
    3. */
    4. @RequestMapping("/mq/direct01")
    5. @ResponseBody
    6. public void mq01() {
    7.   mqSender.send01("Hello,Red");
    8. }
    9. /**
    10. * 测试发送RabbitMQ消息
    11. */
    12. @RequestMapping("/mq/direct02")
    13. @ResponseBody
    14. public void mq02() {
    15.   mqSender.send02("Hello,Green");
    16. }

    5、测试

    调用 mq/direct01 接口,消息经由交换机绑定的 queue.red RoutingKey 转发到 queue_direct01

    调用 mq/direct02 接口,消息经由交换机绑定的 queue.green RoutingKey 转发到 queue_direct02
    队列

    Topic模式

    所有发送到 Topic Exchange 的消息被转发到所有管线 RouteKey 中指定 Topic Queue
    Exchange RouteKey 和某 Topic 进行模糊匹配 , 此时队列需要绑定一个 Topic
    对于 routing key 匹配模式定义规则举例如下 :
    routing key 为一个句点号 . 分隔的字符串(我们将被句点号 . 分隔开的每一段独立的字符串称为
    一个单词),如 “stock.usd.nyse” “nyse.vmw” “quick.orange.rabbit”
    routing key 中可以存在两种特殊字符 * # ,用于做模糊匹配,其中 * 用于匹配一个单词, #
    于匹配多个单词(可以是零个)

    1、RabbitMQConfig.java

    1. package com.xxxx.seckill.config;
    2. import org.springframework.amqp.core.Binding;
    3. import org.springframework.amqp.core.BindingBuilder;
    4. import org.springframework.amqp.core.Queue;
    5. import org.springframework.amqp.core.TopicExchange;
    6. import org.springframework.context.annotation.Bean;
    7. import org.springframework.context.annotation.Configuration;
    8. /**
    9. * @author zhoubin
    10. * @since 1.0.0
    11. */
    12. @Configuration
    13. public class RabbitMQConfig {
    14. private static final String QUEUE01 = "queue_topic01";
    15. private static final String QUEUE02 = "queue_topic02";
    16. private static final String EXCHANGE = "topicExchange";
    17. private static final String ROUTINGKEY01 = "#.queue.#";
    18. private static final String ROUTINGKEY02 = "*.queue.#";
    19. @Bean
    20. public Queue queue01(){
    21. return new Queue(QUEUE01);
    22. }
    23. @Bean
    24. public Queue queue02(){
    25. return new Queue(QUEUE02);
    26. }
    27. @Bean
    28. public TopicExchange topicExchange(){
    29. return new TopicExchange(EXCHANGE);
    30. }
    31. @Bean
    32. public Binding binding01(){
    33. return
    34. BindingBuilder.bind(queue01()).to(topicExchange()).with(ROUTINGKEY01);
    35. }
    36. @Bean
    37. public Binding binding02(){
    38. return
    39. BindingBuilder.bind(queue02()).to(topicExchange()).with(ROUTINGKEY02);
    40. }
    41. }

    2、MQSender.java

    1. package com.xxxx.seckill.rabbitmq;
    2. import lombok.extern.slf4j.Slf4j;
    3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
    4. import org.springframework.beans.factory.annotation.Autowired;
    5. import org.springframework.stereotype.Service;
    6. /**
    7. * @author zhoubin
    8. * @since 1.0.0
    9. */
    10. @Service
    11. @Slf4j
    12. public class MQSender {
    13. @Autowired
    14. private RabbitTemplate rabbitTemplate;
    15. public void send01(Object msg) {
    16. log.info("发送消息(被01队列接受):"+msg);
    17. rabbitTemplate.convertAndSend("topicExchange","queue.red.message",msg);
    18. }
    19. public void send02(Object msg) {
    20. log.info("发送消息(被两个queue接受):"+msg);
    21. rabbitTemplate.convertAndSend("topicExchange","message.queue.green.abc",msg);
    22. }
    23. }

    3、MQReceiver.java

    1. package com.xxxx.seckill.rabbitmq;
    2. import lombok.extern.slf4j.Slf4j;
    3. import org.springframework.amqp.rabbit.annotation.RabbitListener;
    4. import org.springframework.stereotype.Service;
    5. /**
    6. * @author zhoubin
    7. * @since 1.0.0
    8. */
    9. @Service
    10. @Slf4j
    11. public class MQReceiver {
    12. @RabbitListener(queues = "queue_topic01")
    13. public void receive01(Object msg) {
    14. log.info("QUEUE01接受消息:" + msg);
    15. }
    16. @RabbitListener(queues = "queue_topic02")
    17. public void receive02(Object msg) {
    18. log.info("QUEUE02接受消息:" + msg);
    19. }
    20. }

    4、UserController.java

    1. /**
    2. * 测试发送RabbitMQ消息
    3. */
    4. @RequestMapping("/mq/topic01")
    5. @ResponseBody
    6. public void mq01() {
    7.   mqSender.send01("Hello,Red");
    8. }
    9. /**
    10. * 测试发送RabbitMQ消息
    11. */
    12. @RequestMapping("/mq/topic02")
    13. @ResponseBody
    14. public void mq02() {
    15.   mqSender.send02("Hello,Green");
    16. }

    5、测试

    调用 mq/topic01 接口,消息经由交换机绑定的 #.queue.# RoutingKey 转发到 queue_topic01 队列

    调用 mq/topic02 接口,消息经由交换机绑定的 *.queue.# #.queue.# RoutingKey 转发到
    queue_topic01 queue_topic02 队列

    Headers模式

    不依赖 routingkey ,使用发送消息时 basicProperties 对象中的 headers 匹配队列
    headers 是一个键值对类型,键值对的值可以是任何类型
    在队列绑定交换机时用 x-match 来指定, all 代表定义的多个键值对都要满足, any 则代表只要满足
    一个可以了

    1、RabbitMQConfig.java

    1. package com.xxxx.seckill.config;
    2. import org.springframework.amqp.core.Binding;
    3. import org.springframework.amqp.core.BindingBuilder;
    4. import org.springframework.amqp.core.HeadersExchange;
    5. import org.springframework.amqp.core.Queue;
    6. import org.springframework.context.annotation.Bean;
    7. import org.springframework.context.annotation.Configuration;
    8. import java.util.HashMap;
    9. import java.util.Map;
    10. /**
    11. * @author zhoubin
    12. * @since 1.0.0
    13. */
    14. @Configuration
    15. public class RabbitMQConfig {
    16. private static final String QUEUE01 = "queue_header01";
    17. private static final String QUEUE02 = "queue_header02";
    18. private static final String EXCHANGE = "headersExchange";
    19. @Bean
    20. public Queue queue01(){
    21. return new Queue(QUEUE01);
    22. }
    23. @Bean
    24. public Queue queue02(){
    25. return new Queue(QUEUE02);
    26. }
    27. @Bean
    28. public HeadersExchange headersExchange(){
    29. return new HeadersExchange(EXCHANGE);
    30. }
    31. @Bean
    32. public Binding binding01(){
    33. Map map = new HashMap<>();
    34. map.put("color","red");
    35. map.put("speed","low");
    36. return
    37. BindingBuilder.bind(queue01()).to(headersExchange()).whereAny(map).match();
    38. }
    39. @Bean
    40. public Binding binding02(){
    41. Map map = new HashMap<>();
    42. map.put("color","red");
    43. map.put("speed","fast");
    44. return
    45. BindingBuilder.bind(queue02()).to(headersExchange()).whereAll(map).match();
    46. }
    47. }

    2、MQSender.java

    1. package com.xxxx.seckill.rabbitmq;
    2. import lombok.extern.slf4j.Slf4j;
    3. import org.springframework.amqp.core.Message;
    4. import org.springframework.amqp.core.MessageProperties;
    5. import org.springframework.amqp.rabbit.core.RabbitTemplate;
    6. import org.springframework.beans.factory.annotation.Autowired;
    7. import org.springframework.stereotype.Service;
    8. /**
    9. * @author zhoubin
    10. * @since 1.0.0
    11. */
    12. @Service
    13. @Slf4j
    14. public class MQSender {
    15. @Autowired
    16. private RabbitTemplate rabbitTemplate;
    17. public void send01(String msg) {
    18. log.info("发送消息(被两个queue接受):" + msg);
    19. MessageProperties properties = new MessageProperties();
    20. properties.setHeader("color", "red");
    21. properties.setHeader("speed", "fast");
    22. Message message = new Message(msg.getBytes(), properties);
    23. rabbitTemplate.convertAndSend("headersExchange", "", message);
    24. }
    25. public void send02(String msg) {
    26. log.info("发送消息(被01队列接受):" + msg);
    27. MessageProperties properties = new MessageProperties();
    28. properties.setHeader("color", "red");
    29. properties.setHeader("speed", "normal");
    30. Message message = new Message(msg.getBytes(), properties);
    31. rabbitTemplate.convertAndSend("headersExchange", "", message);
    32. }
    33. }

    3、MQReceiver.java

    1. package com.xxxx.seckill.rabbitmq;
    2. import lombok.extern.slf4j.Slf4j;
    3. import org.springframework.amqp.core.Message;
    4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
    5. import org.springframework.stereotype.Service;
    6. /**
    7. * @author zhoubin
    8. * @since 1.0.0
    9. */
    10. @Service
    11. @Slf4j
    12. public class MQReceiver {
    13. @RabbitListener(queues = "queue_header01")
    14. public void receive01(Message message) {
    15. log.info("QUEUE01接受Message对象:" + message);
    16. log.info("QUEUE01接受消息:" + new String(message.getBody()));
    17. }
    18. @RabbitListener(queues = "queue_header02")
    19. public void receive02(Message message) {
    20. log.info("QUEUE02接受Message对象:" + message);
    21. log.info("QUEUE02接受消息:" + new String(message.getBody()));
    22. }
    23. }

    4、UserController.java

    1. /**
    2. * 测试发送RabbitMQ消息
    3. */
    4. @RequestMapping("/mq/header01")
    5. @ResponseBody
    6. public void mq01() {
    7.   mqSender.send01("Hello,header01");
    8. }
    9. /**
    10. * 测试发送RabbitMQ消息
    11. */
    12. @RequestMapping("/mq/header02")
    13. @ResponseBody
    14. public void mq02() {
    15.   mqSender.send02("Hello,header02");
    16. }

    5、测试

    queue_header01 设置 x-match any queue_header02 设置 x-match all 。因此调用 mq/header01
    接口,可以匹配两个队列

    调用 mq/header02 接口,只能匹配 queue_header01 队列

  • 相关阅读:
    Linux 压缩、解压缩命令
    数据中心供配电及能效管理系统的设计要点
    LeetCode 0053. 最大子数组和:DP 或 递归(线段树入门题?)
    【MyBatis XML实现批量删除操作】
    IB心理学如何记住大量的内容?
    大一新生HTML期末作业 学生个人网页设计作业 HTML5响应式个人简历网站模板 web前端网页制作课作业
    个人付费专栏上线预热
    基于Freetype的文字渲染流程以及缓存策略
    《MLB棒球创造营》:走近棒球运动·华盛顿国民队
    趁着中秋节来临之际,学学如何做好团队管理
  • 原文地址:https://blog.csdn.net/Zyw907155124/article/details/128199513