• RabbitMQ工作模式-主题模式


    主题模式

    官方文档参考:https://www.rabbitmq.com/tutorials/tutorial-five-python.html

    使用topic类型的交换器,队列绑定到交换器、bingingKey时使用通配符,交换器将消息路由转发到具体队列时,会根据消息routingKey模糊匹配,比较灵活。

    在Direct类型的交换器做到了根据日志级别的不同,将消息发送给了不同队列的。

    这里再加入一个需求,不仅想根据日志级别进行划分,还想根据日志的来源分日志,如何来做呢?

    使用topic类型的交换器, routingKey就不能随便写了,它必须是点分单词,单词可以随便写,一般按消息的特征,该点分单词字符串最长255字节。

    bindingKey也必须是这种形式。top类型的交换器背后原理跟direct类型类似只要队列的bingingkey的值与消息的routingKey的匹配,队列就可以收到该消息。有两个不同

    1. * (star)匹配一个单词。
    2. # 匹配0到多个单词。

    在这里插入图片描述

    上报的数据的RoutingKey,格式如下

    地区.业务.日志级别 如shanghai.busi.INFO 、 hangzhou.line.ERROR

    生产者

    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.nio.charset.StandardCharsets;
    import java.util.concurrent.ThreadLocalRandom;
    
    public class Product {
    
      private static final String[] ADDRESS_ARRAYS = {"shanghai", "suzhou", "hangzhou"};
    
      private static final String[] BUSI_NAMES = {"product", "user", "schedule"};
    
      private static final String[] LOG_LEVEL = {"ERROR", "WARN", "INFO"};
    
      public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri("amqp://root:123456@node1:5672/%2f");
    
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
    
        // 定义交换机
        channel.exchangeDeclare(
            "ex.busi.topic",
            BuiltinExchangeType.TOPIC,
            // 持久化标识
            false,
            // 是否自动删除
            false,
            // 属性信息
            null);
    
        for (int i = 0; i < 50; i++) {
    
          String level = LOG_LEVEL[ThreadLocalRandom.current().nextInt(0, LOG_LEVEL.length)];
          String busiName = BUSI_NAMES[ThreadLocalRandom.current().nextInt(0, BUSI_NAMES.length)];
          String address =
              ADDRESS_ARRAYS[ThreadLocalRandom.current().nextInt(0, ADDRESS_ARRAYS.length)];
          String routingKey = address + "." + busiName + "." + level;
    
          String pushMsg = "地址[" + address + "],业务[" + busiName + "],级别[" + level + "],消息:" + i;
    
          channel.basicPublish(
              "ex.busi.topic", routingKey, null, pushMsg.getBytes(StandardCharsets.UTF_8));
        }
      }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50

    上海的消费者

    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import java.nio.charset.StandardCharsets;
    import java.util.concurrent.ThreadLocalRandom;
    
    /**
     * 上海地区的消费都,获取所有的上海信息
     */
    public class ShangHaiConsumer {
    
      public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri("amqp://root:123456@node1:5672/%2f");
    
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
    
        // 定义交换机
        channel.exchangeDeclare(
            "ex.busi.topic",
            BuiltinExchangeType.TOPIC,
            // 持久化标识
            false,
            // 是否自动删除
            false,
            // 属性信息
            null);
    
        // 定义队列
        channel.queueDeclare(
            "shanghai.all.log",
            // 持久化存储
            true,
            // 排他
            false,
            // 自动删除
            true,
            // 属性
            null);
    
        // 将队列与交换机进行绑定
        channel.queueBind("shanghai.all.log", "ex.busi.topic", "shanghai.#", null);
    
        channel.basicConsume(
            "shanghai.all.log",
            (consumerTag, message) -> {
              String dataMsg = new String(message.getBody(), StandardCharsets.UTF_8);
              System.out.println("shanghai consumer 收到数据:" + dataMsg);
            },
            consumerTag -> {});
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54

    所有错误日志的消费者

    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import java.nio.charset.StandardCharsets;
    
    public class ErrorLogConsumer {
    
      public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri("amqp://root:123456@node1:5672/%2f");
    
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
    
        // 定义交换机
        channel.exchangeDeclare(
            "ex.busi.topic",
            BuiltinExchangeType.TOPIC,
            // 持久化标识
            false,
            // 是否自动删除
            false,
            // 属性信息
            null);
    
        // 定义队列
        channel.queueDeclare(
            "log.all.error",
            // 持久化存储
            true,
            // 排他
            false,
            // 自动删除
            true,
            // 属性
            null);
    
        // 将队列与交换机进行绑定
        channel.queueBind("log.all.error", "ex.busi.topic", "#.ERROR", null);
    
        channel.basicConsume(
            "log.all.error",
            (consumerTag, message) -> {
              String dataMsg = new String(message.getBody(), StandardCharsets.UTF_8);
              System.out.println("错误日志 consumer 收到数据:" + dataMsg);
            },
            consumerTag -> {});
      }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51

    苏州用户的消费者

    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import java.nio.charset.StandardCharsets;
    
    public class SuZhouUserConsumer {
    
      public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri("amqp://root:123456@node1:5672/%2f");
    
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
    
        // 定义交换机
        channel.exchangeDeclare(
            "ex.busi.topic",
            BuiltinExchangeType.TOPIC,
            // 持久化标识
            false,
            // 是否自动删除
            false,
            // 属性信息
            null);
    
        // 定义队列
        channel.queueDeclare(
            "suzhou.user.consumer",
            // 持久化存储
            true,
            // 排他
            false,
            // 自动删除
            true,
            // 属性
            null);
    
        // 将队列与交换机进行绑定
        channel.queueBind("suzhou.user.consumer", "ex.busi.topic", "suzhou.user.*", null);
    
        channel.basicConsume(
            "suzhou.user.consumer",
            (consumerTag, message) -> {
              String dataMsg = new String(message.getBody(), StandardCharsets.UTF_8);
              System.out.println("suzhou consumer 收到数据:" + dataMsg);
            },
            consumerTag -> {});
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50

    首先启动三个消费者,查看队列和交换器的信息

    [root@nullnull-os ~]# rabbitmqctl list_exchanges --formatter pretty_table
    Listing exchanges for vhost / ...
    ┌────────────────────┬─────────┐
    │ name               │ type    │
    ├────────────────────┼─────────┤
    │ amq.fanout         │ fanout  │
    ├────────────────────┼─────────┤
    │ ex.busi.topic      │ topic   │
    ├────────────────────┼─────────┤
    │ amq.rabbitmq.trace │ topic   │
    ├────────────────────┼─────────┤
    │ amq.headers        │ headers │
    ├────────────────────┼─────────┤
    │ amq.topic          │ topic   │
    ├────────────────────┼─────────┤
    │ amq.direct         │ direct  │
    ├────────────────────┼─────────┤
    │                    │ direct  │
    ├────────────────────┼─────────┤
    │ ex.routing         │ direct  │
    ├────────────────────┼─────────┤
    │ amq.match          │ headers │
    └────────────────────┴─────────┘
    [root@nullnull-os ~]# rabbitmqctl list_bindings --formatter pretty_table
    Listing bindings for vhost /...
    ┌───────────────┬─────────────┬──────────────────────┬──────────────────┬──────────────────────┬───────────┐
    │ source_name   │ source_kind │ destination_name     │ destination_kind │ routing_key          │ arguments │
    ├───────────────┼─────────────┼──────────────────────┼──────────────────┼──────────────────────┼───────────┤
    │               │ exchange    │ suzhou.user.consumer │ queue            │ suzhou.user.consumer │           │
    ├───────────────┼─────────────┼──────────────────────┼──────────────────┼──────────────────────┼───────────┤
    │               │ exchange    │ shanghai.all.log     │ queue            │ shanghai.all.log     │           │
    ├───────────────┼─────────────┼──────────────────────┼──────────────────┼──────────────────────┼───────────┤
    │               │ exchange    │ log.all.error        │ queue            │ log.all.error        │           │
    ├───────────────┼─────────────┼──────────────────────┼──────────────────┼──────────────────────┼───────────┤
    │ ex.busi.topic │ exchange    │ log.all.error        │ queue            │ #.ERROR              │           │
    ├───────────────┼─────────────┼──────────────────────┼──────────────────┼──────────────────────┼───────────┤
    │ ex.busi.topic │ exchange    │ shanghai.all.log     │ queue            │ shanghai.#           │           │
    ├───────────────┼─────────────┼──────────────────────┼──────────────────┼──────────────────────┼───────────┤
    │ ex.busi.topic │ exchange    │ suzhou.user.consumer │ queue            │ suzhou.user.*        │           │
    └───────────────┴─────────────┴──────────────────────┴──────────────────┴──────────────────────┴───────────┘
    [root@nullnull-os ~]# 
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41

    观察可以发现,此队列与消息的绑定已经成功。接下使用生产者发送消息。观察控制台输出:

    错误日志消费者

    错误日志 consumer 收到数据:地址[suzhou],业务[schedule],级别[ERROR],消息:6
    错误日志 consumer 收到数据:地址[suzhou],业务[product],级别[ERROR],消息:8
    错误日志 consumer 收到数据:地址[shanghai],业务[product],级别[ERROR],消息:10
    错误日志 consumer 收到数据:地址[shanghai],业务[schedule],级别[ERROR],消息:12
    错误日志 consumer 收到数据:地址[suzhou],业务[schedule],级别[ERROR],消息:15
    错误日志 consumer 收到数据:地址[hangzhou],业务[user],级别[ERROR],消息:16
    错误日志 consumer 收到数据:地址[shanghai],业务[schedule],级别[ERROR],消息:17
    错误日志 consumer 收到数据:地址[shanghai],业务[product],级别[ERROR],消息:18
    错误日志 consumer 收到数据:地址[hangzhou],业务[user],级别[ERROR],消息:21
    错误日志 consumer 收到数据:地址[shanghai],业务[product],级别[ERROR],消息:22
    错误日志 consumer 收到数据:地址[shanghai],业务[product],级别[ERROR],消息:24
    错误日志 consumer 收到数据:地址[hangzhou],业务[product],级别[ERROR],消息:28
    错误日志 consumer 收到数据:地址[suzhou],业务[schedule],级别[ERROR],消息:33
    错误日志 consumer 收到数据:地址[hangzhou],业务[schedule],级别[ERROR],消息:39
    错误日志 consumer 收到数据:地址[suzhou],业务[user],级别[ERROR],消息:40
    错误日志 consumer 收到数据:地址[suzhou],业务[user],级别[ERROR],消息:43
    错误日志 consumer 收到数据:地址[shanghai],业务[schedule],级别[ERROR],消息:46
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    上海地区的消费者

    shanghai consumer 收到数据:地址[shanghai],业务[schedule],级别[WARN],消息:0
    shanghai consumer 收到数据:地址[shanghai],业务[user],级别[INFO],消息:1
    shanghai consumer 收到数据:地址[shanghai],业务[schedule],级别[INFO],消息:2
    shanghai consumer 收到数据:地址[shanghai],业务[schedule],级别[WARN],消息:5
    shanghai consumer 收到数据:地址[shanghai],业务[product],级别[ERROR],消息:10
    shanghai consumer 收到数据:地址[shanghai],业务[schedule],级别[ERROR],消息:12
    shanghai consumer 收到数据:地址[shanghai],业务[schedule],级别[ERROR],消息:17
    shanghai consumer 收到数据:地址[shanghai],业务[product],级别[ERROR],消息:18
    shanghai consumer 收到数据:地址[shanghai],业务[product],级别[ERROR],消息:22
    shanghai consumer 收到数据:地址[shanghai],业务[product],级别[ERROR],消息:24
    shanghai consumer 收到数据:地址[shanghai],业务[user],级别[INFO],消息:32
    shanghai consumer 收到数据:地址[shanghai],业务[schedule],级别[INFO],消息:35
    shanghai consumer 收到数据:地址[shanghai],业务[product],级别[INFO],消息:38
    shanghai consumer 收到数据:地址[shanghai],业务[schedule],级别[WARN],消息:41
    shanghai consumer 收到数据:地址[shanghai],业务[schedule],级别[ERROR],消息:46
    shanghai consumer 收到数据:地址[shanghai],业务[user],级别[INFO],消息:48
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    苏州用户的消费者

    suzhou consumer 收到数据:地址[suzhou],业务[user],级别[WARN],消息:37
    suzhou consumer 收到数据:地址[suzhou],业务[user],级别[ERROR],消息:40
    suzhou consumer 收到数据:地址[suzhou],业务[user],级别[ERROR],消息:43
    suzhou consumer 收到数据:地址[suzhou],业务[user],级别[WARN],消息:45
    
    • 1
    • 2
    • 3
    • 4

    至此topic模式操作成功。

  • 相关阅读:
    未来之路:互联网技术驱动汽车行业的创新浪潮
    【OpenGL】纹理(Texture)的使用
    怎样提取视频中的音频?分享一个一学就会的方法~
    [附源码]java毕业设计小区物业管理系统
    抢先看!中移创马大赛通信能力开放专题赛初赛评选结果新鲜出炉
    SSM处理过程
    Chapter9:Simulink建模与仿真
    聊天室(二)__ unipush 推送实现详细教程
    不可重复读和幻读区别
    centos LVM磁盘快照
  • 原文地址:https://blog.csdn.net/bug_null/article/details/132591482