• RabbitMQ基础


    一、消息队列

    1.1.什么是消息队列?

    消息队列(Message Queue)是一种应用间的通信方式。
    消息的生产者可以只关心把消息发到消息队列,由消息系统来确保消息的可靠传递,也不用管关心消息会被谁消费掉。消息的消费者只需要关心从MQ中拿消息进行消费,不用管消息是谁发的。也就是生产者和消费者都无需知道对方的存在。

    1.2.为何使用消息队列?

    以游戏为例,当某个活动结束的时候产生了第一名玩家,这时候可能的业务逻辑包括:活动信息记录、奖励发放、通知推送…; 在业务初期可能可以放在同一个后端服务中执行,但是随着业务量的增长,可能就需要将各种操作拆分出来。主要的后端程序只需要将这个结果通知出去,后续的一系列操作由其他服务负责处理。这样就实现了业务的解耦,除此之外,MQ常见的场景还包括流量的削峰填谷、分布式事务的最终一致性等。

    1.3.常见的消息队列

    当前主流常见的MQ包括:

    • RabbitMQ:是使用一种比较小众的编程语言:Erlang 语言编写的,它最早是为电信行业系统之间的可靠通信设计的,也是少数几个支持 AMQP 协议的消息队列之一;
    • RocketMQ:RocketMQ 是阿里巴巴在 2012 年开源的消息队列产品,使用JAVA编写,后来捐赠给 Apache 软件基金会,2017 正式毕业,成为 Apache 的顶级项目;
    • Kafka:Kafka 最早是由 LinkedIn 开发( 使用 Scala 和 Java 语言),目前也是 Apache 的顶级项目。Kafka 最初的设计目的是用于处理海量的日志。
      其他MQ:
    • ActiveMQ:最老牌的消息队列;
    • ZeroMQ:严格来说 ZeroMQ 并不能称为一个消息队列,而是一个基于消息队列的多线程网络库,如果需求是将消息队列的功能集成到你的系统进程中,可以考虑使用;
    • Pulsar:Pulsar 是一个新兴的开源消息队列产品, 采用存储和计算分离的设计,最早是由 Yahoo 开发,目前处于成长期,流行度和成熟度相对没有那么高。

    二、RabbitMQ

    2.1.RabbitMQ基础概念

    前文提到,RabbitMQ支持 AMQP协议,因此,其内部实现大部分也是基于该协议相关的概念:
    AMQP :Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。

    • Publisher:消息的生产者;
    • Exchange:交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列(Queue)。
    • Binding:绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。
    • Queue:队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
    • Connection:网络连接,比如一个TCP连接。
    • Channel:信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。
    • Consumer:消息的消费者。
    • Virtual Host:虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。
    • Broker:表示消息队列服务器实体。一个MQ就可以称为一个Broker。

    RabbitMQ 生产和过程:

    1. 生产者连接到消息队列服务器,打开一个 channel。
    2. 生产者声明一个 exchange,并设置相关属性。
    3. 生产者声明一个 queue,并设置相关属性。
    4. 生产者使用 routing key,在 exchange 和 queue 之间建立好绑定关系。
    5. 生产者投递消息到 exchange,消息进入queue。
    6. 消费者从指定的 queue 中消费信息。

    2.2.RabbitMQ部署

    docker hub rabbitmq

    端口:

    • 5672:客户端连接MQ使用
    • 15672:Admin UI

    docker启动:

    docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.7.2-management
    
    • 1

    管理界面http://127.0.0.1:15672/#/,默认账号密码都是guest,可以通过如下配置更换

    default_user = xxxx
    default_pass = xxxx
    
    • 1
    • 2


    如果要在k8s部署高可用RabbitMQ集群,用Statefulset部署。

    2.3.生产者和消费者测试

    生产者

    import pika
    import sys
    
    connection = pika.BlockingConnection(
        pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    
    channel.queue_declare(queue='task_queue', durable=True)
    
    message = ' '.join(sys.argv[1:]) or "Hello World!"
    channel.basic_publish(
        exchange='',
        routing_key='task_queue',
        body=message,
        properties=pika.BasicProperties(
            delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE
        ))
    print(" [x] Sent %r" % message)
    connection.close()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    消费者

    import pika
    import time
    
    connection = pika.BlockingConnection(
        pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    
    channel.queue_declare(queue='task_queue', durable=True)
    print(' [*] Waiting for messages. To exit press CTRL+C')
    
    
    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body.decode())
        time.sleep(body.count(b'.'))
        print(" [x] Done")
        ch.basic_ack(delivery_tag=method.delivery_tag)
    
    
    channel.basic_qos(prefetch_count=1)
    channel.basic_consume(queue='task_queue', on_message_callback=callback)
    
    channel.start_consuming()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    python3 sender.py
    [x] Sent 'Hello World!'
    
    python3 sender.py
    [x] Sent 'Hello World!'
    
    
    python3 receiver.py                                     
     [*] Waiting for messages. To exit press CTRL+C
     [x] Received 'Hello World!'
     [x] Done
     [x] Received 'Hello World!'
     [x] Done
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    2.4.查看消息日志

    由于记录消息的发送情况会降低性能,默认关闭日志追踪,如果要看日志的话,首先需要开启trace插件

    rabbitmq-plugins enable rabbitmq_tracing
    
    # 开启trace
    rabbitmqctl trace_on
    
    # 关闭trace
    rabbitmqctl trace_off
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    然后需要到Admin界面,新建一个trace进行追踪,Pettern选择#表示匹配所有日志

    在这里插入图片描述

  • 相关阅读:
    华为云Stack的学习(五)
    2.5python 循环_python量化实用版教程(初级)
    H5组件Canvas画电子印章
    IDEA2021配置Maven
    [React] Context上下文的使用
    肖sir__mysql之三表__008
    Windows Server 2019 - 辅助DNS
    JAVA主要API
    5800交点正反算坐标(可计算不对称缓和曲线)
    ICMP协议(二)
  • 原文地址:https://blog.csdn.net/qq_41832237/article/details/126769019