目录
C1和C2属于竞争关系,一个消息只有一个消费者可以取到。

代码部分只需要用两个消费者进程监听同一个队里即可。
两个消费者呈现竞争关系。
用一个生产者推送10条消息
- for(int i=0;i<10;i++)
- {
- String body=i+"hello rabbitmq!!!";
- channel.basicPublish("","work_queues",null,body.getBytes());
- }
两个监听的消费者接收情况如下。


一个生产者发送消息后有两个消费者可以收到消息。
生产者把消息发给交换机,交换机再把消息通过Routes路由分发给不同的队列。

- //发送消息
- public class producer_PubSub {
- public static void main(String[] args) throws IOException, TimeoutException {
- //1.创建连接工厂
- ConnectionFactory factory=new ConnectionFactory();
- //2.设置参数
- factory.setHost(""); //设置ip地址。默认为127.0.0.1
- factory.setPort(5672); //端口 默认值5672
- factory.setVirtualHost("/itcast"); //设置虚拟机 默认值/
- factory.setUsername("yhy"); //用户名,默认值guest
- factory.setPassword(""); //密码,默认值guest
- //3.创建连接Connection
- Connection connection = factory.newConnection();
- //4.创建Channel
- Channel channel = connection.createChannel();
-
- /*
- * exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map
arguments) - * 参数:
- * 1.exchange : 交换价名称
- * 2.type : 交换机类型 ,有四种
- * DIRECT("direct"), 定向
- FANOUT("fanout"), 扇形(广播),发送消息到每一个与之绑定队列
- TOPIC("topic"), 通配符的方式
- HEADERS("headers"); 参数匹配
- *3.durable :是否持久化
- * 4.autoDelete:是否自动删除
- * 5.internal: 内部使用。一般false
- * 6.arguments:参数
- * */
- //5.创建交换机
- String exchangeName="test_fanout";
- channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);
- //6.创建队列
- String queue1Name="test_fanout_queue1";
- String queue2Name="test_fanout_queue2";
- channel.queueDeclare(queue1Name,true,false,false,null);
- channel.queueDeclare(queue2Name,true,false,false,null);
-
- /*
- * queueBind(String queue, String exchange, String routingKey)
- * 参数:
- * queue:队列名
- * exchange:交换机名称
- * routingKey:路由键,绑定规则
- * 如果交换机类型为fanout,routingKey设置为""
- * */
- //7.绑定队列和交换机
- channel.queueBind(queue1Name,exchangeName,"");
- channel.queueBind(queue2Name,exchangeName,"");
-
- String body="日志信息:调用了findAll方法";
- //8.发送消息
- channel.basicPublish(exchangeName,"",null,body.getBytes());
- //9.释放资源
- channel.close();
- connection.close();
- }
- }
运行之后两个队列里面就会多一条消息

两个消费者的代码大同小异,只是绑定的队列名不同,这里只给其中一个
- public class consumer_PubSub1 {
- public static void main(String[] args) throws IOException, TimeoutException {
- //1.创建连接工厂
- ConnectionFactory factory=new ConnectionFactory();
- //2.设置参数
- factory.setHost(""); //设置ip地址。默认为127.0.0.1
- factory.setPort(5672); //端口 默认值5672
- factory.setVirtualHost("/itcast"); //设置虚拟机 默认值/
- factory.setUsername("yhy"); //用户名,默认值guest
- factory.setPassword(""); //密码,默认值guest
- //3.创建连接Connection
- Connection connection = factory.newConnection();
- //4.创建Channel
- Channel channel = connection.createChannel();
-
- String queue1Name="test_fanout_queue1";
- String queue2Name="test_fanout_queue2";
- /*
- * basicConsume(String queue, boolean autoAck, Consumer callback)
- * 参数:
- * 1.队列名称
- * 2.autoAck:是否自动确认
- * 3.callback:回调对象
- * */
- //6.接收消息
- Consumer consumer=new DefaultConsumer(channel){
- /*
- * 回调方法,当收到消息后,会自动执行该方法
- * 1.consumerTag:标识
- * 2.envelope :获取一些信息,交换机,路由key...
- * 3.properties: 配置信息
- * 4.body: 数据
- * */
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- // System.out.println("consumerTag:"+consumerTag);
- // System.out.println("Exchange:"+envelope.getExchange());
- // System.out.println("RoutingKey:"+envelope.getRoutingKey());
- // System.out.println("properties:"+properties);
- System.out.println("body:"+new String(body));
- System.out.println("将日志信息打印到控制台......");
- }
- };
- channel.basicConsume(queue1Name,true,consumer);
-
- //不需要关闭资源
- }
- }
控制台输出有



