目录
我们也可以使用MQ完成系统与系统之间得调用。




2.3 削峰填谷



既然MQ有优势也有劣势,那么使用MQ需要满足什么条件呢?
(1)生产者不需要从消费者处获得反馈。引入消息队列之前的直接调用,其接口的返回值应该为
空,这才让明明下层的动作还没做,上层却当成动作做完了继续往后走,即所谓异步成为了可能。
(2)容许短暂的不一致性。
(3)确实是用了有效果。即解耦、提速、削峰这些方面的收益,超过加入MQ,管理MQ这些成本。
查看前一篇的csdn。


>1.Producer:生产者【生产消息】>2. Consumer:消费者(从队列中获取消息)
>2. Connection:连接通道【生产者和消费者和Rabbit服务连接】
>3.channel:信道--由于Connection采用TCP协议,会造成资源占用高。相当于小型connection。资源消息比较小。
>4.Broker: RabbitMQ服务器
>5.Virtual Host:虚拟主机,小型RabbitMQ服务器。很多项目可以连接各自的虚拟主机。
>6.Exchange:交换机,把信息分发到相应的队列
>7.Queue:队列,存放信息的地方
RabbitMQ的官网:
Messaging that just works — RabbitMQ
提供了5种模式:
(1)简单模式--Hello (2)工作者模式--work queues

(3)发布订阅模式 (4)路由模式

(5)主题模式


P: 一个生产者C: 一个消费者Q: 队列
-
-
com.rabbitmq -
amqp-client -
5.13.1 -
-
- public class SimpleProduct {
- public static void main(String[] args) throws Exception {
- ConnectionFactory factory=new ConnectionFactory();
- //设置rabbitMQ服务器的地址 默认localhost
- factory.setHost("192.168.227.175");
- //设置rabbitMQ的端口号 默认5672
- factory.setPort(5672);
- //设置账号和密码 默认guest
- factory.setUsername("guest");
- factory.setPassword("guest");
- //设置虚拟主机名 默认为 /
- factory.setVirtualHost("/");
-
- //获取连接通道
- Connection connection=factory.newConnection();
- //获取channel信道
- Channel channel = connection.createChannel();
- //创建队列
- /*如果该队列名不存在则自动创建,存在则不创建
- * String queue,队列名
- * boolean durable,是否持久化
- * boolean exclusive,(独占)声明队列同一时间只能保 证一个连接,且该队列只有被这一个连接使用。
- * boolean autoDelete,是否自动删除
- * Map
arguments: 其他参数 - * */
- channel.queueDeclare("simple_queue",true,false,false,null);
- //发送消息到队列
- /* String exchange,把消息发给哪个交换机--简单模式没 有交换机""
- *String routingKey,消息绑定的路由key 如果为简单模 式 默认写为队列名称
- * BasicProperties props, 消息的属性
- * byte[] body: 消息的内容
- * */
- String msg="这是简单模式===================";
- channel.basicPublish("","simple_queue",null,msg.getBytes(StandardCharsets.UTF_8));
- channel.close();
- }
- public class SimpleConsumer {
- public static void main(String[] args) throws Exception {
- ConnectionFactory factory=new ConnectionFactory();
- factory.setPort(5672);
- factory.setUsername("guest");
- factory.setPassword("guest");
- factory.setHost("192.168.227.175");
-
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- //监听队列
- /* String queue,监听的队列名称
- *autoAck:是否自动确认消息
- * Consumer callback: 监听到消息后触发的回调函数
- */
- DefaultConsumer defaultConsumer=new DefaultConsumer(channel) {
- //一定有消息就会触发该方法
- // body:表示消息的内容
-
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- System.out.println("接受到的信息"+new String(body));
- }
- };
- channel.basicConsume("simple_queue",true,defaultConsumer);
-
- }
- }

