CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name
(
{ <physical_column_definition> | <metadata_column_definition> | <computed_column_definition> }[ , ...n]
[ <watermark_definition> ]
[ <table_constraint> ][ , ...n]
)
[COMMENT table_comment]
[PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
WITH (key1=val1, key2=val2, ...)
[ LIKE source_table [( <like_options> )] ]
-- 定义表字段
<physical_column_definition>:
column_name column_type [ <column_constraint> ] [COMMENT column_comment]
<column_constraint>:
[CONSTRAINT constraint_name] PRIMARY KEY NOT ENFORCED
<table_constraint>:
[CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED
<metadata_column_definition>:
column_name column_type METADATA [ FROM metadata_key ] [ VIRTUAL ]
-- 定义计算列
<computed_column_definition>:
column_name AS computed_column_expression [COMMENT column_comment]
-- 定义水位线
<watermark_definition>:
WATERMARK FOR rowtime_column_name AS watermark_strategy_expression
<source_table>:
[catalog_name.][db_name.]table_name
<like_options>:
{
{ INCLUDING | EXCLUDING } { ALL | CONSTRAINTS | PARTITIONS }
| { INCLUDING | EXCLUDING | OVERWRITING } { GENERATED | OPTIONS | WATERMARKS }
}[, ...]
物理字段
元数据字段
-- 定义计算列
<computed_column_definition>:
column_name AS computed_column_expression [COMMENT column_comment]
计算列是一个通过column_name AS computed_column_expression生成的虚拟列,产生的计算列不是物理存储在数据源表中。一个计算列可以通过原有数据源表中的某个字段、运算符及内置函数生成。比如,定义一个消费金额的计算列(cost),可以使用表的价格(price)*数量(quantity)计算得到。
计算列常常被用在定义时间属性,可以通过 PROCTIME() 函数定义处理时间属性,语法为proc AS PROCTIME()。除此之外,计算列可以被用作提取事件时间列,因为原始的事件时间可能不是 TIMESTAMP(3) 类型或者是存在JSON串中。
注意:
WATERMARK 定义了表的事件时间属性,其形式为
WATERMARK FOR rowtime_column_name AS watermark_strategy_expression
rowtime_column_name 把一个现有的列定义为一个为表标记事件时间的属性。该列的类型必须为 TIMESTAMP(3),且是 schema 中的顶层列,它也可以是一个计算列。
watermark_strategy_expression 定义了 watermark 的生成策略。它允许使用包括计算列在内的任意非查询表达式来计算 watermark ;表达式的返回类型必须是 TIMESTAMP(3),表示了从 Epoch 以来的经过的时间。 返回的 watermark 只有当其不为空且其值大于之前发出的本地 watermark 时才会被发出(以保证 watermark 递增)。每条记录的 watermark 生成表达式计算都会由框架完成。 框架会定期发出所生成的最大的 watermark ,如果当前 watermark 仍然与前一个 watermark 相同、为空、或返回的 watermark 的值小于最后一个发出的 watermark ,则新的 watermark 不会被发出。 Watermark 根据 pipeline.auto-watermark-interval 中所配置的间隔发出。 若 watermark 的间隔是 0ms ,那么每条记录都会产生一个 watermark,且 watermark 会在不为空并大于上一个发出的 watermark 时发出。
使用事件时间语义时,表必须包含事件时间属性和 watermark 策略。
Flink 提供了几种常用的 watermark 策略。
严格递增时间戳: WATERMARK FOR rowtime_column AS rowtime_column。
递增时间戳: WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL ‘0.001’ SECOND。
有界乱序时间戳: WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL ‘string’ timeUnit。
1、事件事件的类型 必须是 timestamp(3), 格式为 yyyy-MM-dd HH:mm:ss
定义一个表
CREATE TABLE user_log (
user_id VARCHAR,
item_id VARCHAR,
category_id VARCHAR,
behavior VARCHAR,
`ts` TIMESTAMP(3), -- 事件时间
proctime as PROCTIME(), -- 处理时间列
WATERMARK FOR ts as ts - INTERVAL '5' SECOND -- 在ts上定义watermark,ts成为事件时间列
) WITH (
'connector' = 'kafka', -- 使用 kafka connector
'topic' = 'user_behavior', -- kafka topic
'scan.startup.mode' = 'latest-offset', -- 从起始 offset 开始读取
'properties.bootstrap.servers' = 'chb1:9092',
'properties.group.id' = 'testGroup',
'format' = 'csv'
);
源数据的 事件时间 为 yyyy-MM-dd HH:mm:ss
4,1003,10003,buy,2022-10-27 10:47:06
3,1001,10001,pv,2022-10-27 10:47:06
4,1002,10004,cart,2022-10-27 10:47:07
2、如果修改源数据的事件事件 为 long time,
源数据的 事件事件 为 13位时间戳
3,1002,10004,cart,1666839133265
6,1001,10003,buy,1666839133574
1,1002,10002,buy,1666839133887
报错
java.time.format.DateTimeParseException: Text '1666838767684' could not be parsed at index 0
解决:重新定义 事件时间字段
CREATE TABLE user_log2 (
user_id VARCHAR,
item_id VARCHAR,
category_id VARCHAR,
behavior VARCHAR,
`event_time` bigint, -- 13位时间戳
ts as to_timestamp(from_unixtime(`event_time`/1000, 'yyyy-MM-dd HH:mm:ss')), -- 定义成事件时间,必须是timestamp(3)格式
`proctime` as PROCTIME(), -- 处理时间列
WATERMARK FOR ts as ts - INTERVAL '5' SECOND -- 在ts上定义watermark,ts成为事件时间列
) WITH (
'connector' = 'kafka', -- 使用 kafka connector
'topic' = 'user_behavior', -- kafka topic
'scan.startup.mode' = 'latest-offset', -- 从起始 offset 开始读取
'properties.bootstrap.servers' = 'chb1:9092',
'properties.group.id' = 'testGroup',
'format' = 'csv'
);