• 消息队列——rabbitmq的不同工作模式


    目录

    Work queues 工作队列模式

     Pub/Sub 订阅模式

    Routing路由模式

    Topics通配符模式 

     工作模式总结


    Work queues 工作队列模式

    C1和C2属于竞争关系,一个消息只有一个消费者可以取到。

     代码部分只需要用两个消费者进程监听同一个队里即可。

    两个消费者呈现竞争关系。

    用一个生产者推送10条消息

    1. for(int i=0;i<10;i++)
    2. {
    3. String body=i+"hello rabbitmq!!!";
    4. channel.basicPublish("","work_queues",null,body.getBytes());
    5. }

    两个监听的消费者接收情况如下。 

     

     Pub/Sub 订阅模式

    一个生产者发送消息后有两个消费者可以收到消息。

    生产者把消息发给交换机,交换机再把消息通过Routes路由分发给不同的队列。

    1. //发送消息
    2. public class producer_PubSub {
    3. public static void main(String[] args) throws IOException, TimeoutException {
    4. //1.创建连接工厂
    5. ConnectionFactory factory=new ConnectionFactory();
    6. //2.设置参数
    7. factory.setHost(""); //设置ip地址。默认为127.0.0.1
    8. factory.setPort(5672); //端口 默认值5672
    9. factory.setVirtualHost("/itcast"); //设置虚拟机 默认值/
    10. factory.setUsername("yhy"); //用户名,默认值guest
    11. factory.setPassword(""); //密码,默认值guest
    12. //3.创建连接Connection
    13. Connection connection = factory.newConnection();
    14. //4.创建Channel
    15. Channel channel = connection.createChannel();
    16. /*
    17. * exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map arguments)
    18. * 参数:
    19. * 1.exchange : 交换价名称
    20. * 2.type : 交换机类型 ,有四种
    21. * DIRECT("direct"), 定向
    22. FANOUT("fanout"), 扇形(广播),发送消息到每一个与之绑定队列
    23. TOPIC("topic"), 通配符的方式
    24. HEADERS("headers"); 参数匹配
    25. *3.durable :是否持久化
    26. * 4.autoDelete:是否自动删除
    27. * 5.internal: 内部使用。一般false
    28. * 6.arguments:参数
    29. * */
    30. //5.创建交换机
    31. String exchangeName="test_fanout";
    32. channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);
    33. //6.创建队列
    34. String queue1Name="test_fanout_queue1";
    35. String queue2Name="test_fanout_queue2";
    36. channel.queueDeclare(queue1Name,true,false,false,null);
    37. channel.queueDeclare(queue2Name,true,false,false,null);
    38. /*
    39. * queueBind(String queue, String exchange, String routingKey)
    40. * 参数:
    41. * queue:队列名
    42. * exchange:交换机名称
    43. * routingKey:路由键,绑定规则
    44. * 如果交换机类型为fanout,routingKey设置为""
    45. * */
    46. //7.绑定队列和交换机
    47. channel.queueBind(queue1Name,exchangeName,"");
    48. channel.queueBind(queue2Name,exchangeName,"");
    49. String body="日志信息:调用了findAll方法";
    50. //8.发送消息
    51. channel.basicPublish(exchangeName,"",null,body.getBytes());
    52. //9.释放资源
    53. channel.close();
    54. connection.close();
    55. }
    56. }

     运行之后两个队列里面就会多一条消息

    两个消费者的代码大同小异,只是绑定的队列名不同,这里只给其中一个

    1. public class consumer_PubSub1 {
    2. public static void main(String[] args) throws IOException, TimeoutException {
    3. //1.创建连接工厂
    4. ConnectionFactory factory=new ConnectionFactory();
    5. //2.设置参数
    6. factory.setHost(""); //设置ip地址。默认为127.0.0.1
    7. factory.setPort(5672); //端口 默认值5672
    8. factory.setVirtualHost("/itcast"); //设置虚拟机 默认值/
    9. factory.setUsername("yhy"); //用户名,默认值guest
    10. factory.setPassword(""); //密码,默认值guest
    11. //3.创建连接Connection
    12. Connection connection = factory.newConnection();
    13. //4.创建Channel
    14. Channel channel = connection.createChannel();
    15. String queue1Name="test_fanout_queue1";
    16. String queue2Name="test_fanout_queue2";
    17. /*
    18. * basicConsume(String queue, boolean autoAck, Consumer callback)
    19. * 参数:
    20. * 1.队列名称
    21. * 2.autoAck:是否自动确认
    22. * 3.callback:回调对象
    23. * */
    24. //6.接收消息
    25. Consumer consumer=new DefaultConsumer(channel){
    26. /*
    27. * 回调方法,当收到消息后,会自动执行该方法
    28. * 1.consumerTag:标识
    29. * 2.envelope :获取一些信息,交换机,路由key...
    30. * 3.properties: 配置信息
    31. * 4.body: 数据
    32. * */
    33. @Override
    34. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    35. // System.out.println("consumerTag:"+consumerTag);
    36. // System.out.println("Exchange:"+envelope.getExchange());
    37. // System.out.println("RoutingKey:"+envelope.getRoutingKey());
    38. // System.out.println("properties:"+properties);
    39. System.out.println("body:"+new String(body));
    40. System.out.println("将日志信息打印到控制台......");
    41. }
    42. };
    43. channel.basicConsume(queue1Name,true,consumer);
    44. //不需要关闭资源
    45. }
    46. }

     控制台输出有

    Routing路由模式

    对于特定级别的信息会发送到别的队列,如上图的error,在发送消息时也会有一个routing,只要和后面的队列对应上就可以发送到对应队列。 

     生产者代码:

    1. //发送消息
    2. public class producer_Routing {
    3. public static void main(String[] args) throws IOException, TimeoutException {
    4. //1.创建连接工厂
    5. ConnectionFactory factory=new ConnectionFactory();
    6. //2.设置参数
    7. factory.setHost(""); //设置ip地址。默认为127.0.0.1
    8. factory.setPort(5672); //端口 默认值5672
    9. factory.setVirtualHost("/itcast"); //设置虚拟机 默认值/
    10. factory.setUsername("yhy"); //用户名,默认值guest
    11. factory.setPassword(""); //密码,默认值guest
    12. //3.创建连接Connection
    13. Connection connection = factory.newConnection();
    14. //4.创建Channel
    15. Channel channel = connection.createChannel();
    16. /*
    17. * exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map arguments)
    18. * 参数:
    19. * 1.exchange : 交换价名称
    20. * 2.type : 交换机类型 ,有四种
    21. * DIRECT("direct"), 定向
    22. FANOUT("fanout"), 扇形(广播),发送消息到每一个与之绑定队列
    23. TOPIC("topic"), 通配符的方式
    24. HEADERS("headers"); 参数匹配
    25. *3.durable :是否持久化
    26. * 4.autoDelete:是否自动删除
    27. * 5.internal: 内部使用。一般false
    28. * 6.arguments:参数
    29. * */
    30. //5.创建交换机
    31. String exchangeName="test_direct";
    32. channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null);
    33. //6.创建队列
    34. String queue1Name="test_direct_queue1";
    35. String queue2Name="test_direct_queue2";
    36. channel.queueDeclare(queue1Name,true,false,false,null);
    37. channel.queueDeclare(queue2Name,true,false,false,null);
    38. /*
    39. * queueBind(String queue, String exchange, String routingKey)
    40. * 参数:
    41. * queue:队列名
    42. * exchange:交换机名称
    43. * routingKey:路由键,绑定规则
    44. * 如果交换机类型为fanout,routingKey设置为""
    45. * */
    46. //7.绑定队列和交换机
    47. //队列1绑定error
    48. channel.queueBind(queue1Name,exchangeName,"error");
    49. //队列2绑定error,info,warning
    50. channel.queueBind(queue2Name,exchangeName,"info");
    51. channel.queueBind(queue2Name,exchangeName,"error");
    52. channel.queueBind(queue2Name,exchangeName,"warning");
    53. String body="日志信息:调用了findAll方法,级别:info,error,warning";
    54. //8.发送消息
    55. channel.basicPublish(exchangeName,"error",null,body.getBytes());
    56. //9.释放资源
    57. channel.close();
    58. connection.close();
    59. }
    60. }

    消费者代码(两个消费者就绑定队列名不一样):

    1. public class consumer_Routing1 {
    2. public static void main(String[] args) throws IOException, TimeoutException {
    3. //1.创建连接工厂
    4. ConnectionFactory factory=new ConnectionFactory();
    5. //2.设置参数
    6. factory.setHost(""); //设置ip地址。默认为127.0.0.1
    7. factory.setPort(5672); //端口 默认值5672
    8. factory.setVirtualHost("/itcast"); //设置虚拟机 默认值/
    9. factory.setUsername("yhy"); //用户名,默认值guest
    10. factory.setPassword(""); //密码,默认值guest
    11. //3.创建连接Connection
    12. Connection connection = factory.newConnection();
    13. //4.创建Channel
    14. Channel channel = connection.createChannel();
    15. String queue1Name="test_direct_queue1";
    16. String queue2Name="test_direct_queue2";
    17. /*
    18. * basicConsume(String queue, boolean autoAck, Consumer callback)
    19. * 参数:
    20. * 1.队列名称
    21. * 2.autoAck:是否自动确认
    22. * 3.callback:回调对象
    23. * */
    24. //6.接收消息
    25. Consumer consumer=new DefaultConsumer(channel){
    26. /*
    27. * 回调方法,当收到消息后,会自动执行该方法
    28. * 1.consumerTag:标识
    29. * 2.envelope :获取一些信息,交换机,路由key...
    30. * 3.properties: 配置信息
    31. * 4.body: 数据
    32. * */
    33. @Override
    34. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    35. // System.out.println("consumerTag:"+consumerTag);
    36. // System.out.println("Exchange:"+envelope.getExchange());
    37. // System.out.println("RoutingKey:"+envelope.getRoutingKey());
    38. // System.out.println("properties:"+properties);
    39. System.out.println("body:"+new String(body));
    40. System.out.println("将日志信息存储到数据库");
    41. }
    42. };
    43. channel.basicConsume(queue1Name,true,consumer);
    44. //不需要关闭资源
    45. }
    46. }

    Topics通配符模式 

    发送消息时设定的routingkey会和后面的routingkey进行匹配。

    生产者代码:

    1. //发送消息
    2. public class producer_Topic {
    3. public static void main(String[] args) throws IOException, TimeoutException {
    4. //1.创建连接工厂
    5. ConnectionFactory factory=new ConnectionFactory();
    6. //2.设置参数
    7. factory.setHost(""); //设置ip地址。默认为127.0.0.1
    8. factory.setPort(5672); //端口 默认值5672
    9. factory.setVirtualHost("/itcast"); //设置虚拟机 默认值/
    10. factory.setUsername("yhy"); //用户名,默认值guest
    11. factory.setPassword(""); //密码,默认值guest
    12. //3.创建连接Connection
    13. Connection connection = factory.newConnection();
    14. //4.创建Channel
    15. Channel channel = connection.createChannel();
    16. /*
    17. * exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map arguments)
    18. * 参数:
    19. * 1.exchange : 交换价名称
    20. * 2.type : 交换机类型 ,有四种
    21. * DIRECT("direct"), 定向
    22. FANOUT("fanout"), 扇形(广播),发送消息到每一个与之绑定队列
    23. TOPIC("topic"), 通配符的方式
    24. HEADERS("headers"); 参数匹配
    25. *3.durable :是否持久化
    26. * 4.autoDelete:是否自动删除
    27. * 5.internal: 内部使用。一般false
    28. * 6.arguments:参数
    29. * */
    30. //5.创建交换机
    31. String exchangeName="test_topic";
    32. channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);
    33. //6.创建队列
    34. String queue1Name="test_topic_queue1";
    35. String queue2Name="test_topic_queue2";
    36. channel.queueDeclare(queue1Name,true,false,false,null);
    37. channel.queueDeclare(queue2Name,true,false,false,null);
    38. /*
    39. * queueBind(String queue, String exchange, String routingKey)
    40. * 参数:
    41. * queue:队列名
    42. * exchange:交换机名称
    43. * routingKey:路由键,绑定规则
    44. * 如果交换机类型为fanout,routingKey设置为""
    45. * */
    46. //7.绑定队列和交换机
    47. // routing key 系统的名称.日志的级别。
    48. //需求:所有error级别的日志存入数据库,所有order系统的日志存入数据库
    49. channel.queueBind(queue1Name,exchangeName,"#.error");
    50. channel.queueBind(queue1Name,exchangeName,"order.*");
    51. channel.queueBind(queue2Name,exchangeName,"*.*");
    52. String body="日志信息:调用了findAll方法";
    53. //8.发送消息
    54. channel.basicPublish(exchangeName,"goods.error",null,body.getBytes());
    55. //9.释放资源
    56. channel.close();
    57. connection.close();
    58. }
    59. }

     消费者代码

    1. public class consumer_Topic1 {
    2. public static void main(String[] args) throws IOException, TimeoutException {
    3. //1.创建连接工厂
    4. ConnectionFactory factory=new ConnectionFactory();
    5. //2.设置参数
    6. factory.setHost(""); //设置ip地址。默认为127.0.0.1
    7. factory.setPort(5672); //端口 默认值5672
    8. factory.setVirtualHost("/itcast"); //设置虚拟机 默认值/
    9. factory.setUsername("yhy"); //用户名,默认值guest
    10. factory.setPassword(""); //密码,默认值guest
    11. //3.创建连接Connection
    12. Connection connection = factory.newConnection();
    13. //4.创建Channel
    14. Channel channel = connection.createChannel();
    15. String queue1Name="test_topic_queue1";
    16. String queue2Name="test_topic_queue2";
    17. /*
    18. * basicConsume(String queue, boolean autoAck, Consumer callback)
    19. * 参数:
    20. * 1.队列名称
    21. * 2.autoAck:是否自动确认
    22. * 3.callback:回调对象
    23. * */
    24. //6.接收消息
    25. Consumer consumer=new DefaultConsumer(channel){
    26. /*
    27. * 回调方法,当收到消息后,会自动执行该方法
    28. * 1.consumerTag:标识
    29. * 2.envelope :获取一些信息,交换机,路由key...
    30. * 3.properties: 配置信息
    31. * 4.body: 数据
    32. * */
    33. @Override
    34. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    35. // System.out.println("consumerTag:"+consumerTag);
    36. // System.out.println("Exchange:"+envelope.getExchange());
    37. // System.out.println("RoutingKey:"+envelope.getRoutingKey());
    38. // System.out.println("properties:"+properties);
    39. System.out.println("body:"+new String(body));
    40. System.out.println("将日志信息存储到数据库");
    41. }
    42. };
    43. channel.basicConsume(queue1Name,true,consumer);
    44. //不需要关闭资源
    45. }
    46. }

     工作模式总结

  • 相关阅读:
    【漏洞复现】Weblogic 任意文件上传漏洞(CVE-2018-2894)
    docker基础知识
    一次Actuator未授权访问利用
    关于本地项目连接git远程仓库以及git设置ignore文件
    第九章《字符串》第3节:String类对象的存储方式
    idea模板设置
    opencv:从0到实现人脸识别
    为了让线上代码可追溯, 我开发了这个vite插件
    数据结构:选择题+编程题(每日一练)
    使用 Java 操作 Redis
  • 原文地址:https://blog.csdn.net/m0_62327332/article/details/131729883