• 分布式消息队列:RabbitMQ(1)


    目录

    一:中间件

    二:分布式消息队列 

    2.1:是消息队列

    2.1.1:消息队列的优势

    2.1.1.1:异步处理化

    2.1.1.2:削峰填谷

    2.2:分布式消息队列

    2.2.1:分布式消息队列的优势

    2.2.1.1:数据的持久化

    2.2.1.2:可扩展性

    2.2.1.3:应用解耦

    2.2.1.4:发送订阅 

    2.2.2:分布式消息队列的应用场景 

    三:Rabbitmq

    3.1:基本概念

    3.2:快速入门 

    3.2.1:引入消息队列Java客户端

    3.2.2:单消费开发生产者和消费者

     3.2.3:多消费开发生产者和消费者

    3.3.3:交换机

    3.3.3.1:交换机的类别

    a):fanout


    一:中间件

    连接多个系统,帮助多个系统紧密协作的技术(组件)

    二:分布式消息队列 

    2.1:是消息队列

    概念:存储消息的队列

    关键词:存储,消息,队列

    存储:存储数据

    消息:某种数据结构,比如l字符串,对象,二进制数据,json等

    队列:先进先出的数据结构

    作用:在不同的系统下,应用之间实现消息的传输,不需要考虑传输应用的编程语言,系统和,框架等等,实现应用解耦的作用。

            eg:可以让Java开发的应用发消息,让php开发的应用收消息。

     针对生产者来说:不需要关心消费者什么时候接受消息,什么时候消费,我只需要把我的工作完成就好了。生产者和消费者之间实现了解耦。

    针对上图,同样我们会发现,当小李要别的书籍的时候,小王也可以将别的书籍放到消息队列中。生产者和消费者从某一种程度上实现了解耦合。

    2.1.1:消息队列的优势

    2.1.1.1:异步处理

    生产者发送消息之后,可以继续去忙别的,消费者什么时候消费都可以,不产生阻塞。

    2.1.1.2:削峰填谷

    先把用户的请求放到消息队列种,消费者(实际执行操作的应用)可以按照自己的需求,慢慢去取。

    举个栗子:

    原本:

    12点时来了10万个请求,原本情况下,10万个请求都在系统内部立刻处理,很快系统压力过大宕机。

    现在:

    把10万个请求放到消息队列中,处理系统以自己的恒定速率(比如每秒1个)慢慢执行,稳定处理。

    2.2:分布式消息队列

    2.2.1:分布式消息队列的优势

    分布式消息队列继承于消息队列的优势,并进行了一部分的拓展。

    2.2.1.1:数据的持久化

    把消息集中存储在硬盘当中,服务器重启就不会丢失。

    2.2.1.2:可扩展性

    可以根据需求,随时增加(或减少)节点,继续保持稳定的服务。

    2.2.1.3:应用解耦

    可以连接不同语言(Java,PHP),框架开发的系统,让这些系统读取数据。

    示例:

    以前的项目:

    加了分布式消息队列之后的项目: 

    1:一个系统挂了,不影响另一个系统。

    2:系统挂了之后并恢复,仍然可以从消息队列中取消息

    3:只要发送消息到队列,就可以立即进行返回,不用同步调用所有系统,性能更高

    2.2.1.4:发送订阅 

    假设情景:当QQ进行了一部分改革之后,其他使用QQ的APP也应该处理
    这部分改革。
    QQ做了一个情景,要让其他系统知道,比如公告消息。如果QQ一次性给这些应用发消息,所引出的问题如下:
    1.每次发通知都要调用很多系统,很麻烦,很可能失败
    2.不知道哪个系统需要这些QQ的改革。
    解决方案:大的核心系统始终往消息队列发消息,其他的系统都去订阅这个消息队列的消息,用的时候进行取就OK。

    2.2.2:分布式消息队列的应用场景 

    1:耗时场景。

    2:高并发场景。

    3:分布式系统的协作。(跨团队,跨业务合作,应用解耦)

    4:强稳定的场景(金融业务,持久化,可靠性,削锋填谷)  

    三:Rabbitmq

    特点:生态好,易学习,易于理解,时效性强,支持不同语言的客户端,扩展性,可用性都很不错。

    3.1:基本概念

    AMPQ协议:Rabbitmq是遵循AMPQ协议的一种消息中间件。

    生产者:发消息到交换机

    消费者:收消息的,从某个队列中取消息

    交换机(exchange):负责把消息转发到对应的队列

    队列(Queue):存储消息的

    路由(Rountes):转发,怎么把一个消息从一个地方转发到另一个地方(比如生产者转发到某个队列)

    Rabbitmq:端口占用   5672:程序连接的端口 15672:管理界面端口

    Rabbitmq的安装:https://blog.csdn.net/qq_25919879/article/details/113055350

    管理器页面打不开:http://t.csdnimg.cn/6FqZl

    3.2:快速入门 

    3.2.1:引入消息队列Java客户端

    1. <dependency>
    2. <groupId>com.rabbitmqgroupId>
    3. <artifactId>amqp-clientartifactId>
    4. <version>5.17.0version>
    5. dependency>

    3.2.2:单消费开发生产者和消费者

    生产者端代码:

    1. public class SingeProducer {
    2. private final static String QUEUE_NAME = "hello";
    3. public static void main(String[] argv) throws Exception {
    4. //创建连接工厂
    5. ConnectionFactory factory = new ConnectionFactory();
    6. factory.setHost("localhost");
    7. try (Connection connection = factory.newConnection();
    8. //频道相当于客户端(jdbcClient,redisClient),提供了和消队列server建立通信,程序通过channel进行发送消息
    9. Channel channel = connection.createChannel()) {
    10. //创建消息队列,第二个参数(durable):是否开启持久化,第三个参数exclusiove:是否允许当前这个创建消息队列的
    11. //连接操作消息队列 第四个参数:没有人使用队列,是否需要删除
    12. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    13. //发送消息
    14. String message = "Hello World!";
    15. channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
    16. System.out.println(" [x] Sent '" + message + "'");
    17. }
    18. }
    19. }

    消费者代码:

    1. public class SingeConsumer {
    2. private final static String QUEUE_NAME = "hello";
    3. public static void main(String[] argv) throws Exception {
    4. //创建连接工厂
    5. ConnectionFactory factory = new ConnectionFactory();
    6. factory.setHost("localhost");
    7. //创建频道,提供通信
    8. Connection connection = factory.newConnection();
    9. Channel channel = connection.createChannel();
    10. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    11. System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    12. //如何处理消息
    13. DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    14. String message = new String(delivery.getBody(), "UTF-8");
    15. System.out.println(" [x] Received '" + message + "'");
    16. };
    17. channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
    18. }
    19. }

     3.2.3:多消费开发生产者和消费者

    场景:一个生产者给队列里面发了一条消息,多个消费者进行消费。适用于多个机器同时去接收并处理任务(每个机器处理任务有限)

    队列持久化:

    durable:

    参数设置为true,服务器队列不丢失

     channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
    

     消息持久化:

     指定MessageProperties.PERSISTENY_TEXT_PLAIN参数

    1. channel.basicPublish("", TASK_QUEUE_NAME,
    2. MessageProperties.PERSISTENT_TEXT_PLAIN,
    3. message.getBytes("UTF-8"));

    生产者端代码:

    1. public class MultiProducer {
    2. private static final String TASK_QUEUE_NAME = "multi_queue";
    3. public static void main(String[] argv) throws Exception {
    4. ConnectionFactory factory = new ConnectionFactory();
    5. factory.setHost("localhost");
    6. try (Connection connection = factory.newConnection();
    7. Channel channel = connection.createChannel()) {
    8. channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
    9. Scanner scanner=new Scanner(System.in);
    10. while(scanner.hasNext()){
    11. String message = scanner.nextLine();
    12. channel.basicPublish("", TASK_QUEUE_NAME,
    13. MessageProperties.PERSISTENT_TEXT_PLAIN,
    14. message.getBytes("UTF-8"));
    15. System.out.println(" [x] Sent '" + message + "'");
    16. }
    17. }
    18. }
    19. }

    消费者代码: 

    在消费者代码中,如何测验一个消费者只能取一个任务,我们利用for循环来进行解决。

    指定确认某条消息:

    第一个参数:获取消息的信息

    第二个参数:如果是true,把所有的历史消息全都确认了。如果为false,取出当前的消息。

    1. //第二个参数:是否一次性取所有的消息。如果为true,则要取所有的挤压在消息队列中的消息
    2. //如果为false,则为一次性取一个消息
    3. channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

    指定拒绝某条消息

    第一个参数:获取消息的信息

    第二个参数:如果是true,则代表是否要拒绝所有的历史消息。

    第三个参数:如果是false, 则代表失败的任务是否要重新入队。

      channel.basicNack(delivery.getEnvelope().getDeliveryTag(),false,false);
    1. public class MultiConsumer {
    2. private static final String TASK_QUEUE_NAME = "multi_queue";
    3. public static void main(String[] argv) throws Exception {
    4. //建立连接
    5. ConnectionFactory factory = new ConnectionFactory();
    6. factory.setHost("localhost");
    7. final Connection connection = factory.newConnection();
    8. for (int i = 0; i <= 2; i++) {
    9. final Channel channel = connection.createChannel();
    10. int finalI=i;
    11. //声明队列
    12. channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
    13. System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    14. //控制单个消费者的任务积压数:每个消费者最多处理一个任务,每个消费者智能处理一个任务
    15. channel.basicQos(1);
    16. //处理从队列中取的的消息
    17. DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    18. String message = new String(delivery.getBody(), "UTF-8");
    19. try {
    20. //处理工作
    21. System.out.println(" [x] Received '" +"编号:"+finalI+ message + "'");
    22. //停20秒模拟一个机器处理工作能力有限
    23. Thread.sleep(20000);
    24. //第二个参数:是否一次性取所有的消息。如果为true,则要取所有的挤压在消息队列中的消息
    25. //如果为false,则为一次性取一个消息
    26. channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    27. } catch (InterruptedException e) {
    28. throw new RuntimeException(e);
    29. } finally {
    30. System.out.println(" [x] Done");
    31. channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    32. }
    33. };
    34. //开启消费监听
    35. channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { });
    36. }
    37. }
    38. }

    3.3.3:交换机

    一个生产者给多个队列发消息,一个生产者对多个队列。交换机:转发功能,怎么把消息转发到不同的队列上。

    3.3.3.1:交换机的类别
    a):fanout

    场景:很适用于发布订阅的场景。

    特点:消息会被转发到所有绑定到交换机的队列。

    生产者代码:当生产者发送消息后,由交换机放到消息队列中,消费者从消息队列中取。

    1. public class FonoutProducer {
    2. private static final String EXCHANGE_NAME = "1";
    3. public static void main(String[] argv) throws Exception {
    4. ConnectionFactory factory = new ConnectionFactory();
    5. factory.setHost("localhost");
    6. try (Connection connection = factory.newConnection();
    7. Channel channel = connection.createChannel()) {
    8. //创建交换机
    9. channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    10. Scanner scanner=new Scanner(System.in);
    11. while(scanner.hasNext()){
    12. String message = scanner.nextLine();
    13. channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
    14. System.out.println(" [x] Sent '" + message + "'");
    15. }
    16. }
    17. }
    18. }

    消费者代码:

    1. public class FonoutConsumer {
    2. private static final String EXCHANGE_NAME = "1";
    3. public static void main(String[] argv) throws Exception {
    4. ConnectionFactory factory = new ConnectionFactory();
    5. factory.setHost("localhost");
    6. Connection connection = factory.newConnection();
    7. Channel channel = connection.createChannel();
    8. Channel channel2= connection.createChannel();
    9. //声明交换机
    10. //创建队列,随机分配一个队列名称
    11. channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    12. String queueName="xiaowang";
    13. channel.queueDeclare(queueName,true,false,false,null);
    14. channel.queueBind(queueName, EXCHANGE_NAME, "");
    15. channel2.exchangeDeclare(EXCHANGE_NAME, "fanout");
    16. String queueName2="xiaoli";
    17. channel2.queueDeclare(queueName2,true,false,false,null);
    18. channel2.queueBind(queueName2,EXCHANGE_NAME,"");
    19. System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    20. DeliverCallback deliverCallback1 = (consumerTag, delivery) -> {
    21. String message = new String(delivery.getBody(), "UTF-8");
    22. System.out.println(" [小王] Received '" + message + "'");
    23. };
    24. DeliverCallback deliverCallback2 = (consumerTag, delivery) -> {
    25. String message = new String(delivery.getBody(), "UTF-8");
    26. System.out.println(" [小李] Received '" + message + "'");
    27. };
    28. channel.basicConsume(queueName, true, deliverCallback1, consumerTag -> { });
    29. channel.basicConsume(queueName2, true, deliverCallback2, consumerTag -> { });
    30. }
    31. }

    运行结果:

  • 相关阅读:
    【MySQL】SQL语句
    蓝桥杯每日一题2023.11.16
    奇迹mu 架设过程中可能会出现的问题及解决办法
    期权翻倍行情一个月会出现几次?
    代码随想录Day16 LeetCode T654 最大二叉树 T617 合并二叉树 T700 二叉搜索树中的搜索
    分类算法——ROC曲线与AUC指标(九)
    算法刷题 week4
    基于Java swing+mysql+eclipse的【水电费管理系统】
    《省级国土空间规划编制技术规程》国家标准(GB/T 43214-2023)原文下载
    LeetCode --- 1266. Minimum Time Visiting All Points 解题报告
  • 原文地址:https://blog.csdn.net/weixin_65492194/article/details/134063774