• Kafka - 3.x Producer 生产者最佳实践



    在这里插入图片描述


    生产经验_生产者提高吞吐量

    核心参数

    在这里插入图片描述

    Code

    package com.artisan.pc;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    import java.util.Properties;
    
    /**
     * @author 小工匠
     * @version 1.0
     * @mark: show me the code , change the world
     */
    public class CustomProducerParameters {
    
        public static void main(String[] args) throws InterruptedException {
    
            // 1. 创建kafka生产者的配置对象
            Properties properties = new Properties();
    
            // 2. 给kafka配置对象添加配置信息:bootstrap.servers
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.126.171:9092");
    
            // key,value序列化(必须):key.serializer,value.serializer
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    
            // batch.size:批次大小,默认16K
            properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
    
            // linger.ms:等待时间,默认0
            properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
    
            // RecordAccumulator:缓冲区大小,默认32M:buffer.memory
            properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
    
            // compression.type:压缩,默认none,可配置值gzip、snappy、lz4和zstd
            properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
    
            // 3. 创建kafka生产者对象
            KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
    
            // 4. 调用send方法,发送消息
            for (int i = 0; i < 10; i++) {
                kafkaProducer.send(new ProducerRecord<>("artisan", "art-msg-" + i));
            }
    
            // 5. 关闭资源
            kafkaProducer.close();
        }
    }
        
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53

    在这里插入图片描述


    生产经验_数据可靠性

    消息的发送流程

    回顾下消息的发送流程如下:

    在这里插入图

    ACK应答机制

    在这里插入图片描述

    背景Kafka提供的解决方案
    Leader收到数据,所有Follower开始同步数据,但有一个Follower因故障无法同步,导致Leader一直等待直到同步完成才发送ACK。- Leader维护了一个动态的In-Sync Replica Set (ISR)和Leader保持同步的Follower集合。
    - 当ISR中的Follower完成数据同步后,Leader向Producer发送ACK。
    - 如果某个Follower长时间(replica.lag.time.max.ms)未向Leader同步数据,则该Follower将被移出ISR。
    - 在Leader发生故障时,将从ISR中选举新的Leader。

    ack应答级别

    在这里插入图片描述

    对于某些不太重要的数据,对数据的可靠性要求不是很高,能够容忍数据的少量丢失,所以没必要等ISR中的follower全部接收成功。
    所以Kafka为用户提供了三种可靠性级别,用户根据对可靠性和延迟的要求进行权衡,选择以下的配置

    acks描述
    0提供最低延迟,Leader副本接收消息后返回ack,尚未写入磁盘。可能导致数据丢失,特别是在Leader发生故障时。
    1Leader副本将消息写入磁盘后返回ack,但如果Leader在Follower副本同步数据之前发生故障,可能会丢失数据。
    -1或者 (all) ,Leader和所有Follower副本都将消息写入磁盘后才返回ack。如果在Follower副本同步完成后,Leader副本在发送ack之前发生故障,可能会导致数据重复。

    应答机制 小结

    在这里插入图片描述


    Code

    package com.artisan.pc;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    import java.util.Properties;
    
    /**
     * @author 小工匠
     * @version 1.0
     * @mark: show me the code , change the world
     */
    public class CustomProducerAck {
    
        public static void main(String[] args) throws InterruptedException {
    
            // 1. 创建kafka生产者的配置对象
            Properties properties = new Properties();
    
            // 2. 给kafka配置对象添加配置信息:bootstrap.servers
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.126.171:9092");
    
            // key,value序列化(必须):key.serializer,value.serializer
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    
            // 设置acks
            properties.put(ProducerConfig.ACKS_CONFIG, "all");
    
            // 重试次数retries,默认是int最大值,2147483647
            properties.put(ProducerConfig.RETRIES_CONFIG, 3);
    
            // 3. 创建kafka生产者对象
            KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
    
            // 4. 调用send方法,发送消息
            for (int i = 0; i < 10; i++) {
                kafkaProducer.send(new ProducerRecord<>("artisan", "art-msg-ack" + i));
            }
    
            // 5. 关闭资源
            kafkaProducer.close();
        }
    }
        
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

    在这里插入图片描述

    生产经验_数据去重

    数据传递语义

    在这里插入图片描述


    幂等性

    幂等性原理

    在这里插入图片描述

    开启幂等性配置(默认开启)

    在prudocer的配置对象中,添加参数enable.idempotence,参数值默认为true,设置为false就关闭了。


    生产者事务

    kafka事务原理

    在这里插入图片描述

    事务代码流程

    // 1初始化事务
    void initTransactions();
    // 2开启事务
    void beginTransaction() throws ProducerFencedException;
    // 3在事务内提交已经消费的偏移量(主要用于消费者)
    void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
                                  String consumerGroupId) throws ProducerFencedException;
    // 4提交事务
    void commitTransaction() throws ProducerFencedException;
    // 5放弃事务(类似于回滚事务的操作)
    void abortTransaction() throws ProducerFencedException;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    生产经验_数据有序

    在这里插入图片描述


    生产经验_数据乱序

    在这里插入图片描述


  • 相关阅读:
    上周热点回顾(10.10-10.16)
    Win32 COLORREF、RGB、获取颜色分量
    Avalonia环境搭建
    AndroidStudio集成GitHub操作入门
    【愚公系列】2022年07月 Go教学课程 021-Go容器之切片操作
    文本的设置
    优秀智慧园区案例 - 中建科技产业园(中建·光谷之星),万字长文解析先进智慧园区建设方案经验
    linux命令学习
    关于python中正则表达式的一些笔记
    Linux下NANDFLASH probe函数分析
  • 原文地址:https://blog.csdn.net/yangshangwei/article/details/134069418