对于特定级别的信息会发送到别的队列,如上图的error,在发送消息时也会有一个routing,只要和后面的队列对应上就可以发送到对应队列。
生产者代码:
- //发送消息
- public class producer_Routing {
- public static void main(String[] args) throws IOException, TimeoutException {
- //1.创建连接工厂
- ConnectionFactory factory=new ConnectionFactory();
- //2.设置参数
- factory.setHost(""); //设置ip地址。默认为127.0.0.1
- factory.setPort(5672); //端口 默认值5672
- factory.setVirtualHost("/itcast"); //设置虚拟机 默认值/
- factory.setUsername("yhy"); //用户名,默认值guest
- factory.setPassword(""); //密码,默认值guest
- //3.创建连接Connection
- Connection connection = factory.newConnection();
- //4.创建Channel
- Channel channel = connection.createChannel();
-
- /*
- * exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map
arguments) - * 参数:
- * 1.exchange : 交换价名称
- * 2.type : 交换机类型 ,有四种
- * DIRECT("direct"), 定向
- FANOUT("fanout"), 扇形(广播),发送消息到每一个与之绑定队列
- TOPIC("topic"), 通配符的方式
- HEADERS("headers"); 参数匹配
- *3.durable :是否持久化
- * 4.autoDelete:是否自动删除
- * 5.internal: 内部使用。一般false
- * 6.arguments:参数
- * */
- //5.创建交换机
- String exchangeName="test_direct";
- channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null);
- //6.创建队列
- String queue1Name="test_direct_queue1";
- String queue2Name="test_direct_queue2";
- channel.queueDeclare(queue1Name,true,false,false,null);
- channel.queueDeclare(queue2Name,true,false,false,null);
-
- /*
- * queueBind(String queue, String exchange, String routingKey)
- * 参数:
- * queue:队列名
- * exchange:交换机名称
- * routingKey:路由键,绑定规则
- * 如果交换机类型为fanout,routingKey设置为""
- * */
- //7.绑定队列和交换机
- //队列1绑定error
-
- channel.queueBind(queue1Name,exchangeName,"error");
-
- //队列2绑定error,info,warning
- channel.queueBind(queue2Name,exchangeName,"info");
- channel.queueBind(queue2Name,exchangeName,"error");
- channel.queueBind(queue2Name,exchangeName,"warning");
-
- String body="日志信息:调用了findAll方法,级别:info,error,warning";
- //8.发送消息
- channel.basicPublish(exchangeName,"error",null,body.getBytes());
- //9.释放资源
- channel.close();
- connection.close();
- }
- }
消费者代码(两个消费者就绑定队列名不一样):
- public class consumer_Routing1 {
- public static void main(String[] args) throws IOException, TimeoutException {
- //1.创建连接工厂
- ConnectionFactory factory=new ConnectionFactory();
- //2.设置参数
- factory.setHost(""); //设置ip地址。默认为127.0.0.1
- factory.setPort(5672); //端口 默认值5672
- factory.setVirtualHost("/itcast"); //设置虚拟机 默认值/
- factory.setUsername("yhy"); //用户名,默认值guest
- factory.setPassword(""); //密码,默认值guest
- //3.创建连接Connection
- Connection connection = factory.newConnection();
- //4.创建Channel
- Channel channel = connection.createChannel();
-
- String queue1Name="test_direct_queue1";
- String queue2Name="test_direct_queue2";
- /*
- * basicConsume(String queue, boolean autoAck, Consumer callback)
- * 参数:
- * 1.队列名称
- * 2.autoAck:是否自动确认
- * 3.callback:回调对象
- * */
- //6.接收消息
- Consumer consumer=new DefaultConsumer(channel){
- /*
- * 回调方法,当收到消息后,会自动执行该方法
- * 1.consumerTag:标识
- * 2.envelope :获取一些信息,交换机,路由key...
- * 3.properties: 配置信息
- * 4.body: 数据
- * */
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- // System.out.println("consumerTag:"+consumerTag);
- // System.out.println("Exchange:"+envelope.getExchange());
- // System.out.println("RoutingKey:"+envelope.getRoutingKey());
- // System.out.println("properties:"+properties);
- System.out.println("body:"+new String(body));
- System.out.println("将日志信息存储到数据库");
- }
- };
- channel.basicConsume(queue1Name,true,consumer);
-
- //不需要关闭资源
- }
- }

