• Kafka消费者api编写教程


    1.基本属性配置

    输入new Properties().var 回车

    1. //创建属性
    2.         Properties properties = new Properties();
    3.        //连接集群
    4.         properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092,node2:9092");
    5.         //反序列化
    6.         properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    7.         properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
    8.         //指定消费者组id
    9.         properties.put(ConsumerConfig.GROUP_ID_CONFIG,"KK");

    2.创建消费者

    输入new KafkaConsumer(properties).var 回车选择消费者名称

    1. //创建消费者
    2. KafkaConsumer kafkaConsumer = new KafkaConsumer<>(properties);

    3.订阅主题/分区

    3.1订阅主题

       输入new ArrayList().var 回车修改变量名为topics

    1. //创建一个数组列表变量接收topics值
    2. ArrayList topics = new ArrayList<>();
    3. //指定要订阅的主题
    4. topics.add("customers");
    5. //订阅主题
    6. kafkaConsumer.subscribe(topics);

    3.2订阅分区

        输入new ArrayList<TopicPartition>().var 回车选择变量名为topicsPartitions

    4.消费数据

    1. //消费数据
    2. while (true){
    3. //if (flag == true) flag 标志位置
    4. //break;
    5. //}生产中退出循环的位置;
    6. ConsumerRecords consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
    7. //将消费的信息输出到控制台,输入consumerRecords.for回车,进行对consumerRecords循环遍历
    8. for (ConsumerRecord consumerRecord : consumerRecords){
    9. System.out.println(consumerRecord);
    10. }
    11. }

    5.运行MyConsumer,通过生产者api发送消息

    输出台上可以看到输出的都是订阅的主题/分区的信息

    6.完整代码

    1. package com.ljr.kafka.replay;
    2. import org.apache.kafka.clients.consumer.ConsumerConfig;
    3. import org.apache.kafka.clients.consumer.ConsumerRecord;
    4. import org.apache.kafka.clients.consumer.ConsumerRecords;
    5. import org.apache.kafka.clients.consumer.KafkaConsumer;
    6. import org.apache.kafka.common.TopicPartition;
    7. import org.apache.kafka.common.serialization.StringDeserializer;
    8. import java.time.Duration;
    9. import java.util.ArrayList;
    10. import java.util.Properties;
    11. public class MyConsumer {
    12. public static void main(String[] args) {
    13. //创建属性
    14. Properties properties = new Properties();
    15. //连接集群
    16. properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092,node2:9092");
    17. //反序列化
    18. properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    19. properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
    20. //指定消费者组id
    21. properties.put(ConsumerConfig.GROUP_ID_CONFIG,"KK");
    22. //创建消费者
    23. KafkaConsumer kafkaConsumer = new KafkaConsumer<>(properties);
    24. /*//订阅主题
    25. //创建一个数组列表变量接收topics值
    26. ArrayList topics = new ArrayList<>();
    27. //指定要订阅的主题
    28. topics.add("customers");
    29. //订阅主题
    30. kafkaConsumer.subscribe(topics);*/
    31. //订阅分区
    32. //创建一个数组列表变量接收主题分区值
    33. ArrayList topicPartitions = new ArrayList<>();
    34. //指定要订阅的分区
    35. topicPartitions.add(new TopicPartition("customers",2));
    36. //订阅分区
    37. kafkaConsumer.assign(topicPartitions);
    38. //消费数据
    39. while (true){
    40. //if (flag == true) flag 标志位置
    41. //break;
    42. //}生产中退出循环的位置;
    43. ConsumerRecords consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
    44. //将消费的信息输出到控制台,输入consumerRecords.for 回车 对consumerRecords循环遍历
    45. for (ConsumerRecord consumerRecord : consumerRecords){
    46. System.out.println(consumerRecord);
    47. }
    48. }
    49. }
    50. }

  • 相关阅读:
    Python毕业设计选题推荐
    【开源】加油站管理系统 JAVA+Vue.js+SpringBoot+MySQL
    pytorch 入门 (三)案例一:mnist手写数字识别
    响应式布局
    Node.js 中的进程和线程
    Flink的API分层、架构与组件原理、并行度、任务执行计划
    etcd集群部署实战
    Unity中AssetBundle的变体(Varient)使用教程
    Mysql时间操作
    iptables防火墙
  • 原文地址:https://blog.csdn.net/v15220/article/details/139452462