• RabbitMQ及各种模式


    目录

    一、MQ的基本概念

    1.1 MQ概述

    1.2 MQ的优势和劣势

    1.3 MQ的优势

    1.应用解耦

    2.异步提速 

    3.削峰填谷

    1.4 MQ的劣势

    小结

     1.5 常见的 MQ 产品

    1.6 RabbitMQ 简介

    1.7 JMS

    小结

    二、RabbitMQ管控台 

     三、Hello World简单模式

    ​编辑

    1、生产者

    ​编辑 2、消费者

    ​编辑 四、Work queues 工作队列模式

    1、生产者

    2、消费者

    启动两个消费者 

     启动生产者

     小结

    五、Pub/Sub订阅模式

     1、生产者

    2、消费者1

    消费者2 

    小结

    六、Routing 路由模式

    1、生产者 

    2、消费者1,2

    七、Topics 通配符模式 

    1、生产者

    2、消费者

    小结

    八、工作模式总结  

    1. 简单模式 HelloWorld

    2. 工作队列模式 Work Queue

    3. 发布订阅模式 Publish/subscribe

    4. 路由模式 Routing

    5. 通配符模式 Topic

    一、MQ的基本概念

    1.1 MQ概述

    MQ全称 M essage Q ueue(消息队列),是在消息的传输过程中保存消息的容器。多用于分布式系统之间进行通信。MQ,消息队列,存储消息的中间件。发送方称为生产者,接收方称为消费者
    分布式系统通信两种方式:
    • 直接远程调用
    • 借助第三方完成间接通信

    1.2 MQ的优势和劣势

    • 优势:                                                            劣势:
      • 应用解耦                                               系统可用性降低
      • 异步提速                                               系统复杂度提高
      • 削峰填谷                                               ⚫ 一致性问题

    1.3 MQ的优势

    1.应用解耦

    用户点击下单,进入订单系统,订单系统通过远程调用去调用库存系统、支付系统、物流系统。这样这四个系统就会耦合在一起,可能出现第一个问题:当库存系统出现异常,订单系统链路走不通也会出问题,用户可能得到下单失败这个反馈,整个系统的容错率低;第二个问题:在下订单的过程中要增加一个X系统,就要修改订单系统然后再访问X系统,如果又要加Y系统不要X系统了,那么又要修改订单系统,整个系统的可维护性比较低。

    系统的耦合性越高,容错性就越低,可维护性就越低。

    用户点击下单,进入订单系统。订单系统只需要发送一条消息到MQ就可以了,可以个用户发送下单成功。库存系统、支付系统、物流系统只需要从MQ里取出消息消费就可以了。对于问题一:库存系统出现异常后,订单系统没有异常,因为订单系统和其他三个系统是隔离的,没有任何影响。库存系统异常是暂时的,修复之后再去MQ里取出消息消费,最终是正常的。系统容错性提高;对于问题二:增加X系统,与订单系统无关,不需要修改订单系统,直接增加新系统然后再去MQ里取出消息消费就可以了

    使用 MQ 使得应用间解耦,提升容错性和可维护性。

    2.异步提速 

    远程调用是个同步的方式,订单系统先调用库存返回后再调用支付返回后再调用物流,需要同步的去完成订单的整个链路的调用,没有问题后就会返回给用户下单成功。

    一个下单操作耗时:20 + 300 + 300 + 300 = 920ms
    用户点击完下单按钮后,需要等待920ms才能得到下单响应,太慢!

    用户下订单,订单系统保存到自己的数据库花费20ms,向MQ发送消息花费5ms,这时订单系统可以直接告诉用户下单成功了。后边的操作不管成功与否,取出消息消费即可,这就是异步的方式。

    用户点击完下单按钮后,只需等待25ms就能得到下单响应 (20 + 5 = 25ms)。
    提升用户体验和系统吞吐量(单位时间内处理请求的数目)。

    3.削峰填谷

    A系统每秒最大处理1000请求,现在10点秒杀活动,请求瞬间增多,每秒5000个请求,A系统承载不了这么大的并发,宕机系统不可用,用户的体验就太差了。

    可以使用MQ削峰

    5000个请求对接MQ,5000请求MQ完全可以承载,小意思,A系统再慢慢的从MQ每秒拉去1000个请求完成消费,A系统的稳定性就提高了很多

    使用了 MQ 之后,限制消费消息的速度为1000,这样一来,高峰期产生的数据势必会被积压在 MQ 中,高峰就被“削”掉了,但是因为消息积压,在高峰期过后的一段时间内,消费消息的速度还是会维持在1000,直到消费完积压的消息,这就叫做“填谷”。
    使用MQ后,可以提高系统稳定性。
    小结:
    • 应用解耦:提高系统容错性和可维护性
    • 异步提速:提升用户体验和系统吞吐量
    • 削峰填谷:提高系统稳定性

    1.4 MQ的劣势

    系统可用性降低
    系统引入的外部依赖越多,系统稳定性越差。一旦 MQ 宕机,就会对业务造成影响。如何保证MQ的高可用?
    系统复杂度提高
    MQ 的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过 MQ 进行异步调用。如何保证消息没有被重复消费?怎么处理消息丢失情况?那么保证消息传递的顺序性?
    一致性问题
    A 系统处理完业务,通过 MQ 给B、C、D三个系统发消息数据,如果 B 系统、C 系统处理成功,D 系统处理失败。如何保证消息数据处理的一致性?

    小结

    既然 MQ 有优势也有劣势,那么使用 MQ 需要满足什么条件呢?
    • ① 生产者不需要从消费者处获得反馈。引入消息队列之前的直接调用,其接口的返回值应该为空,这才让明明下层的动作还没做,上层却当成动作做完了继续往后走,即所谓异步成为了可能。
    • ② 容许短暂的不一致性。
    • ③ 确实是用了有效果。即解耦、提速、削峰这些方面的收益,超过加入MQ,管理MQ这些成本。

     1.5 常见的 MQ 产品

    目前业界有很多的 MQ 产品,例如 RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMq等, 也有直接使用 Redis 充当消息队列的案例,而这些消息队列产品,各有侧重,在实际选型时,需要结合自身需求及 MQ 产品特征,综合考虑。

    1.6 RabbitMQ 简介

    AMQP Advanced Message Queuing Protocol (高级消息队列协议),是一个网络协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。2006年,AMQP 规范发布。 类比HTTP

    生产者发布消息到exchange,exchange通过不同的规则,把消息路由到不同的队列去存储,consumer监听从队列中拿走对应的消息消费

    2007年,Rabbit 技术公司基于 AMQP 标准开发的 RabbitMQ 1.0 发布。RabbitMQ 采用 Erlang 语言开发。Erlang 语言由 Ericson 设计,专门为开发高并发和分布式系统的一种语言,在电信领域使用广泛。
    RabbitMQ 基础架构如下图:

    RabbitMQ 中的相关概念:
    • Broker接收和分发消息的应用,RabbitMQ Server就是 Message Broker
    • Virtual host出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost 创建 exchange/queue 等
    • Connectionpublisher/consumer 和 broker 之间的 TCP 连接
    • Channel如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection 的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的 channel 进行通讯,AMQP method 包含了channel id 帮助客户端和 message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销
    • Exchangemessage 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
    • Queue消息最终被送到这里等待 consumer 取走
    • Bindingexchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据
    RabbitMQ 提供了 6 种工作模式 简单模式、work queues、Publish/Subscribe 发布与订阅模式、Routing 路由模式、Topics 主题模式、RPC 远程调用模式(远程调用,不太算 MQ)。

    RabbitMQ Tutorials — RabbitMQ

    1.7 JMS

    • JMS 即 Java 消息服务(JavaMessage Service)应用程序接口,是一个 Java 平台中关于面向消息中间件的API
    • JMS 是 JavaEE 规范中的一种,类比JDBC
    • 很多消息中间件都实现了JMS规范,例如:ActiveMQ。RabbitMQ 官方没有提供 JMS 的实现包,但是开源社区有

    小结

    • 1. RabbitMQ 是基于 AMQP 协议使用 Erlang 语言开发的一款消息队列产品。
    • 2. RabbitMQ提供了6种工作模式,这边讲解5种。这是重点。
    • 3. AMQP 是协议,类比HTTP。
    • 4. JMS 是 API 规范接口,类比 JDBC。

    二、RabbitMQ管控台 

     结果:


     三、Hello World简单模式

    在上图的模型中,有以下概念:
    • P:生产者,也就是要发送消息的程序
    • C:消费者:消息的接收者,会一直等待消息到来
    • queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息

    需求:使用简单模式完成消息传递

    步骤:     ① 创建工程(生成者、消费者)
                    ② 分别添加依赖
                    ③ 编写生产者发送消息
                    ④ 编写消费者接收消息

    添加依赖:rabbitmq客户端,编译版本插件

    1. com.rabbitmq
    2. amqp-client
    3. 5.6.0
    4. org.apache.maven.plugins
    5. maven-compiler-plugin
    6. 3.8.0
    7. 1.8
    8. 1.8

    1、生产者

    1.创建连接工厂---2. 设置参数---3. 创建连接 Connection---4. 创建Channel---5. 创建队列Queue---6. 发送消息---7.释放资源

    1. import com.rabbitmq.client.Channel;
    2. import com.rabbitmq.client.Connection;
    3. import com.rabbitmq.client.ConnectionFactory;
    4. import java.io.IOException;
    5. import java.util.concurrent.TimeoutException;
    6. /**
    7. * 发送消息
    8. *
    9. */
    10. public class Producer_HelloWorld {
    11. public static void main(String[] args) throws IOException, TimeoutException {
    12. //1.创建连接工厂
    13. ConnectionFactory factory = new ConnectionFactory();
    14. //2. 设置参数
    15. factory.setHost("43.143.246.208");//ip 默认值 localhost
    16. factory.setPort(5672); //端口 默认值 5672
    17. factory.setVirtualHost("/itcast");//虚拟机 默认值 /
    18. factory.setUsername("root");//用户名 默认 guest
    19. factory.setPassword("root");//密码 默认值 guest
    20. factory.setConnectionTimeout(5000);//针对连接超时,延长我们的连接时间
    21. //3. 创建连接 Connection
    22. Connection connection = factory.newConnection();
    23. //4. 创建Channel
    24. Channel channel = connection.createChannel();
    25. //5. 创建队列Queue
    26. /*
    27. queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments)
    28. 参数:
    29. 1. queue:队列名称
    30. 2. durable:是否持久化,当mq重启之后,还在
    31. 3. exclusive:
    32. * 是否独占。只能有一个消费者监听这队列
    33. * 当Connection关闭时,是否删除队列
    34. * 一般设为false
    35. 4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
    36. 5. arguments:参数。配置一些怎么删的参数
    37. */
    38. //如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
    39. channel.queueDeclare("hello_world",true,false,false,null);
    40. /*
    41. basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
    42. 参数:
    43. 1. exchange:交换机名称。简单模式下交换机会使用默认的空串 ""
    44. 2. routingKey:路由名称,使用默认的交换机路由名称要和队列名称一致
    45. 3. props:配置信息
    46. 4. body:字节数组,真实发送的消息数据
    47. */
    48. String body = "hello rabbitmq~~~";
    49. //6. 发送消息
    50. channel.basicPublish("","hello_world",null,body.getBytes());
    51. //7.释放资源
    52. // channel.close();
    53. // connection.close();
    54. }
    55. }

    连接不关闭,不释放资源

     2、消费者

    1.创建连接工厂---2. 设置参数---3. 创建连接 Connection---4. 创建Channel---5. 创建队列Queue---6. 接收消息

    1. import com.rabbitmq.client.*;
    2. import java.io.IOException;
    3. import java.util.concurrent.TimeoutException;
    4. public class Consumer_HelloWorld {
    5. public static void main(String[] args) throws IOException, TimeoutException {
    6. //1.创建连接工厂
    7. ConnectionFactory factory = new ConnectionFactory();
    8. //2. 设置参数
    9. factory.setHost("43.143.246.208");//ip 默认值 localhost
    10. factory.setPort(5672); //端口 默认值 5672
    11. factory.setVirtualHost("/itcast");//虚拟机 默认值/
    12. factory.setUsername("root");//用户名 默认 guest
    13. factory.setPassword("root");//密码 默认值 guest
    14. //3. 创建连接 Connection
    15. Connection connection = factory.newConnection();
    16. //4. 创建Channel
    17. Channel channel = connection.createChannel();
    18. //5. 创建队列Queue
    19. /*
    20. queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments)
    21. 参数:
    22. 1. queue:队列名称
    23. 2. durable:是否持久化,当mq重启之后,还在
    24. 3. exclusive:
    25. * 是否独占。只能有一个消费者监听这队列
    26. * 当Connection关闭时,是否删除队列
    27. *
    28. 4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
    29. 5. arguments:参数。
    30. */
    31. //如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
    32. channel.queueDeclare("hello_world",true,false,false,null);
    33. /*
    34. basicConsume(String queue, boolean autoAck, Consumer callback)
    35. 参数:
    36. 1. queue:队列名称
    37. 2. autoAck:是否自动确认
    38. 3. callback:回调对象
    39. */
    40. // 接收消息
    41. Consumer consumer = new DefaultConsumer(channel){
    42. /*
    43. 回调方法,当收到消息后,会自动执行该方法
    44. 1. consumerTag:标识
    45. 2. envelope:获取一些信息,交换机,路由key...
    46. 3. properties:配置信息
    47. 4. body:数据
    48. */
    49. @Override
    50. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    51. System.out.println("consumerTag:"+consumerTag);
    52. System.out.println("Exchange:"+envelope.getExchange());
    53. System.out.println("RoutingKey:"+envelope.getRoutingKey());
    54. System.out.println("properties:"+properties);
    55. System.out.println("body:"+new String(body));
    56. }
    57. };
    58. channel.basicConsume("hello_world",true,consumer);
    59. //关闭资源?不要
    60. }
    61. }

     四、Work queues 工作队列模式

    Work Queues: 与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。
    应用场景 :对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。比如队列里有1000条消息,C1只能处理500条消息,增加队友C2一起处理,理论上可以处理1000条消息了
    Work Queues 与入门程序的简单模式的代码几乎是一样的。可以完全复制,并多复制一个消费者进行多
    个消费者同时对消费消息的测试。

    1、生产者

    队列修改为work_queues
    //修改为循环输出10条语句
    for (int i = 1; i <= 10; i++) {
        String body = i+"hello rabbitmq~~~";
    
        //6. 发送消息
        channel.basicPublish("","work_queues",null,body.getBytes());
    }
    

    2、消费者

    增加为两个消费者
    Consumer_WorkQueues1
    Consumer_WorkQueues2
    队列修改为work_queues

    启动两个消费者 

     启动生产者

    消费者1 消费13579  ,消费者2 消费246810  ,两个是循环交替消费的

     小结

    1. 在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系。
    2. Work Queues 对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。例如:短信服务部署多个,
    只需要有一个节点成功发送即可。

    五、Pub/Sub订阅模式

     

    在订阅模型中,多了一个 Exchange 角色,而且过程略有变化:
    • P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
    • C:消费者,消息的接收者,会一直等待消息到来
    • Queue:消息队列,接收消息、缓存消息
    • Exchange:交换机(X)。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、 递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:
                    ➢ Fanout:广播,将消息交给所有绑定到交换机的队列
                    ➢ Direct:定向,把消息交给符合指定routing key 的队列
                    ➢ Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
    Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合
    路由规则的队列,那么消息会丢失!

     1、生产者

    1.创建连接工厂---2. 设置参数---3. 创建连接 Connection---4. 创建Channel---5. 创建交换机---

    6. 创建队列Queue---7. 绑定队列和交换机---8. 发送消息---9. 释放资源

    1. public class Producer_PubSub {
    2. public static void main(String[] args) throws IOException, TimeoutException {
    3. //1.创建连接工厂
    4. ConnectionFactory factory = new ConnectionFactory();
    5. //2. 设置参数
    6. factory.setHost("43.143.246.208");//ip 默认值 localhost
    7. factory.setPort(5672); //端口 默认值 5672
    8. factory.setVirtualHost("/itcast");//虚拟机 默认值 /
    9. factory.setUsername("root");//用户名 默认 guest
    10. factory.setPassword("root");//密码 默认值 guest
    11. factory.setConnectionTimeout(5000);//针对连接超时,延长我们的连接时间
    12. //3. 创建连接 Connection
    13. Connection connection = factory.newConnection();
    14. //4. 创建Channel
    15. Channel channel = connection.createChannel();
    16. /*
    17. exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map arguments)
    18. 参数:
    19. 1. exchange:交换机名称
    20. 2. type:交换机类型
    21. DIRECT("direct"),:定向
    22. FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。
    23. TOPIC("topic"),通配符的方式
    24. HEADERS("headers");参数匹配,用的比较少
    25. 3. durable:是否持久化
    26. 4. autoDelete:自动删除
    27. 5. internal:内部使用。 一般false
    28. 6. arguments:参数
    29. */
    30. String exchangeName = "test_fanout";
    31. //5. 创建交换机
    32. channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);
    33. //6. 创建队列
    34. String queue1Name = "test_fanout_queue1";
    35. String queue2Name = "test_fanout_queue2";
    36. channel.queueDeclare(queue1Name,true,false,false,null);
    37. channel.queueDeclare(queue2Name,true,false,false,null);
    38. //7. 绑定队列和交换机
    39. /*
    40. queueBind(String queue, String exchange, String routingKey)
    41. 参数:
    42. 1. queue:队列名称
    43. 2. exchange:交换机名称
    44. 3. routingKey:路由键,绑定规则
    45. 如果交换机的类型为fanout ,routingKey设置为""
    46. */
    47. channel.queueBind(queue1Name,exchangeName,"");
    48. channel.queueBind(queue2Name,exchangeName,"");
    49. String body = "日志信息:张三调用了findAll方法...日志级别:info...";
    50. //8. 发送消息
    51. channel.basicPublish(exchangeName,"",null,body.getBytes());
    52. //9. 释放资源
    53. channel.close();
    54. connection.close();
    55. }
    56. }

    2、消费者1

    1. public class Consumer_PubSub1 {
    2. public static void main(String[] args) throws IOException, TimeoutException {
    3. //1.创建连接工厂
    4. ConnectionFactory factory = new ConnectionFactory();
    5. //2. 设置参数
    6. factory.setHost("43.143.246.208");//ip 默认值 localhost
    7. factory.setPort(5672); //端口 默认值 5672
    8. factory.setVirtualHost("/itcast");//虚拟机 默认值/
    9. factory.setUsername("root");//用户名 默认 guest
    10. factory.setPassword("root");//密码 默认值 guest
    11. //3. 创建连接 Connection
    12. Connection connection = factory.newConnection();
    13. //4. 创建Channel
    14. Channel channel = connection.createChannel();
    15. String queue1Name = "test_fanout_queue1";
    16. String queue2Name = "test_fanout_queue2";
    17. /*
    18. basicConsume(String queue, boolean autoAck, Consumer callback)
    19. 参数:
    20. 1. queue:队列名称
    21. 2. autoAck:是否自动确认
    22. 3. callback:回调对象
    23. */
    24. // 接收消息
    25. Consumer consumer = new DefaultConsumer(channel){
    26. /*
    27. 回调方法,当收到消息后,会自动执行该方法
    28. 1. consumerTag:标识
    29. 2. envelope:获取一些信息,交换机,路由key...
    30. 3. properties:配置信息
    31. 4. body:数据
    32. */
    33. @Override
    34. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    35. System.out.println("body:"+new String(body));//字节数组转成字符串
    36. System.out.println("将日志信息打印到控制台.....");
    37. }
    38. };
    39. channel.basicConsume(queue1Name,true,consumer);
    40. //消费者关闭资源?不要!
    41. }
    42. }

    消费者2 

    1. public class Consumer_PubSub2 {
    2. public static void main(String[] args) throws IOException, TimeoutException {
    3. //1.创建连接工厂
    4. ConnectionFactory factory = new ConnectionFactory();
    5. //2. 设置参数
    6. factory.setHost("43.143.246.208");//ip 默认值 localhost
    7. factory.setPort(5672); //端口 默认值 5672
    8. factory.setVirtualHost("/itcast");//虚拟机 默认值/
    9. factory.setUsername("root");//用户名 默认 guest
    10. factory.setPassword("root");//密码 默认值 guest
    11. //3. 创建连接 Connection
    12. Connection connection = factory.newConnection();
    13. //4. 创建Channel
    14. Channel channel = connection.createChannel();
    15. String queue1Name = "test_fanout_queue1";
    16. String queue2Name = "test_fanout_queue2";
    17. /*
    18. basicConsume(String queue, boolean autoAck, Consumer callback)
    19. 参数:
    20. 1. queue:队列名称
    21. 2. autoAck:是否自动确认
    22. 3. callback:回调对象
    23. */
    24. // 接收消息
    25. Consumer consumer = new DefaultConsumer(channel){
    26. /*
    27. 回调方法,当收到消息后,会自动执行该方法
    28. 1. consumerTag:标识
    29. 2. envelope:获取一些信息,交换机,路由key...
    30. 3. properties:配置信息
    31. 4. body:数据
    32. */
    33. @Override
    34. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    35. System.out.println("body:"+new String(body));//字节数组转成字符串
    36. System.out.println("将日志信息保存到数据库.....");
    37. }
    38. };
    39. channel.basicConsume(queue2Name,true,consumer);
    40. //消费者关闭资源?不要!
    41. }
    42. }

    小结

    1. 交换机需要与队列进行绑定,绑定之后;一个消息可以被多个消费者都收到,可以进行不同的处理
    2. 发布订阅模式与工作队列模式的区别:
    • 工作队列模式不用定义交换机,而发布/订阅模式需要定义交换机
    • 发布/订阅模式的生产方是面向交换机发送消息,工作队列模式的生产方是面向队列发送消息(底层使用默认交换机)
    • 发布/订阅模式需要设置队列和交换机的绑定,工作队列模式不需要设置,实际上工作队列模式会将队列绑定到默认的交换机

    六、Routing 路由模式

     

    模式说明:

    • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个 RoutingKey(路由key)
    • 消息的发送方在向 Exchange 发送消息时,也必须指定消息的 RoutingKey
    • Exchange 不再把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列的Routingkey 与消息的 Routing key 完全一致,才会接收到消息
    图解:
    • P:生产者,向 Exchange 发送消息,发送消息时,会指定一个routing key
    • X:Exchange(交换机),接收生产者的消息,然后把消息递交给与 routing key 完全匹配的队列
    • C1:消费者,其所在队列指定了需要 routing key 为 error 的消息
    • C2:消费者,其所在队列指定了需要 routing key 为 info、error、warning 的消息

    1、生产者 

    修改内容:exchangeName = "test_direct"
    BuiltinExchangeType.DIRECT
    String queue1Name = "test_direct_queue1";
    String queue2Name = "test_direct_queue2";
    //队列1绑定 error
    channel.queueBind(queue1Name,exchangeName,"error");
    //队列2绑定 info error warning
    channel.queueBind(queue2Name,exchangeName,"info");
    channel.queueBind(queue2Name,exchangeName,"error");
    channel.queueBind(queue2Name,exchangeName,"warning");
    //8. 发送消息
    channel.basicPublish(exchangeName,"info",null,body.getBytes());//2收到
    //channel.basicPublish(exchangeName,"error",null,body.getBytes());//1,2都收到
    //channel.basicPublish(exchangeName,"warning",null,body.getBytes());//2收到
    

    2、消费者1,2

    修改内容
    String queue1Name = "test_direct_queue1";
    String queue2Name = "test_direct_queue2";
    

     

     Routing 模式要求队列在绑定交换机时要指定 routing key,消息会转发到符合 routing key 的队列。

    七、Topics 通配符模式 

    Topic 类型与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型 Exchange 可以让队列在绑定 Routing key 的时候使用 通配符
    •  Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
    • 通配符规则:# 匹配一个或多个词,* 匹配不多不少恰好1个词,例如:item.# 能够匹配 item.insert.abc 或者 item.insert 而 item.* 只能匹配 item.insert
    图解:
    • 红色 Queue:绑定的是 usa.# ,因此凡是以 usa. 开头的 routing key 都会被匹配到
    • 黄色 Queue:绑定的是 #.news ,因此凡是以 .news 结尾的 routing key 都会被匹配

    1、生产者

    修改内容:
    exchangeName = "test_topic" 
    BuiltinExchangeType.TOPIC 
    String queue1Name = "test_topic_queue1"; 
    String queue2Name = "test_topic_queue2";
    //队列1绑定 error
    channel.queueBind(queue1Name,exchangeName,"*.orange.*");
    //队列2绑定 info error warning
    channel.queueBind(queue2Name,exchangeName,"*.*.rabbite");
    channel.queueBind(queue2Name,exchangeName,"lazy.#");
    //8. 发送消息
    channel.basicPublish(exchangeName,"lazy.orange.ra",null,body.getBytes());//1,2都有
    //channel.basicPublish(exchangeName,"lazy.orange",null,body.getBytes());//1,2都没有
    

    2、消费者

     修改内容:

    String queue1Name = "test_topic_queue1";
    String queue2Name = "test_topic_queue2";

     

     

    小结

    Topic 主题模式可以实现 Pub/Sub 发布与订阅模式和 Routing 路由模式的功能,只是 Topic 在配置routing key 的时候可以使用通配符,显得更加灵活。

    八、工作模式总结  

    1. 简单模式 HelloWorld

    一个生产者、一个消费者,不需要设置交换机(使用默认的交换机)。

    2. 工作队列模式 Work Queue

    一个生产者、多个消费者(竞争关系),不需要设置交换机(使用默认的交换机)。

    3. 发布订阅模式 Publish/subscribe

    需要设置类型为 fanout 的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列。

    4. 路由模式 Routing

    需要设置类型为 direct 的交换机交换机和队列进行绑定,并且指定 routing key,当发送消息到交换机后,交换机会根据 routing key 将消息发送到对应的队列。

    5. 通配符模式 Topic

    需要设置类型为 topic 的交换机,交换机和队列进行绑定,并且指定通配符方式的 routing key,当发送消息到交换机后,交换机会根据 routing key 将消息发送到对应的队列。
  • 相关阅读:
    【网站架构】服务器弹性伸缩不能全自动,实际如何追加服务器
    Spring Framework IoC依赖注入-按Bean类型注入
    Cholesterol-PEG-Maleimide,CLS-PEG-MAL,胆固醇-聚乙二醇-马来酰亚胺供应
    linux中git暂存,提交,上传到github
    高级性能测试系列《24. 通过jdbc执行sql脚本》
    基于springboot+vue的大学生智能消费记账系统
    Java开发者的Golang进修指南:从0->1带你实现协程池
    Flink Log4j 2.x使用Filter过滤日志类型
    如何在导入的数据库查找api接口
    ubuntu 系统升级问题
  • 原文地址:https://blog.csdn.net/m0_62639288/article/details/132834386