• mq配置、springboot+rabbitmq实现生产消费


    1)引用包

    1. org.springframework.boot
    2. spring-boot-starter-amqp

    2)配置

    1. spring:
    2. rabbitmq:
    3. host: 192.168.221.129
    4. port: 5673
    5. username: test
    6. password: test
    7. virtual-host: /
    8. // 配置ack方式,我这配置成手动,方便容错重试
    9. listener:
    10. simple:
    11. acknowledge-mode: manual

    3)配置主题交换机设置主题

     4)配置多个查询并绑定实现但生产多消费

     5)代码

    1. @SneakyThrows
    2. @Override
    3. public void run(ApplicationArguments args) {
    4. rabbitProducerTopic("topic测试");
    5. }
    6. /**
    7. * 生产消息
    8. * @param msg 消息
    9. */
    10. private void rabbitProducerTopic(String msg){
    11. rabbitTemplate.convertAndSend("test.topic","test",msg);
    12. }
    13. /**
    14. * 消费消息1
    15. * @param msg
    16. * @param channel
    17. * @param message
    18. */
    19. @RabbitHandler
    20. @RabbitListener(queuesToDeclare = @Queue("topic.que"))
    21. private void rabbitTopic(String msg, Channel channel, Message message){
    22. System.out.println("test路由消费者topic:"+msg);
    23. ackMethod(channel,message);
    24. }
    25. /**
    26. * 消费消息2
    27. * @param msg
    28. * @param channel
    29. * @param message
    30. */
    31. @RabbitHandler
    32. @RabbitListener(queuesToDeclare = @Queue("topic.queall"))
    33. private void rabbitTopicNoRout(String msg, Channel channel, Message message){
    34. System.out.println("无路由消费者topic:"+msg);
    35. ackMethod(channel,message);
    36. }
    37. /**
    38. * 确认消息
    39. * @param channel
    40. * @param message
    41. */
    42. @SneakyThrows
    43. private void ackMethod(Channel channel, Message message){
    44. channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
    45. }
    46. /**
    47. * 异常消息重试
    48. * @param channel
    49. * @param message
    50. */
    51. @SneakyThrows
    52. private void nackMethod(Channel channel, Message message){
    53. channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,// false 继续消费,true消息丢失
    54. true);
    55. // 异常阻塞十秒
    56. Thread.sleep(10000);
    57. }

    5)其他

    AMQP四种不同的Exchange

    标准概念
    Direct(直连交换机)直连交换机的特点是消息队列通过routingKey与交换机进行绑定,相同的routingKey会获得相同的消息。一个队列可以通过多个不同的routingKey与交换机进行绑定。不同的队列也可以通过相同的routingKey绑定交换机。
    Topic(主题交换机)

    应用范围最广的交换机类型,消息队列通过消息主题与交换机绑定。一个队列可以通过多个主题与交换机绑定,多个消息队列也可以通过相同消息主题和交换机绑定。并且可以通过通配符(*或者#)进行多个消息主题的适配。

    消息主题的一般格式为xxx.xxx.xxx(x为英文字母,每个单词用英文句号隔开)。*通配符可以适配一个单词,#可以适配零个或者多个单词。

    通配符适配如下:*.xxx.#。此主题可以适配xxx前面只有一个单词后面有零个或者多个单词的所有消息主题。

    Headers(头交换机)

    与routingKey无关,匹配机制是匹配消息头中的属性信息。在绑定消息队列与交换机之前声明一个map键值对,通过这个map对象实现消息队列和交换机的绑定。当消息发送到RabbitMQ时会取到该消息的headers与Exchange绑定时指定的键值对进行匹配;如果完全匹配则消息会路由到该队列,否则不会路由到该队列。

    匹配规则x-match有下列两种类型:

    x-match = all :表示所有的键值对都匹配才能接受到消息

    x-match = any :表示只要有键值对匹配就能接受到消息

    Fanout(扇出交换机)扇出交换机的特点是类似于广播,只要队列与该类型的交换机绑定,所有发送到该交换机的信息都会被转发到所有与之绑定的队列,与routingKey无关。

  • 相关阅读:
    Linux Shell :正则表达式
    字符串算法--$\mathcal{KMP,Trie}$树
    JavaEE进阶 - SpringBoot 统⼀功能处理 - 细节狂魔
    Leetcode 349.两个数组的交集
    Sping5新功能(单元测试支持junit5)
    KF、EKF、IEKF、UKF卡尔曼滤波器
    这些13 种锁的实现方式你知道吗
    QProcess类
    MySQL系列----什么是MySQL?
    【C++笔试强训】第二十天
  • 原文地址:https://blog.csdn.net/weixin_42280959/article/details/126488921