• FlinkSQL ChangeLog


    01 Changelog相关优化规则

    0101 运行upsert-kafka作业

    登录sql-client,创建一个upsert-kafka的sql作业(注意,这里发送给kafka的消息必须带key,普通只有value的消息无法解析,这里的key即是主键的值)

    CREATE TABLE pageviews_per_region (
      user_region STRING,
      pv STRING,
      PRIMARY KEY (user_region) NOT ENFORCED  -- 设置主键
    ) WITH (
      'connector' = 'upsert-kafka',
      'topic' = 'pageviews_per_region',
      'properties.bootstrap.servers' = 'xxxxxx:9092',
      'key.format' = 'csv',
      'value.format' = 'csv'
    );
    
    select * from pageviews_per_region;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    发送消息带key和消费消息显示key方式如下

    kafka-console-producer.sh --broker-list xxxxxx:9092 --topic pageviews_per_region --property "parse.key=true" --property "key.separator=:"
    key1:value1,value1
    key2:value2,value2
    
    kafka-console-consumer.sh --bootstrap-server xxxxxx:9092 --topic pageviews_per_region --from-beginning --property print.key=true
    
    • 1
    • 2
    • 3
    • 4
    • 5

    作业的DAG图如下
    在这里插入图片描述

    0102 StreamPhysicalChangelogNormalize

    DAG图中有一个ChangelogNormalize,代码中搜索到对应的类是StreamPhysicalChangelogNormalize,这是一个对changelog数据做规范化的类,注释如下

    /**
     * Stream physical RelNode which normalizes a changelog stream which maybe an upsert stream or a
     * changelog stream containing duplicate events. This node normalize such stream into a regular
     * changelog stream that contains INSERT/UPDATE_BEFORE/UPDATE_AFTER/DELETE records without
     * duplication.
     */
    class StreamPhysicalChangelogNormalize(
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    功能就是转成对应的exec节点

    override def translateToExecNode(): ExecNode[_] = {
      val generateUpdateBefore = ChangelogPlanUtils.generateUpdateBefore(this)
      new StreamExecChangelogNormalize(
        unwrapTableConfig(this),
        uniqueKeys,
        generateUpdateBefore,
        InputProperty.DEFAULT,
        FlinkTypeFactory.toLogicalRowType(getRowType),
        getRelDetailedDescription)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    0103 StreamPhysicalTableSourceScanRule

    StreamPhysicalChangelogNormalize是在优化规则StreamPhysicalTableSourceScanRule当中创建的,如下流式的FlinkLogicalTableSourceScan会应用该规则

    class StreamPhysicalTableSourceScanRule
      extends ConverterRule(
        classOf[FlinkLogicalTableSourceScan],
        FlinkConventions.LOGICAL,
        FlinkConventions.STREAM_PHYSICAL,
        "StreamPhysicalTableSourceScanRule") {
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    创建StreamPhysicalChangelogNormalize,也就是转为changelog的条件如下

    if (
      isUpsertSource(resolvedSchema, table.tableSource) ||
      isSourceChangeEventsDuplicate(resolvedSchema, table.tableSource, config)
    ) {
    
    • 1
    • 2
    • 3
    • 4

    isUpsertSource判断是否为upsert流,判断逻辑如下

    public static boolean isUpsertSource(
            ResolvedSchema resolvedSchema, DynamicTableSource tableSource) {
        if (!(tableSource instanceof ScanTableSource)) {
            return false;
        }
        ChangelogMode mode = ((ScanTableSource) tableSource).getChangelogMode();
        boolean isUpsertMode =
                mode.contains(RowKind.UPDATE_AFTER) && !mode.contains(RowKind.UPDATE_BEFORE);
        boolean hasPrimaryKey = resolvedSchema.getPrimaryKey().isPresent();
        return isUpsertMode && hasPrimaryKey;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    其中ChangelogMode在各自数据源实现类的getChangelogMode接口中定义,如JDBC只支持insert

    @Override
    public ChangelogMode getChangelogMode() {
        return ChangelogMode.insertOnly();
    }
    
    • 1
    • 2
    • 3
    • 4

    isSourceChangeEventsDuplicate判断不是upsert的更改流,判断逻辑如下

    public static boolean isSourceChangeEventsDuplicate(
            ResolvedSchema resolvedSchema,
            DynamicTableSource tableSource,
            TableConfig tableConfig) {
        if (!(tableSource instanceof ScanTableSource)) {
            return false;
        }
        ChangelogMode mode = ((ScanTableSource) tableSource).getChangelogMode();
        boolean isCDCSource =
                !mode.containsOnly(RowKind.INSERT) && !isUpsertSource(resolvedSchema, tableSource);
        boolean changeEventsDuplicate =
                tableConfig.get(ExecutionConfigOptions.TABLE_EXEC_SOURCE_CDC_EVENTS_DUPLICATE);
        boolean hasPrimaryKey = resolvedSchema.getPrimaryKey().isPresent();
        return isCDCSource && changeEventsDuplicate && hasPrimaryKey;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    综合来说要走StreamPhysicalChangelogNormalize这一条调用链,就不能是insertOnly的数据源,但目前大部分Flink实现的数据源包括Iceberg都是insertOnly的

    0104 更新模式

    Flink相关的更新模式类有如下几个:RowKind、ChangelogMode、UpdateKind

    • RowKind

    RowKind是定义更新流每条数据的类型,其中对于更新有;两条数据,一条删除旧数据,一条插入新数据

    /** Insertion operation. */
    INSERT("+I", (byte) 0),
    
    /**
     * Update operation with the previous content of the updated row.
     *
     * 

    This kind SHOULD occur together with {@link #UPDATE_AFTER} for modelling an update that * needs to retract the previous row first. It is useful in cases of a non-idempotent update, * i.e., an update of a row that is not uniquely identifiable by a key. */ UPDATE_BEFORE("-U", (byte) 1), /** * Update operation with new content of the updated row. * *

    This kind CAN occur together with {@link #UPDATE_BEFORE} for modelling an update that * needs to retract the previous row first. OR it describes an idempotent update, i.e., an * update of a row that is uniquely identifiable by a key. */ UPDATE_AFTER("+U", (byte) 2), /** Deletion operation. */ DELETE("-D", (byte) 3);

    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • ChangelogMode

    ChangelogMode定义数据源的更新模式,主要三种,就是包含不同的RowKind的类型

    private static final ChangelogMode INSERT_ONLY =
            ChangelogMode.newBuilder().addContainedKind(RowKind.INSERT).build();
    
    private static final ChangelogMode UPSERT =
            ChangelogMode.newBuilder()
                    .addContainedKind(RowKind.INSERT)
                    .addContainedKind(RowKind.UPDATE_AFTER)
                    .addContainedKind(RowKind.DELETE)
                    .build();
    
    private static final ChangelogMode ALL =
            ChangelogMode.newBuilder()
                    .addContainedKind(RowKind.INSERT)
                    .addContainedKind(RowKind.UPDATE_BEFORE)
                    .addContainedKind(RowKind.UPDATE_AFTER)
                    .addContainedKind(RowKind.DELETE)
                    .build();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • UpdateKind

    UpdateKind是针对update这种更新类型细分

    /** NONE doesn't represent any kind of update operation. */
    NONE,
    
    /**
     * This kind indicates that operators should emit update changes just as a row of {@code
     * RowKind#UPDATE_AFTER}.
     */
    ONLY_UPDATE_AFTER,
    
    /**
     * This kind indicates that operators should emit update changes in the way that a row of {@code
     * RowKind#UPDATE_BEFORE} and a row of {@code RowKind#UPDATE_AFTER} together.
     */
    BEFORE_AND_AFTER
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    02 StreamExecChangelogNormalize

    StreamExecChangelogNormalize的处理流程中根据是否启用table.exec.mini-batch.enabled分为微批处理和单数据的流处理

    微批处理使用ProcTimeMiniBatchDeduplicateKeepLastRowFunction,流式使用ProcTimeDeduplicateKeepLastRowFunction,两者的核心差别就是微批会缓存数据使用一个for循环处理

    这两个函数除了StreamPhysicalChangelogNormalize这一条链路外,还有StreamExecDeduplicate这一条链路,对应StreamPhysicalRankRule,是一个排序的东西

    for (Map.Entry<RowData, RowData> entry : buffer.entrySet()) {
        RowData currentKey = entry.getKey();
        RowData currentRow = entry.getValue();
        ctx.setCurrentKey(currentKey);
        if (inputInsertOnly) {
            processLastRowOnProcTime(
                    currentRow,
                    generateUpdateBefore,
                    generateInsert,
                    state,
                    out,
                    isStateTtlEnabled,
                    equaliser);
        } else {
            processLastRowOnChangelog(
                    currentRow, generateUpdateBefore, state, out, isStateTtlEnabled, equaliser);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • processLastRowOnProcTime

    对数据按照时间语义进行去重,将当前数据作为最新,这个函数只针对insert only的数据

    static void checkInsertOnly(RowData currentRow) {
        Preconditions.checkArgument(currentRow.getRowKind() == RowKind.INSERT);
    }
    
    • 1
    • 2
    • 3

    整套处理逻辑就是对数据根据场景修改数据的RowKind类型

    } else {
        if (generateUpdateBefore) {
            preRow.setRowKind(RowKind.UPDATE_BEFORE);
            out.collect(preRow);
        }
        currentRow.setRowKind(RowKind.UPDATE_AFTER);
        out.collect(currentRow);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • processLastRowOnChangelog

    这个函数就是按Key去重,本质上也是针对数据修改RowKind

    核心的一块功能就是更新的时候要将前一个数据修改为UPDATE_BEFORE

    } else {
        if (generateUpdateBefore) {
            preRow.setRowKind(RowKind.UPDATE_BEFORE);
            out.collect(preRow);
        }
        currentRow.setRowKind(RowKind.UPDATE_AFTER);
        out.collect(currentRow);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    函数整体借用的是Flink的state功能,从状态中获取前面的数据,所以对状态缓存由要求;另外针对非删除型的数据,如果TTL没有开的话,就不会更新前面的数据

    if (!isStateTtlEnabled && equaliser.equals(preRow, currentRow)) {
        // currentRow is the same as preRow and state cleaning is not enabled.
        // We do not emit retraction and update message.
        // If state cleaning is enabled, we have to emit messages to prevent too early
        // state eviction of downstream operators.
        return;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    03 初始RowKind来源

    前面的流程里,在进行changelog转换的时候,数据是已经存在一个RowKind的值了,这一章追踪初始RowKind的来源

    基于Flink-27的设计,Kafka数据源处理任务有一个KafkaRecordEmitter,emitRecord当中做数据的反序列化

    deserializationSchema.deserialize(consumerRecord, sourceOutputWrapper);
    
    • 1

    最终走到DeserializationSchema.deserialize完成最终的反序列化

    default void deserialize(byte[] message, Collector<T> out) throws IOException {
        T deserialize = deserialize(message);
        if (deserialize != null) {
            out.collect(deserialize);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    这里message是一个二进制数组,实际是Kafka的数据类型ConsumerRecord。根据SQL当中的配置,value反序列化使用的是csv,所以走到CsvRowDataDeserializationSchema当中处理

    final JsonNode root = objectReader.readValue(message);
    return (RowData) runtimeConverter.convert(root);
    
    • 1
    • 2

    这里读出来的root是数据的key,convert的转化的实现类是CsvToRowDataConverters,其createRowConverter接口当中创建了转化函数,函数中将数据转化为了Flink的数据类型GenericRowData

    GenericRowData row = new GenericRowData(arity);
    
    • 1

    GenericRowData的定义当中,有初始化RowKind,就是insert

    public GenericRowData(int arity) {
        this.fields = new Object[arity];
        this.kind = RowKind.INSERT; // INSERT as default
    }
    
    • 1
    • 2
    • 3
    • 4

    04 补充

    0401 delete

    按照官方说法,发送一个空消息就会产生delete

     Also, null values are interpreted in a special way: a record with a null value represents a “DELETE.
    
    • 1

    使用kafka producer控制台发送空消息无法解析

    [ERROR] Could not execute SQL statement. Reason:
    org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException: No content to map due to end-of-input
     at [Source: UNKNOWN; byte offset: #UNKNOWN]
    
    • 1
    • 2
    • 3

    官方说法是kafka的控制台版本对 null的支持问题,需要3.2以上版本
    https://issues.apache.org/jira/browse/FLINK-27663?jql=project%20%3D%20FLINK%20AND%20text%20~%20%22upsert-kafka%22

    空值处理逻辑在DynamicKafkaDeserializationSchema.deserialize当中
    这里根据输入的数据是否空值进行分支处理;非空值时走的就是前三章的逻辑,也就是这里是前三章逻辑的入口

    if (record.value() == null && upsertMode) {
        // collect tombstone messages in upsert mode by hand
        outputCollector.collect(null);
    } else {
        valueDeserialization.deserialize(record.value(), outputCollector);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    空值时走到OutputProjectionCollector.emitRow,这里会设置初始类型为DELETE

    if (physicalValueRow == null) {
        if (upsertMode) {
            rowKind = RowKind.DELETE;
        } else {
            throw new DeserializationException(
                    "Invalid null value received in non-upsert mode. Could not to set row kind for output record.");
        }
    } else {
        rowKind = physicalValueRow.getRowKind();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
  • 相关阅读:
    【基于FreeRTOS的STM32F103系统】简介及官方文件移植
    怎么写公司百度百科词条,写好的百科词条怎么上传编辑到百度百科
    【计算机网络】IO多路转接之poll
    JVM(九) —— 运行时数据区之堆的详细介绍(四)
    Leetcode 剑指Offer
    列表推导式,Python中最好的特性?可读性?
    Openssl
    【编程不良人】快速入门SpringBoot学习笔记05---RestFul、异常处理、CORS跨域、Jasypt加密
    【C++】模板进阶 -- 详解
    物联网AI MicroPython学习之语法 bluetooth蓝牙
  • 原文地址:https://blog.csdn.net/blackjjcat/article/details/136410406