• RabbitMQ消息的链路跟踪


    背景

    TraceId能标记一次请求的调用链路,在我们排查问题的时候十分重要。系统引入MQ后,MQ消息默认不带TraceId,所以消息发送和处理的链路就断了。下面分享如何对业务逻辑无感的方式,将TraceId带到消费端。

    难点

    RabbitMQ的Message对象可以在属性上设置头信息,所以携带TraceId的位置有了,问题是怎么无感的方式设置和获取TraceId?

    Spring RabbitMQ拦截器

    在Spring里使用RabbitMQ本身没有拦截器,但是有一个消息处理器,可以在发送和接收消息之前对消息进行处理。里面有3个重载的方法,对原始消息进行转换。我们可以借助这个处理器,在Message对象里加上TraceId。

    public interface MessagePostProcessor {
    
    	/**
    	 * Change (or replace) the message.
    	 * @param message the message.
    	 * @return the message.
    	 * @throws AmqpException an exception.
    	 */
    	Message postProcessMessage(Message message) throws AmqpException;
    
    	/**
    	 * Change (or replace) the message and/or change its correlation data. Only applies to
    	 * outbound messages.
    	 * @param message the message.
    	 * @param correlation the correlation data.
    	 * @return the message.
    	 * @since 1.6.7
    	 */
    	default Message postProcessMessage(Message message, Correlation correlation) {
    		return postProcessMessage(message);
    	}
    
    	/**
    	 * Change (or replace) the message and/or change its correlation data. Only applies to
    	 * outbound messages.
    	 * @param message the message.
    	 * @param correlation the correlation data.
    	 * @param exchange the exchange to which the message is to be sent.
    	 * @param routingKey the routing key.
    	 * @return the message.
    	 * @since 2.3.4
    	 */
    	default Message postProcessMessage(Message message, Correlation correlation, String exchange, String routingKey) {
    		return postProcessMessage(message, correlation);
    	}
    
    }

    发送消息时,携带TraceId

    Spring默认使用RabbitTemplate来发送消息,RabbitTemplate的send方法在发送消息之前,会调用beforePublishPostProcessors来处理Message对象。beforePublishPostProcessors集合里存的就是MessagePostProcessor对象,它会在发消息之前执行:

    public void doSend(Channel channel, String exchangeArg, String routingKeyArg, Message message,
    			boolean mandatory, @Nullable CorrelationData correlationData) throws IOException {
    	// ...
    	// 核心代码
    	if (this.beforePublishPostProcessors != null) {
    		for (MessagePostProcessor processor : this.beforePublishPostProcessors) {
    			messageToUse = processor.postProcessMessage(messageToUse, correlationData, exch, rKey);
    		}
    	}
    	// ...
    
    	sendToRabbit(channel, exch, rKey, mandatory, messageToUse);
    
    	// ...
    }

    我们要做的是在注册RabbitTemplate的时候加上处理TraceId的逻辑

    @Bean
    public RabbitTemplate rabbitTemplate(RabbitTemplateConfigurer configurer, ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate();
        configurer.configure(template, connectionFactory);
    
        template.setBeforePublishPostProcessors(new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
    			// 这个方法没调用,调用的是下面那个方法
                return message;
            }
    
            @Override
            public Message postProcessMessage(Message message, Correlation correlation, String exchange, String routingKey) {
                String requestId = MDC.get(MdcConstants.REQUESTID);
                if (StringUtils.isEmpty(requestId)) {
                    requestId = StringUtils.random();
                }
                message.getMessageProperties().setHeader(RabbitMQConstants.HEADER_REQUESTID, requestId);
                return message;
            }
        });
        return template;
    }

    消费消息时,获取TraceId

    当我们通过@RabbitListener注册consumer时,Spring会通过org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.AsyncMessageProcessingConsumer#mainLoop方法,不断从consumer队列里拿到消息。在把消息交给@RabbitListener标注的对象前,也会对Message对象进行处理。这里的处理器是存在org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#afterReceivePostProcessors属性上。

    private void doExecuteListener(Channel channel, Object data) {
    	if (data instanceof Message) {
    		Message message = (Message) data;
    		if (this.afterReceivePostProcessors != null) {
    			for (MessagePostProcessor processor : this.afterReceivePostProcessors) {
    				message = processor.postProcessMessage(message);
    				if (message == null) {
    					throw new ImmediateAcknowledgeAmqpException(
    							"Message Post Processor returned 'null', discarding message");
    				}
    			}
    		}
    		// ...
    
    		invokeListener(channel, message);
    	}
    	else {
    		invokeListener(channel, data);
    	}
    }

    怎么设置SimpleMessageListenerContainer#afterReceivePostProcessors的值?

    SimpleMessageListenerContainer对象是由SimpleRabbitListenerContainerFactory工厂对象创建,在创建SimpleMessageListenerContainer对象时,会把工厂里的属性拷贝过来,afterReceivePostProcessors就是通过工厂拷过来。所以我们直接设置SimpleRabbitListenerContainerFactory的afterReceivePostProcessors值就可以。

    @Bean(name = {"rabbitListenerContainerFactory"})
    @ConditionalOnProperty(
            prefix = "spring.rabbitmq.listener",
            name = {"type"},
            havingValue = "simple",
            matchIfMissing = true
    )
    public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory, ObjectProvider> simpleContainerCustomizer) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        configurer.configure(factory, connectionFactory);
        simpleContainerCustomizer.ifUnique(factory::setContainerCustomizer);
    
        factory.setAfterReceivePostProcessors(message -> {
            Object requestId = message.getMessageProperties().getHeader(RabbitMQConstants.HEADER_REQUESTID);
            if (StringUtils.isEmpty(requestId)) {
                requestId = StringUtils.random();
            }
            MDC.put(MdcConstants.REQUESTID, String.valueOf(requestId));
            return message;
        });
        return factory;
    }

    这样,我们就能在@RabbitListener的消费逻辑里拿到TraceId。

  • 相关阅读:
    台式机卡死救机心情点滴记录
    A Unified MRC Framework for Named Entity Recognition
    机器学习中 TP FP TN FN的概念
    面经-虚拟机-JVM垃圾回收算法
    适合运动的耳机推荐,2022年值得买的六款运动蓝牙耳机
    计算机毕业设计Java知识管理系统(源码+系统+mysql数据库+Lw文档)
    ubuntu127.0.1.1
    C专家编程 第10章 再论指针 10.8 轻松一下---程序检验的限制
    常用百宝箱——日志处理
    搭建前端框架
  • 原文地址:https://blog.csdn.net/xsgnzb/article/details/133385943