• RabbitMQ工作模式——Routing路由模式


    1.Routing路由模式

    在这里插入图片描述
    Routing生产者代码

    public class Producer_Routing {
    	public static void main(String[] args) throws IOException, TimeoutException {
    		//1.创建连接工厂
    		ConnectionFactory factory = new ConnectionFactory();
    		//2.设置参数
    		factory.setHost("172.16.98.133"); ip 默认值 localhost
    		factory.setPort(5672);//端口 默认值5672
    		factory.setVirtualHost("/itcast");//虚拟机 默认值
    		factory.setUsername("heima");//用户名 默认guest
    		factory.setPassword("heima");//密码 默认值 guest
    		//3.创建连接 Connection
    		Connection connection = factory.newConnection();
    		//4.创建Channel
    		Channel channel = connection.creatChannel();
    		/*
    		exchange(String exchange,String type,boolean durable,boolean autoDelete,boolean internal,Map arguments)
    		参数:
    		1.exchange:交换机名称
    		2.type:交换机类型
    			DIRECT("direct"),:定向
    			FANOUT("fanout"),:扇形(广播)发送消息到每一个与之绑定的队列
    			TOPIC("topic"),:通配符方式
    			HEADERS("headers");:参数匹配
    		3.durable:是否持久化
    		4.autoDelete:自动删除
    		5.internal:内部使用。一般为false
    		6.arguments:参数,一般设为null
    		*/
    		//5.创建交换机
    		String exchangeName = "test_direct";
    		channel.exchangeDeclare(exchangeName,BuiltinExchangeType.DIRECT,true,false,false,null);
    		//6.创建队列
    		String queue1Name = "test_direct_queue1";
    		String queue2Name = "test_direct_queue2";
    		channel.queueDeclare(queue1Name,true,false,false,null);
    		channel.queueDeclare(queue2Name,true,false,false,null);
    		//7.绑定队列和交换机
    		/*
    		queueBind(String queue,String exchange,String routingKey)
    		参数:
    			1.queue:队列名称
    			2.exchange:交换机名称
    			3.routingKey:路由键,绑定规则
    				如果交换机的类型为:fanout,routingKey设置为空字符串
    		*/
    		//队列1的绑定 error
    		channel.queueBind(queue1Name,exchangeName,"error");
    		//队列2的绑定 info error warning
    		channel.queueBind(queue2Name,exchangeName,"info");
    		channel.queueBind(queue2Name,exchangeName,"error");
    		channel.queueBind(queue2Name,exchangeName,"warning");
    		//8.发送消息
    		String body = "日志信息,张三调用了findAll方法...日志级别:info...";	
    		channel.basicPublish(exchangeName,"info",null,body.getBytes());
    		//9.释放资源
    		channel.close();
    		connection.close();
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59

    Routing1消费者代码

    public class Consumer_Routing1 {
    	public static void main(String[] args) throws IOException, TimeoutException {
    		//1.创建连接工厂
    		ConnectionFactory factory = new ConnectionFactory();
    		//2.设置参数
    		factory.setHost("172.16.98.133"); ip 默认值 localhost
    		factory.setPort(5672);//端口 默认值5672
    		factory.setVirtualHost("/itcast");//虚拟机 默认值
    		factory.setUsername("heima");//用户名 默认guest
    		factory.setPassword("heima");//密码 默认值 guest
    		//3.创建连接 Connection
    		Connection connection = factory.newConnection();
    		//4.创建Channel
    		Channel channel = connection.creatChannel();
    		
    		String queue1Name = "test_direct_queue1";
    		String queue2Name = "test_direct_queue2";
    		
    		/*
    			basicConsume(String queue,boolean autoAck,Consumer callback)
    			参数:
    			1.queue:队列名称
    			2.autoAck:是否自动确认
    			3.callback:回调对象
    		*/
    		//接收消息
    		Consumer consumer = new DefaultConsumer(channel){
    			/*
    				回调方法,当收到消息后会自动执行该方法
    				1.consumerTag:标识
    				2.envelope:获取一些信息,交换机,路由key...
    				3.properties:配置信息
    				4.body:数据
    			*/
    			@Override
    			public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body){
    				System.out.println("consumerTag" + consumerTag);
    				System.out.println("Exchange" + envelope.getExchange());
    				System.out.println("RoutingKey" + envelope.getRoutingKey());
    				System.out.println("properties" + properties);
    				System.out.println("body" + new String(body));
    				System.out.println("将日志信息存储到数据库......");
    			}
    		};
    		channel.basicConsume("queue2Name",true,consumer);
    
    		//消费者不能关闭资源
    	}
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50

    Routing2消费者代码

    public class Consumer_Routing2 {
    	public static void main(String[] args) throws IOException, TimeoutException {
    		//1.创建连接工厂
    		ConnectionFactory factory = new ConnectionFactory();
    		//2.设置参数
    		factory.setHost("172.16.98.133"); ip 默认值 localhost
    		factory.setPort(5672);//端口 默认值5672
    		factory.setVirtualHost("/itcast");//虚拟机 默认值
    		factory.setUsername("heima");//用户名 默认guest
    		factory.setPassword("heima");//密码 默认值 guest
    		//3.创建连接 Connection
    		Connection connection = factory.newConnection();
    		//4.创建Channel
    		Channel channel = connection.creatChannel();
    		
    		String queue1Name = "test_direct_queue1";
    		String queue2Name = "test_direct_queue2";
    		
    		/*
    			basicConsume(String queue,boolean autoAck,Consumer callback)
    			参数:
    			1.queue:队列名称
    			2.autoAck:是否自动确认
    			3.callback:回调对象
    		*/
    		//接收消息
    		Consumer consumer = new DefaultConsumer(channel){
    			/*
    				回调方法,当收到消息后会自动执行该方法
    				1.consumerTag:标识
    				2.envelope:获取一些信息,交换机,路由key...
    				3.properties:配置信息
    				4.body:数据
    			*/
    			@Override
    			public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body){
    				System.out.println("consumerTag" + consumerTag);
    				System.out.println("Exchange" + envelope.getExchange());
    				System.out.println("RoutingKey" + envelope.getRoutingKey());
    				System.out.println("properties" + properties);
    				System.out.println("body" + new String(body));
    				System.out.println("将日志信息打印到控制台......");
    			}
    		};
    		channel.basicConsume("queue1Name",true,consumer);
    
    		//消费者不能关闭资源
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49

    在这里插入图片描述

  • 相关阅读:
    R语言使用lm函数构建带交互项的多元回归模型、使用step函数构建逐步回归模型筛选预测变量的最佳子集(step regression)
    智能驾驶功能软件平台设计规范第三部分:预测功能服务接口
    vue2中seo时使用vue-meta-info
    react_14
    STM32单片机-BKP和RTC
    鸿蒙(HarmonyOS)项目方舟框架(ArkUI)之Scroll容器组件
    JavaScript奇淫技巧:把JS编译成exe
    spring tool suit 安装 Lombok 步骤
    Mybatis plus无介绍快使用,MybatisPlus3.5版本设置批量插入附源码(十一)
    火炬之光无限-萌新记录
  • 原文地址:https://blog.csdn.net/weixin_44860226/article/details/133238286