P: 生产者C1: 消费者 1C2: 消费者 2Q: 队列消费者 1 和消费者 2 属于竞争关系,一个消息只会被一个消费者消费。
- public class WorkProduct {
- public static void main(String[] args) throws Exception {
- ConnectionFactory factory=new ConnectionFactory();
- //设置rabbitMQ服务器的地址 默认localhost
- factory.setHost("192.168.227.175");
- //设置rabbitMQ的端口号 默认5672
- factory.setPort(5672);
- //设置账号和密码 默认guest
- factory.setPassword("guest");
- factory.setUsername("guest");
- //设置虚拟主机名 默认为 /
- factory.setVirtualHost("/");
-
- //获取连接通道
- Connection connection = factory.newConnection();
- //获取channel信道
- Channel channel = connection.createChannel();
-
- //创建队列
- /*如果该队列名不存在则自动创建,存在则不创建
- * String queue,队列名
- * boolean durable,是否持久化
- * boolean exclusive,(独占)声明队列同一时间只能保 证一个连接,且该队列只有被这一个连接使用。
- * boolean autoDelete,是否自动删除
- * Map
arguments: 其他参数 - * */
- channel.queueDeclare("work_queue",true,false,false,null);
- for(int i=0;i<=10;i++){
- String msg="发送消息========="+i;
- //发送消息到队列
- /* String exchange,把消息发给哪个交换机--简单模式没 有交换机""
- *String routingKey,消息绑定的路由key 如果为简单模 式 默认写为队列名称
- * BasicProperties props, 消息的属性
- * byte[] body: 消息的内容
- * */
- channel.basicPublish("","work_queue",null,msg.getBytes(StandardCharsets.UTF_8));
-
- }
- connection.close();
- }
- }
- public class WorkConsumer01 {
- public static void main(String[] args) throws Exception{
- ConnectionFactory factory=new ConnectionFactory();
- //设置rabbitMQ服务器的地址 默认localhost
- factory.setHost("192.168.227.175");
- //设置rabbitMQ的端口号 默认5672
- factory.setPort(5672);
- //设置账号和密码 默认guest
- factory.setUsername("guest");
- factory.setPassword("guest");
- //设置虚拟主机名 默认为 /
- factory.setVirtualHost("/");
-
- //获取连接通道
- Connection connection = factory.newConnection();
- //获取channel信道
- Channel channel = connection.createChannel();
- //监听队列
- /* String queue,监听的队列名称
- *autoAck:是否自动确认消息
- * Consumer callback: 监听到消息后触发的回调函数
- */
- DefaultConsumer defaultConsumer=new DefaultConsumer(channel){
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- System.out.println("监听的信息"+new String(body));
- }
- };
-
- channel.basicConsume("work_queue",true,defaultConsumer);
- }
- }

