码农知识堂 - 1000bd
  •   Python
  •   PHP
  •   JS/TS
  •   JAVA
  •   C/C++
  •   C#
  •   GO
  •   Kotlin
  •   Swift
  • ClickHouse(21)ClickHouse集成Kafka表引擎详细解析


    目录
    • Kafka表集成引擎
      • 配置
        • Kerberos 支持
      • 虚拟列
    • 资料分享
    • 系列文章
      • clickhouse系列文章

    Kafka表集成引擎

    此引擎与Apache Kafka结合使用。

    Kafka 特性:

    • 发布或者订阅数据流。
    • 容错存储机制。
    • 处理流数据。

    老版Kafka集成表引擎参数格式:

    Kafka(kafka_broker_list, kafka_topic_list, kafka_group_name, kafka_format
    [, kafka_row_delimiter, kafka_schema, kafka_num_consumers])

    新版Kafka集成表引擎参数格式:

    Kafka SETTINGS
    kafka_broker_list = 'localhost:9092',
    kafka_topic_list = 'topic1,topic2',
    kafka_group_name = 'group1',
    kafka_format = 'JSONEachRow',
    kafka_row_delimiter = '\n',
    kafka_schema = '',
    kafka_num_consumers = 2

    必要参数:

    • kafka_broker_list – 以逗号分隔的 brokers 列表 (localhost:9092)。
    • kafka_topic_list – topic 列表 (my_topic)。
    • kafka_group_name – Kafka 消费组名称 (group1)。如果不希望消息在集群中重复,请在每个分片中使用相同的组名。
    • kafka_format – 消息体格式。使用与 SQL 部分的 FORMAT 函数相同表示方法,例如 JSONEachRow。

    可选参数:

    • kafka_row_delimiter - 每个消息体(记录)之间的分隔符。
    • kafka_schema – 如果解析格式需要一个 schema 时,此参数必填。
    • kafka_num_consumers – 单个表的消费者数量。默认值是:1,如果一个消费者的吞吐量不足,则指定更多的消费者。消费者的总数不应该超过 topic 中分区的数量,因为每个分区只能分配一个消费者。

    ClickHouse可以接受和返回各种格式的数据。受支持的输入格式可用于提交给INSERT语句、从文件表(File,URL,HDFS或者外部目录)执行SELECT语句,受支持的输出格式可用于格式化SELECT语句的返回结果,或者通过INSERT写入到文件表。

    以下kafka_format是支持的格式,ClickHouse可以接受和返回各种格式的数据。受支持的输入格式可用于提交给INSERT语句、从文件表(File,URL,HDFS或者外部目录)执行SELECT语句,受支持的输出格式可用于格式化SELECT语句的返回结果,或者通过INSERT写入到文件表。

    格式 输入 输出
    [TabSeparated] ✔ ✔
    [TabSeparatedRaw] ✔ ✔
    [TabSeparatedWithNames] ✔ ✔
    [TabSeparatedWithNamesAndTypes] ✔ ✔
    [Template] ✔ ✔
    [TemplateIgnoreSpaces] ✔ ✗
    [CSV] ✔ ✔
    [CSVWithNames] ✔ ✔
    [CustomSeparated] ✔ ✔
    [Values] ✔ ✔
    [Vertical] ✗ ✔
    [JSON] ✗ ✔
    [JSONAsString] ✔ ✗
    [JSONStrings] ✗ ✔
    [JSONCompact] ✗ ✔
    [JSONCompactStrings] ✗ ✔
    [JSONEachRow] ✔ ✔
    [JSONEachRowWithProgress] ✗ ✔
    [JSONStringsEachRow] ✔ ✔
    [JSONStringsEachRowWithProgress] ✗ ✔
    [JSONCompactEachRow] ✔ ✔
    [JSONCompactEachRowWithNamesAndTypes] ✔ ✔
    [JSONCompactStringsEachRow] ✔ ✔
    [JSONCompactStringsEachRowWithNamesAndTypes] ✔ ✔
    [TSKV] ✔ ✔
    [Pretty] ✗ ✔
    [PrettyCompact] ✗ ✔
    [PrettyCompactMonoBlock] ✗ ✔
    [PrettyNoEscapes] ✗ ✔
    [PrettySpace] ✗ ✔
    [Protobuf] ✔ ✔
    [ProtobufSingle] ✔ ✔
    [Avro] ✔ ✔
    [AvroConfluent] ✔ ✗
    [Parquet] ✔ ✔
    [Arrow] ✔ ✔
    [ArrowStream] ✔ ✔
    [ORC] ✔ ✔
    [RowBinary] ✔ ✔
    [RowBinaryWithNamesAndTypes] ✔ ✔
    [Native] ✔ ✔
    [Null] ✗ ✔
    [XML] ✗ ✔
    [CapnProto] ✔ ✗
    [LineAsString] ✔ ✗
    [Regexp] ✔ ✗
    [RawBLOB] ✔ ✔

    示例:

    CREATE TABLE queue (
    timestamp UInt64,
    level String,
    message String
    ) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');
    SELECT * FROM queue LIMIT 5;
    CREATE TABLE queue2 (
    timestamp UInt64,
    level String,
    message String
    ) ENGINE = Kafka SETTINGS kafka_broker_list = 'localhost:9092',
    kafka_topic_list = 'topic',
    kafka_group_name = 'group1',
    kafka_format = 'JSONEachRow',
    kafka_num_consumers = 4;
    CREATE TABLE queue2 (
    timestamp UInt64,
    level String,
    message String
    ) ENGINE = Kafka('localhost:9092', 'topic', 'group1')
    SETTINGS kafka_format = 'JSONEachRow',
    kafka_num_consumers = 4;

    消费的消息会被自动追踪,因此每个消息在不同的消费组里只会记录一次。如果希望获得两次数据,则使用另一个组名创建副本。

    消费组可以灵活配置并且在集群之间同步。例如,如果群集中有10个主题和5个表副本,则每个副本将获得2个主题。 如果副本数量发生变化,主题将自动在副本中重新分配。

    SELECT 查询对于读取消息并不是很有用(调试除外),因为每条消息只能被读取一次。使用物化视图创建实时线程更实用。您可以这样做:

    1. 使用引擎创建一个 Kafka 消费者并作为一条数据流。
    2. 创建一个结构表。
    3. 创建物化视图,改视图会在后台转换引擎中的数据并将其放入之前创建的表中。

    当 MATERIALIZED VIEW 添加至引擎,它将会在后台收集数据。可以持续不断地从 Kafka 收集数据并通过 SELECT 将数据转换为所需要的格式。

    示例:

    CREATE TABLE queue (
    timestamp UInt64,
    level String,
    message String
    ) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');
    CREATE TABLE daily (
    day Date,
    level String,
    total UInt64
    ) ENGINE = SummingMergeTree(day, (day, level), 8192);
    CREATE MATERIALIZED VIEW consumer TO daily
    AS SELECT toDate(toDateTime(timestamp)) AS day, level, count() as total
    FROM queue GROUP BY day, level;
    SELECT level, sum(total) FROM daily GROUP BY level;

    为了提高性能,接受的消息被分组为max_insert_block_size大小的块。如果未在stream_flush_interval_ms毫秒内形成块,则不关心块的完整性,都会将数据刷新到表中。

    停止接收主题数据或更改转换逻辑,请 detach 物化视图:

    DETACH TABLE consumer;
    ATTACH TABLE consumer;

    如果使用 ALTER 更改目标表,为了避免目标表与视图中的数据之间存在差异,推荐停止物化视图。

    配置

    与 GraphiteMergeTree 类似,Kafka 引擎支持使用ClickHouse配置文件进行扩展配置。可以使用两个配置键:全局 (kafka) 和 主题级别 (kafka_*)。首先应用全局配置,然后应用主题级配置(如果存在)。

    <kafka>
    <debug>cgrpdebug>
    <auto_offset_reset>smallestauto_offset_reset>
    kafka>
    <kafka_logs>
    <retry_backoff_ms>250retry_backoff_ms>
    <fetch_min_bytes>100000fetch_min_bytes>
    kafka_logs>

    在ClickHouse配置中使用下划线 (_) ,并不是使用点 (.)。例如,check.crcs=true 将是 true。

    Kerberos 支持

    对于使用了kerberos的kafka, 将security_protocol 设置为sasl_plaintext就够了,如果kerberos的ticket是由操作系统获取和缓存的。
    clickhouse也支持自己使用keyfile的方式来维护kerbros的凭证。配置sasl_kerberos_service_name、sasl_kerberos_keytab、sasl_kerberos_principal三个子元素就可以。

    示例:

    <kafka>
    <security_protocol>SASL_PLAINTEXTsecurity_protocol>
    <sasl_kerberos_keytab>/home/kafkauser/kafkauser.keytabsasl_kerberos_keytab>
    <sasl_kerberos_principal>kafkauser/kafkahost@EXAMPLE.COMsasl_kerberos_principal>
    kafka>

    虚拟列

    • _topic – Kafka 主题。
    • _key – 信息的键。
    • _offset – 消息的偏移量。
    • _timestamp – 消息的时间戳。
    • _timestamp_ms – 消息的时间戳(毫秒)。
    • _partition – Kafka 主题的分区。

    资料分享

    ClickHouse经典中文文档分享

    系列文章

    clickhouse系列文章

    • ClickHouse(01)什么是ClickHouse,ClickHouse适用于什么场景
    • ClickHouse(02)ClickHouse架构设计介绍概述与ClickHouse数据分片设计
    • ClickHouse(03)ClickHouse怎么安装和部署
    • ClickHouse(04)如何搭建ClickHouse集群
    • ClickHouse(05)ClickHouse数据类型详解
    • ClickHouse(06)ClickHouse建表语句DDL详细解析
    • ClickHouse(07)ClickHouse数据库引擎解析
    • ClickHouse(08)ClickHouse表引擎概况
    • ClickHouse(09)ClickHouse合并树MergeTree家族表引擎之MergeTree详细解析
    • ClickHouse(10)ClickHouse合并树MergeTree家族表引擎之ReplacingMergeTree详细解析
    • ClickHouse(11)ClickHouse合并树MergeTree家族表引擎之SummingMergeTree详细解析
    • ClickHouse(12)ClickHouse合并树MergeTree家族表引擎之AggregatingMergeTree详细解析
    • ClickHouse(13)ClickHouse合并树MergeTree家族表引擎之CollapsingMergeTree详细解析
    • ClickHouse(14)ClickHouse合并树MergeTree家族表引擎之VersionedCollapsingMergeTree详细解析
    • ClickHouse(15)ClickHouse合并树MergeTree家族表引擎之GraphiteMergeTree详细解析
    • ClickHouse(16)ClickHouse日志表引擎Log详细解析
    • ClickHouse(17)ClickHouse集成JDBC表引擎详细解析
    • ClickHouse(18)ClickHouse集成ODBC表引擎详细解析
    • ClickHouse(19)ClickHouse集成Hive表引擎详细解析
    • ClickHouse(20)ClickHouse集成PostgreSQL表引擎详细解析
    • ClickHouse(21)ClickHouse集成Kafka表引擎详细解析
    • ClickHouse(22)ClickHouse集成HDFS表引擎详细解析
    • ClickHouse(23)ClickHouse集成Mysql表引擎详细解析
  • 相关阅读:
    java 枚举
    git在线学习网站
    【MATLAB编程】递归调用证明函数的极限
    利用python中if函数判断三角形的形状
    紫光同创FPGA实现UDP协议栈精简版,基于YT8511和RTL8211,提供2套PDS工程源码和技术支持
    python数据分析-面试题
    Transit path
    【PD】—review
    【FastChat】用于训练、服务和评估大型语言模型的开放平台
    仿钉钉考勤统计页面的日历组件,通过日历展示每日考勤打卡情况,支持在日历上打两种不同类型的点,大致适配各种分辨率效果图
  • 原文地址:https://www.cnblogs.com/the-pig-of-zf/p/17961601
  • 最新文章
  • 攻防演习之三天拿下官网站群
    数据安全治理学习——前期安全规划和安全管理体系建设
    企业安全 | 企业内一次钓鱼演练准备过程
    内网渗透测试 | Kerberos协议及其部分攻击手法
    0day的产生 | 不懂代码的"代码审计"
    安装scrcpy-client模块av模块异常,环境问题解决方案
    leetcode hot100【LeetCode 279. 完全平方数】java实现
    OpenWrt下安装Mosquitto
    AnatoMask论文汇总
    【AI日记】24.11.01 LangChain、openai api和github copilot
  • 热门文章
  • 十款代码表白小特效 一个比一个浪漫 赶紧收藏起来吧!!!
    奉劝各位学弟学妹们,该打造你的技术影响力了!
    五年了,我在 CSDN 的两个一百万。
    Java俄罗斯方块,老程序员花了一个周末,连接中学年代!
    面试官都震惊,你这网络基础可以啊!
    你真的会用百度吗?我不信 — 那些不为人知的搜索引擎语法
    心情不好的时候,用 Python 画棵樱花树送给自己吧
    通宵一晚做出来的一款类似CS的第一人称射击游戏Demo!原来做游戏也不是很难,连憨憨学妹都学会了!
    13 万字 C 语言从入门到精通保姆级教程2021 年版
    10行代码集2000张美女图,Python爬虫120例,再上征途
Copyright © 2022 侵权请联系2656653265@qq.com    京ICP备2022015340号-1
正则表达式工具 cron表达式工具 密码生成工具

京公网安备 11010502049817号