消息队列(Message Queue)是一种应用间的通信方式。
消息的生产者可以只关心把消息发到消息队列,由消息系统来确保消息的可靠传递,也不用管关心消息会被谁消费掉。消息的消费者只需要关心从MQ中拿消息进行消费,不用管消息是谁发的。也就是生产者和消费者都无需知道对方的存在。
以游戏为例,当某个活动结束的时候产生了第一名玩家,这时候可能的业务逻辑包括:活动信息记录、奖励发放、通知推送…; 在业务初期可能可以放在同一个后端服务中执行,但是随着业务量的增长,可能就需要将各种操作拆分出来。主要的后端程序只需要将这个结果通知出去,后续的一系列操作由其他服务负责处理。这样就实现了业务的解耦,除此之外,MQ常见的场景还包括流量的削峰填谷、分布式事务的最终一致性等。
当前主流常见的MQ包括:
前文提到,RabbitMQ支持 AMQP协议,因此,其内部实现大部分也是基于该协议相关的概念:
AMQP :Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。

RabbitMQ 生产和过程:
端口:
docker启动:
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.7.2-management
管理界面http://127.0.0.1:15672/#/,默认账号密码都是guest,可以通过如下配置更换
default_user = xxxx
default_pass = xxxx

如果要在k8s部署高可用RabbitMQ集群,用Statefulset部署。
生产者
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(
exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE
))
print(" [x] Sent %r" % message)
connection.close()
消费者
import pika
import time
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] Received %r" % body.decode())
time.sleep(body.count(b'.'))
print(" [x] Done")
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)
channel.start_consuming()
python3 sender.py
[x] Sent 'Hello World!'
python3 sender.py
[x] Sent 'Hello World!'
python3 receiver.py
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'Hello World!'
[x] Done
[x] Received 'Hello World!'
[x] Done
由于记录消息的发送情况会降低性能,默认关闭日志追踪,如果要看日志的话,首先需要开启trace插件
rabbitmq-plugins enable rabbitmq_tracing
# 开启trace
rabbitmqctl trace_on
# 关闭trace
rabbitmqctl trace_off
然后需要到Admin界面,新建一个trace进行追踪,Pettern选择#表示匹配所有日志