p: producter 生产者x : exchange 交换机Q: 队列C1 和 C2: 消费者
- public class FaBuProduct {
- public static void main(String[] args) throws Exception{
- ConnectionFactory factory=new ConnectionFactory();
- //设置rabbitMQ服务器的地址 默认localhost
- factory.setHost("192.168.227.175");
- //设置rabbitMQ的端口号 默认5672
- factory.setPort(5672);
- //设置账号和密码 默认guest
- factory.setPassword("guest");
- factory.setUsername("guest");
- //设置虚拟主机名 默认为 /
- factory.setVirtualHost("/");
-
- //获取连接通道
- Connection connection = factory.newConnection();
- //获取channel信道
- Channel channel = connection.createChannel();
-
- //创建队列
- /*如果该队列名不存在则自动创建,存在则不创建
- * String queue,队列名
- * boolean durable,是否持久化
- * boolean exclusive,(独占)声明队列同一时间只能保 证一个连接,且该队列只有被这一个连接使用。
- * boolean autoDelete,是否自动删除
- * Map
arguments: 其他参数 - * */
- channel.queueDeclare("faBu_queue01",true,false,false,null);
- channel.queueDeclare("faBu_queue02",true,false,false,null);
-
- //创建交换机
- /*String exchange,交换机的名称
- BuiltinExchangeType type,交换机的种类
- boolean durable:是否持久化
- */
- channel.exchangeDeclare("faBu_exchange",BuiltinExchangeType.FANOUT,true);
-
- //交换机和队列绑定
- /*String queue,队列名
- String exchange,交换机名
- String routingKey 路由key 如果为发布订阅模式则无需路由key
- */
- channel.queueBind("faBu_queue01","faBu_exchange","");
- channel.queueBind("faBu_queue02","faBu_exchange","");
-
- for (int i=0;i<10;i++){
- String msg="publicher msg============"+i;
- //发送消息到队列
- /* String exchange,把消息发给哪个交换机--简单模式没 有交换机""
- *String routingKey,消息绑定的路由key 如果为简单模 式 默认写为队列名称
- * BasicProperties props, 消息的属性
- * byte[] body: 消息的内容
- * */
- channel.basicPublish("faBu_exchange","",null,msg.getBytes(StandardCharsets.UTF_8));
- }
- connection.close();
-
- }
- }
- public class FaBuConsumer01 {
- public static void main(String[] args) throws Exception{
- ConnectionFactory factory=new ConnectionFactory();
- //设置rabbitMQ服务器的地址 默认localhost
- factory.setHost("192.168.227.175");
- //设置rabbitMQ的端口号 默认5672
- factory.setPort(5672);
- //设置账号和密码 默认guest
- factory.setPassword("guest");
- factory.setUsername("guest");
- //设置虚拟主机名 默认为 /
- factory.setVirtualHost("/");
-
- //获取连接通道
- Connection connection = factory.newConnection();
- //获取channel信道
- Channel channel = connection.createChannel();
-
- //监听队列
- /* String queue,监听的队列名称
- *autoAck:是否自动确认消息
- * Consumer callback: 监听到消息后触发的回调函数
- */
- DefaultConsumer defaultConsumer=new DefaultConsumer(channel){
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- System.out.println("接收到的信息"+new String(body));
- }
- };
-
- channel.basicConsume("faBu_queue01",true,defaultConsumer);
-
- }
- }

p:生产者
x: 交换机---Direct (路由模式)
c1和c2表示消费者
Q:队列
- public class RouterProduct {
- public static void main(String[] args) throws Exception{
- ConnectionFactory factory=new ConnectionFactory();
- //设置rabbitMQ服务器的地址 默认localhost
- factory.setHost("192.168.227.175");
- //设置rabbitMQ的端口号 默认5672
- factory.setPort(5672);
- //设置账号和密码 默认guest
- factory.setPassword("guest");
- factory.setUsername("guest");
- //设置虚拟主机名 默认为 /
- factory.setVirtualHost("/");
-
- //获取连接通道
- Connection connection = factory.newConnection();
- //获取channel信道
- Channel channel = connection.createChannel();
-
- //创建队列
- /**
- * 如果该队列名不存在则自动创建,存在则不创建
- * String queue,队列名
- * boolean durable,是否持久化
- * boolean exclusive,(独占)声明队列同一时间只能保证一个连接,且该队列只有被这一个连接使用。
- * boolean autoDelete,是否自动删除
- * Map
arguments: 其他参数 - */
- channel.queueDeclare("router_queue01",true,false,false,null);
- channel.queueDeclare("router_queue02",true,false,false,null);
-
-
- //创建交换机
- /*
- String exchange,交换机的名称
- BuiltinExchangeType type,交换机的种类
- boolean durable:是否持久化
- */
- channel.exchangeDeclare("router_change", BuiltinExchangeType.DIRECT,true);
-
-
- //交换机和队列绑定
- /*
- String queue,队列名 String exchange,交换机名 String routingKey 路由key 如果为发布订阅模式则无需路由key
- */
- channel.queueBind("router_queue01","router_change","error");
- channel.queueBind("router_queue02","router_change","error");
- channel.queueBind("router_queue02","router_change","info");
- channel.queueBind("router_queue02","router_change","warning");
-
- //发送消息到队列
- /**
- * String exchange,把消息发给哪个交换机--简单模式没有交换机""
- * String routingKey,消息绑定的路由key 如果为简单模式 默认写为队列名称
- * BasicProperties props, 消息的属性
- * byte[] body: 消息的内容
- */
- String msg="router==========2";
- channel.basicPublish("router_change","error",null,msg.getBytes(StandardCharsets.UTF_8));
-
- connection.close();
- }
- }
- public class RouterConsumer01 {
- public static void main(String[] args) throws Exception{
- ConnectionFactory factory=new ConnectionFactory();
- //设置rabbitMQ服务器的地址 默认localhost
- factory.setHost("192.168.227.175");
- //设置rabbitMQ的端口号 默认5672
- factory.setPort(5672);
- //设置账号和密码 默认guest
- factory.setPassword("guest");
- factory.setUsername("guest");
- //设置虚拟主机名 默认为 /
- factory.setVirtualHost("/");
-
- //获取连接通道
- Connection connection = factory.newConnection();
- //获取channel信道
- Channel channel = connection.createChannel();
-
- DefaultConsumer defaultConsumer=new DefaultConsumer(channel){
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- System.out.println("接受的信息内容"+new String(body));
- }
- };
- channel.basicConsume("router_queue01",true,defaultConsumer);
- connection.close();
- }
- }

