• kafka复习:(26)通过RecordHeaders和RecordHeader来实现TTL功能


    一、定义生产者,在消息中加入RecordHeaders

    package com.cisdi.dsp.modules.metaAnalysis.rest.kafka2023;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.apache.kafka.common.header.internals.RecordHeaders;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    import java.util.Date;
    import java.util.Properties;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.Future;
    
    public class KafkaTest26 {
        public static void main(String[] args) {
            Properties properties= new Properties();
    
            properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"xx.xx.xx.xx:9092");
            KafkaProducer kafkaProducer=new KafkaProducer(properties);
    
            //大概率被消费者拦截器任务超时而丢弃
            RecordHeaders recordHeaders1 = new RecordHeaders();
            recordHeaders1.add("ttl", BytesUtils.longToBytes(1));
    
            RecordHeaders recordHeaders2 = new RecordHeaders();
            recordHeaders2.add("ttl", BytesUtils.longToBytes(30));
    
            RecordHeaders recordHeaders3 = new RecordHeaders();
            recordHeaders3.add("ttl", BytesUtils.longToBytes(60));
    
            ProducerRecord producerRecord1 = new ProducerRecord<>("ttl",0,
                    new Date().getTime(),"fff","hello sister,now is: "+ new Date(), recordHeaders1);
            ProducerRecord producerRecord2 = new ProducerRecord<>("ttl",0,
                    new Date().getTime(),"fff","hello sister,now is: "+ new Date(), recordHeaders2);
            ProducerRecord producerRecord3 = new ProducerRecord<>("ttl",0,
                    new Date().getTime(),"fff","hello sister,now is: "+ new Date(), recordHeaders3);
    
    
            Future future = kafkaProducer.send(producerRecord1);
            Future future2 = kafkaProducer.send(producerRecord2);
            Future future3 = kafkaProducer.send(producerRecord3);
    
            try {
                future.get();
                future2.get();
                future3.get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
            System.out.println("ok");
    
            kafkaProducer.close();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60

    二、定义消费者拦截器

    package com.cisdi.dsp.modules.metaAnalysis.rest.kafka2023;
    
    import org.apache.kafka.clients.consumer.ConsumerInterceptor;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    import org.apache.kafka.common.TopicPartition;
    import org.apache.kafka.common.header.Header;
    
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    public class TtlConsumerInterceptor implements ConsumerInterceptor {
        @Override
        public ConsumerRecords onConsume(ConsumerRecords records) {
            long now = System.currentTimeMillis();
            Map>> newRecords = new HashMap<>();
            for (TopicPartition tp : records.partitions()) {
                List> tpRecords = records.records(tp);
                List> newTpRecords = new ArrayList<>();
                for (ConsumerRecord record : tpRecords) {
                    long ttl = -1;
                    for (Header header : record.headers()) {
                        if (header.key().equals("ttl")){
                            ttl = BytesUtils.bytesToLong(header.value());
                        }
                    }
                    // 超时???
                    if (ttl > 0 && (now - record.timestamp() < ttl * 1000)){
                        newTpRecords.add(record);
                    } else {
                        newTpRecords.add(record);
                    }
                    if (!newTpRecords.isEmpty()){
                        newRecords.put(tp, newTpRecords);
                    }
                }
            }
            return new ConsumerRecords<>(newRecords);
        }
    
    
        @Override
        public void onCommit(Map offsets) {
    
        }
    
        @Override
        public void close() {
    
        }
    
        @Override
        public void configure(Map configs) {
    
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60

    三、定义消费者,配置上述拦截器

    package com.cisdi.dsp.modules.metaAnalysis.rest.kafka2023;
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.common.TopicPartition;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    import java.time.Duration;
    import java.time.temporal.TemporalUnit;
    import java.util.Arrays;
    import java.util.Properties;
    import java.util.concurrent.TimeUnit;
    
    public class KafkaTest27 {
    
        private static Properties getProperties(){
            Properties properties=new Properties();
    
            properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"xx.xx.xx.xx:9092");
            properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"testGroup");
            properties.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TtlConsumerInterceptor.class.getName());
            return properties;
        }
        public static void main(String[] args) {
    
            KafkaConsumer myConsumer=new KafkaConsumer(getProperties());
            String topic="ttl";
            myConsumer.subscribe(Arrays.asList(topic));
    
            while(true){
                ConsumerRecords consumerRecords=myConsumer.poll(Duration.ofMillis(5000));
                for(ConsumerRecord record: consumerRecords){
                    System.out.println(record.value());
                    System.out.println("record offset is: "+record.offset());
                }
    
            }
    
    
    
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
  • 相关阅读:
    1072 Gas Station
    Python、机器学习和量化开发环境
    阿坤老师的独特瓷器(Java详解)
    二维码智慧门牌管理系统:提升社会治理与服务的全新解决方案
    【JAVA学习笔记】48 - 八大常用Wrapper类(包装类)
    使用CAD偏移和阵列命令绘制图形、使用CAD旋转复制命令绘制图形
    MyBatis之关联关系
    项目部署上线
    守护安全|AIRIOT城市天然气综合管理解决方案
    数组06-滑动窗口
  • 原文地址:https://blog.csdn.net/amadeus_liu2/article/details/132666883