• SpringBoot用kafka.listener监听接受Kafka消息


    1.创建kafka监听配置并进行注册

    
    
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.annotation.EnableKafka;
    import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
    import org.springframework.kafka.core.ConsumerFactory;
    import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
    import org.springframework.kafka.listener.ContainerProperties;
    
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * @author 35
     * @description kafka listen监听配置
     * @date 2024年04月24日 13:25
     */
    @Configuration
    @EnableKafka
    public class KafkaConfig {
    
    
        // kafka实例
        @Value("${spring.kafka.bootstrap-servers}")
        private String bootstrapServers;
    
        // kafka AI 服务的Group
        private String groupId = Constants.KAFKA_AI_SERVER_GROUP;
    
        @Bean
        public ConsumerFactory<String, String> consumerFactory() {
            Map<String, Object> props = new HashMap<>();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            return new DefaultKafkaConsumerFactory<>(props);
        }
    
        @Bean
        public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
            // 设置为可以手动消费
            factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
            return factory;
        }
    }
    
    

    2.使用示例

      @KafkaListener(topics = Constants.KAFKA_USER_TOPIC, groupId = Constants.KAFKA_SERVER_GROUP)
        public void syncUserByKafKa(String message, Acknowledgment ack) {
            try {
                // 调用具体的执行方法
                 bb(message);
    
                // 提交kafka消费位移
                ack.acknowledge();
            } catch (Exception e) {
                log.error("失败:" + e.getMessage() + "消息:" + message);
            } finally {
                // 提交kafka消费位移
                ack.acknowledge();
            }
    
        }
    
  • 相关阅读:
    CRYPTOHACK BLOCK CIPHERS
    Spring Bean自动装配
    JVM_常见【面试题】
    【MySQL】Java的JDBC编程
    Ansible playbook的block
    [附源码]java毕业设计校园一卡通管理信息系统台
    2310D导入c部分可用
    跨境电商做什么产品好?2022速卖通热销品类榜单来袭!
    【已解决】springboot整合swagger2文档
    Hadoop FS 文件系统命令
  • 原文地址:https://blog.csdn.net/qq_27860623/article/details/142181316