• Kafka生产消费实战-JAVA


    Kafka生产消费实战-JAVA

    生产者代码

    public static void main(String[] args) {
    
            Properties prop = new Properties();
            // 指定broker地址
            prop.put("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092");
            // 消息序列化
            prop.put("key.serializer", StringSerializer.class.getName());
            prop.put("value.serializer", StringSerializer.class.getName());
            // 创建生产者
            KafkaProducer producer = new KafkaProducer<String, String>(prop);
            // f发送数据
            String topic = "hello";
            producer.send(new ProducerRecord<String, String>(topic, "hello kafka producer"));
    
            // close
            producer.close();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    消费者代码

     public static void main(String[] args) {
    
            Properties prop = new Properties();
            prop.put("bootstrap.servers", "192.168.52.100:9092,192.168.52.101:9092,192.168.52.102:9092");
            // 反序列化
            prop.put("key.deserializer", StringDeserializer.class.getName());
            prop.put("value.deserializer", StringDeserializer.class.getName());
            // 指定消费者组
            prop.put("group.id", "con-1");
            KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(prop);
    
            Collection<String> topics = new ArrayList<>();
            topics.add("hello");
            // 订阅指定的topic
            consumer.subscribe(topics);
    
            while(true) {
                // 消费数据
                ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
    
                for (ConsumerRecord message: consumerRecords
                     ) {
                    System.out.println(message);
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    image-20240312144608270

    消费者代码扩展

    // 开启自动提交功能,默认是开启
            prop.put("enable.auto.commit", "true");
            // 自动提交时间间隔
            prop.put("auto.commit.interval.ms", "5000");
            // 先根据group.id指定的消费者组查询保存的offset信息
            // 如果找到了,说明之前消费过该消费组的消息,则根据之前保存的offset继续消费
            // 如果没有找到,说明是第一次消费,或者说是之前的offset对应的数据已经不存在了,此时就会根据auto.offset.reset 的值执行不同的消费逻辑
            // earliest:从最早的数据开始消费,从头开始
            // latest : 最新的数据开始消费-默认的策略
            // none : 抛出异常
            // 在实时计算的场景下,建议设置为latest
            // 这个参数只会在消费者第一次消费或者对应的offset没有数据的时候才会生效
            prop.put("auto.offset.reset", "latest");
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    Consumer消费offset查询

    • kafka0.9之前,消费的offset信息是保存在zookeeper中,0.9之后使用了新的消费API,消费者的信息会保存在kafka里面的_consumer_offsets这个topic中

    image-20240313091756451

    • 如何查询保存在kafka中的consumer的offset信息?
    # 查询消费者信息
    [root@hadoop01 kafka_2.12-2.4.0]# bin/kafka-consumer-groups.sh --list --bootstrap-server hadoop01:9092 
    con-1
    
    # 消费组描述
    [root@hadoop01 kafka_2.12-2.4.0]# bin/kafka-consumer-groups.sh --describe --bootstrap-server hadoop01:9092 --group con-1
    
    GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                           HOST            CLIENT-ID
    con-1           hello           2          1               1               0               consumer-con-1-1-572e3210-a06e-499c-ab2e-3d3340dd0129 /192.168.52.1   consumer-con-1-1
    con-1           hello           3          1               1               0               consumer-con-1-1-572e3210-a06e-499c-ab2e-3d3340dd0129 /192.168.52.1   consumer-con-1-1
    con-1           hello           1          0               0               0               consumer-con-1-1-572e3210-a06e-499c-ab2e-3d3340dd0129 /192.168.52.1   consumer-con-1-1
    con-1           hello           0          1               1               0               consumer-con-1-1-572e3210-a06e-499c-ab2e-3d3340dd0129 /192.168.52.1   consumer-con-1-1
    con-1           hello           4          2               2               0               consumer-con-1-1-572e3210-a06e-499c-ab2e-3d3340dd0129 /192.168.52.1   consumer-con-1-1
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    Consumer消费顺序

    • 当一个消费者消费一个partition的时候,消费的数据顺序和此partition数据的生产顺序是一致的

    • 当一个消费者消费多个partition的时候,消费者按照partition的顺序,首先消费一个partition,当消费完一个partition最新的数据后再消费其它partition的数据

    总之,如果一个消费者消费多个partition,只能保证消费者的数据顺序在一个partition内有序

    Kafka的三种语义

    • 至少一次:at-least-once,有可能对数据重复处理
    // 将自动提交设置为false
    prop.put("enable.auto.commit", "false");
    // 手动提交
    consumer.commitAsync();
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 至多一次:at-most-once,默认实现

    • 仅此一次:exactly-once

  • 相关阅读:
    Django 内置的过滤器与标签、自定义过滤器与标签
    大厂必备的6款React UI框架
    HTML介绍——HTML筑基
    WRFDA资料同化实践技术应用
    数据湖(十四):Spark与Iceberg整合查询操作
    深入理解Spring Boot钩子函数
    Femas:云原生多运行时微服务框架
    猿创征文 第二季| #「笔耕不辍」--生命不息,写作不止#
    机器学习——入门
    文件IO和标准IO的区别
  • 原文地址:https://blog.csdn.net/Grady00/article/details/136671676