*: 统配一个单词
#:统配零个或者n个单词
- public class TopicProduct {
- public static void main(String[] args) throws Exception{
- ConnectionFactory factory=new ConnectionFactory();
- //设置rabbitMQ服务器的地址 默认localhost
- factory.setHost("192.168.227.175");
- //设置rabbitMQ的端口号 默认5672
- factory.setPort(5672);
- //设置账号和密码 默认guest
- factory.setPassword("guest");
- factory.setUsername("guest");
- //设置虚拟主机名 默认为 /
- factory.setVirtualHost("/");
-
- //获取连接通道
- Connection connection = factory.newConnection();
- //获取channel信道
- Channel channel = connection.createChannel();
-
- channel.queueDeclare("topic_queue01",true,false,false,null);
- channel.queueDeclare("topic_queue02",true,false,false,null);
-
- //创建交换机
- /*
- String exchange,交换机的名称
- BuiltinExchangeType type,交换机的种类
- boolean durable:是否持久化
- */
- channel.exchangeDeclare("topic_exchange", BuiltinExchangeType.TOPIC,true);
-
- //交换机和队列绑定
- /*
- String queue,队列名 String exchange,交换机名 String routingKey 路由key 如果为发布订阅模式则无需路由key
- */
- channel.queueBind("topic_queue01","topic_exchange",".orange.*");
- channel.queueBind("topic_queue02","topic_exchange","*.*.rabbit");
- channel.queueBind("topic_queue02","topic_exchange","lazy.#");
-
- //发送消息到队列
- /**
- * String exchange,把消息发给哪个交换机--简单模式没有交换机""
- * String routingKey,消息绑定的路由key 如果为简单模式 默认写为队列名称
- * BasicProperties props, 消息的属性
- * byte[] body: 消息的内容
- */
- String msg="topic=================";
- channel.basicPublish("topic_exchange",".orange.*",null,msg.getBytes(StandardCharsets.UTF_8));
- connection.close();
- }
- }
- public class TopicConsumer01 {
- public static void main(String[] args) throws Exception {
- ConnectionFactory factory=new ConnectionFactory();
- //设置rabbitMQ服务器的地址 默认localhost
- factory.setHost("192.168.227.175");
- //设置rabbitMQ的端口号 默认5672
- factory.setPort(5672);
- //设置账号和密码 默认guest
- factory.setPassword("guest");
- factory.setUsername("guest");
- //设置虚拟主机名 默认为 /
- factory.setVirtualHost("/");
-
- //获取连接通道
- Connection connection = factory.newConnection();
- //获取channel信道
- Channel channel = connection.createChannel();
-
-
- //监听队列
- /**
- * String queue,监听的队列名称
- * autoAck:是否自动确认消息
- * Consumer callback: 监听到消息后触发的回调函数
- */
- DefaultConsumer defaultConsumer=new DefaultConsumer(channel){
- //一定有消息就会触发该方法
- //body:表示消息的内容
-
-
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- System.out.println("接受的消息内容:"+new String(body));
- }
- };
- channel.basicConsume("topic_queue02",true,defaultConsumer);
- }
- }