• kafka消息的生产和消费(java)


    新建maven项目,引入依赖:

    <dependency>
      <groupId>org.apache.kafkagroupId>
      <artifactId>kafka-clientsartifactId>
      <version>3.2.1version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    java kafka生产者

    public class JavaProducer {
        
        // kafka地址
        public static final String bootstrapServer = "localhost:9092";
        // topic主题
        public static final String topic = "test";
        
        public static void main(String[] args) {
            
            Properties properties = new Properties();
            // 指定key和消息体value的编码方式
            properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
            properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
            properties.put("bootstrap.servers",bootstrapServer);
            
            // 创建并配置生产者
            KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
            // 创建消息,并指定分区
            ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic,"test message 033");
            // 发送消息
            kafkaProducer.send(producerRecord);
            // 关闭生产者客户端
            kafkaProducer.close();
            
            
        }
        
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    一个消息的生产主要包括四个步骤:

    1.配置和创建生产者实例

    2.配置和创建消息

    3.发送消息

    4.关闭生产者客户端实例

    Kafka生产者配置项

    kafka生产者有三个必填配置

    • bootstrap.servers:指定broker地址清单
    • key.serializer:key的序列化方式,消费者对应的需要配置反序列化方式
    • value.serializer:value的序列化方式,消费者对应的需要配置反序列化方式

    还有一些非必填配置,参照org.apache.kafka.clients.producer.ProducerConfig类。

    消息的创建

    消息主要包括以下属性,其中topic和value是必填项,其余是选填项

    public class ProducerRecord<K, V> {
    
        private final String topic;
        private final Integer partition;
        private final Headers headers;
        private final K key;
        private final V value;
        private final Long timestamp;
        ………
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    对应的,ProducerRecord也提供了多个构造方法

    img

    消息的发送

    生产者实例和消息实例都构建完成之后,就可以发送了

    发送消息主要由三种模式:发后即忘(fire-and-forget)、同步(sync)、异步(async)

    上面kafkaProducer.send(producerRecord);就是发后即忘,他只管发送消息,至于有没有发送成功不关心,这就有消息丢失的可能。

    事实上send方法并非是void类型的,而是Future类型,并且提供了两个重载方法

    img

    所以同步发送模式就可以利用返回的Future对象实现:

    Future<RecordMetadata> send = kafkaProducer.send(producerRecord);
    RecordMetadata recordMetadata = send.get();
    
    • 1
    • 2

    异步发送方式则可以利用send的重载方法,指定一个callback回调函数

    kafkaProducer.send(producerRecord, (metadata, exception) -> {
        if (exception != null) {
            // 异常处理
        } else {
            System.out.println(metadata);
        }
    });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    RecordMetadata

    RecordMetadata对象包含了消息的一些元数据信息:

    public final class RecordMetadata {
    
        /**
         * Partition value for record without partition assigned
         */
        public static final int UNKNOWN_PARTITION = -1;
    
        private final long offset;
        // The timestamp of the message.
        // If LogAppendTime is used for the topic, the timestamp will be the timestamp returned by the broker.
        // If CreateTime is used for the topic, the timestamp is the timestamp in the corresponding ProducerRecord if the
        // user provided one. Otherwise, it will be the producer local time when the producer record was handed to the
        // producer.
        private final long timestamp;
        private final int serializedKeySize;
        private final int serializedValueSize;
        private final TopicPartition topicPartition;
        
        //…………
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    关闭生产者客户端示例

    通常情况下,一个KafkaProducer不会只发送单条消息。在发送完所有的消息后,调用KafkaProducer#close()方法关闭KafkaProducer实例来回收资源。close()方法会阻塞等待所有发送请求完成后再关闭KafkaProducer

    KafkaProducer还提供了一个带超时时间的重载方法,如果使用这个重载方法,则只会等待指定的超时时间,如果超过了这个时间,即使还有消息未发送完成,也会强行退出。我们一般使用无参的close()方法。

    java kafka消费者

    public class JavaConsumer {
    
        public static final String bootstrapServer = "localhost:9092";
        public static final String topic = "test";
        public static final String group_id = "test-group2";
    
        public static final AtomicBoolean isRunning = new AtomicBoolean(true);
    
        public static void main(String[] args) {
    
            Properties properties = new Properties();
            properties.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
            properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
            properties.put("bootstrap.servers",bootstrapServer);
            properties.put("group.id",group_id);
            // 创建消费者客户端
            KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
            // 订阅主题
            kafkaConsumer.subscribe(Collections.singletonList(topic));
            // 循环消费消息
            try{
                while (isRunning.get()){
                    ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));
                    for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                        // 处理consumerRecord
                    }
                }
            }catch (Exception e){
                // 处理异常
            }finally {
                kafkaConsumer.close();
            }
            
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    消费者的消费逻辑主要包括以下几个步骤:

    1.配置消费者客户端参数并创建消费者实例

    2.订阅主题

    3.拉取消息并消费

    4.提交消费位移

    5.关闭消费者实例

    配置消费者客户端

    必填参数:

    • bootstrap.servers:指定broker地址清单
    • key.deserialize:key反序列化方式,与生产者序列化方式对应
    • value.deserializer:value反序列化方式,与生产者序列化方式对应
    • group.id:消费者组名

    更多配置参org.apache.kafka.clients.consumer.ConsumerConfig

    订阅主题与分区

    调用KafkaConsumer#subscribe()方法来订阅主题。

    一个消费者可以订阅一个或多个主题。

    KafkaConsumer#subscribe()有四个重载方法,可以以集合的方式或者正则表达式的方式来订阅主题。

    img

    kafkaConsumer.subscribe(Arrays.asList("topic1"));
    kafkaConsumer.subscribe(Pattern.compile("topic*"));
    
    • 1
    • 2

    消费者还可以调用KafkaConsumer#assign(Collection partitions)方法来直接订阅某些主题的特定分区

    取消订阅则使用KafkaConsumer#unsubscribe()方法

    消息的消费

    kafka消费消息是一个不断轮询的过程,在上面的代码中可以看出,消费者消费消息就是重复的调用poll()方法,poll()方法返回的则是所订阅的主题(分区)上的一组消息。

        @Override
        public ConsumerRecords<K, V> poll(final Duration timeout) {
            return poll(time.timer(timeout), true);
        }
    poll()`方法接收一个超时时间参数`timeout`,在消费者的缓冲区里没有可用数据时会发生阻塞,阻塞时间为`timeout
    
    • 1
    • 2
    • 3
    • 4
    • 5
    ConsumerRecord

    消费者消费到的消息类型为ConsumerRecord,相对于ProducerRecord,ConsumerRecord的内容更加丰富一些

    public class ConsumerRecord<K, V> {
        public static final long NO_TIMESTAMP = RecordBatch.NO_TIMESTAMP;
        public static final int NULL_SIZE = -1;
    
        /**
         * @deprecated checksums are no longer exposed by this class, this constant will be removed in Apache Kafka 4.0
         *             (deprecated since 3.0).
         */
        @Deprecated
        public static final int NULL_CHECKSUM = -1;
    
        private final String topic;
        private final int partition;
        private final long offset;
        private final long timestamp;
        private final TimestampType timestampType;
        private final int serializedKeySize;
        private final int serializedValueSize;
        private final Headers headers;
        private final K key;
        private final V value;
        private final Optional<Integer> leaderEpoch;
        
        //………………
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    位移提交

    控制或关闭消费

    KafkaConsumer#pause():暂停某些分区在拉取操作时返回数据给客户端

    KafkaConsumer#resume():恢复某些分区向客户端返回数据

    KafkaConsumer#paused():返回被暂停的分区集合

    KafkaConsumer#close():关闭消费者

    img

  • 相关阅读:
    免费注册US.KG域名支持接入CF
    Java深入讲解static操作符
    文件夹改名:批量随机重命名文件夹,让整理更轻松
    [项目管理-13]:项目经理的困惑:为什么有些项目亏钱,项目还要做?
    网络空间测绘——MQTT服务篇
    8 ICMP与ping
    常见的作物模型有哪些?DSSAT模型、APSIM模型、WOFOST模型与PCSE模型等应用
    图像分割项目中损失函数的选择
    (附源码)php二手服装网站 毕业设计 201711
    安装内核切换内核
  • 原文地址:https://blog.csdn.net/qq_43460095/article/details/126231516