1.运维反馈服务器cpu高,且高达80%
2.经过排查发现kafka出现消息积压情况
3.使用的是springboot kafka框架
- dependency>
- <groupId>org.springframework.kafkagroupId>
- <artifactId>spring-kafkaartifactId>
- dependency>
4.kafka消费者需要发起http调用第三方api
5.消费者出现报错日志
6.报错日志每间隔5分钟抛出一次
- Consumer exception]
- org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
- at org.springframework.kafka.listener.SeekToCurrentBatchErrorHandler.handle(SeekToCurrentBatchErrorHandler.java:72)
- at org.springframework.kafka.listener.RecoveringBatchErrorHandler.handle(RecoveringBatchErrorHandler.java:124)
- at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1372)
- at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1070)
- at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
- at java.util.concurrent.FutureTask.run(FutureTask.java:266)
- at java.lang.Thread.run(Thread.java:750)
- Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
- at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1116)
- at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:983)
- at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1510)
- at sun.reflect.GeneratedMethodAccessor239.invoke(Unknown Source)
- at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
- at java.lang.reflect.Method.invoke(Method.java:498)
- at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344)
- at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:205)
- at com.sun.proxy.$Proxy206.commitSync(Unknown Source)
- at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doCommitSync(KafkaMessageListenerContainer.java:2324)
- at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitSync(KafkaMessageListenerContainer.java:2319)
- at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:2305)
- at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:2119)
- at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1104)
- at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1038)
- ... 3 common frames omitted
1.根据报错消息分析
踢出消费组通常是由于消费者被消费者组移除或重平衡(rebalance)导致的。
2.kafka消费者被踢出有如下几种情况
心跳超时:消费者没有在规定的时间内发送心跳到 Kafka 集群,导致被移出消费者组。
处理时间过长:消费者处理消息的时间超过了 max.poll.interval.ms 配置,导致 Kafka 认为消费者已经失效并将其移出消费者组。
网络问题:网络延迟或不稳定可能导致消费者与 Kafka 集群之间的连接问题。
资源限制:消费者运行环境的资源(CPU、内存等)不足,导致消费者无法及时处理消息和发送心跳。
3.业务逻辑
业务上存在调用第三方http接口请求
第三方接口请求会存在报错情况,通常情况下网络请求会有默认超时时间,例如ConnectTimeout、ReadTimeout等。
出现消费慢时怀疑是第三方接口导致的,但是服务器日志没有超时报错信息,被误导不是超时导致的。
重新新增日志打印定位请求参数,发现一直没有执行最后的ack操作,故怀疑是第三方请求线程一直在等待导致线程挂起
查看代码设置发现原来ReadTimeout超时时间不配置情况下,默认都是-1,没有超时时间,被第三方依赖包坑了
1.优化消费者处理逻辑
确保消费者处理消息的逻辑高效,尽量减少单个消息的处理时间。如果处理逻辑复杂,可以考虑使用多线程或异步处理来提高效率。
2 涉及第三方接口调用需要做好降级熔断操作
本次存在请求第三方http接口,经过排查是第三方接口一直没响应,同时http请求的sockekTimeOut(aka socketReadTimeOut)没设置超时时间,刚好默认为空,既不超时,
导致接口一直卡住,超过kafka的max.poll.interval.ms的时间
3.如果一个批次的消息在下次poll的时间内处理不完,可以降低poll获取消息的数量
- spring:
- kafka:
- consumer:
- max-poll-records: 5 (从50降到5个)
4.如果消费者处理消息的时间较长,可以增加 max.poll.interval.ms 的值,以允许消费者有更长的时间处理消息。
(springboot yaml配置如下)
- spring:
- kafka:
- consumer:
- properties:
- max.poll.interval.ms: 600000 (默认是5分钟(300000毫秒))
5.增加心跳间隔和超时时间
调整 heartbeat.interval.ms 和 session.timeout.ms 的值,以允许更长的心跳间隔和更长的会话超时时间
(一般情况下使用默认值即可)
1. 在kafka消费者业务代码中使用了同步锁,例如redis,众所周知,锁竞争不到时会有等待时间,这样加长了消息的处理时长,导致超过max.poll.interval.ms会被踢出消费组
2. 忙中出错
2.1 查看技术文章说要设置调大max.poll.interval.ms的时间
- @Bean
- public KafkaListenerContainerFactory> batchFactory(ConsumerFactory consumerFactory){
- ConcurrentKafkaListenerContainerFactory
factory = - new ConcurrentKafkaListenerContainerFactory<>();
- factory.setConsumerFactory(consumerFactory);
- factory.setConcurrency(1);
- factory.getContainerProperties().setPollTimeout(6000);
- //设置为批量消费,每个批次数量在Kafka配置参数中设置
- factory.setBatchListener(true);
- //设置手动提交ackMode
- factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
- return factory;
- }
误以为在就是设置factory.getContainerProperties().setPollTimeout(6000)这个方法。结果设置后头痛医脚
2.2 Spring Kafka 中的 setPollTimeout 和 max.poll.interval.ms区别
在 Spring Kafka 中,setPollTimeout 方法设置的是消费者从 Kafka 服务器拉取消息时的超时时间。这是在每次调用消费者的 poll 方法时使用的超时时间。
例如:
container.setPollTimeout(Duration.ofMillis(3000)); // 设置轮询超时时间为3秒
原生 Kafka 客户端中的 poll 方法
在原生 Kafka 客户端中,poll 方法的超时时间是通过传递给 poll 方法的参数来设置的:
ConsumerRecords
如何理解 poll 超时时间
poll 方法的超时时间指的是消费者从 Kafka 服务器拉取消息的时间间隔。如果在指定的时间内没有消息可供消费,poll 方法会返回空的 ConsumerRecords 集合。
这个超时时间不应该与 max.poll.interval.ms 混淆,后者是指消费者处理消息的最长时间,如果超过这个时间,消费者会被认为失效,从而触发重平衡。
3. 为了提高消费速度,不恰当的增加消费者多线程
还是springboot kafka代码
把factory.setConcurrency(1);改为 factory.setConcurrency(3);
其实kafka的topic分区数决定了最多分配给多少个消费者(线程),
例如topic A设置了3个分区,部署三个应用分别每个应用一个消费者线程,那么每个应用刚好分配一个分区,如果只有2个应用,那么每个应用消费者各获得一个分区,剩下一个分区是空的,
此时可以让一个应用增加并发线程数,即setConcurrency(2),此时提高并发数的应用是获取2个分区,另一个是获得一个分区
补充分区、消费者、消费者多线程的关系
关于分区和消费者关系,官网原文为:
- if, say, 6 TopicPartitions are provided and the concurrency is 3; each container will get 2 partitions. For 5 TopicPartitions, 2 containers will get 2 partitions and the third will get 1.
- If the concurrency is greater than the number of TopicPartitions, the concurrency will be adjusted down such that each container will get one partition.
假设提供了6个TopicPartitions,并发性为3;每个容器将得到2个分区。对于5个TopicPartitions, 2个容器将获得2个分区,第三个容器将获得1个分区。如果并发性大于TopicPartitions的数量,那么并发性将被调低,这样每个容器将获得一个分区。
4. 不理解为什么触发重平衡会导致cpu高,导致排查方向不明确
Kafka 重平衡(rebalance)是指消费者组(Consumer Group)的成员变化(如加入或离开)时,Kafka 会重新分配分区给消费者。这一过程会导致 CPU 使用率升高,具体细节如下:
【重平衡导致 CPU 高的具体原因】
1.消费者停止消息处理(Stop-the-World)
*当重平衡开始时,所有消费者必须停止当前的消息处理。这是一个Stop-the-World操作,
会影响消费者的正常工作流程。
*消费者需要提交当前处理到一半的消息的偏移量,这需要与 Kafka Broker 进行通信。
2.协调器与消费者之间的通信
*Kafka 使用协调器(Coordinator)来管理消费者组。重平衡时,
协调器需要与所有消费者通信,获取当前消费者的状态,并重新分配分区。
*这种通信涉及多次网络交互和数据处理,消耗 CPU 资源。
3.分区分配和元数据更新
*协调器计算新的分区分配方案,并将结果通知所有消费者。消费者需要更新本地的元数据,以反映新的分区分配。
*这些操作包括反序列化元数据、更新本地状态、处理可能的分区迁移等,都会消耗 CPU。
4.连接和断开连接
*重平衡期间,消费者可能会频繁地连接和断开连接。这包括重新建立网络连接、初始化通信通道等操作。
*网络 I/O 操作和相关的上下文切换会增加 CPU 负载。
5.日志记录和监控
*重平衡过程中,Kafka 和消费者通常会记录大量的日志信息。这些日志记录操作也会增加 CPU 的使用。
*如果启用了详细的监控和指标收集,这些操作会进一步加剧 CPU 负载。
【重平衡的流程细节】
1.触发重平衡
消费者加入或离开消费者组,或者消费者组协调器发生变化,都会触发重平衡。
2.停止消息处理
当前消费者停止消息处理,提交当前偏移量。
3.协调器获取状态
消费者向协调器发送 JoinGroupRequest,协调器收集所有消费者的状态。
4.分区重新分配
协调器根据分区分配策略(如 Range、RoundRobin、Sticky 等)重新分配分区。
5分区分配通知
协调器将新的分区分配方案发送给所有消费者,消费者更新本地的分区信息。
6重新开始消息处理
消费者根据新的分区分配,重新开始消息处理。
【分区、消费者、消费者多线程的关系】
关于分区和消费者关系,官网原文为:
- if, say, 6 TopicPartitions are provided and the concurrency is 3; each container will get 2 partitions. For 5 TopicPartitions, 2 containers will get 2 partitions and the third will get 1.
- If the concurrency is greater than the number of TopicPartitions, the concurrency will be adjusted down such that each container will get one partition.
翻译如下:
假设提供了6个TopicPartitions,并发性为3;每个容器将得到2个分区。对于5个TopicPartitions, 2个容器将获得2个分区,第三个容器将获得1个分区。如果并发性大于TopicPartitions的数量,那么并发性将被调低,这样每个容器将获得一个分区。