• 5.2.SpringBoot整合Kafka


    1、准备工作

    pom配置:

    	<dependency>
    		<groupId>org.springframework.kafkagroupId>
    		<artifactId>spring-kafkaartifactId>
    		<version>2.8.8version>
    	dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    application-dev.yml配置 (配置在spring下):

     kafka:
        bootstrap-servers:
          - localhost:9092
        template:
          default-topic: demo.topic
        producer:
          retries: 3 # 重试次数,设置大于0的值,则客户端会将发送失败的记录重新发送
          batch-size: 16384 #批量处理大小,16K
          buffer-memory: 33554432 #缓冲存储大,32M
          acks: 1
          # 指定消息key和消息体的编解码方式
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
        consumer:
          # 是否自动提交(手动提交要关闭,不然会报错)
          group-id: spring_customer
          enable-auto-commit: false
          # 消费偏移配置
          # none:如果没有为消费者找到先前的offset的值,即没有自动维护偏移量,也没有手动维护偏移量,则抛出异常
          # earliest:在各分区下有提交的offset时:从offset处开始消费;在各分区下无提交的offset时:从头开始消费
          # latest:在各分区下有提交的offset时:从offset处开始消费;在各分区下无提交的offset时:从最新的数据开始消费
          auto-offset-reset: earliest
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        # 监听
        listener:
          # record:当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
          # batch:当每一批poll()的数据被ListenerConsumer处理之后提交
          # time:当每一批poll()的数据被ListenerConsumer处理之后,距离上次提交时间大于TIME时提交
          # count:当每一批poll()的数据被ListenerConsumer处理之后,被处理record数量大于等于COUNT时提交
          # count_time:TIME或COUNT中有一个条件满足时提交
          # manual:当每一批poll()的数据被ListenerConsumer处理之后, 手动调用Acknowledgment.acknowledge()后提交
          # manual_immediate:手动调用Acknowledgment.acknowledge()后立即提交,一般推荐使用这种
          ack-mode: manual_immediate
    	  # 在侦听器容器中运行的线程数。
          concurrency: 5
     # 其它属性配置
     properties:
     # 设置发送消息的大小
        max.request.size: 10240000
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    2、创建生产者和消费者

    生产者:

    /**
     * cf
     * 生产者
     */
    @RestController
    public class KafkaProducer {
    
        @Autowired
        private KafkaTemplate<String, Object> kafkaTemplate;
    
        /**
         * 发送消息
         * @param message
         */
        @PostMapping("/kafka")
        public void sendMessage1(@RequestBody String message) {
            kafkaTemplate.send("topic_test01", "cfKey",message);
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    消费者:

    /**
     * cf
     * 消费者
     */
    @Component
    @Slf4j
    public class KafkaConsumer {
        /**
         * kafka的监听器 消费消息
         * @param record
         * @param item
         */
        @KafkaListener(topics = "topic_test01", groupId = "spring_customer")
        public void topicListener(ConsumerRecord<String, String> record, Acknowledgment item) {
            log.info("我开始消费:{}==,{}==,{};",record.topic(),record.partition(),record.value());
            //手动提交
            item.acknowledge();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    在 Kafka 中消息通过服务器推送给各个消费者,而 Kafka 的消费者在消费消息时,需要提供一个监听器(Listener)对某个 Topic 实现监听,从而获取消息,这也是 Kafka 消费消息的唯一方式。

    @KafkaListener 注解实现监听器,就是我们用到的。

    源码:

    @Target({ ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE })
    @Retention(RetentionPolicy.RUNTIME)
    @MessageMapping
    @Documented
    @Repeatable(KafkaListeners.class)
    public @interface KafkaListener {
        String id() default "";
        String containerFactory() default "";
        //消息 Topic
        String[] topics() default {};
        //Topic 的模式匹配表达式
        String topicPattern() default "";
    	//Topic 分区
        TopicPartition[] topicPartitions() default {};
        String containerGroup() default "";
        String errorHandler() default "";
    	//消息分组 Id
        String groupId() default "";
        boolean idIsGroup() default true;
        String clientIdPrefix() default "";
        String beanRef() default "__listener";
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    在使用 @KafkaListener 时,最核心的操作是设置 Topic,而 Kafka 还提供了一个模式匹配表达式可以对目标 Topic 实现灵活设置。
    在这里,强调下 groupId 这个属性,这就涉及 Kafka 中另一个核心概念:消费者分组(Consumer Group)。

    设计消费者组的目的是应对集群环境下的多服务实例问题。

    显然,如果采用发布-订阅模式会导致一个服务的不同实例可能会消费到同一条消息。

    为了解决这个问题,Kafka 中提供了消费者组的概念。一旦我们使用了消费组,一条消息只能被同一个组中的某一个服务实例所消费

    3.测试一下

    在这里插入图片描述

    成功:
    在这里插入图片描述

    4. 复杂操作1 (ConcurrentKafkaListenerContainerFactory)应用

    kafka过滤器

    1.我们在使用kafka的时候,会遇到某一些消息过滤的情况,所以我们需要配置过滤器,来清洗kafka中的数据。
    2.有时候我们需要配置并发消费,所以要Specify the container concurrency.指定容器并发性

    配置如下:

    
    /**
     * cf
     * 消费者消息过滤器
     */
    @Configuration
    public class CustomKafkaFilter {
        @Autowired
        private ConsumerFactory consumerFactory;
        @Bean("concurrentKafkaListenerContainerFactory")
        public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(){
            ConcurrentKafkaListenerContainerFactory factory =
                    new ConcurrentKafkaListenerContainerFactory();
            factory.setConsumerFactory(consumerFactory);
    		//指定容器并发性(配置文件配置也可)
            factory.setConcurrency(3);
    		//开启批量消费 (配置文件配置也可)
            factory.setBatchListener(true);
            // 被过滤的消息将被丢弃
            factory.setAckDiscarded(true);
            // 设置记录筛选策略
            factory.setRecordFilterStrategy(new RecordFilterStrategy() {
                @Override
                public boolean filter(ConsumerRecord consumerRecord) {
                    String msg = consumerRecord.value().toString();
    				 //这里写筛选规则
                    return true;// 返回true消息将会被丢弃
                }
            });
            return factory;
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    注意: @Bean(“concurrentKafkaListenerContainerFactory”)一定要指定名称,否则spring会认为没有这个bean自动创建一个,会出现重复执行问题。

    5. 复杂操作2 封装 kafka生产者

    工具类KafkaAnalyzeProducer:

    
    import lombok.SneakyThrows;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.stereotype.Component;
    import org.springframework.util.concurrent.FailureCallback;
    import org.springframework.util.concurrent.ListenableFuture;
    import org.springframework.util.concurrent.ListenableFutureCallback;
    import org.springframework.util.concurrent.SuccessCallback;
    
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.TimeoutException;
    
    /**
     * kafka生产者 cf
     */
    @Slf4j
    @Component
    public class KafkaAnalyzeProducer {
    
        @Autowired
        private KafkaTemplate kafkaTemplate;
    
        /**
         * 发送数据到kafka
         *
         * @param topic        topic名称
         * @param message      发送信息字符串
         * @param sendCallBack 发送回调
         */
        public void send(String topic, String message, SendCallBack sendCallBack) {
    
            ListenableFuture listenableFuture = kafkaTemplate.send(topic, message);
            //发送成功后回调
            SuccessCallback<String> successCallback = new SuccessCallback() {
                @SneakyThrows
                @Override
                public void onSuccess(Object result) {
                    sendCallBack.sendSuccessCallBack(topic, message);
                }
            };
            //发送失败回调
            FailureCallback failureCallback = new FailureCallback() {
                @SneakyThrows
                @Override
                public void onFailure(Throwable ex) {
                    sendCallBack.sendFailCallBack(topic, message, ex);
                }
            };
    
            listenableFuture.addCallback(successCallback, failureCallback);
        }
    
    
        /**
         * producer 同步方式发送数据
         *
         * @param topic   topic名称
         * @param message producer发送的数据
         */
        public void sendAsynchronize(String topic, String message) throws InterruptedException, ExecutionException, TimeoutException {
            kafkaTemplate.send(topic, message).get(10, TimeUnit.SECONDS);
        }
    
        /**
         * producer 异步方式发送数据
         *
         * @param topic   topic名称
         * @param message producer发送的数据
         */
        public void sendSynchronize(String topic, String message) {
            kafkaTemplate.send(topic, message).addCallback(new ListenableFutureCallback() {
                @Override
                public void onFailure(Throwable throwable) {
                    log.error("----事件kafka记录解析完成放入topic:{},发送失败{}", topic, message, throwable);
                }
    
                @Override
                public void onSuccess(Object o) {
                    log.info("----事件kafka记录解析完成放入topic:{},发送成功:{}", topic, message);
                }
            });
        }
    
        public void sendMessage(String topic, String message) {
            this.kafkaTemplate.send(topic, message);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91

    SendCallBack:

    import java.text.ParseException;
    
    public interface SendCallBack {
    
        /**
         * 生产成功回调
         * @param topic topic
         * @param msg 信息字符串
         */
        void sendSuccessCallBack(String topic,String msg) throws ParseException;
    
        /**
         * 生产失败回调
         * @param topic topic
         * @param msg 信息字符串
         * @param ex 异常
         */
        void sendFailCallBack(String topic,String msg,Throwable ex) throws ParseException;
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    使用方式:

        @Autowired
        private KafkaAnalyzeProducer kafkaAnalyzeProducer;
       
    // 下面的放方法里面就好了   
    kafkaAnalyzeProducer.send(topic,JSON.toJSONString(itemMap,SerializerFeature.WriteMapNullValue),new SendCallBack() {
            @Override
            public void sendSuccessCallBack (String topic, String msg){
                JSONObject jsonObject = JSONObject.parseObject(msg);
    
                log.info("----sendSuccessCallBack事件kafka记录解析完成放入topic:{},发送成功:{}", topic, jsonObject.getOrDefault("id", "").toString());
            }
    
            @Override
            public void sendFailCallBack (String topic, String msg, Throwable ex){
                JSONObject jsonObject = JSONObject.parseObject(msg);
    
                log.error("----sendFailCallBack事件kafka记录解析完成放入topic:{},发送失败{}", topic, jsonObject.getOrDefault("id", "").toString(), ex);
            }
        });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    ok基本上就分享这些就够用了。

  • 相关阅读:
    18.Raising and Lower Indexs
    MySQL性能分析常见方式
    Windows 利用Anaconda创建pytorch等环境
    JVM(2)
    【微信小程序】接口生成自定义首页二维码
    玩机搞机-----安卓全机型刷机卡fastboot模式解决方法与故障解析
    mysql为什么会选错索引?
    nix包管理器
    mac电脑做为开发机的一些初始化操作
    传奇开服怎么开服?不会技术自己能开服吗?传奇开服需要准备什么?前期需要投入多少?
  • 原文地址:https://blog.csdn.net/daohangtaiqian/article/details/126117768