发送消息时设定的routingkey会和后面的routingkey进行匹配。

生产者代码:
- //发送消息
- public class producer_Topic {
- public static void main(String[] args) throws IOException, TimeoutException {
- //1.创建连接工厂
- ConnectionFactory factory=new ConnectionFactory();
- //2.设置参数
- factory.setHost(""); //设置ip地址。默认为127.0.0.1
- factory.setPort(5672); //端口 默认值5672
- factory.setVirtualHost("/itcast"); //设置虚拟机 默认值/
- factory.setUsername("yhy"); //用户名,默认值guest
- factory.setPassword(""); //密码,默认值guest
- //3.创建连接Connection
- Connection connection = factory.newConnection();
- //4.创建Channel
- Channel channel = connection.createChannel();
-
- /*
- * exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map
arguments) - * 参数:
- * 1.exchange : 交换价名称
- * 2.type : 交换机类型 ,有四种
- * DIRECT("direct"), 定向
- FANOUT("fanout"), 扇形(广播),发送消息到每一个与之绑定队列
- TOPIC("topic"), 通配符的方式
- HEADERS("headers"); 参数匹配
- *3.durable :是否持久化
- * 4.autoDelete:是否自动删除
- * 5.internal: 内部使用。一般false
- * 6.arguments:参数
- * */
- //5.创建交换机
- String exchangeName="test_topic";
- channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);
- //6.创建队列
- String queue1Name="test_topic_queue1";
- String queue2Name="test_topic_queue2";
- channel.queueDeclare(queue1Name,true,false,false,null);
- channel.queueDeclare(queue2Name,true,false,false,null);
-
- /*
- * queueBind(String queue, String exchange, String routingKey)
- * 参数:
- * queue:队列名
- * exchange:交换机名称
- * routingKey:路由键,绑定规则
- * 如果交换机类型为fanout,routingKey设置为""
- * */
- //7.绑定队列和交换机
- // routing key 系统的名称.日志的级别。
- //需求:所有error级别的日志存入数据库,所有order系统的日志存入数据库
- channel.queueBind(queue1Name,exchangeName,"#.error");
- channel.queueBind(queue1Name,exchangeName,"order.*");
- channel.queueBind(queue2Name,exchangeName,"*.*");
-
- String body="日志信息:调用了findAll方法";
- //8.发送消息
- channel.basicPublish(exchangeName,"goods.error",null,body.getBytes());
- //9.释放资源
- channel.close();
- connection.close();
- }
- }
消费者代码
- public class consumer_Topic1 {
- public static void main(String[] args) throws IOException, TimeoutException {
- //1.创建连接工厂
- ConnectionFactory factory=new ConnectionFactory();
- //2.设置参数
- factory.setHost(""); //设置ip地址。默认为127.0.0.1
- factory.setPort(5672); //端口 默认值5672
- factory.setVirtualHost("/itcast"); //设置虚拟机 默认值/
- factory.setUsername("yhy"); //用户名,默认值guest
- factory.setPassword(""); //密码,默认值guest
- //3.创建连接Connection
- Connection connection = factory.newConnection();
- //4.创建Channel
- Channel channel = connection.createChannel();
-
- String queue1Name="test_topic_queue1";
- String queue2Name="test_topic_queue2";
- /*
- * basicConsume(String queue, boolean autoAck, Consumer callback)
- * 参数:
- * 1.队列名称
- * 2.autoAck:是否自动确认
- * 3.callback:回调对象
- * */
- //6.接收消息
- Consumer consumer=new DefaultConsumer(channel){
- /*
- * 回调方法,当收到消息后,会自动执行该方法
- * 1.consumerTag:标识
- * 2.envelope :获取一些信息,交换机,路由key...
- * 3.properties: 配置信息
- * 4.body: 数据
- * */
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- // System.out.println("consumerTag:"+consumerTag);
- // System.out.println("Exchange:"+envelope.getExchange());
- // System.out.println("RoutingKey:"+envelope.getRoutingKey());
- // System.out.println("properties:"+properties);
- System.out.println("body:"+new String(body));
- System.out.println("将日志信息存储到数据库");
- }
- };
- channel.basicConsume(queue1Name,true,consumer);
-
- //不需要关闭资源
- }
- }
