• 【kafka异常】使用Spring-kafka遇到的坑


    查看一下压缩策略

    bin/kafka-topics.sh --describe --zookeeper xxxx:2181 --topic SHI_TOPIC1

    Topic:SHI_TOPIC1 PartitionCount:1 ReplicationFactor:1 Configs:cleanup.policy=compact

    Topic: SHI_TOPIC1 Partition: 0 Leader: 0 Replicas: 0 Isr: 0

    Configs:cleanup.policy=compact :

    然后再检查一下自己发送消息的时候是不是没有传 key

    [参考链接](()

    问题堆栈信息

    org.springframework.kafka.listener.ListenerExecutionFailedException: invokeHandler Failed;

    nested exception is java.lang.IllegalStateException: No Acknowledgment available as an argument,

    the listener container must have a MANUAL AckMode to populate the Acknowledgment.;

    nested exception is java.lang.IllegalStateException:

    No Acknowledgment available as an argument, the listener container must have a MANUAL AckMode to populate the Acknowledgment.

    问题原因

    解决方案

    问题堆栈信息

    Failed to start bean ‘org.springframework.kafka.config.internalKafkaListenerEndpointRegistry’; nested exception is java.lang.IllegalStateException: Consumer cannot be configured for auto commit for ackMode MANUAL_IMMEDIATE

    问题原因

    不能再配置中既配置kafka.consumer.enable-auto-commit=true 自动提交; 然后又在监听器中使用手动提交

    例如:

    kafka.consumer.enable-auto-commit=true

    @Autowired

    private ConsumerFactory consumerFactory;

    @Bean

    public KafkaListenerContainerFactory> kafkaManualAckListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory factory =

    new ConcurrentKafkaListenerContainerFactory<>();

    factory.setConsumerFactory(consumerFactory);

    //设置提交偏移量的方式 当Acknowledgment.acknowledge()侦听器调用该方法时,立即提交偏移量

    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);

    return factory;

    }

    /**

    • 手动ack 提交记录

    • @param data

    • @param ack

    • @throws InterruptedException

    */

    @KafkaListener(id = “consumer-id2”,topics = “SHI_TOPIC1”,concurrency = “1”,

    clientIdPrefix = “myClientId2”,containerFactory = “kafkaManualAckListenerContainerFactory”)

    public void consumer2(String data, Acknowledgment ack) {

    log.info(“consumer-id2-手动ack,提交记录,data:{}”,data);

    ack.acknowledge();

    }

    解决方法:

    将自动提交关掉,或者去掉手动提交;

    如果你想他们都同时存在,某些情况自动提交;某些情况手动提交; 那你创建 一个新的

    consumerFactory 将它的是否自动提交设置为false;比如

    @Configuration

    @EnableKafka

    public class KafkaConfig {

    @Autowired

    private KafkaProperties properties;

    /**

    • 创建一个新的消费者工厂

    • 创建多个工厂的时候 SpringBoot就不会自动帮忙创建工厂了;所以默认的还是自己创建一下

    • @return

    */

    @Bean

    public ConsumerFactory kafkaConsumerFactory() {

    Map map = properties.buildConsumerProperties();

    DefaultKafkaConsumerFactory factory = new DefaultKafkaConsumerFactory<>( map);

    return factory;

    }

    /**

    • 创建一个新的消费者工厂

    • 但是修改为不自动提交

    • @return

    */

    @Bean

    public ConsumerFactory kafkaManualConsumerFactory() {

    Map map = properties.buildConsumerProperties();

    map.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);

    DefaultKafkaConsumerFactory factory = new DefaultKafkaConsumerFactory<>( map);

    return factory;

    }

    /**

    • 手动提交的监听器工厂 (使用的消费组工厂必须 kafka.consumer.enable-auto-commit = false)

    • @return

    */

    @Bean

    public KafkaListenerContainerFactory> kafkaManualAckListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory factory =

    new ConcurrentKafkaListenerContainerFactory<>();

    factory.setConsumerFactory(kafkaManualConsumerFactory());

    //设置 《一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义》无偿开源 威信搜索公众号【编程进阶路】 提交偏移量的方式 当Acknowledgment.acknowledge()侦听器调用该方法时,立即提交偏移量

    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);

    return factory;

    }

    先自我介绍一下,小编13年上师交大毕业,曾经在小公司待过,去过华为OPPO等大厂,18年进入阿里,直到现在。深知大多数初中级java工程师,想要升技能,往往是需要自己摸索成长或是报班学习,但对于培训机构动则近万元的学费,着实压力不小。自己不成体系的自学效率很低又漫长,而且容易碰到天花板技术停止不前。因此我收集了一份《java开发全套学习资料》送给大家,初衷也很简单,就是希望帮助到想自学又不知道该从何学起的朋友,同时减轻大家的负担。添加下方名片,即可获取全套学习资料哦

  • 相关阅读:
    Applied Time Series Analysis with R
    esp8266 通过Arduino与电脑串口通信
    物理内存 虚拟内存 页映射模式
    Hbase 过滤器详解
    Apache DolphinScheduler - 快速扩展 TaskPlugin 从入门到放弃
    MATLAB科技绘图与数据分析
    月入10.3K,苦逼土木狗转行5G网路优化工程师:对象没了之后,我选择转行!
    工厂设计模式
    Solon 1.6.33 发布,更现代感的应用开发框架
    微服务项目实战-黑马头条(三):APP端文章详情
  • 原文地址:https://blog.csdn.net/m0_67401920/article/details/126080599