- "1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchemainstance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
- http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0modelVersion>
- <groupId>com.dxwgroupId>
- <artifactId>rabbitmq-producerartifactId>
- <version>1.0-SNAPSHOTversion>
- <dependencies>
- <dependency>
- <groupId>com.rabbitmqgroupId>
- <artifactId>amqp-clientartifactId>
- <version>5.6.0version>
- dependency>
- dependencies>
- project>
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
- /**
- * 生产者:发送消息
- */
- public class Producer_HelloWorld {
- public static void main(String[] args) throws
- IOException, TimeoutException {
- //1、创建连接工厂
- ConnectionFactory factory = new
- ConnectionFactory();
- //2、设置参数
- factory.setHost("localhost");//ip 默认localhost
- factory.setPort(5672);//端口 默认5672
- factory.setVirtualHost("/dxw");//虚拟机 默认/
- factory.setUsername("dxw");//用户名 默认guest
- factory.setPassword("1234");//密码 默认guest
- //3、创建连接
- Connection connection = factory.newConnection();
- //4、创建Channel
- Channel channel = connection.createChannel();
- //5、创建队列
- /*
- * 参数解释:
- * queueDeclare(String queue,
- * boolean durable,
- * boolean exclusive,
- * boolean autoDelete,
- * Map
arguments) - * 1. queue:队列名称
- * 如果没有一个名字叫hello_world的队列,则会创建该队
- 列,如果有则不会创建
- * 2. durable:是否持久化,当mq重启之后,队列中消息还在
- * 3. exclusive:
- * 是否独占。只能有一个消费者监听这队列
- * 当Connection关闭时,是否删除队列
- * 4. autoDelete:是否自动删除。当没有Consumer时,自动
- 删除掉
- * 5. arguments:参数。
- */
- channel.queueDeclare("hello_world",true,false,false,null)
- ;
- //6、发送消息
- /*
- * 参数解释:
- * basicPublish(String exchange,
- * String routingKey,
- * BasicProperties props,
- * byte[] body)
- * 1. exchange:交换机名称。简单模式下交换机会使用默认的
- ""
- * 2. routingKey:路由名称
- * 3. props:配置信息
- * 4. body:发送消息数据
- */
- String body = "hello rabbitmq~~~";
- channel.basicPublish("","hello_world",null,body.getBytes(
- ));
- //7、释放资源
- //channel.close();
- //connection.close();
- }
- }
- "1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchemainstance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
- http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0modelVersion>
- <groupId>com.dxwgroupId>
- <artifactId>rabbitmq-consumerartifactId>
- <version>1.0-SNAPSHOTversion>
- <dependencies>
- <dependency>
- <groupId>com.rabbitmqgroupId>
- <artifactId>amqp-clientartifactId>
- <version>5.6.0version>
- dependency>
- dependencies>
- project>
- import com.rabbitmq.client.*;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
- /**
- * 消费者:接收消息
- */
- public class Consumer_HelloWorld {
- public static void main(String[] args) throws
- IOException, TimeoutException {
- //1、创建连接工厂
- ConnectionFactory factory = new
- ConnectionFactory();
- //2. 设置参数
- factory.setHost("localhost");//ip 默认值 localhost
- factory.setPort(5672); //端口 默认值 5672
- factory.setVirtualHost("/dxw");//虚拟机 默认/
- factory.setUsername("dxw");//用户名 默认guest
- factory.setPassword("1234");//密码 默认guest
- //3. 创建连接 Connection
- Connection connection = factory.newConnection();
- //4. 创建Channel
- Channel channel = connection.createChannel();
- //5、创建队列
- /*
- * 参数解释:
- * queueDeclare(String queue,
- * boolean durable,
- * boolean exclusive,
- * boolean autoDelete,
- * Map
arguments) - * 1. queue:队列名称
- * 如果没有一个名字叫hello_world的队列,则会创建该
- 队列,如果有则不会创建
- * 2. durable:是否持久化,当mq重启之后,队列中消息还在
- * 3. exclusive:
- * 是否独占。只能有一个消费者监听这队列
- * 当Connection关闭时,是否删除队列
- * 4. autoDelete:是否自动删除。当没有Consumer时,自
- 动删除掉
- * 5. arguments:参数。
- */
- channel.queueDeclare("hello_world",true,false,false,null)
- ;
- //6、接收消息
- Consumer consumer = new DefaultConsumer(channel){
- /*
- 回调方法,当收到消息后,会自动执行该方法
- 1. consumerTag:标识
- 2. envelope:获取一些信息,交换机,路由key...
- 3. properties:配置信息
- 4. body:数据
- */
- @Override
- public void handleDelivery(String consumerTag,
- Envelope envelope, AMQP.BasicProperties properties, byte[]
- body) throws IOException {
- System.out.println("consumerTag:"+consumerTag);
- System.out.println("Exchange:"+envelope.getExchange());
- System.out.println("RoutingKey:"+envelope.getRoutingKey(
- ));
- System.out.println("properties:"+properties);
- System.out.println("body:"+new
- String(body));
- }
- };
- /*
- * 参数解释:
- * basicConsume(String queue, boolean autoAck,
- Consumer callback)
- * 1. queue:队列名称
- * 2. autoAck:是否自动确认
- * 3. callback:回调对象
- */
- channel.basicConsume("hello_world",true,consumer);
- //关闭资源?不要
- }
- }