• Kafka使用Java管理主题和分区



    配置信息

    配置信息,更多详情配置以及说明查看AdminClientConfig

    public static Properties getKafkaConfig(){
            Properties properties = new Properties();
            //Kafka服务器地址
            properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,brokerList);
            //当前客户端ID
            properties.put(AdminClientConfig.CLIENT_ID_CONFIG,"admin topic");
            return properties;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    创建主题

    创建主题示例代码
    需要指定主题名称、分区数量、副本数量

     //创建主题
    public static void createTopic(){
        AdminClient client = KafkaAdminClient.create(getKafkaConfig());
        //主题名称
        String  topName = "test-topic";
        //分区数量
        int numPartitions = 1;
        //副本数量
        short replicationFactor = 1;
        NewTopic topic = new NewTopic(topName,numPartitions,replicationFactor);
        //创建主题
        CreateTopicsResult clientTopics = client.createTopics(Collections.singleton(topic));
        Iterator<Map.Entry<String, KafkaFuture<Void>>> createIterator = clientTopics.values().entrySet().iterator();
        while (createIterator.hasNext()){
            Map.Entry<String,KafkaFuture<Void>>  entry =createIterator.next();
            log.info(entry.getKey());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    删除主题

    代码如下(示例):

    public static void deleteTopic(){
        AdminClient client = KafkaAdminClient.create(getKafkaConfig());
        //主题名称
        String  topName = "test-topic";
        //删除主题
        DeleteTopicsResult deleteTopicsResult = client.deleteTopics(Collections.singleton(topName));
    
        Iterator<Map.Entry<String, KafkaFuture<Void>>> deleteIterator = deleteTopicsResult.values().entrySet().iterator();
        while (deleteIterator.hasNext()){
            Map.Entry<String,KafkaFuture<Void>>  entry =deleteIterator.next();
            log.info(entry.getKey());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    查看所有主题名称

    查询所有主题名称

     public static void listTopics() throws Exception{
        AdminClient client = KafkaAdminClient.create(getKafkaConfig());
        //查看所有主题
        ListTopicsResult listTopicsResult = client.listTopics();
        KafkaFuture<Set<String>> listFuture = listTopicsResult.names();
        Iterator<String> iterator = listFuture.get().iterator();
        while (iterator.hasNext()){
            log.info("主题名称:{}",iterator.next());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    查看某个主题详细信息

    通过主题名称,去查看某个主题的详细信息,包含分区,副本,存活副本等信息
    主题名称可以通过查看所有主题名称获得,或是自己指定主题名称

    public static void describeTopics() throws Exception{
        AdminClient client = KafkaAdminClient.create(getKafkaConfig());
        //主题名称
        String  topName = "test-topic";
        //查看主题详细信息
        DescribeTopicsResult describeTopicsResult = client.describeTopics(Collections.singleton(topName));
    
        Iterator<Map.Entry<String,KafkaFuture<TopicDescription>>> describeIterator = describeTopicsResult.values().entrySet().iterator();
    
        while (describeIterator.hasNext()){
            Map.Entry<String,KafkaFuture<TopicDescription>> entry = describeIterator.next();
            log.info(entry.getKey());
            TopicDescription description = entry.getValue().get();
            log.info("主题名称: {}",description.name());
            List<TopicPartitionInfo> partitionInfos = description.partitions();
            for(TopicPartitionInfo info : partitionInfos){
                log.info("分区位置:{}",info.partition());
                log.info("分区主节点信息:{}",info.leader());
                log.info("分区副本节点信息:{}",info.replicas());
                log.info("分区存活节点信息:{}",info.isr());
             }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    删除消息记录

    删除指定分区,偏移量之前的数据,并返回低水平位值
    低水平位:能消费的最大offset值

    public static void deleteRecords() throws Exception{    
        AdminClient client = KafkaAdminClient.create(getKafkaConfig());
     
        Map<TopicPartition, RecordsToDelete> deleteMap = new HashMap<>();
        //指定分区
        int partition = 2;
        //主题
        String  topName = "test-topic";
        TopicPartition topicPartition = new TopicPartition(topName,partition);
        //消息偏移量
        int offset = 5;
        //删除偏移量之前的数据
        RecordsToDelete recordsToDelete = RecordsToDelete.beforeOffset(offset);
    
        deleteMap.put(topicPartition,recordsToDelete);
        DeleteRecordsResult result =  client.deleteRecords(deleteMap);
        //低水平位,消息的offset
        Iterator<Map.Entry<TopicPartition,KafkaFuture<DeletedRecords>>> iterator = result.lowWatermarks().entrySet().iterator();
            
        while (iterator.hasNext()){
            DeletedRecords records = iterator.next().getValue().get();
            log.info("低水平位:lowWatermark = {}"+records.lowWatermark());
        }
            
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
  • 相关阅读:
    【算法专题】双指针
    2022-08-05 粗糙集Rough set
    Java毕业设计 基于springboot vue大学新生报到系统
    k线图快速入门必读
    adb shell dumpsys 使用命令和来源
    Java 设计模式实战系列—策略模式
    【python】懒人福利,通过Python的JIRA库操作JIRA,自动批量提交关闭bug,提高效率
    Wireshark把DDoS照原形
    Taurus.MVC-Java 版本打包上传到Maven中央仓库(详细过程):5、Maven版本发布与后续版本更新(大结局)
    nodejs采集淘宝、天猫网商品详情数据以及解决_m_h5_tk令牌及sign签名验证(2023-09-09)
  • 原文地址:https://blog.csdn.net/swg321321/article/details/126473931