• 如何删除kafka主题数据


    本文我们探讨几种关于如何删除kafka主题数据的策略。

    场景分析

    在进入主题之前,先讨论下需要删除kafka主题数据的应用场景。

    场景介绍

    kafka消息在过了保留周期之后会被自动清除。但总有一些情况,需要立刻删除消息。

    假设这样场景:已经开始给kafka主题生产消息的应用发现了缺陷,接着bug修复程序需要更新,这是kafka主题中已经了一些错误的消息。这样场景通常在开发环境,我们需要的就是快速批量删除这些消息。

    模拟环境

    为了模拟环境,首先在kafka目录中创建 purge-scenario主题:

    $ bin/kafka-topics.sh \
      --create --topic purge-scenario --if-not-exists \
      --partitions 2 --replication-factor 1 \
      --zookeeper localhost:2181
    

    接着使用shuf命令生成随机数,然后通过kafka-console-producer.sh发送kafka主题:

    $ /usr/bin/shuf -i 1-100000 -n 50000000 \
      | tee -a /tmp/kafka-random-data \
      | bin/kafka-console-producer.sh \
      --bootstrap-server=0.0.0.0:9092 \
      --topic purge-scenario
    

    shuf -i 1-100000 -n 50000000 :表示生成n个1-100000范围内随机数。

    tee -a 前面命令结果写入文件 -a 表示追加;这是使用tee保存模拟数据是为了以后使用;

    最后验证消费主题消息:

    $ bin/kafka-console-consumer.sh \
      --bootstrap-server=0.0.0.0:9092 \
      --from-beginning --topic purge-scenario \
      --max-messages 3
    76696
    49425
    1744
    Processed a total of 3 messages
    

    消息过期

    在purge-scenario主题中的消息有缺省7天保留时间。为了删除消息,我们可以临时设置主题的 retention.ms 属性为10秒,然后等待其自动过期:

    $ bin/kafka-configs.sh --alter \
      --add-config retention.ms=10000 \
      --bootstrap-server=0.0.0.0:9092 \
      --topic purge-scenario \
      && sleep 10
    

    现在验证消费是否过期:

    $ bin/kafka-console-consumer.sh  \
      --bootstrap-server=0.0.0.0:9092 \
      --from-beginning --topic purge-scenario \
      --max-messages 1 --timeout-ms 1000
    [2021-02-28 11:20:15,951] ERROR Error processing message, terminating consumer process:  (kafka.tools.ConsoleConsumer$)
    org.apache.kafka.common.errors.TimeoutException
    Processed a total of 0 messages
    

    最后,我们要恢复主题的保留周期:

    $ bin/kafka-configs.sh --alter \
      --add-config retention.ms=604800000 \
      --bootstrap-server=0.0.0.0:9092 \
      --topic purge-scenario
    

    通过这个方法,kafka会删除主题所有分区的数据。

    选择性删除消息

    有时可能需要有选择性删除一个或多个主题的分区数据,可以使用kafka-delete-records.sh脚本实现。首先需要在delete-config.json 配置文件中指定分区级偏移量,我们打算分区的删除所有数据,partition指定分区,offset=-1:

    {
      "partitions": [
        {
          "topic": "purge-scenario",
          "partition": 1,
          "offset": -1
        }
      ],
      "version": 1
    }
    

    接着处理删除记录:

    $ bin/kafka-delete-records.sh \
      --bootstrap-server localhost:9092 \
      --offset-json-file delete-config.json
    

    现在验证从分区0获取数据:

    $ bin/kafka-console-consumer.sh \
      --bootstrap-server=0.0.0.0:9092 \
      --from-beginning --topic purge-scenario --partition=0 \
      --max-messages 1 --timeout-ms 1000
      44017
      Processed a total of 1 messages
    

    接着从分区1获取数据:

    $ bin/kafka-console-consumer.sh \
      --bootstrap-server=0.0.0.0:9092 \
      --from-beginning --topic purge-scenario \
      --partition=1 \
      --max-messages 1 --timeout-ms 1000
    [2021-02-28 11:48:03,548] ERROR Error processing message, terminating consumer process:  (kafka.tools.ConsoleConsumer$)
    org.apache.kafka.common.errors.TimeoutException
    Processed a total of 0 messages
    

    删除重新创建主题

    另外方法是通过删除主题删除其所有数据,然后重新创建主题。当然只有服务端设置delete.topic.enable属性为true才可能删除主题:

    $ bin/kafka-server-start.sh config/server.properties \
      --override delete.topic.enable=true
    

    可以通过kafka-topics.sh命令删除主题:

    $ bin/kafka-topics.sh \
      --delete --topic purge-scenario \
      --zookeeper localhost:2181
    Topic purge-scenario is marked for deletion.
    Note: This will have no impact if delete.topic.enable is not set to true.
    

    现在验证主题:

    $ bin/kafka-topics.sh --zookeeper localhost:2181 --list
    

    如果确认主题已不存在,然后再重新创建主题。

    总结

    本文介绍了几种方式删除kafka主题数据。包括设置主题过期时间、删除主题所有数据及部分分区数据,到通过删除主题变相删除数据。

  • 相关阅读:
    GBase 8c V3.0.0数据类型——数字操作函数
    JAVA xml格式转为java对象
    力扣373.查找和最小的K对数字
    高通个别驱动创建Buffer耗时高问题的解决
    大数据课程L8——网站流量项目的SparkStreaming整合代码
    三、安全工程练习题(CISSP)
    第九十三周周报
    MySQL数据库——17.正则表达式
    【RPC】RPC的序列化方式
    【学习笔记17】JavaScript作用域
  • 原文地址:https://blog.csdn.net/neweastsun/article/details/127117941