• RabbitMQ简单用法


    RabbitMQ中 交换机、队列、虚拟主机、连接、通道、路由键的概念以及它们之间的联系

    RabbitMQ 是一种消息代理软件,它支持消息队列、消息路由和发布/订阅模式等功能。下面是 RabbitMQ 中几个重要概念及它们之间的联系:

    1. 交换机(Exchange):用于接收生产者发送的消息并将其路由到一个或多个队列。交换机有四种类型:直连交换机、主题交换机、头交换机和扇形交换机。
    
    2. 队列(Queue):用于存储消息的容器。消费者从队列中获取消息并进行处理。
    
    3. 虚拟主机(Virtual Host):RabbitMQ 允许在一个物理服务器上创建多个虚拟主机,每个虚拟主机相当于一个独立的 RabbitMQ 服务器,拥有自己的交换机、队列和绑定等。
    
    4. 连接(Connection):生产者或消费者与 RabbitMQ 服务器之间的连接。
    
    5. 通道(Channel):在连接上创建的一个虚拟连接,用于发送和接收消息。一个连接可以有多个通道。
    
    6. 路由键(Routing Key):用于将消息从交换机路由到队列的规则。
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    它们之间的联系如下:

    1. 生产者将消息发送到交换机,交换机根据路由键将消息路由到一个或多个队列。
    
    2. 消费者从队列中获取消息并进行处理。
    
    3. 虚拟主机可以将多个交换机、队列和绑定等逻辑上隔离开来,提高了系统的安全性和可靠性。
    
    4. 连接是生产者或消费者与 RabbitMQ 服务器之间的通信通道,一个连接可以创建多个通道。
    
    5. 通道是在连接上创建的虚拟连接,用于发送和接收消息。一个连接可以有多个通道。
    
    6. 路由键是将消息从交换机路由到队列的规则,可以根据不同的路由键将消息路由到不同的队列。
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    ConnectionFactory

    ConnectionFactory类是RabbitMQ Java客户端库中的一个类,用于创建RabbitMQ连接。常用属性和方法如下:

    属性:

    - host:RabbitMQ服务器的主机名,默认为localhost。
    - port:RabbitMQ服务器的端口号,默认为5672- username:连接RabbitMQ服务器的用户名,默认为guest。
    - password:连接RabbitMQ服务器的密码,默认为guest。
    - virtualHost:连接RabbitMQ服务器的虚拟主机,默认为/- connectionTimeout:连接超时时间,默认为0(无限制)。
    - requestedHeartbeat:请求的心跳超时时间,默认为0(无限制)。
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    方法:

    - newConnection():创建一个新的RabbitMQ连接。
    - createChannel():创建一个新的通道。
    
    • 1
    • 2

    Channel

    Channel类的常用方法包括:

    1. basicPublish:用于将消息发送到指定的交换机和路由键。
    	参数含义:
    	- exchange:消息发送到的交换机名称
    	- routingKey:消息发送到的队列名称
    	- props:消息的属性
    	- body:消息体
    	作用:将消息发送到指定的交换机上,等待被消费者消费。
    
    2. basicConsume:用于从指定队列中消费消息。
    	 参数含义:
    	- queue:要消费的队列名称
    	- autoAck:是否自动确认消息
    	- callback:消费者接收到消息后的回调函数
    	作用:订阅队列中的消息,等待被消费者消费。
    
    3. basicAck:用于确认已经处理完毕的消息。
    	参数含义:
    	- deliveryTag:消息标签
    	- multiple:是否批量确认
    	作用:确认消息已被消费,告诉RabbitMQ可以删除该消息。
    	
    4. basicNack:用于拒绝处理某个消息,并可以选择是否重新将消息放回队列。
    	参数含义:
    	- deliveryTag:消息标签
    	- multiple:是否批量拒绝
    	- requeue:是否重新入队列
    	作用:拒绝消息,并可选择是否重新入队列。
    
    5. basicReject:用于拒绝处理某个消息,并可以选择是否重新将消息放回队列。
    	参数含义:
    	- deliveryTag:消息标签
    	- requeue:是否重新入队列
    	作用:拒绝消息,并重新入队列。
    
    6. queueDeclare:用于声明一个队列。
    	参数含义:
    	- queue:队列名称
    	- durable:是否持久化
    	- exclusive:是否独占
    	- autoDelete:是否自动删除
    	- arguments:队列参数
    	作用:声明队列,如果队列不存在则创建。
    
    7 exchangeDeclare:用于声明一个交换机。
    	参数含义:
    	- exchange:交换机名称
    	- type:交换机类型
    	- durable:是否持久化
    	- autoDelete:是否自动删除
    	- internal:是否内部使用
    	- arguments:交换机参数
    	作用:声明交换机,如果交换机不存在则创建。
    
    8. queueBind:用于将队列绑定到指定的交换机和路由键。
    	参数含义:
    	- queue:队列名称
    	- exchange:交换机名称
    	- routingKey:路由键
    	- arguments:绑定参数
    	作用:将队列绑定到交换机上,等待被消费者消费。
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60

    Channel类的作用是提供了与RabbitMQ服务器进行通信的通道,通过该通道可以进行消息的发送和接收,以及队列和交换机的声明和绑定等操作。同时,Channel类还提供了一些方法用于控制消息的确认和拒绝,以及消息的持久化等高级特性。


    DefaultConsumer

    DefaultConsumer是RabbitMQ客户端库中的一个类,它实现了Consumer接口,用于处理从RabbitMQ服务器接收到的消息。

    DefaultConsumer类的常用方法包括:

    1. handleDelivery:处理从RabbitMQ服务器接收到的消息。
    	void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException;
    	参数含义:
    	- consumerTag:消费者标签,用于标识消费者。
    	- envelope:消息的信封,包含了消息的元数据,如交换机、路由键等。
    	- properties:消息的属性,包含了消息的元数据,如消息ID、消息类型等。
    	- body:消息的内容,即消息体。
    
    2. handleShutdownSignal:处理与RabbitMQ服务器的连接关闭信号。
    	void handleShutdownSignal(String consumerTag, ShutdownSignalException sig);
    	参数含义:
    	- consumerTag:消费者标签,用于标识消费者。
    	- sig:关闭信号异常,包含了关闭的原因和异常信息。
    	
    3. handleConsumeOk:处理与RabbitMQ服务器的消费者注册成功信号。
    	void handleConsumeOk(String consumerTag);
    	参数含义:
    	- consumerTag:消费者标签,用于标识消费者。
    	
    4. handleCancelOk:处理与RabbitMQ服务器的消费者取消注册成功信号。
    	void handleCancelOk(String consumerTag);
    	参数含义:
    	- consumerTag:消费者标签,用于标识消费者。
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    在handleDelivery方法中,我们可以根据需要对消息进行处理,例如解析消息内容、存储消息等。

    生产者示例代码

    创建RabbitMQ连接、创建通道、声明交换机和发送消息的完整示例代码:

    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.ConnectionFactory;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class RabbitMQExample {
        private static final String QUEUE_NAME = "my_queue";
        private static final String EXCHANGE_NAME = "my_exchange";
        private static final String ROUTING_KEY = "my_routing_key";
    
        public static void main(String[] args) {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            factory.setPort(5672);
            factory.setUsername("guest");
            factory.setPassword("guest");
            factory.setVirtualHost("/");
    
            try {
                // 创建连接
                Connection connection = factory.newConnection();
    
                // 创建通道
                Channel channel = connection.createChannel();
    
                // 声明交换机
                channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
    
                // 发送消息
                String message = "Hello, RabbitMQ!";
                channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, message.getBytes());
                System.out.println("Sent message: " + message);
    
                // 关闭通道和连接
                channel.close();
                connection.close();
            } catch (IOException | TimeoutException e) {
                e.printStackTrace();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    在这个示例中,使用了默认的本地RabbitMQ服务器,用户名和密码都是"guest",虚拟主机是"/"。创建了一个名为"my_exchange"的topic类型交换机,并发送了一条消息到该交换机上,使用了"my_routing_key"作为路由键。


    消费者示例代码

    创建RabbitMQ连接、创建通道、声明交换机、以及接收消息:

    import com.rabbitmq.client.*;
    
    public class RabbitMQExample {
        private final static String QUEUE_NAME = "my_queue";
        private final static String EXCHANGE_NAME = "my_exchange";
        private final static String ROUTING_KEY = "my_routing_key";
    
        public static void main(String[] args) throws Exception {
            // 创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            factory.setPort(5672);
            factory.setUsername("guest");
            factory.setPassword("guest");
            factory.setVirtualHost("/");
    
            // 创建连接
            Connection connection = factory.newConnection();
    
            // 创建通道
            Channel channel = connection.createChannel();
    
            // 声明交换机
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
    
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            // 绑定队列到交换机
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
    
            // 创建消费者
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println("Received message: " + message);
                }
            };
    
            // 开始消费消息
            channel.basicConsume(QUEUE_NAME, true, consumer);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    以上示例代码演示了如何创建一个RabbitMQ连接、创建通道、声明一个类型为TOPIC的交换机、以及接收消息的过程。你可以根据自己的需求修改相关参数和逻辑。

    扩展:

    RabbitMQ交换机的四种类型:直连交换机、主题交换机、头交换机和扇形交换机

    1. 直连交换机(Direct Exchange):直连交换机根据消息的路由键(Routing Key)将消息发送到与之绑定的队列中。路由键与队列名完全匹配时,消息将被路由到该队列。直连交换机是最简单的交换机类型。
    
    2. 主题交换机(Topic Exchange):主题交换机将消息按照模式匹配的方式发送到与之绑定的队列中。主题交换机支持通配符,可以将路由键与模式进行匹配,例如“*”代表一个单词,“#”代表零个或多个单词。主题交换机可以实现灵活的消息路由。
    
    3. 头交换机(Headers Exchange):头交换机根据消息头(Header)中的键值对进行匹配,将消息发送到与之绑定的队列中。头交换机的路由规则比较复杂,性能也相对较低,一般不常用。
    
    4. 扇形交换机(Fanout Exchange):扇形交换机将消息发送到所有与之绑定的队列中,忽略路由键。扇形交换机是一种广播式的交换机,适用于将消息发送给多个消费者的场景。
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    RabbitMQ交换机的四种类型:直连交换机、主题交换机、头交换机和扇形交换机的示例代码

    1. 直连交换机示例代码:
    // 创建连接和通道
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    
    // 定义交换机名称和类型
    String exchangeName = "direct_exchange";
    String exchangeType = "direct";
    
    // 声明交换机
    channel.exchangeDeclare(exchangeName, exchangeType);
    
    // 发送消息到交换机
    String message = "Hello, direct exchange!";
    channel.basicPublish(exchangeName, "direct_routing_key", null, message.getBytes());
    
    // 关闭连接和通道
    channel.close();
    connection.close();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    1. 主题交换机示例代码:
    // 创建连接和通道
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    
    // 定义交换机名称和类型
    String exchangeName = "topic_exchange";
    String exchangeType = "topic";
    
    // 声明交换机
    channel.exchangeDeclare(exchangeName, exchangeType);
    
    // 发送消息到交换机
    String message = "Hello, topic exchange!";
    channel.basicPublish(exchangeName, "topic.routing.key", null, message.getBytes());
    
    // 关闭连接和通道
    channel.close();
    connection.close();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    1. 头交换机示例代码:
    // 创建连接和通道
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    
    // 定义交换机名称和类型
    String exchangeName = "headers_exchange";
    String exchangeType = "headers";
    
    // 声明交换机
    channel.exchangeDeclare(exchangeName, exchangeType);
    
    // 设置消息头
    Map<String, Object> headers = new HashMap<>();
    headers.put("key1", "value1");
    headers.put("key2", "value2");
    
    // 发送消息到交换机
    String message = "Hello, headers exchange!";
    AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().headers(headers).build();
    channel.basicPublish(exchangeName, "", properties, message.getBytes());
    
    // 关闭连接和通道
    channel.close();
    connection.close();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    1. 扇形交换机示例代码:
    // 创建连接和通道
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    
    // 定义交换机名称和类型
    String exchangeName = "fanout_exchange";
    String exchangeType = "fanout";
    
    // 声明交换机
    channel.exchangeDeclare(exchangeName, exchangeType);
    
    // 发送消息到交换机
    String message = "Hello, fanout exchange!";
    channel.basicPublish(exchangeName, "", null, message.getBytes());
    
    // 关闭连接和通道
    channel.close();
    connection.close();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
  • 相关阅读:
    汇编语言实验7:子程序结构设计
    GIS、GPS、RS综合应用
    卫星星座1:Starlink星座
    Java安全—CommonsCollections7
    软件工程(Software Engineering)
    jersey跨域文件上传
    Go实现日志2——支持结构化和hook
    Open3D(C++) 点到平面的ICP算法实现点云精配准
    【Java篇】备战面试——你真的了解“数组”的吗?
    【面试题精讲】接口和抽象类有什么共同点和区别?
  • 原文地址:https://blog.csdn.net/weixin_42594143/article/details/133900005