• RabbitMQ初步到精通-第四章-RabbitMQ工作模式-SIMPLE


    RabbitMQ工作模式-SIMPLE模式

    1.模式介绍

    这是最简单的一个模式了,一般在实际的生产环境中,大家应该都不会使用一个消费者。只做入门的介绍。

    一个生产者,一个默认的交换机【图中没体现】,一个队列,一个消费者。

    生产者产生消费发送至交换机,交换机路由至队列,队列再投递给消费者进行消费。

     在进行代码开发前,我们先用模拟器进行模拟看下:

    消息是直接发到了默认的Exchange,【Exchange不存储消息,只路由】,路由到对应的MQ中,接着进行了消费。

    2.模式相关代码

    我们还采用小明洗澡的方式,进行模拟。水龙头模拟生产者,热水器模拟mq,小明洗澡模拟消费者。

    2.1 生产者

    首次执行代码的时候,我们会发现缺失对应的Queue,可以采用两种方式进行创建。

    1.去mq的控制台,手动创建queue

    2.执行queue创建的代码,这里我们放到了消费者-采用此方式可以先执行消费者

    1. /**
    2. * @author rabbit
    3. * @version 1.0.0
    4. * @Description MQ 简单模式:一个生产者,一个默认的交换机,一个队列,一个消费者
    5. * 1.创建生产者,创建一个channel,发送消息到默认的exchange
    6. * 2.打开控制台观察
    7. * 3.打开WireShark观察
    8. * 4.启动消费者
    9. * 5.打开控制台观察
    10. * 6.打开WireShark观察
    11. * @createTime 2022/07/27 19:34:00
    12. */
    13. public class WaterProducer {
    14. public static final String QUEUE_NAME = "SolarWaterHeater";
    15. //生产者
    16. public static void main(String[] args) throws Exception {
    17. //1、获取connection
    18. Connection connection = RabbitCommonConfig.getConnection();
    19. //2、创建channel
    20. Channel channel = connection.createChannel();
    21. for (int i = 1; i <= 1; i++) {
    22. sendMsg(channel, i);
    23. Thread.sleep(1000);
    24. }
    25. //4、关闭管道和连接
    26. channel.close();
    27. connection.close();
    28. }
    29. private static void sendMsg(Channel channel, int k) throws IOException {
    30. //3、发送消息到exchange
    31. String msg = k + "升";
    32. /**
    33. * 参数1:指定exchange,使用“”。默认的exchange
    34. * 参数2:指定路由的规则,使用具体的队列名称。exchange为""时,消息直接发送到队列中
    35. * 参数3:指定传递的消息携带的properties
    36. * 参数4:指定传递的消息,byte[]类型
    37. */
    38. channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
    39. System.out.println("水龙头放水成功!" + k + "升");
    40. }
    41. }

    2.2 消费者

    1. /**
    2. * @author rabbit
    3. * @version 1.0.0
    4. * @Description MQ 简单模式:一个生产者,一个默认的交换机,一个队列,一个消费者
    5. * 创建一个消费者,创建一个channel,创建一个队列
    6. * @createTime 2022/07/27 19:36:00
    7. */
    8. public class XMShowerConsumer {
    9. public static final String QUEUE_NAME = "SolarWaterHeater";
    10. //消费者
    11. public static void main(String[] args) throws Exception {
    12. //1、获取连对象、
    13. Connection connection = RabbitCommonConfig.getConnection();
    14. //2、创建channel
    15. Channel channel = connection.createChannel();
    16. //3、创建队列
    17. /**
    18. * 参数1:queue 指定队列名称
    19. * 参数2:durable 是否开启持久化(true)
    20. * 参数3:exclusive 是否排外(当前队列只能被一个消费者消费)
    21. * 参数4:autoDelete 如果这个队列没有其他消费者在消费,队列自动删除
    22. * 参数5:arguments 指定队列携带的信息
    23. */
    24. channel.queueDeclare(QUEUE_NAME, true, false, false, null);
    25. //4.开启监听Queue
    26. DefaultConsumer consumer = new DefaultConsumer(channel) {
    27. @Override
    28. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    29. try {
    30. Thread.sleep(1000);
    31. } catch (InterruptedException e) {
    32. e.printStackTrace();
    33. }
    34. System.out.println("小明洗澡用水: " + new String(body, "UTF-8"));
    35. }
    36. };
    37. /**
    38. * 参数1:queue 指定消费哪个队列
    39. * 参数1:deliverCallback 指定是否ACK(true:收到消息会立即告诉RabbiMQ,false:手动告诉)
    40. * 参数1:cancelCallback 指定消费回调
    41. */
    42. channel.basicConsume(QUEUE_NAME, true, consumer);
    43. System.out.println("小明开始洗澡......");
    44. //5、键盘录入,让程序不结束!
    45. System.in.read();
    46. //6、释放资源
    47. channel.close();
    48. connection.close();
    49. }
    50. }

    2.3 执行结果

    生产者:

    水龙头放水成功!1

    mq控制台:

     消费者:

    1. 小明开始洗澡......
    2. 小明洗澡用水: 1

    3.抓包分析

    3.1 生产者:

    我们抓取所有关于5672端口的包,5672即mq的服务端端口,56639为本机端口。

    1、首先看建立连接的部分。

    前三条TCP连接有没有很熟悉,56639希望与5672建立连接,三次握手的形式建立好连接,为后续的AMQP协议的传输做好准备

    2. 发送数据包部分

    第四条为56639 发送给5672 一个数据,协议头 是 0-9-1 这就是我们的AMQP的版本。要准备AMQP交互了

     3. AMQP协议交互

    我们将TCP的包过滤掉,只看AMQP的,这里就完整呈现了,mq消息发送的整个过程。

    Connection 创建、打开通道Tunne、打开虚机、打开信道Channel、发送消息、信道关闭、连接关闭。一目了然。

    3.2 消费者

    TCP连接不再赘述,看下消费的AMQP协议传输

    基本与生产者一致,区别在于,我们在消费者中有创建Queue的操作,数据包体现在-Queue.Declare.

    另外 消费者是需要消费的 Basic.Consume 即把消费者注册到了队列上。

    Basic.Consume-OK 将消息推送过来,进行了消费。

    我们看到Basic.Consume-OK 消息内容中有 x 即 exchange 。默认为空。

    还有rk 即 routing key 路由键- 默认的为队列的名称

  • 相关阅读:
    c#中使用Task.WhenAll
    stm32——hal库学习笔记(外部中断)
    【开发工具的那些故事】Git跨代码仓库合并代码
    学习笔记-FRIDA脚本系列(四)
    Acwing 829. 模拟队列
    如何学习java
    WTM 增加IOT 大屏展示界面页面
    一文深入springboot,springboot的实战实践文档(PDF)
    百日刷题计划 ———— DAY2
    东芝发布智能栅极驱动光电耦合器,简化功率器件外围电路的设计
  • 原文地址:https://blog.csdn.net/blucastle/article/details/127923690