RabbitMQ 是一种消息代理软件,它支持消息队列、消息路由和发布/订阅模式等功能。下面是 RabbitMQ 中几个重要概念及它们之间的联系:
1. 交换机(Exchange):用于接收生产者发送的消息并将其路由到一个或多个队列。交换机有四种类型:直连交换机、主题交换机、头交换机和扇形交换机。
2. 队列(Queue):用于存储消息的容器。消费者从队列中获取消息并进行处理。
3. 虚拟主机(Virtual Host):RabbitMQ 允许在一个物理服务器上创建多个虚拟主机,每个虚拟主机相当于一个独立的 RabbitMQ 服务器,拥有自己的交换机、队列和绑定等。
4. 连接(Connection):生产者或消费者与 RabbitMQ 服务器之间的连接。
5. 通道(Channel):在连接上创建的一个虚拟连接,用于发送和接收消息。一个连接可以有多个通道。
6. 路由键(Routing Key):用于将消息从交换机路由到队列的规则。
它们之间的联系如下:
1. 生产者将消息发送到交换机,交换机根据路由键将消息路由到一个或多个队列。
2. 消费者从队列中获取消息并进行处理。
3. 虚拟主机可以将多个交换机、队列和绑定等逻辑上隔离开来,提高了系统的安全性和可靠性。
4. 连接是生产者或消费者与 RabbitMQ 服务器之间的通信通道,一个连接可以创建多个通道。
5. 通道是在连接上创建的虚拟连接,用于发送和接收消息。一个连接可以有多个通道。
6. 路由键是将消息从交换机路由到队列的规则,可以根据不同的路由键将消息路由到不同的队列。
ConnectionFactory类是RabbitMQ Java客户端库中的一个类,用于创建RabbitMQ连接。常用属性和方法如下:
属性:
- host:RabbitMQ服务器的主机名,默认为localhost。
- port:RabbitMQ服务器的端口号,默认为5672。
- username:连接RabbitMQ服务器的用户名,默认为guest。
- password:连接RabbitMQ服务器的密码,默认为guest。
- virtualHost:连接RabbitMQ服务器的虚拟主机,默认为/。
- connectionTimeout:连接超时时间,默认为0(无限制)。
- requestedHeartbeat:请求的心跳超时时间,默认为0(无限制)。
方法:
- newConnection():创建一个新的RabbitMQ连接。
- createChannel():创建一个新的通道。
Channel类的常用方法包括:
1. basicPublish:用于将消息发送到指定的交换机和路由键。
参数含义:
- exchange:消息发送到的交换机名称
- routingKey:消息发送到的队列名称
- props:消息的属性
- body:消息体
作用:将消息发送到指定的交换机上,等待被消费者消费。
2. basicConsume:用于从指定队列中消费消息。
参数含义:
- queue:要消费的队列名称
- autoAck:是否自动确认消息
- callback:消费者接收到消息后的回调函数
作用:订阅队列中的消息,等待被消费者消费。
3. basicAck:用于确认已经处理完毕的消息。
参数含义:
- deliveryTag:消息标签
- multiple:是否批量确认
作用:确认消息已被消费,告诉RabbitMQ可以删除该消息。
4. basicNack:用于拒绝处理某个消息,并可以选择是否重新将消息放回队列。
参数含义:
- deliveryTag:消息标签
- multiple:是否批量拒绝
- requeue:是否重新入队列
作用:拒绝消息,并可选择是否重新入队列。
5. basicReject:用于拒绝处理某个消息,并可以选择是否重新将消息放回队列。
参数含义:
- deliveryTag:消息标签
- requeue:是否重新入队列
作用:拒绝消息,并重新入队列。
6. queueDeclare:用于声明一个队列。
参数含义:
- queue:队列名称
- durable:是否持久化
- exclusive:是否独占
- autoDelete:是否自动删除
- arguments:队列参数
作用:声明队列,如果队列不存在则创建。
7 exchangeDeclare:用于声明一个交换机。
参数含义:
- exchange:交换机名称
- type:交换机类型
- durable:是否持久化
- autoDelete:是否自动删除
- internal:是否内部使用
- arguments:交换机参数
作用:声明交换机,如果交换机不存在则创建。
8. queueBind:用于将队列绑定到指定的交换机和路由键。
参数含义:
- queue:队列名称
- exchange:交换机名称
- routingKey:路由键
- arguments:绑定参数
作用:将队列绑定到交换机上,等待被消费者消费。
Channel类的作用是提供了与RabbitMQ服务器进行通信的通道,通过该通道可以进行消息的发送和接收,以及队列和交换机的声明和绑定等操作。同时,Channel类还提供了一些方法用于控制消息的确认和拒绝,以及消息的持久化等高级特性。
DefaultConsumer是RabbitMQ客户端库中的一个类,它实现了Consumer接口,用于处理从RabbitMQ服务器接收到的消息。
DefaultConsumer类的常用方法包括:
1. handleDelivery:处理从RabbitMQ服务器接收到的消息。
void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException;
参数含义:
- consumerTag:消费者标签,用于标识消费者。
- envelope:消息的信封,包含了消息的元数据,如交换机、路由键等。
- properties:消息的属性,包含了消息的元数据,如消息ID、消息类型等。
- body:消息的内容,即消息体。
2. handleShutdownSignal:处理与RabbitMQ服务器的连接关闭信号。
void handleShutdownSignal(String consumerTag, ShutdownSignalException sig);
参数含义:
- consumerTag:消费者标签,用于标识消费者。
- sig:关闭信号异常,包含了关闭的原因和异常信息。
3. handleConsumeOk:处理与RabbitMQ服务器的消费者注册成功信号。
void handleConsumeOk(String consumerTag);
参数含义:
- consumerTag:消费者标签,用于标识消费者。
4. handleCancelOk:处理与RabbitMQ服务器的消费者取消注册成功信号。
void handleCancelOk(String consumerTag);
参数含义:
- consumerTag:消费者标签,用于标识消费者。
在handleDelivery方法中,我们可以根据需要对消息进行处理,例如解析消息内容、存储消息等。
创建RabbitMQ连接、创建通道、声明交换机和发送消息的完整示例代码:
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RabbitMQExample {
private static final String QUEUE_NAME = "my_queue";
private static final String EXCHANGE_NAME = "my_exchange";
private static final String ROUTING_KEY = "my_routing_key";
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");
try {
// 创建连接
Connection connection = factory.newConnection();
// 创建通道
Channel channel = connection.createChannel();
// 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
// 发送消息
String message = "Hello, RabbitMQ!";
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, message.getBytes());
System.out.println("Sent message: " + message);
// 关闭通道和连接
channel.close();
connection.close();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}
在这个示例中,使用了默认的本地RabbitMQ服务器,用户名和密码都是"guest",虚拟主机是"/"。创建了一个名为"my_exchange"的topic类型交换机,并发送了一条消息到该交换机上,使用了"my_routing_key"作为路由键。
创建RabbitMQ连接、创建通道、声明交换机、以及接收消息:
import com.rabbitmq.client.*;
public class RabbitMQExample {
private final static String QUEUE_NAME = "my_queue";
private final static String EXCHANGE_NAME = "my_exchange";
private final static String ROUTING_KEY = "my_routing_key";
public static void main(String[] args) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");
// 创建连接
Connection connection = factory.newConnection();
// 创建通道
Channel channel = connection.createChannel();
// 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
// 创建消费者
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Received message: " + message);
}
};
// 开始消费消息
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
以上示例代码演示了如何创建一个RabbitMQ连接、创建通道、声明一个类型为TOPIC的交换机、以及接收消息的过程。你可以根据自己的需求修改相关参数和逻辑。
1. 直连交换机(Direct Exchange):直连交换机根据消息的路由键(Routing Key)将消息发送到与之绑定的队列中。路由键与队列名完全匹配时,消息将被路由到该队列。直连交换机是最简单的交换机类型。
2. 主题交换机(Topic Exchange):主题交换机将消息按照模式匹配的方式发送到与之绑定的队列中。主题交换机支持通配符,可以将路由键与模式进行匹配,例如“*”代表一个单词,“#”代表零个或多个单词。主题交换机可以实现灵活的消息路由。
3. 头交换机(Headers Exchange):头交换机根据消息头(Header)中的键值对进行匹配,将消息发送到与之绑定的队列中。头交换机的路由规则比较复杂,性能也相对较低,一般不常用。
4. 扇形交换机(Fanout Exchange):扇形交换机将消息发送到所有与之绑定的队列中,忽略路由键。扇形交换机是一种广播式的交换机,适用于将消息发送给多个消费者的场景。
// 创建连接和通道
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 定义交换机名称和类型
String exchangeName = "direct_exchange";
String exchangeType = "direct";
// 声明交换机
channel.exchangeDeclare(exchangeName, exchangeType);
// 发送消息到交换机
String message = "Hello, direct exchange!";
channel.basicPublish(exchangeName, "direct_routing_key", null, message.getBytes());
// 关闭连接和通道
channel.close();
connection.close();
// 创建连接和通道
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 定义交换机名称和类型
String exchangeName = "topic_exchange";
String exchangeType = "topic";
// 声明交换机
channel.exchangeDeclare(exchangeName, exchangeType);
// 发送消息到交换机
String message = "Hello, topic exchange!";
channel.basicPublish(exchangeName, "topic.routing.key", null, message.getBytes());
// 关闭连接和通道
channel.close();
connection.close();
// 创建连接和通道
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 定义交换机名称和类型
String exchangeName = "headers_exchange";
String exchangeType = "headers";
// 声明交换机
channel.exchangeDeclare(exchangeName, exchangeType);
// 设置消息头
Map<String, Object> headers = new HashMap<>();
headers.put("key1", "value1");
headers.put("key2", "value2");
// 发送消息到交换机
String message = "Hello, headers exchange!";
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().headers(headers).build();
channel.basicPublish(exchangeName, "", properties, message.getBytes());
// 关闭连接和通道
channel.close();
connection.close();
// 创建连接和通道
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 定义交换机名称和类型
String exchangeName = "fanout_exchange";
String exchangeType = "fanout";
// 声明交换机
channel.exchangeDeclare(exchangeName, exchangeType);
// 发送消息到交换机
String message = "Hello, fanout exchange!";
channel.basicPublish(exchangeName, "", null, message.getBytes());
// 关闭连接和通道
channel.close();
connection.close();