• RabbitMQ消息中间件


    目录

    1.什么是MQ

    2.MQ的优点

    2.1 应用解耦

    2.2 异步提速

    ​2.3 削峰填谷

    3.MQ的缺点

    4.如何选择使用MQ

    5.MQ的种类

    6.安装RabbitMQ

    7.概述端口号

    8.rabbit的工作原理

    9.java程序连接RabbitMQ服务

    9.1 simple 简单模式

    9.1.1 依赖

    9.1.2 生产者

    9.1.3 消费者

    9.2 work工作模式

    9.2.1 生产者

    9.2.2 消费者(两者内容一样)

    9.3 public/Subscribe发布订阅模式

    9.3.1 生产者

    9.3.2 消费者(两者相似,只需要修改队列名)

    9.4 router路由模式

    9.4.1 生产者

    9.4.2 消费者(两者相似,只需要修改队列名)

    9.5 主题模式--topic

    9.5.1 生产者

    9.5.2 消费者(两者相似,只需要修改队列名)


    1.什么是MQ

            MQ全称 Message Queue (消息队列),是在消息的传输过程中 保存消息的容器 。多用于分
    布式 系统之间进行通信。
            
            思考: 原来服务与服务之间如何通信 ?
            Openfeign 服务与服务之间直接调用。

             我们也可以使用MQ完成系统与系统之间得调用。

    2.MQ的优点

    2.1 应用解耦

    2.2 异步提速

    2.3 削峰填谷

    3.MQ的缺点

    4.如何选择使用MQ

    既然MQ有优势也有劣势,那么使用MQ需要满足什么条件呢?

    (1)生产者不需要从消费者处获得反馈。引入消息队列之前的直接调用,其接口的返回值应该为

    空,这才让明明下层的动作还没做,上层却当成动作做完了继续往后走,即所谓异步成为了可能。

    (2)容许短暂的不一致性。

    (3)确实是用了有效果。即解耦、提速、削峰这些方面的收益,超过加入MQ,管理MQ这些成本。

    5.MQ的种类

           (1) rabbitMQ
           (2) kafka
           (3) RocketMQ
           (4) ActiveMQ

    6.安装RabbitMQ

    查看前一篇的csdn。

    7.概述端口号

    8.rabbit的工作原理

    >1.Producer:生产者【生产消息】>2. Consumer:消费者(从队列中获取消息)

    >2. Connection:连接通道【生产者和消费者和Rabbit服务连接】

    >3.channel:信道--由于Connection采用TCP协议,会造成资源占用高。相当于小型connection。资源消息比较小。

    >4.Broker: RabbitMQ服务器

    >5.Virtual Host:虚拟主机,小型RabbitMQ服务器。很多项目可以连接各自的虚拟主机。

    >6.Exchange:交换机,把信息分发到相应的队列

    >7.Queue:队列,存放信息的地方

    9.java程序连接RabbitMQ服务

    RabbitMQ的官网:

    Messaging that just works — RabbitMQ

    提供了5种模式:

    (1)简单模式--Hello                                                         (2)工作者模式--work queues

                                      

    (3)发布订阅模式                                                            (4)路由模式

                                  

    (5)主题模式           

     

    9.1 simple 简单模式

    P: 一个生产者
    C: 一个消费者
    Q: 队列

    9.1.1 依赖

    1. com.rabbitmq
    2. amqp-client
    3. 5.13.1

    9.1.2 生产者

    1. public class SimpleProduct {
    2. public static void main(String[] args) throws Exception {
    3. ConnectionFactory factory=new ConnectionFactory();
    4. //设置rabbitMQ服务器的地址 默认localhost
    5. factory.setHost("192.168.227.175");
    6. //设置rabbitMQ的端口号 默认5672
    7. factory.setPort(5672);
    8. //设置账号和密码 默认guest
    9. factory.setUsername("guest");
    10. factory.setPassword("guest");
    11. //设置虚拟主机名 默认为 /
    12. factory.setVirtualHost("/");
    13. //获取连接通道
    14. Connection connection=factory.newConnection();
    15. //获取channel信道
    16. Channel channel = connection.createChannel();
    17. //创建队列
    18. /*如果该队列名不存在则自动创建,存在则不创建
    19. * String queue,队列名
    20. * boolean durable,是否持久化
    21. * boolean exclusive,(独占)声明队列同一时间只能保 证一个连接,且该队列只有被这一个连接使用。
    22. * boolean autoDelete,是否自动删除
    23. * Map arguments: 其他参数
    24. * */
    25. channel.queueDeclare("simple_queue",true,false,false,null);
    26. //发送消息到队列
    27. /* String exchange,把消息发给哪个交换机--简单模式没 有交换机""
    28. *String routingKey,消息绑定的路由key 如果为简单模 式 默认写为队列名称
    29. * BasicProperties props, 消息的属性
    30. * byte[] body: 消息的内容
    31. * */
    32. String msg="这是简单模式===================";
    33. channel.basicPublish("","simple_queue",null,msg.getBytes(StandardCharsets.UTF_8));
    34. channel.close();
    35. }

    9.1.3 消费者

    1. public class SimpleConsumer {
    2. public static void main(String[] args) throws Exception {
    3. ConnectionFactory factory=new ConnectionFactory();
    4. factory.setPort(5672);
    5. factory.setUsername("guest");
    6. factory.setPassword("guest");
    7. factory.setHost("192.168.227.175");
    8. Connection connection = factory.newConnection();
    9. Channel channel = connection.createChannel();
    10. //监听队列
    11. /* String queue,监听的队列名称
    12. *autoAck:是否自动确认消息
    13. * Consumer callback: 监听到消息后触发的回调函数
    14. */
    15. DefaultConsumer defaultConsumer=new DefaultConsumer(channel) {
    16. //一定有消息就会触发该方法
    17. // body:表示消息的内容
    18. @Override
    19. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    20. System.out.println("接受到的信息"+new String(body));
    21. }
    22. };
    23. channel.basicConsume("simple_queue",true,defaultConsumer);
    24. }
    25. }

    9.2 work工作模式

    P: 生产者
    C1: 消费者 1
    C2: 消费者 2
    Q: 队列
    消费者 1 和消费者 2 属于竞争关系,一个消息只会被一个消费者消费。

    9.2.1 生产者

    1. public class WorkProduct {
    2. public static void main(String[] args) throws Exception {
    3. ConnectionFactory factory=new ConnectionFactory();
    4. //设置rabbitMQ服务器的地址 默认localhost
    5. factory.setHost("192.168.227.175");
    6. //设置rabbitMQ的端口号 默认5672
    7. factory.setPort(5672);
    8. //设置账号和密码 默认guest
    9. factory.setPassword("guest");
    10. factory.setUsername("guest");
    11. //设置虚拟主机名 默认为 /
    12. factory.setVirtualHost("/");
    13. //获取连接通道
    14. Connection connection = factory.newConnection();
    15. //获取channel信道
    16. Channel channel = connection.createChannel();
    17. //创建队列
    18. /*如果该队列名不存在则自动创建,存在则不创建
    19. * String queue,队列名
    20. * boolean durable,是否持久化
    21. * boolean exclusive,(独占)声明队列同一时间只能保 证一个连接,且该队列只有被这一个连接使用。
    22. * boolean autoDelete,是否自动删除
    23. * Map arguments: 其他参数
    24. * */
    25. channel.queueDeclare("work_queue",true,false,false,null);
    26. for(int i=0;i<=10;i++){
    27. String msg="发送消息========="+i;
    28. //发送消息到队列
    29. /* String exchange,把消息发给哪个交换机--简单模式没 有交换机""
    30. *String routingKey,消息绑定的路由key 如果为简单模 式 默认写为队列名称
    31. * BasicProperties props, 消息的属性
    32. * byte[] body: 消息的内容
    33. * */
    34. channel.basicPublish("","work_queue",null,msg.getBytes(StandardCharsets.UTF_8));
    35. }
    36. connection.close();
    37. }
    38. }

    9.2.2 消费者(两者内容一样)

    1. public class WorkConsumer01 {
    2. public static void main(String[] args) throws Exception{
    3. ConnectionFactory factory=new ConnectionFactory();
    4. //设置rabbitMQ服务器的地址 默认localhost
    5. factory.setHost("192.168.227.175");
    6. //设置rabbitMQ的端口号 默认5672
    7. factory.setPort(5672);
    8. //设置账号和密码 默认guest
    9. factory.setUsername("guest");
    10. factory.setPassword("guest");
    11. //设置虚拟主机名 默认为 /
    12. factory.setVirtualHost("/");
    13. //获取连接通道
    14. Connection connection = factory.newConnection();
    15. //获取channel信道
    16. Channel channel = connection.createChannel();
    17. //监听队列
    18. /* String queue,监听的队列名称
    19. *autoAck:是否自动确认消息
    20. * Consumer callback: 监听到消息后触发的回调函数
    21. */
    22. DefaultConsumer defaultConsumer=new DefaultConsumer(channel){
    23. @Override
    24. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    25. System.out.println("监听的信息"+new String(body));
    26. }
    27. };
    28. channel.basicConsume("work_queue",true,defaultConsumer);
    29. }
    30. }

    9.3 public/Subscribe发布订阅模式

    p: producter 生产者
    x exchange 交换机
    Q: 队列
    C1 C2: 消费者

    9.3.1 生产者

    1. public class FaBuProduct {
    2. public static void main(String[] args) throws Exception{
    3. ConnectionFactory factory=new ConnectionFactory();
    4. //设置rabbitMQ服务器的地址 默认localhost
    5. factory.setHost("192.168.227.175");
    6. //设置rabbitMQ的端口号 默认5672
    7. factory.setPort(5672);
    8. //设置账号和密码 默认guest
    9. factory.setPassword("guest");
    10. factory.setUsername("guest");
    11. //设置虚拟主机名 默认为 /
    12. factory.setVirtualHost("/");
    13. //获取连接通道
    14. Connection connection = factory.newConnection();
    15. //获取channel信道
    16. Channel channel = connection.createChannel();
    17. //创建队列
    18. /*如果该队列名不存在则自动创建,存在则不创建
    19. * String queue,队列名
    20. * boolean durable,是否持久化
    21. * boolean exclusive,(独占)声明队列同一时间只能保 证一个连接,且该队列只有被这一个连接使用。
    22. * boolean autoDelete,是否自动删除
    23. * Map arguments: 其他参数
    24. * */
    25. channel.queueDeclare("faBu_queue01",true,false,false,null);
    26. channel.queueDeclare("faBu_queue02",true,false,false,null);
    27. //创建交换机
    28. /*String exchange,交换机的名称
    29. BuiltinExchangeType type,交换机的种类
    30. boolean durable:是否持久化
    31. */
    32. channel.exchangeDeclare("faBu_exchange",BuiltinExchangeType.FANOUT,true);
    33. //交换机和队列绑定
    34. /*String queue,队列名
    35. String exchange,交换机名
    36. String routingKey 路由key 如果为发布订阅模式则无需路由key
    37. */
    38. channel.queueBind("faBu_queue01","faBu_exchange","");
    39. channel.queueBind("faBu_queue02","faBu_exchange","");
    40. for (int i=0;i<10;i++){
    41. String msg="publicher msg============"+i;
    42. //发送消息到队列
    43. /* String exchange,把消息发给哪个交换机--简单模式没 有交换机""
    44. *String routingKey,消息绑定的路由key 如果为简单模 式 默认写为队列名称
    45. * BasicProperties props, 消息的属性
    46. * byte[] body: 消息的内容
    47. * */
    48. channel.basicPublish("faBu_exchange","",null,msg.getBytes(StandardCharsets.UTF_8));
    49. }
    50. connection.close();
    51. }
    52. }

    9.3.2 消费者(两者相似,只需要修改队列名)

    1. public class FaBuConsumer01 {
    2. public static void main(String[] args) throws Exception{
    3. ConnectionFactory factory=new ConnectionFactory();
    4. //设置rabbitMQ服务器的地址 默认localhost
    5. factory.setHost("192.168.227.175");
    6. //设置rabbitMQ的端口号 默认5672
    7. factory.setPort(5672);
    8. //设置账号和密码 默认guest
    9. factory.setPassword("guest");
    10. factory.setUsername("guest");
    11. //设置虚拟主机名 默认为 /
    12. factory.setVirtualHost("/");
    13. //获取连接通道
    14. Connection connection = factory.newConnection();
    15. //获取channel信道
    16. Channel channel = connection.createChannel();
    17. //监听队列
    18. /* String queue,监听的队列名称
    19. *autoAck:是否自动确认消息
    20. * Consumer callback: 监听到消息后触发的回调函数
    21. */
    22. DefaultConsumer defaultConsumer=new DefaultConsumer(channel){
    23. @Override
    24. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    25. System.out.println("接收到的信息"+new String(body));
    26. }
    27. };
    28. channel.basicConsume("faBu_queue01",true,defaultConsumer);
    29. }
    30. }

    9.4 router路由模式

    p:生产者

    x: 交换机---Direct (路由模式)

    c1和c2表示消费者

    Q:队列

    9.4.1 生产者

    1. public class RouterProduct {
    2. public static void main(String[] args) throws Exception{
    3. ConnectionFactory factory=new ConnectionFactory();
    4. //设置rabbitMQ服务器的地址 默认localhost
    5. factory.setHost("192.168.227.175");
    6. //设置rabbitMQ的端口号 默认5672
    7. factory.setPort(5672);
    8. //设置账号和密码 默认guest
    9. factory.setPassword("guest");
    10. factory.setUsername("guest");
    11. //设置虚拟主机名 默认为 /
    12. factory.setVirtualHost("/");
    13. //获取连接通道
    14. Connection connection = factory.newConnection();
    15. //获取channel信道
    16. Channel channel = connection.createChannel();
    17. //创建队列
    18. /**
    19. * 如果该队列名不存在则自动创建,存在则不创建
    20. * String queue,队列名
    21. * boolean durable,是否持久化
    22. * boolean exclusive,(独占)声明队列同一时间只能保证一个连接,且该队列只有被这一个连接使用。
    23. * boolean autoDelete,是否自动删除
    24. * Map arguments: 其他参数
    25. */
    26. channel.queueDeclare("router_queue01",true,false,false,null);
    27. channel.queueDeclare("router_queue02",true,false,false,null);
    28. //创建交换机
    29. /*
    30. String exchange,交换机的名称
    31. BuiltinExchangeType type,交换机的种类
    32. boolean durable:是否持久化
    33. */
    34. channel.exchangeDeclare("router_change", BuiltinExchangeType.DIRECT,true);
    35. //交换机和队列绑定
    36. /*
    37. String queue,队列名 String exchange,交换机名 String routingKey 路由key 如果为发布订阅模式则无需路由key
    38. */
    39. channel.queueBind("router_queue01","router_change","error");
    40. channel.queueBind("router_queue02","router_change","error");
    41. channel.queueBind("router_queue02","router_change","info");
    42. channel.queueBind("router_queue02","router_change","warning");
    43. //发送消息到队列
    44. /**
    45. * String exchange,把消息发给哪个交换机--简单模式没有交换机""
    46. * String routingKey,消息绑定的路由key 如果为简单模式 默认写为队列名称
    47. * BasicProperties props, 消息的属性
    48. * byte[] body: 消息的内容
    49. */
    50. String msg="router==========2";
    51. channel.basicPublish("router_change","error",null,msg.getBytes(StandardCharsets.UTF_8));
    52. connection.close();
    53. }
    54. }

    9.4.2 消费者(两者相似,只需要修改队列名)

    1. public class RouterConsumer01 {
    2. public static void main(String[] args) throws Exception{
    3. ConnectionFactory factory=new ConnectionFactory();
    4. //设置rabbitMQ服务器的地址 默认localhost
    5. factory.setHost("192.168.227.175");
    6. //设置rabbitMQ的端口号 默认5672
    7. factory.setPort(5672);
    8. //设置账号和密码 默认guest
    9. factory.setPassword("guest");
    10. factory.setUsername("guest");
    11. //设置虚拟主机名 默认为 /
    12. factory.setVirtualHost("/");
    13. //获取连接通道
    14. Connection connection = factory.newConnection();
    15. //获取channel信道
    16. Channel channel = connection.createChannel();
    17. DefaultConsumer defaultConsumer=new DefaultConsumer(channel){
    18. @Override
    19. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    20. System.out.println("接受的信息内容"+new String(body));
    21. }
    22. };
    23. channel.basicConsume("router_queue01",true,defaultConsumer);
    24. connection.close();
    25. }
    26. }

    9.5 主题模式--topic

     *: 统配一个单词

    #:统配零个或者n个单词

    9.5.1 生产者

    1. public class TopicProduct {
    2. public static void main(String[] args) throws Exception{
    3. ConnectionFactory factory=new ConnectionFactory();
    4. //设置rabbitMQ服务器的地址 默认localhost
    5. factory.setHost("192.168.227.175");
    6. //设置rabbitMQ的端口号 默认5672
    7. factory.setPort(5672);
    8. //设置账号和密码 默认guest
    9. factory.setPassword("guest");
    10. factory.setUsername("guest");
    11. //设置虚拟主机名 默认为 /
    12. factory.setVirtualHost("/");
    13. //获取连接通道
    14. Connection connection = factory.newConnection();
    15. //获取channel信道
    16. Channel channel = connection.createChannel();
    17. channel.queueDeclare("topic_queue01",true,false,false,null);
    18. channel.queueDeclare("topic_queue02",true,false,false,null);
    19. //创建交换机
    20. /*
    21. String exchange,交换机的名称
    22. BuiltinExchangeType type,交换机的种类
    23. boolean durable:是否持久化
    24. */
    25. channel.exchangeDeclare("topic_exchange", BuiltinExchangeType.TOPIC,true);
    26. //交换机和队列绑定
    27. /*
    28. String queue,队列名 String exchange,交换机名 String routingKey 路由key 如果为发布订阅模式则无需路由key
    29. */
    30. channel.queueBind("topic_queue01","topic_exchange",".orange.*");
    31. channel.queueBind("topic_queue02","topic_exchange","*.*.rabbit");
    32. channel.queueBind("topic_queue02","topic_exchange","lazy.#");
    33. //发送消息到队列
    34. /**
    35. * String exchange,把消息发给哪个交换机--简单模式没有交换机""
    36. * String routingKey,消息绑定的路由key 如果为简单模式 默认写为队列名称
    37. * BasicProperties props, 消息的属性
    38. * byte[] body: 消息的内容
    39. */
    40. String msg="topic=================";
    41. channel.basicPublish("topic_exchange",".orange.*",null,msg.getBytes(StandardCharsets.UTF_8));
    42. connection.close();
    43. }
    44. }

    9.5.2 消费者(两者相似,只需要修改队列名)

    1. public class TopicConsumer01 {
    2. public static void main(String[] args) throws Exception {
    3. ConnectionFactory factory=new ConnectionFactory();
    4. //设置rabbitMQ服务器的地址 默认localhost
    5. factory.setHost("192.168.227.175");
    6. //设置rabbitMQ的端口号 默认5672
    7. factory.setPort(5672);
    8. //设置账号和密码 默认guest
    9. factory.setPassword("guest");
    10. factory.setUsername("guest");
    11. //设置虚拟主机名 默认为 /
    12. factory.setVirtualHost("/");
    13. //获取连接通道
    14. Connection connection = factory.newConnection();
    15. //获取channel信道
    16. Channel channel = connection.createChannel();
    17. //监听队列
    18. /**
    19. * String queue,监听的队列名称
    20. * autoAck:是否自动确认消息
    21. * Consumer callback: 监听到消息后触发的回调函数
    22. */
    23. DefaultConsumer defaultConsumer=new DefaultConsumer(channel){
    24. //一定有消息就会触发该方法
    25. //body:表示消息的内容
    26. @Override
    27. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    28. System.out.println("接受的消息内容:"+new String(body));
    29. }
    30. };
    31. channel.basicConsume("topic_queue02",true,defaultConsumer);
    32. }
    33. }

  • 相关阅读:
    BP神经网络需要训练的参数,bp神经网络建模步骤
    SpringBoot多数据源
    成都聚华祥科技:店铺优化怎么做
    统计学习导论(ISLR) 第六章变量选择课后习题
    IE11 使用的 DOM API (MutationObserver)
    【无标题】
    C#/.net程序调用python
    龙讯LONTIUM LT8712EXI 国产芯片
    马可尼 光传输设备 全新原装板卡
    item_search - 根据关键词获取义乌购商品列表
  • 原文地址:https://blog.csdn.net/qq_50896786/article/details/126942661