• 3. Exchange 交换机的使用


    二八佳人体似酥,腰间仗剑斩愚夫。虽然不见人头落,暗里教君骨髓枯。

    在上一节中,我们创建了一个工作队列。我们假设的是工作队列背后,每个任务都恰好交付给一个消
    费者(工作进程)。在这一部分中,我们将做一些完全不同的事情-我们将消息传达给多个消费者。这种模式 称为 ”发布/订阅”

    为了说明这种模式,我们将构建一个简单的日志系统。它将由两个程序组成:第一个程序将发出日志消 息,第二个程序是消费者。其中我们会启动三个消费者,其中一个消费者接收到消息后把日志存储在磁盘,另外一个消费者接收到消息后把消息打印在屏幕上,第三个假装存储在数据库中,事实上第一个程序发出的日志消息将广播给所有消费 者者

    Exchange

    概念

    RabbitMQ消息传递模型的核心思想是: 生产者生产的消息从不会直接发送到队列。实际上,通常生产
    者甚至都不知道这些消息传递传递到了哪些队列中。

    相反,生产者只能将消息发送到交换机(exchange),交换机工作的内容非常简单,一方面它接收来
    自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消 息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们。这就的由交换机的类型来决定。

    image.png

    交换机类型

    总共有以下类型:

    直接(direct), 主题(topic) ,标题(headers) , 扇出(fanout)

    无名 exchange

    在本教程的前面部分我们对 exchange 一无所知,但仍然能够将消息发送到队列。之前能实现的
    原因是因为我们使用的是默认交换,我们通过空字符串(“”)进行标识。

                channel.basicPublish("", "YJLA", MessageProperties.PERSISTENT_TEXT_PLAIN, "你好啊,岳泽霖".getBytes(StandardCharsets.UTF_8));
    
    • 1

    第一个参数是交换机的名称。空字符串表示默认或无名称交换机:消息能路由发送到队列中其实 是由 routingKey(bindingkey)绑定 key 指定的,如果它存在的话

    临时队列

    之前的章节我们使用的是具有特定名称的队列(还记得 hello 和 ack_queue 吗?)。
    队列的名称我对们来说至关重要-我们需要指定我们的消费者去消费哪个队列的消息。

    每当我们连接到 Rabbit 时,我们都需要一个全新的空队列,为此我们可以创建一个具有随机名称的队列,或者能让服务器为我们选择一个随机队列名称那就更好了。其次一旦我们断开了消费者的连 接,队列将被自动删除。

    创建临时队列的方式如下:

    String queueName = channel.queueDeclare().getQueue();
    
    • 1

    image.png

    绑定(bindings)

    什么是 bingding 呢,binding 其实是 exchange 和 queue 之间的桥梁,
    它告诉我们 exchange 和那个队列进行了绑定关系。比如说下面这张图告诉我们的就是 X 与 Q1和 Q2 进行了绑定

    image.png

    Fanout 扇出

    Fanout 这种类型非常简单。正如从名称中猜到的那样,它是将接收到的所有消息广播到它知道的
    所有队列中。系统中默认有些 exchange 类型

    image.png

    image.png

    生产者

    routkingKey 没有值

    public class FanoutMessageProducer {
    
        private static String EXCHANGE_NAME = "fanout_logs";
    
        public static void main(String[] args) throws Exception {
            Connection connection = ConnectionFactoryUtil.createConnection();
            Channel channel = connection.createChannel();
            // 创建交换机, 交换机 名称是  logs
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT,
                    true, false, null);
    
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNext()) {
                // routingKey 为 空
                channel.basicPublish(EXCHANGE_NAME, "",
                        null, new String("发布广播消息,内容为:" + scanner.next()).getBytes("UTF-8"));
            }
            channel.close();
            connection.close();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    控制台消费者1

    public class FanoutMessageConsumer1 {
        private static String EXCHANGE_NAME = "fanout_logs";
        private static String QUEUE_NAME = "debug_console";
    
        public static void main(String[] args) throws Exception {
            Connection connection = ConnectionFactoryUtil.createConnection();
            final Channel channel = connection.createChannel();
            //1. 创建交换器
            channel.exchangeDeclare(EXCHANGE_NAME,
                    BuiltinExchangeType.FANOUT, true, false, null);
            //2.  创建队列
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            //7. 绑定队列与 交换机。  其中 routingKey 为 ""
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
    
            // 打印到控制台
            DeliverCallback deliverCallback = (consumerTag, message) -> {
                System.out.println(QUEUE_NAME + ">>> 获取到消息 :" + new String(message.getBody()));
            };
    
            CancelCallback cancelCallback = (consumerTag) -> {
                System.out.println(QUEUE_NAME + ">>>>> 中断了消息接收 " + consumerTag);
            };
            channel.basicConsume(
                    QUEUE_NAME, true, deliverCallback, cancelCallback);
    
            //输入流等待
            System.in.read();
            //关闭
            channel.close();
            connection.close();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    文件消费者2

    public class FanoutMessageConsumer2 {
        private static String EXCHANGE_NAME = "fanout_logs";
        private static String QUEUE_NAME = "debug_file";
    
        public static void main(String[] args) throws Exception {
            Connection connection = ConnectionFactoryUtil.createConnection();
            final Channel channel = connection.createChannel();
            //1. 创建交换器
            channel.exchangeDeclare(EXCHANGE_NAME,
                    BuiltinExchangeType.FANOUT, true, false, null);
            //2.  创建队列
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            //7. 绑定队列与 交换机。  其中 routingKey 为 ""
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
    
            // 打印到控制台
            DeliverCallback deliverCallback = (consumerTag, message) -> {
                String textMessage = new String(message.getBody());
                // 将消息保存到 文件里面,追加形式。
                FileUtil.appendUtf8Lines(Collections.singletonList(textMessage), "D:\\rabbitMq\\debug.log");
    
                System.out.println(">>>> 追加信息到 文件 里面, 内容是:" + textMessage);
            };
    
            CancelCallback cancelCallback = (consumerTag) -> {
                System.out.println(QUEUE_NAME + ">>>>> 中断了消息接收 " + consumerTag);
            };
            channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
            //输入流等待
            System.in.read();
            //关闭
            channel.close();
            connection.close();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    数据库消费者3

    public class FanoutMessageConsumer3 {
        private static String EXCHANGE_NAME = "fanout_logs";
        private static String QUEUE_NAME = "debug_db";
    
        public static void main(String[] args) throws Exception {
            Connection connection = ConnectionFactoryUtil.createConnection();
            final Channel channel = connection.createChannel();
            //1. 创建交换器
            channel.exchangeDeclare(EXCHANGE_NAME,
                    BuiltinExchangeType.FANOUT, true, false, null);
            //2.  创建队列
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            //7. 绑定队列与 交换机。  其中 routingKey 为 ""
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
    
            // 打印到控制台
            DeliverCallback deliverCallback = (consumerTag, message) -> {
                String textMessage = new String(message.getBody());
                // 假装存储到数据库里面
                System.out.println(">>>>> 插入到数据库中的信息是:" + textMessage);
            };
    
            CancelCallback cancelCallback = (consumerTag) -> {
                System.out.println(QUEUE_NAME + ">>>>> 中断了消息接收 " + consumerTag);
            };
            channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
            //输入流等待
            System.in.read();
            //关闭
            channel.close();
            connection.close();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    验证

    启动生产者 ,依次启动3个消费者

    image.png

    image.png

    生产者依次发送3条消息

    发送消息1
    发送消息2
    发送消息3
    
    • 1
    • 2
    • 3

    查询控制台消费者

    debug_console>>> 获取到消息 :发布广播消息,内容为:发送消息1
    debug_console>>> 获取到消息 :发布广播消息,内容为:发送消息2
    debug_console>>> 获取到消息 :发布广播消息,内容为:发送消息3
    
    • 1
    • 2
    • 3

    查询文件消费者

    >>>> 追加信息到 文件 里面, 内容是:发布广播消息,内容为:发送消息1
    >>>> 追加信息到 文件 里面, 内容是:发布广播消息,内容为:发送消息2
    >>>> 追加信息到 文件 里面, 内容是:发布广播消息,内容为:发送消息3
    
    • 1
    • 2
    • 3

    查看数据库消费者

    >>>>> 插入到数据库中的信息是:发布广播消息,内容为:发送消息1
    >>>>> 插入到数据库中的信息是:发布广播消息,内容为:发送消息2
    >>>>> 插入到数据库中的信息是:发布广播消息,内容为:发送消息3
    
    • 1
    • 2
    • 3

    没有通过 routingKey 进行绑定, 三个消费者均可以收到。

    Direct exchange

    在上一节中,我们构建了一个简单的日志记录系统。我们能够向许多接收者广播日志消息。
    在本节我们将向其中添加一些特别的功能-比方说我们只让某个消费者订阅发布的部分消息。
    例如我们只把 较严重错误消息定向存储到日志文件(以节省磁盘空间),严重错误消息发送给数据库,
    同时仍然能够在控制台上打印所有日志消息。

    我们再次来回顾一下什么是 bindings,绑定是交换机和队列之间的桥梁关系。也可以这么理解:
    队列只对它绑定的交换机的消息感兴趣。绑定用参数:routingKey 来表示也可称该参数为 binding key,
    创建绑定我们用代码:channel.queueBind(queueName, EXCHANGE_NAME, “routingKey”);
    绑定之后的 意义由其交换类型决定。

    介绍

    上一节中的我们的日志系统将所有消息广播给所有消费者,对此我们想做一些改变,
    例如我们希望将 warn 和 error 级别的错误发送给 数据库, 将 info, warn, error 的发送给文件, 将 debug,info,warn,error 全部打印到控制台中。
    Fanout 这种交换类型并不能给我们带来很大的灵活性-它只能进行无意识的 广播,在这里我们将使用 direct 这种类型来进行替换,这种类型的工作方式是,消息只去到它绑定的 routingKey 队列中去。

    image.png

    生产者

    /**
     * @ClassName:work
     * @Description 消息发布者
     * @Author 岳建立
     * @Date 2020/12/22 19:55
     * @Version 1.0
     * 

    * direct 直连, routingKey 路由 key 一致的,才进行发送。 *

    * console 接收 debug info warn error * file 接收 info warn error * db 接收 warn error * routingKey 有值。 **/ public class DirectMessageProducer { private static String EXCHANGE_NAME = "direct_logs"; public static void main(String[] args) throws Exception { Connection connection = ConnectionFactoryUtil.createConnection(); Channel channel = connection.createChannel(); // 创建交换机, 交换机 名称是 logs channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true, false, null); // 定义一个路由 routingKey 和 信息 的map, 由 map 进行处理。 Map<String,String> messageMap = new HashMap<>(); messageMap.put("debug","一个 debug 消息 1"); messageMap.put("debug","一个 debug 消息 2"); messageMap.put("info","一个 info 消息 1"); messageMap.put("info","一个 info 消息 2"); messageMap.put("info","一个 info 消息3 "); messageMap.put("warn","一个 warn 消息 1"); messageMap.put("warn","一个 warn 消息 2"); messageMap.put("warn","一个 warn 消息 3"); messageMap.put("error","一个 error 消息 1"); messageMap.put("error","一个 error 消息 2"); messageMap.put("error","一个 error 消息 3"); messageMap.put("error","一个 error 消息 4"); messageMap.forEach((routingKey,message)->{ try { channel.basicPublish(EXCHANGE_NAME,routingKey, null,message.getBytes("UTF-8")); } catch (IOException e) { e.printStackTrace(); } }); //输入流等待 System.in.read(); channel.close(); connection.close(); // Scanner scanner = new Scanner(System.in); // // 传入的格式为 debug 一个 debug 消息 1 // while (scanner.hasNextLine()) { // // routingKey 为 空 // String inputMessage = scanner.nextLine(); // // String[] splitMessage = inputMessage.split("\\ "); // // channel.basicPublish(EXCHANGE_NAME, splitMessage[0], // null, splitMessage[1].getBytes("UTF-8")); // } // channel.close(); // connection.close(); } }

    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75

    消费者1 控制台

    public class DirectMessageConsumer1 {
        private static String EXCHANGE_NAME = "direct_logs";
        private static String QUEUE_NAME = "log_console";
    
        public static void main(String[] args) throws Exception {
            Connection connection = ConnectionFactoryUtil.createConnection();
            final Channel channel = connection.createChannel();
            //1. 创建交换器
            channel.exchangeDeclare(EXCHANGE_NAME,
                    BuiltinExchangeType.DIRECT, true, false, null);
            //2.  创建队列
            channel.queueDeclare(QUEUE_NAME, true, false, true, null);
            //3. 绑定队列与 交换机。  其中 routingKey 为 debug,info,warn,error  ,可以绑定多个。
    
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "debug");
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warn");
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
    
    
            // 打印到控制台
            DeliverCallback deliverCallback = (consumerTag, message) -> {
                System.out.println(QUEUE_NAME + "获取级别:" + message.getEnvelope().getRoutingKey() + ">>>  消息 :" + new String(message.getBody()));
            };
    
            CancelCallback cancelCallback = (consumerTag) -> {
                System.out.println(QUEUE_NAME + ">>>>> 中断了消息接收 " + consumerTag);
            };
            channel.basicConsume(
                    QUEUE_NAME, true, deliverCallback, cancelCallback);
    
            //输入流等待
            System.in.read();
            //关闭
            channel.close();
            connection.close();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    消费者2 文件

    public class DirectMessageConsumer2 {
        private static String EXCHANGE_NAME = "direct_logs";
        private static String QUEUE_NAME = "log_file";
    
        public static void main(String[] args) throws Exception {
            Connection connection = ConnectionFactoryUtil.createConnection();
            final Channel channel = connection.createChannel();
            //1. 创建交换器
            channel.exchangeDeclare(EXCHANGE_NAME,
                    BuiltinExchangeType.DIRECT, true, false, null);
            //2.  创建队列
            channel.queueDeclare(QUEUE_NAME, true, false, true, null);
            //7. 绑定队列与 交换机。  其中 routingKey 为 info ,warn, error
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warn");
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
    
            // 打印到控制台
            DeliverCallback deliverCallback = (consumerTag, message) -> {
    
                String fileMessage = QUEUE_NAME + "获取级别:" + message.getEnvelope().getRoutingKey() + ">>>  消息 :" + new String(message.getBody());
    
                FileUtil.appendUtf8Lines(Collections.singletonList(fileMessage), "D:\\rabbitMq\\log.log");
    
                System.out.println(">>>> 追加信息到 文件 里面, 内容是:" + fileMessage);
            };
    
            CancelCallback cancelCallback = (consumerTag) -> {
                System.out.println(QUEUE_NAME + ">>>>> 中断了消息接收 " + consumerTag);
            };
            channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
            //输入流等待
            System.in.read();
            //关闭
            channel.close();
            connection.close();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    消费者3 数据库

    public class DirectMessageConsumer3 {
        private static String EXCHANGE_NAME = "direct_logs";
        private static String QUEUE_NAME = "log_db";
    
        public static void main(String[] args) throws Exception {
            Connection connection = ConnectionFactoryUtil.createConnection();
            final Channel channel = connection.createChannel();
            //1. 创建交换器
            channel.exchangeDeclare(EXCHANGE_NAME,
                    BuiltinExchangeType.DIRECT, true, false, null);
            //2.  创建队列
            channel.queueDeclare(QUEUE_NAME, true, false, true, null);
            //7. 绑定队列与 交换机。  其中 routingKey 为 warn error
    
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warn");
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
    
            // 打印到控制台
            DeliverCallback deliverCallback = (consumerTag, message) -> {
                String fileMessage = QUEUE_NAME + "获取级别:" + message.getEnvelope().getRoutingKey() + ">>>  消息 :" + new String(message.getBody());
                // 假装存储到数据库里面
                System.out.println(">>>>> 插入到数据库中的信息是:" + fileMessage);
            };
    
            CancelCallback cancelCallback = (consumerTag) -> {
                System.out.println(QUEUE_NAME + ">>>>> 中断了消息接收 " + consumerTag);
            };
            channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
            //输入流等待
            System.in.read();
            //关闭
            channel.close();
            connection.close();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    验证

    启动生产者, 依次启动 三个消费者

    消费者1 控制台打印:

    image.png

    消费者2 文件打印:

    image.png

    消费者3 数据库打印

    image.png

    Topics

    在上一个小节中,我们改进了日志记录系统。我们没有使用只能进行随意广播的 fanout 交换机,而是
    使用了 direct 交换机,从而有能实现有选择性地接收日志。
    尽管使用 direct 交换机改进了我们的系统,但是它仍然存在局限性-比方说我们想接收的日志类型有
    info.base 和 info.advantage,某个队列只想 info.base 的消息,那这个时候 direct 就办不到了。
    这个时候 就只能使用 topic 类型

    要求

    发送到类型是 topic 交换机的消息的 routing_key 不能随意写,必须满足一定的要求,它必须是一个单
    词列表,以点号分隔开。这些单词可以是任意单词,比如说:“stock.usd.nyse”, “nyse.vmw”, “quick.orange.rabbit”.这种类型的。
    当然这个单词列表最多不能超过 255 个字节。
    在这个规则列表中,其中有两个替换符是大家需要注意的

    *(星号)可以代替一个单词

    #(井号)可以替代零个或多个单词

    匹配案例

    下图绑定关系如下

    Q1–>绑定的是 中间带 orange 带 3 个单词的字符串(.orange.)

    Q2–>绑定的是 最后一个单词是 rabbit 的 3 个单词(..rabbit) 第一个单词是 lazy 的多个单词(lazy.#)

    image.png
    上图是一个队列绑定关系图,我们来看看他们之间数据接收情况是怎么样的

    quick.orange.rabbit 被队列 Q1Q2接收到

    lazy.orange.elephant 被队列 Q1Q2接收到

    quick.orange.fox 被队列 Q1接收到

    lazy.brown.fox 被队列 Q2接收到

    lazy.pink.rabbit 虽然满足两个绑定但只被队列 Q2 接收一次

    quick.brown.fox 不匹配任何绑定不会被任何队列接收到会被丢弃

    quick.orange.male.rabbit 是四个单词不匹配任何绑定会被丢弃

    lazy.orange.male.rabbit 是四个单词但匹配 Q2

    当队列绑定关系是下列这种情况时需要引起注意

    当一个队列绑定键是#,那么这个队列将接收所有数据,就有点像 fanout 了

    如果队列绑定键当中没有#和*出现,那么该队列绑定类型就是 direct 了

    生产者

    public class TopicMessageProducer {
        private static String EXCHANGE_NAME = "topic_logs";
    
        public static void main(String[] args) throws Exception {
            Connection connection = ConnectionFactoryUtil.createConnection();
            Channel channel = connection.createChannel();
            // 创建交换机, 交换机 名称是  logs
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC,
                    true, false, null);
    
            // 定义一个路由 routingKey 和 信息 的map, 由 map 进行处理。
    
            Map<String, String> messageMap = new HashMap<>();
    
            messageMap.put("quick.orange.rabbit", "被队列 Q1Q2 接收到");
            messageMap.put("lazy.orange.elephant", "被队列 Q1Q2 接收到");
    
            messageMap.put("quick.orange.fox", "被队列 Q1 接收到");
            messageMap.put("lazy.brown.fox", "被队列 Q2 接收到");
            messageMap.put("info", "一个 info 消息3 ");
    
            messageMap.put("lazy.pink.rabbit", "虽然满足两个绑定但只被队列 Q2 接收一次");
            messageMap.put("quick.brown.fox", "不匹配任何绑定不会被任何队列接收到会被丢弃");
            messageMap.put("quick.orange.male.rabbit", "是四个单词不匹配任何绑定会被丢弃");
    
            messageMap.put("lazy.orange.male.rabbit", "是四个单词但匹配 Q2");
    
            messageMap.forEach((routingKey, message) -> {
                try {
                    channel.basicPublish(EXCHANGE_NAME, routingKey,
                            null, message.getBytes("UTF-8"));
                } catch (IOException e) {
                    e.printStackTrace();
                }
            });
    
            //输入流等待
            System.in.read();
    
            channel.close();
            connection.close();
    
    
    //        Scanner scanner = new Scanner(System.in);
    //        while(scanner.hasNextLine()) {
    //            // routingKey 为 空
    //            String inputMessage = scanner.nextLine();
    //
    //            String[] splitMessage = inputMessage.split("\\ ");
    //
    //            channel.basicPublish(EXCHANGE_NAME,splitMessage[0],
    //                    null,splitMessage[1].getBytes("UTF-8"));
    //        }
    //        channel.close();
    //        connection.close();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57

    消费者1

    public class TopicMessageConsumer1 {
        private static String EXCHANGE_NAME = "topic_logs";
        private static String QUEUE_NAME = "topic_log_console";
    
        public static void main(String[] args) throws Exception {
            Connection connection = ConnectionFactoryUtil.createConnection();
            final Channel channel = connection.createChannel();
            //1. 创建交换器
            channel.exchangeDeclare(EXCHANGE_NAME,
                    BuiltinExchangeType.TOPIC, true, false, null);
            //2.  创建队列
            channel.queueDeclare(QUEUE_NAME, true, false, true, null);
            //3. 绑定队列与 交换机。  其中 routingKey 为 debug,info,warn,error  ,可以绑定多个。
    
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.orange.*");
    
    
            // 打印到控制台
            DeliverCallback deliverCallback = (consumerTag, message) -> {
                System.out.println(QUEUE_NAME + "获取级别:" + message.getEnvelope().getRoutingKey() + ">>>  消息 :" + new String(message.getBody()));
            };
    
            CancelCallback cancelCallback = (consumerTag) -> {
                System.out.println(QUEUE_NAME + ">>>>> 中断了消息接收 " + consumerTag);
            };
            channel.basicConsume(
                    QUEUE_NAME, true, deliverCallback, cancelCallback);
    
            //输入流等待
            System.in.read();
            //关闭
            channel.close();
            connection.close();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    消费者2

    public class TopicMessageConsumer2 {
        private static String EXCHANGE_NAME = "topic_logs";
        private static String QUEUE_NAME = "topic_log_file";
    
        public static void main(String[] args) throws Exception {
            Connection connection = ConnectionFactoryUtil.createConnection();
            final Channel channel = connection.createChannel();
            //1. 创建交换器
            channel.exchangeDeclare(EXCHANGE_NAME,
                    BuiltinExchangeType.TOPIC, true, false, null);
            //2.  创建队列
            channel.queueDeclare(QUEUE_NAME, true, false, true, null);
            //7. 绑定队列与 交换机。  其中 routingKey 为 info ,warn, error
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*.rabbit");
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "lazy.#");
    
            // 打印到控制台
            DeliverCallback deliverCallback = (consumerTag, message) -> {
    
                String fileMessage = QUEUE_NAME + "获取级别:" + message.getEnvelope().getRoutingKey() + ">>>  消息 :" + new String(message.getBody());
    
                FileUtil.appendUtf8Lines(Collections.singletonList(fileMessage), "D:\\rabbitMq\\log.log");
    
                System.out.println(">>>> 追加信息到 文件 里面, 内容是:" + fileMessage);
            };
    
            CancelCallback cancelCallback = (consumerTag) -> {
                System.out.println(QUEUE_NAME + ">>>>> 中断了消息接收 " + consumerTag);
            };
            channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
            //输入流等待
            System.in.read();
            //关闭
            channel.close();
            connection.close();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    验证

    消费者1:

    image.png

    消费者2:

    image.png

  • 相关阅读:
    chatgpt赋能python:Python中的随机选择:介绍和应用
    金仓KFS数据集中场景(多对一)部署
    stm32f4xx-定时器
    基于HTML+CSS+JavaScript仿瓜子二手车官网【学生网页设计作业源码】
    10. Spring Boot2.5 实战 Docker 容器
    【C语言】二级指针的深度理解,峰值的寻找(每日小细节004)
    SQL Server实战一:创建、分离、附加、删除、备份数据库
    基于Java Web的在线教学质量评价系统的设计与实现
    《JAVA EE》内部类(下篇)&Lambda表达式
    Python之哈希表-字典
  • 原文地址:https://blog.csdn.net/yjltx1234csdn/article/details/128170778