• 【MQ简单模式】


    14天阅读挑战赛

    MQ 简单模式
    1 、模式介绍
    需求:使用简单模式完成消息传递
    步骤:
    ① 创建工程(生成者、消费者)
    ② 分别添加依赖
    ③ 编写生产者发送消息
    ④ 编写消费者接收消息

     

    在上图的模型中,有以下概念:
    P :生产者,也就是要发送消息的程序
    C :消费者:消息的接收者,会一直等待消息到来
    queue :消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生
    产者向其中投递消息,消费者从其中取出消息 2 、代码实现
    1 、生产者
    创建生产者项目 rabbitmq-producer
    pom.xml 依赖 :
    1. "1.0" encoding="UTF-8"?>
    2. <project xmlns="http://maven.apache.org/POM/4.0.0"
    3. xmlns:xsi="http://www.w3.org/2001/XMLSchemainstance"
    4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
    5. http://maven.apache.org/xsd/maven-4.0.0.xsd">
    6. <modelVersion>4.0.0modelVersion>
    7. <groupId>com.dxwgroupId>
    8. <artifactId>rabbitmq-producerartifactId>
    9. <version>1.0-SNAPSHOTversion>
    10. <dependencies>
    11. <dependency>
    12. <groupId>com.rabbitmqgroupId>
    13. <artifactId>amqp-clientartifactId>
    14. <version>5.6.0version>
    15. dependency>
    16. dependencies>
    17. project>
    生产者代码 Producer_HelloWorld:
    1. import com.rabbitmq.client.Channel;
    2. import com.rabbitmq.client.Connection;
    3. import com.rabbitmq.client.ConnectionFactory;
    4. import java.io.IOException;
    5. import java.util.concurrent.TimeoutException;
    6. /**
    7. * 生产者:发送消息
    8. */
    9. public class Producer_HelloWorld {
    10. public static void main(String[] args) throws
    11. IOException, TimeoutException {
    12. //1、创建连接工厂
    13. ConnectionFactory factory = new
    14. ConnectionFactory();
    15. //2、设置参数
    16. factory.setHost("localhost");//ip 默认localhost
    17. factory.setPort(5672);//端口 默认5672
    18. factory.setVirtualHost("/dxw");//虚拟机 默认/
    19. factory.setUsername("dxw");//用户名 默认guest
    20. factory.setPassword("1234");//密码 默认guest
    21. //3、创建连接
    22. Connection connection = factory.newConnection();
    23. //4、创建Channel
    24. Channel channel = connection.createChannel();
    25. //5、创建队列
    26. /*
    27. * 参数解释:
    28. * queueDeclare(String queue,
    29. * boolean durable,
    30. * boolean exclusive,
    31. * boolean autoDelete,
    32. * Map arguments)
    33. * 1. queue:队列名称
    34. * 如果没有一个名字叫hello_world的队列,则会创建该队
    35. 列,如果有则不会创建
    36. * 2. durable:是否持久化,当mq重启之后,队列中消息还在
    37. * 3. exclusive:
    38. * 是否独占。只能有一个消费者监听这队列
    39. * 当Connection关闭时,是否删除队列
    40. * 4. autoDelete:是否自动删除。当没有Consumer时,自动
    41. 删除掉
    42. * 5. arguments:参数。
    43. */
    44. channel.queueDeclare("hello_world",true,false,false,null)
    45. ;
    46. //6、发送消息
    47. /*
    48. * 参数解释:
    49. * basicPublish(String exchange,
    50. * String routingKey,
    51. * BasicProperties props,
    52. * byte[] body)
    53. * 1. exchange:交换机名称。简单模式下交换机会使用默认的
    54. ""
    55. * 2. routingKey:路由名称
    56. * 3. props:配置信息
    57. * 4. body:发送消息数据
    58. */
    59. String body = "hello rabbitmq~~~";
    60. channel.basicPublish("","hello_world",null,body.getBytes(
    61. ));
    62. //7、释放资源
    63. //channel.close();
    64. //connection.close();
    65. }
    66. }
    启动生产者后观察控制台 :

     

     

    2 、消费者
    创建消费者项目 rabbitmq-consumer
    pom.xml 依赖
    1. "1.0" encoding="UTF-8"?>
    2. <project xmlns="http://maven.apache.org/POM/4.0.0"
    3. xmlns:xsi="http://www.w3.org/2001/XMLSchemainstance"
    4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
    5. http://maven.apache.org/xsd/maven-4.0.0.xsd">
    6. <modelVersion>4.0.0modelVersion>
    7. <groupId>com.dxwgroupId>
    8. <artifactId>rabbitmq-consumerartifactId>
    9. <version>1.0-SNAPSHOTversion>
    10. <dependencies>
    11. <dependency>
    12. <groupId>com.rabbitmqgroupId>
    13. <artifactId>amqp-clientartifactId>
    14. <version>5.6.0version>
    15. dependency>
    16. dependencies>
    17. project>
    消费者代码 Consumer_HelloWorld:
    1. import com.rabbitmq.client.*;
    2. import java.io.IOException;
    3. import java.util.concurrent.TimeoutException;
    4. /**
    5. * 消费者:接收消息
    6. */
    7. public class Consumer_HelloWorld {
    8. public static void main(String[] args) throws
    9. IOException, TimeoutException {
    10. //1、创建连接工厂
    11. ConnectionFactory factory = new
    12. ConnectionFactory();
    13. //2. 设置参数
    14. factory.setHost("localhost");//ip 默认值 localhost
    15. factory.setPort(5672); //端口 默认值 5672
    16. factory.setVirtualHost("/dxw");//虚拟机 默认/
    17. factory.setUsername("dxw");//用户名 默认guest
    18. factory.setPassword("1234");//密码 默认guest
    19. //3. 创建连接 Connection
    20. Connection connection = factory.newConnection();
    21. //4. 创建Channel
    22. Channel channel = connection.createChannel();
    23. //5、创建队列
    24. /*
    25. * 参数解释:
    26. * queueDeclare(String queue,
    27. * boolean durable,
    28. * boolean exclusive,
    29. * boolean autoDelete,
    30. * Map arguments)
    31. * 1. queue:队列名称
    32. * 如果没有一个名字叫hello_world的队列,则会创建该
    33. 队列,如果有则不会创建
    34. * 2. durable:是否持久化,当mq重启之后,队列中消息还在
    35. * 3. exclusive:
    36. * 是否独占。只能有一个消费者监听这队列
    37. * 当Connection关闭时,是否删除队列
    38. * 4. autoDelete:是否自动删除。当没有Consumer时,自
    39. 动删除掉
    40. * 5. arguments:参数。
    41. */
    42. channel.queueDeclare("hello_world",true,false,false,null)
    43. ;
    44. //6、接收消息
    45. Consumer consumer = new DefaultConsumer(channel){
    46. /*
    47. 回调方法,当收到消息后,会自动执行该方法
    48. 1. consumerTag:标识
    49. 2. envelope:获取一些信息,交换机,路由key...
    50. 3. properties:配置信息
    51. 4. body:数据
    52. */
    53. @Override
    54. public void handleDelivery(String consumerTag,
    55. Envelope envelope, AMQP.BasicProperties properties, byte[]
    56. body) throws IOException {
    57. System.out.println("consumerTag:"+consumerTag);
    58. System.out.println("Exchange:"+envelope.getExchange());
    59. System.out.println("RoutingKey:"+envelope.getRoutingKey(
    60. ));
    61. System.out.println("properties:"+properties);
    62. System.out.println("body:"+new
    63. String(body));
    64. }
    65. };
    66. /*
    67. * 参数解释:
    68. * basicConsume(String queue, boolean autoAck,
    69. Consumer callback)
    70. * 1. queue:队列名称
    71. * 2. autoAck:是否自动确认
    72. * 3. callback:回调对象
    73. */
    74. channel.basicConsume("hello_world",true,consumer);
    75. //关闭资源?不要
    76. }
    77. }
    启动消费者后 , 可以接收到生产者发送的数据 :

     

     

  • 相关阅读:
    关于一些网络的概述
    Java 面试中遇到的坑
    ADB 操作命令详解及用法大全
    【无标题】
    高优先线程
    科技成果鉴定之鉴定测试报告
    deepspeed 训练多机多卡报错 ncclSystemError Last error
    WebExceptionHandler详解
    【APM】在Kubernetes中,使用Helm安装Grafana 9.5.1
    面试:Android广播相关
  • 原文地址:https://blog.csdn.net/m0_72254454/article/details/127908805