• 【kafka】十四、kafka生产者API


    kafka Producer API

    1.消息发送流程

    kafka的producer发送消息采用的是异步发送的方式。在消息的发送过程中,涉及到了两个线程–main线程和sender线程,以及一个线程共享变量–RecordAccumulator。main线程将消息发送给RecordAccumulator,sender线程不断从RecordAccumulator中拉取消息发送到kafka broker。

    image-20220221220852427

    相关参数:

    batch.size:只有数据积累到batch.size之后,sender才会发送数据

    linger.ms:如果数据迟迟未达到batch.size,sender等待linger.ms之后就会发送数据

    2.异步发送API

    依赖

    <dependency>
        <groupId>org.apache.kafkagroupId>
        <artifactId>kafka-clientsartifactId>
        <version>0.11.0.0version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    服务类

    KafkaProducer:创建一个生产者对象,用来发送数据

    ProducerConfig:获取kafka需要的一系列配置参数

    ProducerRecord:每条数据都要封装成一个ProducerRecord对象

    2.1 不调用回到函数
    public class MyProducer {
    
        public static void main(String[] args) throws InterruptedException {
            //1.创建kafka生产者的配置信息
            Properties properties = new Properties();
            //2.指定连接的kafka集群
            properties.put("bootstrap.servers", "hll1:9092");
            //3.ack机制
            properties.put("acks", "all");
            //4.重试次数
            properties.put("retries", 3);
            //5.批次大小,16384=16k
            properties.put("batch.size", 16384);
            //6.等待时间,时间到了之后会发送数据
            properties.put("linger.ms", 1);
            //7.RecordAccumulator缓冲区大小,33554432=32M
            properties.put("buffer.memory", 33554432);
            //8.key value的序列化类
            properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
            //9.创建生产者对象
            KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
            //10.发送数据
            //topic信息写在ProducerRecord
            for (int i = 0; i < 10; i++) {
                producer.send(new ProducerRecord<String, String>("bigdata", "hll---" + i));
            }
            //11.关闭连接,如果不关闭连接,消费者不会接收到消息
            producer.close();
            //或者可以暂停线程,达到"linger.ms"的配置要求也可以完成消费
            //Thread.sleep(1000);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    启动一个消费者后再运行代码,可以看到成功消费通过代码生成的消息

    image-20220226225832308

    2.2 调用回调函数
    public class CallbackProducer {
    
        public static void main(String[] args) {
            //创建kafka配置信息
            Properties properties = new Properties();
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hll1:9092");
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    
    
            //创建kafka生产者对象
            KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
    
            //发送数据
            for (int i = 0; i < 10; i++) {
                kafkaProducer.send(new ProducerRecord<>("bigdata", "hll::" + i),
                        (metadata, exception) -> {
                            if (exception == null) {
                                System.out.println(metadata.partition() + "::" + metadata.offset());
                            }
                        });
            }
    
            //关闭资源
            kafkaProducer.close();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    image-20220228232238995

    从打印的结果来看,消息被平均的分配到了两个分区(当前测试的主题只有两个分区)

    3.自定义分区
    public class MyPartitioner implements Partitioner {
    
        /**
         * 分区选取
         */
        @Override
        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
            //默认分区获取方法
            //new DefaultPartitioner().partition()
    
            //可以实现自己分区策略,返回的需要是可用的分区的
            return 0;
        }
    
        @Override
        public void close() {
    
        }
    
        @Override
        public void configure(Map<String, ?> configs) {
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    public class PartitionerProducer {
    
        public static void main(String[] args) {
            //kafka配置文件
            Properties properties = new Properties();
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hll1:9092");
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    
            //自定义分区加载器
            properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.hll.partitioner.MyPartitioner");
    
            //创建kafka生产者
            KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
    
            //发送数据
            for (int i = 0; i < 10; i++) {
                producer.send(new ProducerRecord<>("bigdata", "hll::" + i),
                        (metadata, exception) -> {
                            //如果exception为null,说明消息发送成功没有异常
                            if (exception == null) {
                                System.out.println(metadata.partition() + "==" + metadata.offset());
                            } else {
                                exception.printStackTrace();
                            }
                        });
            }
    
            //关闭连接
            producer.close();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    image-20220228233550702

    4.同步发送API

    同步发送的意思是,一条消息发送之后,会阻塞当前线程,直到返回ack.

    由于send方法返回的是一个Future对象,根据Future对象的特点,我们也可以实现同步发送的效果,只需要调用Future对象的get()方法即可。

    //调用get,阻塞线程,同步发送
    producer.send(new ProducerRecord<String, String>("bigdata", "hll---" + i)).get();
    
    • 1
    • 2
  • 相关阅读:
    有 AI,无障碍,AIoT 设备为视障人群提供便利
    Jenkins发布失败记录
    经典算法之索引查找
    Python自动化测试——在线生成接口文档
    快递物流类API推荐
    【Redis】10.哨兵模式与分片集群
    CMake学习(二):使用CMake构建包含其他库的C++项目
    完整记录一下Web前端直传阿里OSS大文件+采用后端临时授权传stsToken的方式
    三、LED闪烁
    python自动化之Python webservice协议
  • 原文地址:https://blog.csdn.net/sinat_33151213/article/details/128064499