这是最简单的一个模式了,一般在实际的生产环境中,大家应该都不会使用一个消费者。只做入门的介绍。
一个生产者,一个默认的交换机【图中没体现】,一个队列,一个消费者。
生产者产生消费发送至交换机,交换机路由至队列,队列再投递给消费者进行消费。

在进行代码开发前,我们先用模拟器进行模拟看下:
消息是直接发到了默认的Exchange,【Exchange不存储消息,只路由】,路由到对应的MQ中,接着进行了消费。

我们还采用小明洗澡的方式,进行模拟。水龙头模拟生产者,热水器模拟mq,小明洗澡模拟消费者。
首次执行代码的时候,我们会发现缺失对应的Queue,可以采用两种方式进行创建。
1.去mq的控制台,手动创建queue
2.执行queue创建的代码,这里我们放到了消费者-采用此方式可以先执行消费者
-
- /**
- * @author rabbit
- * @version 1.0.0
- * @Description MQ 简单模式:一个生产者,一个默认的交换机,一个队列,一个消费者
- * 1.创建生产者,创建一个channel,发送消息到默认的exchange
- * 2.打开控制台观察
- * 3.打开WireShark观察
- * 4.启动消费者
- * 5.打开控制台观察
- * 6.打开WireShark观察
- * @createTime 2022/07/27 19:34:00
- */
- public class WaterProducer {
- public static final String QUEUE_NAME = "SolarWaterHeater";
-
- //生产者
- public static void main(String[] args) throws Exception {
- //1、获取connection
- Connection connection = RabbitCommonConfig.getConnection();
- //2、创建channel
- Channel channel = connection.createChannel();
-
- for (int i = 1; i <= 1; i++) {
- sendMsg(channel, i);
- Thread.sleep(1000);
- }
- //4、关闭管道和连接
- channel.close();
- connection.close();
- }
-
- private static void sendMsg(Channel channel, int k) throws IOException {
- //3、发送消息到exchange
- String msg = k + "升";
- /**
- * 参数1:指定exchange,使用“”。默认的exchange
- * 参数2:指定路由的规则,使用具体的队列名称。exchange为""时,消息直接发送到队列中
- * 参数3:指定传递的消息携带的properties
- * 参数4:指定传递的消息,byte[]类型
- */
- channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
- System.out.println("水龙头放水成功!" + k + "升");
- }
-
- }
-
- /**
- * @author rabbit
- * @version 1.0.0
- * @Description MQ 简单模式:一个生产者,一个默认的交换机,一个队列,一个消费者
- * 创建一个消费者,创建一个channel,创建一个队列
- * @createTime 2022/07/27 19:36:00
- */
- public class XMShowerConsumer {
-
- public static final String QUEUE_NAME = "SolarWaterHeater";
-
- //消费者
- public static void main(String[] args) throws Exception {
- //1、获取连对象、
- Connection connection = RabbitCommonConfig.getConnection();
- //2、创建channel
- Channel channel = connection.createChannel();
- //3、创建队列
- /**
- * 参数1:queue 指定队列名称
- * 参数2:durable 是否开启持久化(true)
- * 参数3:exclusive 是否排外(当前队列只能被一个消费者消费)
- * 参数4:autoDelete 如果这个队列没有其他消费者在消费,队列自动删除
- * 参数5:arguments 指定队列携带的信息
- */
- channel.queueDeclare(QUEUE_NAME, true, false, false, null);
-
- //4.开启监听Queue
- DefaultConsumer consumer = new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("小明洗澡用水: " + new String(body, "UTF-8"));
- }
- };
- /**
- * 参数1:queue 指定消费哪个队列
- * 参数1:deliverCallback 指定是否ACK(true:收到消息会立即告诉RabbiMQ,false:手动告诉)
- * 参数1:cancelCallback 指定消费回调
- */
- channel.basicConsume(QUEUE_NAME, true, consumer);
- System.out.println("小明开始洗澡......");
-
- //5、键盘录入,让程序不结束!
- System.in.read();
-
- //6、释放资源
- channel.close();
- connection.close();
- }
-
- }
生产者:
水龙头放水成功!1升
mq控制台:

消费者:
- 小明开始洗澡......
- 小明洗澡用水: 1升
我们抓取所有关于5672端口的包,5672即mq的服务端端口,56639为本机端口。
1、首先看建立连接的部分。
前三条TCP连接有没有很熟悉,56639希望与5672建立连接,三次握手的形式建立好连接,为后续的AMQP协议的传输做好准备
2. 发送数据包部分
第四条为56639 发送给5672 一个数据,协议头 是 0-9-1 这就是我们的AMQP的版本。要准备AMQP交互了

3. AMQP协议交互
我们将TCP的包过滤掉,只看AMQP的,这里就完整呈现了,mq消息发送的整个过程。
Connection 创建、打开通道Tunne、打开虚机、打开信道Channel、发送消息、信道关闭、连接关闭。一目了然。

TCP连接不再赘述,看下消费的AMQP协议传输
基本与生产者一致,区别在于,我们在消费者中有创建Queue的操作,数据包体现在-Queue.Declare.
另外 消费者是需要消费的 Basic.Consume 即把消费者注册到了队列上。
Basic.Consume-OK 将消息推送过来,进行了消费。
我们看到Basic.Consume-OK 消息内容中有 x 即 exchange 。默认为空。
还有rk 即 routing key 路由键- 默认的为队列的名称
