• 【RabbitMQ实战】02 生产者和消费者示例


    在上一节中,我们使用docker部署了RabbitMQ,这一节我们将写一段生产者和消费者的代码。将用到rabbitmq的原生API来进行生产和发送消息。

    一、准备工作

    开始前,我们先在RabbitMQ控制台建相好关的数据
    本机的RabbitMQ部署机器是192.168.56.201
    其中控制台的地址是
    http://192.168.56.201:15672/
    输入控制台的账号后,可以进入
    1、我们先建好一个用户
    用户名:hello,密码:world
    在这里插入图片描述
    2、再建Virtual Host:virtual01
    在这里插入图片描述
    3. 为User设置访问Virtual hosts权限
    在这里插入图片描述
    设置好后,hello用户就有virtual01的权限了
    在这里插入图片描述

    二、代码

    先引入依赖,由于我们后续要用springboot来写生产者消费者代码,这里我们就直接引springboot的包了。如果只想用原生的客户端,可以引原生的包。

        <dependencies>
            <dependency>
                <groupId>org.springframework.bootgroupId>
                <artifactId>spring-boot-starter-amqpartifactId>
            dependency>
        dependencies>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    生产者和消费者代码如下

    public class RabbitMqSimpleTest {
        private static final String EXCHANGE_NAME = "hello_exchange";
        private static final String QUEUE_NAME = "hello_queue";
    
        private static final String ROUTING_KEY = "hello_routing";
    
        @Test
        public void send() throws IOException, TimeoutException {
    
            ConnectionFactory connectionFactory = new ConnectionFactory();
    
            connectionFactory.setHost("192.168.56.201");
            connectionFactory.setPort(5672);//5672是RabbitMQ的默认端口号
            connectionFactory.setUsername("hello");
            connectionFactory.setPassword("world");
            connectionFactory.setVirtualHost("virtual01");
    
            //获取TCP长连接
            Connection conn = connectionFactory.newConnection();
            //创建通信“通道”,相当于TCP中的虚拟连接
            Channel channel = conn.createChannel();
    
            //创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列
            //第一个参数:队列名称ID
            //第二个参数:是否持久化,false对应不持久化数据,MQ停掉数据就会丢失
            //第三个参数:是否队列私有化,false则代表所有消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用,其他消费者不让访问
            //第四个:是否自动删除,false代表连接停掉后不自动删除掉这个队列
            //其他额外的参数, null
            //手动创建一个队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
            //exchange 交换机
            //队列名称
            //额外的设置属性
            //最后一个参数是要传递的消息字节数组
            channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, (System.currentTimeMillis() + ",hello this is my first message!").getBytes());
            channel.close();
            conn.close();
            System.out.println("===发送成功===");
        }
    
        @Test
        public void consumer() throws IOException, TimeoutException {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("192.168.56.201");
            connectionFactory.setPort(5672);//5672是RabbitMQ的默认端口号
            connectionFactory.setUsername("hello");
            connectionFactory.setPassword("world");
            connectionFactory.setVirtualHost("virtual01");
    
            //获取TCP长连接
            Connection conn = connectionFactory.newConnection();
            //创建通信“通道”,相当于TCP中的虚拟连接
            Channel channel = conn.createChannel();
    
            //创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            //从MQ服务器中获取数据
            //创建一个消息消费者
            //第一个参数:队列名
            //第二个参数代表是否自动确认收到消息,false代表手动编程来确认消息,这是MQ的推荐做法
            //第三个参数要传入DefaultConsumer的实现类
            channel.basicConsume(QUEUE_NAME, false, new Receiver(channel));
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67

    消费者回调实现

    public class Receiver extends DefaultConsumer {
    
        private Channel channel;
        //重写构造函数,Channel通道对象需要从外层传入,在handleDelivery中要用到
        public Receiver(Channel channel) {
            super(channel);
            this.channel = channel;
        }
    
    
        @Override
        public void handleDelivery(String consumerTag,
                                   Envelope envelope,
                                   AMQP.BasicProperties properties,
                                   byte[] body)
                throws IOException
        {
            String message = new String(body);
            System.out.println("消费者接收到的消息:"+message);
    
            System.out.println("消息的TagId:"+envelope.getDeliveryTag());
            //false只确认签收当前的消息,设置为true的时候则代表签收该消费者所有未签收的消息
            channel.basicAck(envelope.getDeliveryTag(), false);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    运行一下send发送消息,成功了。
    去控制台后台看一下
    队列成功创建好了
    消息发送成功了,有一条待消费的消息在队列里面
    在这里插入图片描述
    可以在这里查看到刚才发送的消息内容
    在这里插入图片描述
    在这里可以看到queue和exchange的绑定关系
    在这里插入图片描述
    控制台还有很多有意思的功能,大家可以下来尝试一下。
    同时启动消费者,也能成功消费
    在这里插入图片描述

  • 相关阅读:
    leetcode日记(38)字母异位词分组
    pip version 更新
    计算机网络_实验10_子网划分
    【计算机网络】TCP传输控制协议——三次握手
    Tomcat 服务详解
    OpenCV4(C++)—— 图像噪声与图像滤波
    关于技术人员成长的一些建议
    Springboot日常总结-@RestController和@Controller的区别
    营销投入大没效果?痛点难点一站式解决!
    万宾科技智能井盖的效果怎么样?
  • 原文地址:https://blog.csdn.net/suyuaidan/article/details/133171528