• Hudi(三)集成Flink


     1、环境准备

            将编译好的jar包放到Flink的lib目录下。

    cp hudi-flink1.13-bundle-0.12.0.jar /opt/module/flink-1.13.2/lib

    2、sql-client方式

    2.1、修改flink-conf.yaml配置

    1. vim /opt/module/flink-1.13.2/conf/flink-conf.yaml
    2. state.backend: rocksdb
    3. execution.checkpointing.interval: 30000
    4. state.checkpoints.dir: hdfs://hadoop1:9000/ckps
    5. state.backend.incremental: true

    2.2、yarn-session模式启动

    1、启动

    1. 1、先启动hadoop集群,然后通过yarn-session启动flink:
    2. /opt/module/flink-1.13.2/bin/yarn-session.sh -d
    3. 2、再启动sql-client
    4. /opt/module/flink-1.13.2/bin/sql-client.sh embedded -s yarn-session

    2、写入数据

    1. 表格模式(table mode)在内存中实体化结果,并将结果用规则的分页表格可视化展示出来。执行如下命令启用:
    2. SET 'sql-client.execution.result-mode' = 'table'; --默认
    3. 变更日志模式(changelog mode)不会实体化和可视化结果,而是由插入(+)和撤销(-)组成的持续查询产生结果流。
    4. SET 'sql-client.execution.result-mode' = 'changelog';
    5. Tableau模式(tableau mode)更接近传统的数据库,会将执行的结果以制表的形式直接打在屏幕之上。具体显示的内容会取决于作业执行模式的不同(execution.type):
    6. SET 'sql-client.execution.result-mode' = 'tableau';
    7. -- 创建hudi表
    8. CREATE TABLE t1(
    9. uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
    10. name VARCHAR(10),
    11. age INT,
    12. ts TIMESTAMP(3),
    13. `partition` VARCHAR(20)
    14. )
    15. PARTITIONED BY (`partition`)
    16. WITH (
    17. 'connector' = 'hudi',
    18. 'path' = 'hdfs://hadoop1:9000/tmp/hudi_flink/t1', --hudi表的基本路径
    19. 'table.type' = 'MERGE_ON_READ' --默认是COW
    20. );
    21. -- 插入数据
    22. INSERT INTO t1 VALUES
    23. ('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),
    24. ('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
    25. ('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
    26. ('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
    27. ('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
    28. ('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
    29. ('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
    30. ('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');

    3、IDEA编码方式

    3.1、环境准备

    1、手动install依赖

    mvn install:install-file -DgroupId=org.apache.hudi -DartifactId=hudi-flink_2.11 -Dversion=0.12.0 -Dpackaging=jar -Dfile=./hudi-flink1.13-bundle-0.12.0.jar

    2、编写代码

    1. import org.apache.flink.contrib.streaming.state.{EmbeddedRocksDBStateBackend, PredefinedOptions}
    2. import org.apache.flink.streaming.api.CheckpointingMode
    3. import org.apache.flink.streaming.api.environment.{CheckpointConfig, StreamExecutionEnvironment}
    4. import org.apache.flink.table.api.bridge.java.StreamTableEnvironment
    5. import java.util.concurrent.TimeUnit
    6. object HudiExample {
    7. def main(args: Array[String]): Unit = {
    8. // val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration())
    9. val env = StreamExecutionEnvironment.getExecutionEnvironment
    10. // 设置状态后端RocksDB
    11. val embeddedRocksDBStateBackend = new EmbeddedRocksDBStateBackend(true)
    12. // embeddedRocksDBStateBackend.setDbStoragePath("file:///E:/rocksdb")
    13. embeddedRocksDBStateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM)
    14. env.setStateBackend(embeddedRocksDBStateBackend)
    15. // checkpoint配置
    16. env.enableCheckpointing(TimeUnit.SECONDS.toMillis(10), CheckpointingMode.EXACTLY_ONCE)
    17. val checkpointConfig = env.getCheckpointConfig
    18. checkpointConfig.setCheckpointStorage("hdfs://hadoop1:9000/ckps")
    19. checkpointConfig.setMinPauseBetweenCheckpoints(TimeUnit.SECONDS.toMillis(10))
    20. checkpointConfig.setTolerableCheckpointFailureNumber(5)
    21. checkpointConfig.setCheckpointTimeout(TimeUnit.MINUTES.toMillis(1))
    22. checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
    23. val sTableEnv = StreamTableEnvironment.create(env)
    24. sTableEnv.executeSql("CREATE TABLE sourceT (\n" +
    25. " uuid varchar(20),\n" +
    26. " name varchar(10),\n" +
    27. " age int,\n" + " ts timestamp(3),\n" +
    28. " `partition` varchar(20)\n" +
    29. ") WITH (\n" +
    30. " 'connector' = 'datagen',\n" +
    31. " 'rows-per-second' = '1'\n" +
    32. ")")
    33. sTableEnv.executeSql("create table t2(\n" +
    34. " uuid varchar(20),\n" +
    35. " name varchar(10),\n" +
    36. " age int,\n" +
    37. " ts timestamp(3),\n" +
    38. " `partition` varchar(20)\n" +
    39. ")\n" +
    40. "with (\n" +
    41. " 'connector' = 'hudi',\n" +
    42. " 'path' = 'hdfs://hadoop1:9000/tmp/hudi_flink/t2',\n" +
    43. " 'table.type' = 'MERGE_ON_READ'\n" +
    44. ")")
    45. sTableEnv.executeSql("insert into t2 select * from sourceT")
    46. }
    47. }

    3、提交运行

    bin/flink run -t yarn-per-job -c com.my.example.HudiExample ./myjars/HudiExample-1.0-SNAPSHOT-jar-with-dependencies.jar

    4、核心参数设置

            Flink可配参数:https://hudi.apache.org/docs/configurations#FLINK_SQL

    4.1、去重参数

            通过如下语法设置主键:

    1. -- 设置单个主键
    2. create table hoodie_table (
    3. f0 int primary key not enforced,
    4. f1 varchar(20),
    5. ...
    6. ) with (
    7. 'connector' = 'hudi',
    8. ...
    9. )
    10. -- 设置联合主键
    11. create table hoodie_table (
    12. f0 int,
    13. f1 varchar(20),
    14. ...
    15. primary key(f0, f1) not enforced
    16. ) with (
    17. 'connector' = 'hudi',
    18. ...
    19. )

    名称

    说明

    默认值

    备注

    hoodie.datasource.write.recordkey.field

    主键字段

    --

    支持主键语法 PRIMARY KEY 设置,支持逗号分隔的多个字段

    precombine.field

    (0.13.0 之前版本为

     write.precombine.field)

    去重时间字段

    --

    record 合并的时候会按照该字段排序,选值较大的 record 为合并结果;不指定则为处理序:选择后到的 record

    4.2、并发参数

    名称

    说明

    默认值

    备注

    write.tasks

    writer 的并发,每个 writer 顺序写 1~N buckets

    4

    增加并发对小文件个数没影响

    write.bucket_assign.tasks

    bucket assigner 的并发

    Flink的并行度

    增加并发同时增加了并发写的 bucekt 数,也就变相增加了小文件( bucket)

    write.index_bootstrap.tasks

    Index bootstrap 算子的并发,增加并发可以加快 bootstrap 阶段的效率,bootstrap 阶段会阻塞 checkpoint,因此需要设置多一些的 checkpoint 失败容忍次数

    Flink的并行度

    只在 index.bootstrap.enabled true 时生效

    read.tasks

    读算子的并发(batch stream

    4

    compaction.tasks

    online compaction 算子的并发

    writer 的并发

    online compaction 比较耗费资源,建议走 offline compaction

    案例演示

    可以flink建表时在with中指定,或Hints临时指定参数的方式:在需要调整的表名后面加上 /*+ OPTIONS() */

    1. CREATE TABLE sourceT (
    2. uuid varchar(20),
    3. name varchar(10),
    4. age int,
    5. ts timestamp(3),
    6. `partition` varchar(20)
    7. ) WITH (
    8. 'connector' = 'datagen',
    9. 'rows-per-second' = '1'
    10. );
    11. create table t2(
    12. uuid varchar(20),
    13. name varchar(10),
    14. age int,
    15. ts timestamp(3),
    16. `partition` varchar(20)
    17. )
    18. with (
    19. 'connector' = 'hudi',
    20. 'path' = '/tmp/hudi_flink/t2',
    21. 'table.type' = 'MERGE_ON_READ'
    22. );
    23. insert into t2 /*+ OPTIONS('write.tasks'='2','write.bucket_assign.tasks'='3','compaction.tasks'='4') */
    24. select * from sourceT;

    执行如下图所示:

    4.3、压缩参数

    1、参数说明

            在线压缩的参数,通过设置 compaction.async.enabled =false关闭在线压缩执行,但是调度compaction.schedule.enabled 仍然建议开启,之后通过离线压缩直接执行 在线压缩任务 阶段性调度的压缩 plan。

    名称

    说明

    默认值

    备注

    compaction.schedule.enabled

    是否阶段性生成压缩 plan

    true

    建议开启,即使compaction.async.enabled 关闭的情况下

    compaction.async.enabled

    是否开启异步压缩

    true

    通过关闭此参数关闭在线压缩

    compaction.tasks

    压缩 task 并发

    4

    compaction.trigger.strategy

    压缩策略

    num_commits

    支持四种策略:num_commitstime_elapsednum_and_time

    num_or_time

    compaction.delta_commits

    默认策略,5 commits 压缩一次

    5

    compaction.delta_seconds

    3600

    compaction.max_memory

    压缩去重的 hash map 可用内存

    100MB

    资源够用的话建议调整到 1GB

    compaction.target_io

    每个压缩 plan IO 上限,默认 5GB

    500GB

    2、案例演示

    1. CREATE TABLE t3(
    2. uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
    3. name VARCHAR(10),
    4. age INT,
    5. ts TIMESTAMP(3),
    6. `partition` VARCHAR(20)
    7. )
    8. WITH (
    9. 'connector' = 'hudi',
    10. 'path' = '/tmp/hudi_flink/t3',
    11. 'compaction.async.enabled' = 'true',
    12. 'compaction.tasks' = '1',
    13. 'compaction.schedule.enabled' = 'true',
    14. 'compaction.trigger.strategy' = 'num_commits',
    15. 'compaction.delta_commits' = '2',
    16. 'table.type' = 'MERGE_ON_READ'
    17. );
    18. set table.dynamic-table-options.enabled=true;
    19. insert into t3
    20. select * from sourceT/*+ OPTIONS('rows-per-second' = '5')*/;

    4.4、文件大小

    1、参数说明

            Hudi会自管理文件大小,避免向查询引擎暴露小文件,其中自动处理文件大小起很大作用。在进行insert/upsert操作时,Hudi可以将文件大小维护在一个指定文件大小。

            目前只有log文件的写入大小可以做到精确控制,parquet 文件大小按照估算值。

    名称

    说明

    默认值

    备注

    hoodie.parquet.max.file.size

    最大可写入的 parquet 文件大小

    120 * 1024 * 1024

    默认 120MB

    (单位 byte)

    超过该大小切新的 file group

    hoodie.logfile.to.parquet.compression.ratio

    log文件大小转 parquet 的比率

    0.35

    hoodie统一依据 parquet 大小来评估小文件策略

    hoodie.parquet.small.file.limit

    在写入时,hudi 会尝试先追加写已存小文件,该参数设置了小文件的大小阈值,小于该参数的文件被认为是小文件

    104857600

    默认 100MB

    (单位 byte)

    大于 100MB,小于 120MB 的文件会被忽略,避免写过度放大

    hoodie.copyonwrite.record.size.estimate

    预估的record大小,hoodie 会依据历史的commits动态估算record的大小,但是前提是之前有单次写入超过

    hoodie.parquet.small.file.limit大小,在未达到这个大小时会使用这个参数

    1024

    默认 1KB

    (单位 byte)

    如果作业流量比较小,可以设置下这个参数

    hoodie.logfile.max.size

    LogFile最大大小。这是在将Log滚转到下一个版本之前允许的最大大小。

    1073741824

    默认1GB

    (单位 byte)

    5、内存优化 

    5.1、内存参数

    名称

    说明

    默认值

    备注

    write.task.max.size

    一个 write task 的最大可用内存

    1024

    当前预留给 write buffer 的内存为

    write.task.max.size -compaction.max_memory

    write task 的内存 buffer达到阈值后会将内存里最大的 buffer flush 出去

    write.batch.size

    Flink 的写 task 为了提高写数据效率,会按照写 bucket 提前 buffer 数据,每个 bucket 的数据在内存达到阈值之前会一直 cache 在内存中,当阈值达到会把数据 buffer 传递给 hoodie writer 执行写操作

    256

    一般不用设置,保持默认值就好

    write.log_block.size

    hoodie log writer 在收到 write task 的数据后不会马上 flush 数据,writer 是以 LogBlock 为单位往磁盘刷数据的,在 LogBlock 攒够之前 records 会以序列化字节的形式 buffer writer 内部

    128

    一般不用设置,保持默认值就好

    write.merge.max_memory

    hoodie COW 写操作的时候,会有增量数据和 base file 数据 merge 的过程,增量的数据会缓存在内存的 map 结构里,这个 map 是可 spill 的,这个参数控制了 map 可以使用的堆内存大小

    100

    一般不用设置,保持默认值就好

    compaction.max_memory

    write.merge.max_memory: 100MB 类似,只是发生在压缩时。

    100

    如果是 online compaction,资源充足时可以开大些,比如 1GB

    5.2、MOR

    (1)state backend 换成 rocksdb (默认的 in-memory state-backend 非常吃内存)

    (2)内存够的话,compaction.max_memory 调大些 (默认是 100MB 可以调到 1GB)

    (3)关注 TM 分配给每个 write task 的内存,保证每个 write task 能够分配到 write.task.max.size 所配置的大小,比如 TM 的内存是 4GB 跑了 2 个 StreamWriteFunction 那每个 write function 能分到 2GB,尽量预留一些 buffer,因为网络 buffer,TM 上其他类型 task (比如 BucketAssignFunction 也会吃些内存)

    (4)需要关注 compaction 的内存变化,compaction.max_memory 控制了每个 compaction task 读 log 时可以利用的内存大小,compaction.tasks 控制了 compaction task 的并发

            注意: write.task.max.size - compaction.max_memory 是预留给每个 write task 的内存 buffer

    5.3、COW

    (1)state backend 换成 rocksdb(默认的 in-memory state-backend 非常吃内存)。

    (2)write.task.max.size 和 write.merge.max_memory 同时调大(默认是 1GB 和 100MB 可以调到 2GB 和 1GB)。

    (3)关注 TM 分配给每个 write task 的内存,保证每个 write task 能够分配到 write.task.max.size 所配置的大小,比如 TM 的内存是 4GB 跑了 2 个 StreamWriteFunction 那每个 write function 能分到 2GB,尽量预留一些 buffer,因为网络 buffer,TM 上其他类型 task(比如 BucketAssignFunction 也会吃些内存)。

            注意:write.task.max.size - write.merge.max_memory 是预留给每个 write task 的内存 buffer。

  • 相关阅读:
    【genius_platform软件平台开发】第七十九讲:Linux系统中可执行程序后台运行的几种方式
    c++ SQLite 特别好用的库使用实例-查询(2)
    k8s部署Eureka集群
    DBSCAN 算法【python,机器学习,算法】
    HIve数仓新零售项目DWS层的构建(Grouping sets)模型
    Excel - VBA实例: 遍历若干cell的值
    RestClient操作Elasticsearch(Java)
    卷积神经网络 - LeNet
    vue项目打包成H5apk中使用语音播放
    数据库操作QTableView保存小数点的位数
  • 原文地址:https://blog.csdn.net/Yuan_CSDF/article/details/130815909