目录
连接多个系统,帮助多个系统紧密协作的技术(组件)

概念:存储消息的队列
关键词:存储,消息,队列
存储:存储数据
消息:某种数据结构,比如l字符串,对象,二进制数据,json等
队列:先进先出的数据结构
作用:在不同的系统下,应用之间实现消息的传输,不需要考虑传输应用的编程语言,系统和,框架等等,实现应用解耦的作用。
eg:可以让Java开发的应用发消息,让php开发的应用收消息。
针对生产者来说:不需要关心消费者什么时候接受消息,什么时候消费,我只需要把我的工作完成就好了。生产者和消费者之间实现了解耦。

针对上图,同样我们会发现,当小李要别的书籍的时候,小王也可以将别的书籍放到消息队列中。生产者和消费者从某一种程度上实现了解耦合。
生产者发送消息之后,可以继续去忙别的,消费者什么时候消费都可以,不产生阻塞。
先把用户的请求放到消息队列种,消费者(实际执行操作的应用)可以按照自己的需求,慢慢去取。
举个栗子:
原本:
12点时来了10万个请求,原本情况下,10万个请求都在系统内部立刻处理,很快系统压力过大宕机。
现在:
把10万个请求放到消息队列中,处理系统以自己的恒定速率(比如每秒1个)慢慢执行,稳定处理。
分布式消息队列继承于消息队列的优势,并进行了一部分的拓展。
把消息集中存储在硬盘当中,服务器重启就不会丢失。
可以根据需求,随时增加(或减少)节点,继续保持稳定的服务。
可以连接不同语言(Java,PHP),框架开发的系统,让这些系统读取数据。
示例:
以前的项目:

加了分布式消息队列之后的项目:
1:一个系统挂了,不影响另一个系统。
2:系统挂了之后并恢复,仍然可以从消息队列中取消息
3:只要发送消息到队列,就可以立即进行返回,不用同步调用所有系统,性能更高

假设情景:当QQ进行了一部分改革之后,其他使用QQ的APP也应该处理
这部分改革。
QQ做了一个情景,要让其他系统知道,比如公告消息。如果QQ一次性给这些应用发消息,所引出的问题如下:
1.每次发通知都要调用很多系统,很麻烦,很可能失败
2.不知道哪个系统需要这些QQ的改革。
解决方案:大的核心系统始终往消息队列发消息,其他的系统都去订阅这个消息队列的消息,用的时候进行取就OK。

1:耗时场景。
2:高并发场景。
3:分布式系统的协作。(跨团队,跨业务合作,应用解耦)
4:强稳定的场景(金融业务,持久化,可靠性,削锋填谷)
特点:生态好,易学习,易于理解,时效性强,支持不同语言的客户端,扩展性,可用性都很不错。
AMPQ协议:Rabbitmq是遵循AMPQ协议的一种消息中间件。
生产者:发消息到交换机
消费者:收消息的,从某个队列中取消息
交换机(exchange):负责把消息转发到对应的队列
队列(Queue):存储消息的
路由(Rountes):转发,怎么把一个消息从一个地方转发到另一个地方(比如生产者转发到某个队列)
Rabbitmq:端口占用 5672:程序连接的端口 15672:管理界面端口

Rabbitmq的安装:https://blog.csdn.net/qq_25919879/article/details/113055350
管理器页面打不开:http://t.csdnimg.cn/6FqZl

- <dependency>
- <groupId>com.rabbitmqgroupId>
- <artifactId>amqp-clientartifactId>
- <version>5.17.0version>
- dependency>
生产者端代码:
- public class SingeProducer {
-
- private final static String QUEUE_NAME = "hello";
-
- public static void main(String[] argv) throws Exception {
- //创建连接工厂
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- try (Connection connection = factory.newConnection();
- //频道相当于客户端(jdbcClient,redisClient),提供了和消队列server建立通信,程序通过channel进行发送消息
- Channel channel = connection.createChannel()) {
- //创建消息队列,第二个参数(durable):是否开启持久化,第三个参数exclusiove:是否允许当前这个创建消息队列的
- //连接操作消息队列 第四个参数:没有人使用队列,是否需要删除
- channel.queueDeclare(QUEUE_NAME, false, false, false, null);
- //发送消息
- String message = "Hello World!";
- channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
- System.out.println(" [x] Sent '" + message + "'");
- }
- }
- }
消费者代码:
- public class SingeConsumer {
-
- private final static String QUEUE_NAME = "hello";
-
- public static void main(String[] argv) throws Exception {
- //创建连接工厂
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- //创建频道,提供通信
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
-
- channel.queueDeclare(QUEUE_NAME, false, false, false, null);
- System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
- //如何处理消息
- DeliverCallback deliverCallback = (consumerTag, delivery) -> {
- String message = new String(delivery.getBody(), "UTF-8");
- System.out.println(" [x] Received '" + message + "'");
- };
- channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
- }
- }
场景:一个生产者给队列里面发了一条消息,多个消费者进行消费。适用于多个机器同时去接收并处理任务(每个机器处理任务有限)

