• RabbitMq大纲


    一、AMQP协议模型

    消息通过发布者,发到了Exchange上,Exchange在通过routeKey找到对应的Queue队列
    1、注意里面包含【虚拟机 Virtual host】、【Exchange】、【routeKey】、【queue】
    2、一个Exchange可以绑定多个routeKey,一个routeKey对应一个queue
    3、Exchange必须是在同一个虚拟机【Virtual host】里面

    二、保障消息百分百投递成功

    1、首先在发消息前,将数据保存至业务表,然后再生成MSG,保存至消息表。状态为0,表示待发送。
    2、然后开始发送,在消息成年发送到mq服务端,也就是MqBroker的时候,就会收到mqBroker的【消息确认回调】表示成功接收到消息了,也就是step3
    3、收到确认消息,同事我们在更新MSG表中的发送状态,更新为已发送,也就是1
    4、假设step3失败,也就是消息发送到服务端,但是回调消息在回来的时候断了,可能是因为网络断了什么的。
    5、那么可以通过定时任务去跑MSG的表状态等于0的。
    6、如果确认回调消息一直失败,也就没法更新MSG表的状态,那么就会一直发,消息到MqBroker,那么MqBroker就一直给消费端发消息,消息端一定要做好幂等,也就是相同消息过来处理结果都是一样的。
    7、因为考虑到消息重复发送给MqBroker,必然会影响到其他消息进入到MqBroker,所以要在

    三、代码

    1、消息消费

    //durable表示数据是否持久化
    @RabbitListener(bindings = @QueueBinding(
    value = @Queue(value = "order-queue",durable = "true"),
    exchange = @Exchange(name = "order-exchange", durable = "true",type = "topic"),
    key="order.*"))
    
    @RabbitHandler
    public void onOrderMessage(@Payload Order order, @Headers Map<String,Object> headers, Channel channel) throws Exception {
        System.out.println("----收到消息,开始消费-----");
        System.out.println("d订单id:"+order.getId());
        
        Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
    
        /**
         *  取值为 false 时,表示通知 RabbitMQ 当前消息被确认
         *  如果为 true,则额外将比第一个参数指定的 delivery tag 小的消息一并确认
         */
        channel.basicAck(deliveryTag,false);
        System.out.println("--------消费完成--------");
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    疑问:如果没有确认,那么下一条消息是什么时候过来。

    spring:
      rabbitmq: #基本配置
        addresses: www.niezhiliang.com:5672  //mq服务端地址
        username: springcloud			
        password: 123456				
        virtual-host: /    #虚拟主机地址,不写表示所有,写了,就表示发送和接受只能在当前虚拟主机下的人可以收到。
        connection-timeout: 15000			链接超时时间。客户端链接服务点的超时时间。
    
        listener:  	#消费端配置
          simple:	#消费端
            concurrency: 5      默认五个线程,也就是相当于五个channel,就相当于默认有5个consume
            max-concurrency: 10		#最大消费端数
            acknowledge-mode: manual			#自动签收-auto  手动-manual,注意这是针对所有消费端,也就是说所有的消费端,都要手动确认,下面会介绍针对单个消费端。
            prefetch: 1				#限流(海量数据,同时只能过来一条)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    spring:
      rabbitmq:
        publisher-confirms: true   开启投递确认模式,投递表示消息发送方,在发给MqServer成功收到消息后,MQserver会给发送方一个confirm回执,表示发送方的消息投递成功。
        publisher-returns: true
        template:
          mandatory: true
        overtime: 1 #重新投递时间(分钟)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    2、消息发送代码、以及消息回调callBack

    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    //定义confirmCallBack函数
    
    final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
    
        /**
         *
         * @param correlationData 唯一标识,有了这个唯一标识,我们就知道可以确认(失败)哪一条消息了
         * @param ack  是否投递成功
         * @param cause 失败原因
         */
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            String messageId = correlationData.getId();
    
           
            //返回成功,表示消息被正常投递
            if (ack) {
                //更新MSG表的消息发送状态。
               
            } else {
            	//注意失败会有好几个原因,需要更具失败原因分析,比如集群的情况下,3台机器只有1台收到,那么ack=false,
            	//但是如果是队列满了,也是返回false,那么我们后面在投递也是失败。
                logger.error("消费信息失败,messageId:{} 原因:{}",brokerMessageLog.getMessage_id(),cause);
            }
        }
    };
    /**
     * 信息投递的方法
     * @param order
     * @throws Exception
     */
    public void send(Order order) throws Exception{
        //设置投递回调
        rabbitTemplate.setConfirmCallback(confirmCallback);
        CorrelationData correlationData = new CorrelationData();
        correlationData.setId(order.getMessage_id());
    
        //开始发送
        rabbitTemplate.convertAndSend("order-exchange","order.abcd", order, correlationData);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    3、定时任务

    /**
     * 系统启动后5秒开启定时任务 10秒执行一次
     */
    @Scheduled(initialDelay = 5000, fixedDelay = 10000)
    public void rabbitmqReSend() {
    
    	//查询出下一次执行时间小于当前时间的日志记录并且状态为投递中,遍历结果集,判断重试次数是或大于3次,如果大于,将日志设置为投递失败,如果小于 则尝试重新投递,并改变数据库中日志的尝试次数
        list = brokerMessageLogMapper.selectByExample(brokerMessageLogExample);
                
        list.forEach(brokerMessageLog -> {
            if (brokerMessageLog.getTry_count() >= 3) {
                大于最大重试次数,更新发送状态为失败
            } else {
                更新发送次数,开发发送
                orderSender.send(JSONObject.parseObject(brokerMessageLog.getMessage(),Order.class));
            }
    
        });
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
  • 相关阅读:
    tf.metrics
    【C语言】为什么建议使用枚举而不是#define?
    性能测试 —— Jmeter分布式测试的注意事项和常见问题
    对graalvm、springboot3.0一些新特性的探究
    android Intent(意图)
    MQTT vs. XMPP,哪一个才是IoT通讯协议的正解
    gcc编译器
    【Node.js】-闲聊:前端框架发展史
    <数据结构>停车场管理系统,利用栈和队列实现,包含纯c语言版和C++版的全注释源码
    把握时机加入 Road to GDE 指导计划
  • 原文地址:https://blog.csdn.net/weixin_37862824/article/details/126237890