• 【RabbitMQ】——主题模式(Topics)


    一、介绍

    主题模式采用 routingKey匹配的方式实现,只要生产者发送消息是设置的routingKey 满足消费者绑定时候设置的规则,消费者就能够接收到消息。

    生产者端: 声明一个BuiltinExchangeType.TOPIC类型的交换机,然后发送消息时候设置routingkey

     /**
      * 声明一个交换机
      */
     channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
    
    • 1
    • 2
    • 3
    • 4
    /**
     * 发送消息
     * param1 发送到哪个交换机
     * param2 routingKey
     * param3 其他参数信息
     * param4 发送的消息体
     */
    channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    消费者端:

    private static final String ROUTING_KEY_ORANGE = "*.orange.*";
    /**
     * 绑定交换机和队列
     * param1 队列名称
     * param2 交换机名称
     * param3 routingkey
     */
    channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY_ORANGE);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    二、topic规范

    发送到类型是topic交换机的消息的routing_key不能随意写,必须满足一定的要求,它必须是一个单词列表,以点号分隔开。这些单词可以是任意单词,比如说:“stock.usd.nyse”, “nyse.xmw”,“quick.orange.rabbit”.这种类型的。当然这个单词列表最多不能超过255个字节

    在这个规则列表中,其中有两个替换符是大家需要注意的***(星号)可以代替一个单词,#(井号)可以替代零个或多个单词。**
    在这里插入图片描述
    在这里插入图片描述
    当一个队列绑定键是#,那么这个队列将接收所有数据,就有点像fanout 了
    如果队列绑定键当中没有#和*出现,那么该队列绑定类型就是direct了

    三、实现

    ###1. 生产者LogProducerTopic

    package com.rabbitmqDemo.rabbitmq.seven;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmqDemo.rabbitmq.utils.RabbitMqUtils;
    
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.Scanner;
    import java.util.concurrent.TimeoutException;
    
    /**
     * 将消息发送到交换机
     */
    public class LogProducerTopic {
    
        private static final String EXCHANGE_NAME = "logs_topic_exchange";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Channel channel = RabbitMqUtils.getChannel();
    
            Map<String, String> bindingKdyMap = new HashMap<>();
            bindingKdyMap.put("quick.orange.rabbit", "被队列Q1Q2接收到");
            bindingKdyMap.put("lazy.orange.elephant", "被队列Q1Q2接收到");
            bindingKdyMap.put("quick.orange.fox", "被队列Q1接收到");
            bindingKdyMap.put("lazy.brown.fox", "被队列Q2接收到");
            bindingKdyMap.put("lazy.pink.rabbit", "虽然满足两个绑定但只被队列Q2接收一次");
            bindingKdyMap.put("quick.brown.fox", "不匹配任何绑定不会被任何队列接收到会被丢弃");
            bindingKdyMap.put("quick.orange.male.rabbit", "是四个单词不匹配任何绑定会被丢弃");
            bindingKdyMap.put("lazy.orange.male.rabbit", "是四个单词但匹配Q2");
    
            for (Map.Entry<String, String> stringStringEntry : bindingKdyMap.entrySet()) {
                String routingKey = stringStringEntry.getKey();
                String message = stringStringEntry.getValue();
                /**
                 * 发送消息
                 * param1 发送到哪个交换机
                 * param2 routingKey
                 * param3 其他参数信息
                 * param4 发送的消息体
                 */
                channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
                System.out.println("message send end : " + message);
            }
    
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48

    ###2. 消费者Logworker05

    package com.rabbitmqDemo.rabbitmq.seven;
    
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.DeliverCallback;
    import com.rabbitmqDemo.rabbitmq.utils.RabbitMqUtils;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    
    public class Logworker05 {
    
        private static final String EXCHANGE_NAME = "logs_topic_exchange";
        private static final String QUEUE_NAME = "logs_topic_q1";
        private static final String ROUTING_KEY_ORANGE = "*.orange.*";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Channel channel = RabbitMqUtils.getChannel();
    
            /**
             * 声明一个交换机
             */
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
    
            /**
             * 声明一个队列
             */
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            /**
             * 绑定交换机和队列
             * param1 队列名称
             * param2 交换机名称
             * param3 routingkey
             */
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY_ORANGE);
            System.out.println("wait receive message ,print message to console... ");
            //声明 消费者成功消费的回调
            DeliverCallback deliverCallback = (consumerTag, message) -> {
                System.out.println("Logworker05-message:" + new String(message.getBody(), "UTF-8") + ", queue name:" + QUEUE_NAME + " , routing key:" + message.getEnvelope().getRoutingKey());
            };
            //声明 取消消息时的回调
            CancelCallback cancelCallback = (consumerTag) -> {
                System.out.println("Logworker05-消息消费被中断-" + consumerTag);
            };
            /**
             * 消费者消费消息
             * param1 队列名称
             * param2 消费成功之后是否自动应答,true 代表自动应答,false表示不自动应答
             * param3 消费者成功消费的回调
             * param4 消费者取消消费回调
             */
            System.out.println("Logworker05等待接收消息......");
            channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57

    ###3. 消费者Logworker06

    package com.rabbitmqDemo.rabbitmq.seven;
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.DeliverCallback;
    import com.rabbitmqDemo.rabbitmq.utils.RabbitMqUtils;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class Logworker06 {
    
        private static final String EXCHANGE_NAME = "logs_topic_exchange";
        private static final String QUEUE_NAME = "logs_topic_q2";
        private static final String ROUTING_KEY_RABBIT = "*.*.rabbit";
        private static final String ROUTING_KEY_LAZY = "lazy.#";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Channel channel = RabbitMqUtils.getChannel();
    
            /**
             * 声明一个交换机
             */
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
    
            /**
             * 声明一个队列
             */
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            /**
             * 绑定交换机和队列
             * param1 队列名称
             * param2 交换机名称
             * param3 routingkey
             */
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY_RABBIT);
            /**
             * 多重绑定
             */
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY_LAZY);
    
    
            System.out.println("wait receive message ,print message to console... ");
    
            //声明 消费者成功消费的回调
            DeliverCallback deliverCallback = (consumerTag, message) -> {
                System.out.println("Logworker06-message:" + new String(message.getBody(), "UTF-8") + ", queue name:" + QUEUE_NAME + " , routing key:" + message.getEnvelope().getRoutingKey());
    
            };
            //声明 取消消息时的回调
            CancelCallback cancelCallback = (consumerTag) -> {
                System.out.println("Logworker06-消息消费被中断-" + consumerTag);
            };
    
            /**
             * 消费者消费消息
             * param1 队列名称
             * param2 消费成功之后是否自动应答,true 代表自动应答,false表示不自动应答
             * param3 消费者成功消费的回调
             * param4 消费者取消消费回调
             */
            System.out.println("Logworker06等待接收消息......");
            channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
  • 相关阅读:
    【数据结构-进阶】二叉搜索树
    Docker背后的标准化容器执行引擎——runC
    企业数据管理数据备份与恢复
    Docker consul的容器服务更新与发现
    计算机毕业设计家校通微信小程序源码
    nacos微服务云开发,远程联调部署,内网穿透,frp部署
    CSS - 显示模式
    检索技术核心学习总结
    测试用例基础知识
    KEIL/MDK中的标准C库函数printf和malloc实现线程安全
  • 原文地址:https://blog.csdn.net/qq_42000631/article/details/126343716