队列持久化:
durable:
参数设置为true,服务器队列不丢失
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
消息持久化:
指定MessageProperties.PERSISTENY_TEXT_PLAIN参数
- channel.basicPublish("", TASK_QUEUE_NAME,
- MessageProperties.PERSISTENT_TEXT_PLAIN,
- message.getBytes("UTF-8"));
生产者端代码:
- public class MultiProducer {
- private static final String TASK_QUEUE_NAME = "multi_queue";
-
- public static void main(String[] argv) throws Exception {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- try (Connection connection = factory.newConnection();
- Channel channel = connection.createChannel()) {
- channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
- Scanner scanner=new Scanner(System.in);
- while(scanner.hasNext()){
- String message = scanner.nextLine();
- channel.basicPublish("", TASK_QUEUE_NAME,
- MessageProperties.PERSISTENT_TEXT_PLAIN,
- message.getBytes("UTF-8"));
- System.out.println(" [x] Sent '" + message + "'");
- }
-
- }
- }
- }

![]()
消费者代码:
在消费者代码中,如何测验一个消费者只能取一个任务,我们利用for循环来进行解决。
指定确认某条消息:
第一个参数:获取消息的信息
第二个参数:如果是true,把所有的历史消息全都确认了。如果为false,取出当前的消息。
- //第二个参数:是否一次性取所有的消息。如果为true,则要取所有的挤压在消息队列中的消息
- //如果为false,则为一次性取一个消息
- channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
指定拒绝某条消息
第一个参数:获取消息的信息
第二个参数:如果是true,则代表是否要拒绝所有的历史消息。
第三个参数:如果是false, 则代表失败的任务是否要重新入队。
channel.basicNack(delivery.getEnvelope().getDeliveryTag(),false,false);
- public class MultiConsumer {
- private static final String TASK_QUEUE_NAME = "multi_queue";
-
- public static void main(String[] argv) throws Exception {
- //建立连接
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- final Connection connection = factory.newConnection();
- for (int i = 0; i <= 2; i++) {
- final Channel channel = connection.createChannel();
- int finalI=i;
- //声明队列
- channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
- System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
- //控制单个消费者的任务积压数:每个消费者最多处理一个任务,每个消费者智能处理一个任务
- channel.basicQos(1);
- //处理从队列中取的的消息
- DeliverCallback deliverCallback = (consumerTag, delivery) -> {
- String message = new String(delivery.getBody(), "UTF-8");
- try {
- //处理工作
- System.out.println(" [x] Received '" +"编号:"+finalI+ message + "'");
-
- //停20秒模拟一个机器处理工作能力有限
- Thread.sleep(20000);
- //第二个参数:是否一次性取所有的消息。如果为true,则要取所有的挤压在消息队列中的消息
- //如果为false,则为一次性取一个消息
- channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
-
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- } finally {
- System.out.println(" [x] Done");
- channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
- }
- };
- //开启消费监听
- channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { });
-
- }
-
- }
-
- }
一个生产者给多个队列发消息,一个生产者对多个队列。交换机:转发功能,怎么把消息转发到不同的队列上。

场景:很适用于发布订阅的场景。
特点:消息会被转发到所有绑定到交换机的队列。
生产者代码:当生产者发送消息后,由交换机放到消息队列中,消费者从消息队列中取。
- public class FonoutProducer {
-
- private static final String EXCHANGE_NAME = "1";
-
- public static void main(String[] argv) throws Exception {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- try (Connection connection = factory.newConnection();
- Channel channel = connection.createChannel()) {
- //创建交换机
- channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
- Scanner scanner=new Scanner(System.in);
- while(scanner.hasNext()){
- String message = scanner.nextLine();
- channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
- System.out.println(" [x] Sent '" + message + "'");
- }
-
- }
- }
-
- }
消费者代码:
- public class FonoutConsumer {
-
-
- private static final String EXCHANGE_NAME = "1";
- public static void main(String[] argv) throws Exception {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- Channel channel2= connection.createChannel();
- //声明交换机
- //创建队列,随机分配一个队列名称
- channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
- String queueName="xiaowang";
- channel.queueDeclare(queueName,true,false,false,null);
- channel.queueBind(queueName, EXCHANGE_NAME, "");
-
- channel2.exchangeDeclare(EXCHANGE_NAME, "fanout");
- String queueName2="xiaoli";
- channel2.queueDeclare(queueName2,true,false,false,null);
- channel2.queueBind(queueName2,EXCHANGE_NAME,"");
-
- System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
-
- DeliverCallback deliverCallback1 = (consumerTag, delivery) -> {
- String message = new String(delivery.getBody(), "UTF-8");
- System.out.println(" [小王] Received '" + message + "'");
- };
- DeliverCallback deliverCallback2 = (consumerTag, delivery) -> {
- String message = new String(delivery.getBody(), "UTF-8");
- System.out.println(" [小李] Received '" + message + "'");
- };
- channel.basicConsume(queueName, true, deliverCallback1, consumerTag -> { });
- channel.basicConsume(queueName2, true, deliverCallback2, consumerTag -> { });
- }
- }
-
运行结果:

