• .Net RabbitMQ(消息队列)



    一.RabbitMQ 介绍以及工作模式

    1.RabbitMQ的介绍:

    RabbitMQ 是一个开源的消息代理软件,它实现了高级消息队列协议(AMQP)的标准,同时也支持其他消息协议,如 MQTT。它最初由 Rabbit Technologies 公司开发,后来成为了 Pivotal Software 的一部分,现在则是由 VMware 管理。

    以下是 RabbitMQ 的一些关键特点和功能:

    消息队列: RabbitMQ 允许应用程序之间通过发送和接收消息进行通信。它作为一个中间件,负责接收、存储和路由消息,以确保消息能够可靠地传递到目标应用程序。

    灵活的消息路由: RabbitMQ 支持不同类型的交换机(exchange),如 direct、topic、fanout 和 headers,每种类型的交换机都有不同的消息路由方式,使得消息的分发更加灵活高效。

    持久化: RabbitMQ 支持将消息持久化到磁盘,以确保即使在代理重启后,消息也不会丢失。它还允许将队列和交换机设置为持久化,以提高系统的可靠性。

    集群和高可用性: RabbitMQ 支持构建具有高可用性和可伸缩性的集群。通过将多个 RabbitMQ 节点组成集群,可以实现消息的负载均衡和故障转移,从而提高系统的可靠性和性能。

    消息确认机制: RabbitMQ 支持消息确认机制,即生产者可以选择在消息发送到队列后等待消费者对消息的确认,以确保消息被成功处理。这种机制可以保证消息不会丢失,并且可以防止消息重复处理。

    插件系统: RabbitMQ 提供了丰富的插件系统,可以通过插件扩展其功能,如集成其他消息协议、实现认证和授权、监控和管理等。

    多语言客户端: RabbitMQ 提供了多种编程语言的客户端库,如 Java、Python、Ruby、C# 等,使得开发者可以方便地与 RabbitMQ 进行交互。

    总的来说,RabbitMQ 是一个功能强大、可靠性高的消息代理软件,广泛应用于各种分布式系统和应用场景,如微服务架构、实时数据处理、任务调度等。

    2.RabbitMQ的工作模式:

    RabbitMQ 的工作模式主要涉及以下几个关键组件:生产者、交换机(Exchange)、队列(Queue)和消费者。通过这些组件,RabbitMQ 实现了消息的传输和路由。

    生产者(Producer): 生产者是消息的发送者,它负责将消息发送到 RabbitMQ 中。生产者通常将消息发送到一个特定的交换机,然后由交换机将消息路由到一个或多个队列中。

    交换机(Exchange): 交换机是消息的路由中心,负责接收从生产者发送的消息,并根据指定的路由规则将消息路由到一个或多个队列中。RabbitMQ 提供了不同类型的交换机,如 direct、topic、fanout 和 headers,每种类型的交换机都有不同的路由规则。

    队列(Queue): 队列是消息的存储容器,它负责存储从交换机接收到的消息,以便消费者可以按顺序处理消息。每个队列都有一个唯一的名称,并且可以有多个消费者订阅同一个队列。

    消费者(Consumer): 消费者是消息的接收者,它负责从队列中获取消息并进行处理。消费者通常会订阅一个或多个队列,并在收到消息后执行相应的业务逻辑。

    RabbitMQ 的工作流程可以简单描述为:

    生产者将消息发送到一个特定的交换机。
    交换机根据指定的路由规则将消息路由到一个或多个队列中。
    消费者从队列中获取消息并进行处理。
    在这个过程中,RabbitMQ 提供了丰富的路由和消息确认机制,以确保消息能够可靠地传递和处理。

    二.RabbitMQ安装

    bilibili三分钟RabbitMQ安装教程,亲测有效

    1.安装Erlang语言环境

    a.RabbitMQ是Erlang语言开发的,所以我们首先去Erlang官网下载Erlang环境

    在这里插入图片描述

    b.下载完成后,我们直接根据默认的情况安装到电脑上,然后进入windows的“高级系统设置”,设置Erlang环境变量

    给他一个变量名称,然后浏览刚才安装的Erlang目录位置,获取变量值

    在这里插入图片描述

    c.在环境变量的path中,配置刚才配置好的Erlang环境

    在这里插入图片描述

    2.安装RabbitMQ

    a.接着我们进入RabbitMQ官网下载RabbitMQ

    在这里插入图片描述
    b.RabbitMQ下载安装完毕后,我们进入RabbitMQ的sbin目录中,用cmd终端行打开命令行,分别执行以下命令安装RabbitMQ Cli

    a.在cmd中,进入RabbitMQ的sbin目录下,输入 
    rabbitmq-plugins enable rabbitmq_management 
    命令安装RabbitMQ Ctl
    
    b.输入 
    rabbitmqctl status 
    命令检验是否安装成功
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    c.进入服务中,先停止RabbitMQ的服务,然后在RabbitMQ的sbin目录中,启动RabbitMQ.Server.bat文件,最后再重启RabbitMQ服务,就可以进入浏览器访问http://localhost:15672/进入RabbitMQ服务端了

    1)停止rabbitMQ服务

    在这里插入图片描述

    2)启动rabbitmq.server.bat文件

    在这里插入图片描述

    3)重启RabbitMQ服务后,使用浏览器进入http://localhost:15672/
    斜体样式
    默认的账户名密码都是:guest
    在这里插入图片描述

    三.在.Net中使用RabbitMQ

    bilibili某博主详细解说:RabbitMQ从零到高可用集群

    1.HelloWorld模式

    HelloWorld模式通常被视为RabbitMQ的点对点模式,生产者将消息放入队列中,消费者直接从队列中读取

    a.生产者代码:

    var factory = new ConnectionFactory();
    factory.HostName = "localhost";
    factory.Port = 5672;
    factory.UserName = "guest";
    factory.Password = "guest";
    
    using (var connection = factory.CreateConnection())
    {
        using (var channel = connection.CreateModel())
        {
            //创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列
            //第一个参数:队列名称ID
            //第二个参数:是否持久化,false对应不持久化数据,MQ停掉数据就会丢失
            //第三个参数,是否队列私有化,flase达标所有的消费则会都可以访问,true代表只有第一次拥有它的消费者才能一直使用
            //第四个参数:是否自动删除,false代表链接停掉后不自动删除这个队列
    
            channel.QueueDeclare("hello",true,false,false,null);
            string message = $"Hellow RabbitMQ-{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}";
            var body = Encoding.UTF8.GetBytes(message);
            channel.BasicPublish("","hello",null,body);
            Console.WriteLine($"RabbitMQ生产者已发送{message}");
            Console.ReadKey();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    b.消费者代码:

     var factory = new ConnectionFactory();
     factory.HostName = "localhost";
     factory.Port = 5672;
     factory.UserName = "guest";
     factory.Password = "guest";
    
     using (var connection = factory.CreateConnection())
     {
         using (var channel = connection.CreateModel())
         {
             //创建一个名为hello的消息队列
             channel.QueueDeclare("hello",true,false,false,null);
             var consumer = new EventingBasicConsumer(channel);
             channel.BasicConsume("hello",false,consumer);
    
             consumer.Received += (model, e) =>
             {
                 var body = e.Body.ToArray();
                 var message = Encoding.UTF8.GetString(body);
    
                 Console.WriteLine("RabbitMQ消费者接收到消息:" + message);
             };
    
             channel.BasicConsume("hello",false,consumer);
             Console.WriteLine("Press [Enter] to exit");
             Console.ReadKey();
    
         }
     }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    c.运行结果:

    在这里插入图片描述

    2.工作队列模式

    工作队列模式,也称任务队列模式,生产者将任务传递到队列中,多个消费者竞争消费队列中的消息,一条消息只对应一个消费者

    a.生产者代码:

     var factory = new ConnectionFactory();
     factory.HostName = "localhost";
     factory.Port = 5672;
     factory.UserName = "guest";
     factory.Password = "guest";
    
     using (var connection = factory.CreateConnection() )
     {
         using (var channel = connection.CreateModel())
         {
             channel.QueueDeclare("WorkQueue",true,false,false,null);
             for (int i = 0; i < 10; i++)
             {
                 var msg = $"消息队列ID10000{i}发送成功,{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss:fff")}";
                 var body = Encoding.UTF8.GetBytes(msg);
                 channel.BasicPublish("","WorkQueue",null,body);
                 Console.WriteLine($"已发送:{msg}");
             }
             Console.ReadKey();
         }
     }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    b.消费者1代码:

     var factory = new ConnectionFactory();
     factory.HostName = "localhost";
     factory.Port = 5672;
     factory.UserName = "guest";
     factory.Password = "guest";
    
     using (var connection = factory.CreateConnection())
     {
         using (var channel = connection.CreateModel())
         {
             //创建一个名为hello的消息队列
             channel.QueueDeclare("WorkQueue", true, false, false, null);
             //basicqos,MQ不再对消费者一次发送多条请求,而是一条处理完毕(确然后)再从队列中获取新的
             channel.BasicQos(0, 1, false);
             var consumer = new EventingBasicConsumer(channel);
    
             channel.BasicConsume("WorkQueue", false, consumer);
    
             consumer.Received += (model, e) =>
             {
                 var body = e.Body.ToArray();
                 var message = Encoding.UTF8.GetString(body);
                 Thread.Sleep(100);
                 Console.WriteLine("RabbitMQ消费者接收到消息:" + message);
                 //消息确认:一旦消费者成功处理了任务,它会发送确认消息给 RabbitMQ,告知 RabbitMQ 该消息已经被处理并可以从队列中删除。如果消费者在处理任务时发生错误,消息将会被重新放回队列中,以便其他消费者重新尝试处理。
                 channel.BasicAck(e.DeliveryTag,false);
             };
    
             channel.BasicConsume("WorkQueue", false, consumer);
             Console.WriteLine("Press [Enter] to exit");
             Console.ReadKey();
    
         }
     }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    c.消费者2代码:

    与消费者1代码一致
    
    • 1

    d.运行结果:

    在这里插入图片描述

    3.发布订阅模式

    发布/订阅(Publish/Subscribe)模式是一种消息传递模式,用于将消息广播给多个消费者。在这种模式下,生产者将消息发送到一个交换机(Exchange)中,而不是直接发送到队列中。交换机则将消息广播给与之绑定的多个队列,每个队列都有一个或多个消费者来接收消息。

    a.生产者代码:

    var factory = new ConnectionFactory();
    factory.HostName = "localhost";
    factory.Port = 5672;
    factory.UserName = "guest";
    factory.Password = "guest";
    
    using (var connection = factory.CreateConnection())
    {
        using (var channel = connection.CreateModel())
        {
            var msg = "20摄氏度";
            var body = Encoding.UTF8.GetBytes(msg);
            channel.BasicPublish("aweather","",null,body);
            Console.WriteLine("天气发送成功:"+msg);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    b.消费者1代码:

    var factory = new ConnectionFactory();
    factory.HostName = "localhost";
    factory.Port = 5672;
    factory.UserName = "guest";
    factory.Password = "guest";
    
    using (var connection = factory.CreateConnection())
    {
        using (var channel = connection.CreateModel())
        {
            channel.ExchangeDeclare("aweather", ExchangeType.Fanout);
            //声明消息队列
            channel.QueueDeclare("HuBei", true, false, false, null);
            channel.QueueBind("HuBei", "aweather", "");
            var consumer = new EventingBasicConsumer(channel);
    
            consumer.Received += (model, e) =>
            {
                var msg = Encoding.UTF8.GetString(e.Body.ToArray());
                Console.WriteLine("湖北收到天气信息:" + msg);
                channel.BasicAck(e.DeliveryTag, false);
            };
    
            channel.BasicConsume("HuBei", false, consumer);
            Console.WriteLine("Press [Enter] to exit");
            Console.ReadKey();
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    c.消费者2代码:

    var factory = new ConnectionFactory();
    factory.HostName = "localhost";
    factory.Port = 5672;
    factory.UserName = "guest";
    factory.Password = "guest";
    
    using (var connection = factory.CreateConnection())
    {
        using (var channel = connection.CreateModel())
        {
            channel.ExchangeDeclare("aweather",ExchangeType.Fanout);
            //生命消息队列
            channel.QueueDeclare("HuNan",true,false,false,null);
            channel.QueueBind("HuNan","aweather","");
            var consumer = new EventingBasicConsumer(channel);
    
            consumer.Received += (model, e) =>
            {
                var msg = Encoding.UTF8.GetString(e.Body.ToArray());
                Console.WriteLine("湖南收到天气信息:"+msg);
                channel.BasicAck(e.DeliveryTag,false);
            };
    
            channel.BasicConsume("HuNan",false,consumer);
            Console.WriteLine("Press [Enter] to exit");
            Console.ReadKey();
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    d.运行结果:

    在这里插入图片描述

    4.Routing路由模式和Topics通配符模式

    Routing路由模式,是发布订阅模式的延伸,在发布订阅模式的基础上,分配
    路由键(Routing Key),消费者根据路由键匹配队列中对用的消息

    Topics通配符模式,是Routing路由模式的延伸,*用于匹配一个单词,#用于匹配零个或多个单词,用于替换匹配路由键

    注意:一个队列只能匹配一个通配符,但是可以匹配多个路由规则,路由模式和通配符模式可以嵌套使用

    a.生产者代码:

    var dic = new Dictionary();
    dic.Add("china.hunan.changsha.20240418","中国湖南长沙2024年04月18日天气数据");
    dic.Add("china.hubei.wuhan.20240418", "中国湖北武汉2024年04月18日天气数据");
    dic.Add("china.beijing.20240418", "中国北京2024年04月18日天气数据");
    dic.Add("us.flld.20240418", "美国弗罗里达2024年04月18日天气数据");
    
    var factory = new ConnectionFactory();
    factory.HostName = "localhost";
    factory.Port = 5672;
    factory.UserName = "guest";
    factory.Password = "guest";
    
    using (var connection = factory.CreateConnection())
    {
        using (var channel = connection.CreateModel())
        {
            foreach (var item in dic)
            {
                channel.BasicPublish("weather",item.Key,null,Encoding.UTF8.GetBytes(item.Value));
            }
            Console.WriteLine("气象信息发送成功");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    b.消费者1代码:

    var factory = new ConnectionFactory();
    factory.HostName = "localhost";
    factory.Port = 5672;
    factory.UserName = "guest";
    factory.Password = "guest";
    
    using (var connection = factory.CreateConnection())
    {
        using (var channel = connection.CreateModel())
        {
            channel.ExchangeDeclare("weather",ExchangeType.Direct);
            channel.QueueDeclare("us",true,false,false,null);
            //通配符模式
            channel.QueueBind("us","weather", "us.*.20240418");
            //路由模式
            channel.QueueBind("us","weather", "china.beijing.20240418");
            channel.BasicQos(0, 1, false);
            var customer = new EventingBasicConsumer(channel);
            
            customer.Received += (model, e) =>
            {
                var msg = Encoding.UTF8.GetString(e.Body.ToArray());
                Console.WriteLine("美国收到天气信息:"+msg);
                channel.BasicAck(e.DeliveryTag,false);
            };
    
            channel.BasicConsume("us",false,customer);
            Console.WriteLine("Press [Enter] to exit");
            Console.Read();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    c.消费者2代码:

    var factory = new ConnectionFactory();
    factory.HostName = "localhost";
    factory.Port = 5672;
    factory.UserName = "guest";
    factory.Password = "guest";
    using (var connection = factory.CreateConnection())
    {
        using (var channel = connection.CreateModel())
        {
            channel.ExchangeDeclare("weather",ExchangeType.Direct);
            channel.QueueDeclare("china",true,false,false,null);
            //路由模式
            channel.QueueBind("china","weather", "china.hunan.changsha.20240418");
            channel.QueueBind("china", "weather", "china.hubei.wuhan.20240418");
            channel.QueueBind("china", "weather", "china.beijing.20240418");
            //通配符模式
            //channel.QueueBind("china", "weather", "china.#");
            channel.BasicQos(0,1,false);
    
            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, e) =>
            {
                var msg = Encoding.UTF8.GetString(e.Body.ToArray());
                Console.WriteLine("中国收到天气消息:"+msg);
                channel.BasicAck(e.DeliveryTag,false);
            };
            channel.BasicConsume("china",false,consumer);
            Console.WriteLine("Press [Enter] to exit");
            Console.Read();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    d.运行结果:

    在这里插入图片描述

  • 相关阅读:
    信看课堂-厘米GNSS定位
    国科大课程自动评价脚本JS
    AUTOSAR知识点 之 Dem (二):SPEC规范解读
    Camtasia2022最新版本下载及功能升级优化介绍
    【IoT毕设.上】STM32+机智云AIoT+实验室安全监控系统
    关于闭包的递进理解
    继续来,同我一起撸Kotlin Channel 深水区
    Linux学习第22天:Linux中断驱动开发(一): 突如其来
    Eotalk Vol.04 : 如何安全开放 API 数据?
    还在手动包裹产品吗?—机器已逐步取代人工,导电滑环厂家解析
  • 原文地址:https://blog.csdn.net/wuyanEdwardElrid/article/details/137916548