Interval Join 多用于事件时间,如双流join中一条流关联另一条流在指定间隔时间内的记录,使用方法如下:
SELECT *
FROM Orders o, Shipments s
WHERE o.id = s.order_id
AND o.order_time BETWEEN s.ship_time - INTERVAL '4' HOUR AND s.ship_time
Flink SQL 流批一体的核心是:流表二象性。围绕这一核心有若干概念,例如,动态表(Dynamic Table)/时态表(Temporal Table)、版本(Version)、版本表(Version Table)、普通表、连续查询、物化视图/虚拟视图、CDC(Change Data Capture)、Changelog Stream。
使用事件时间属性(即 rowtime 属性) ,可以像过去的某个时刻那样检索键的值。这允许在一个公共时间点连接两个表。版本化的表将存储自上一个水印以来的所有版本(按时间标识)。
-- Create a table of orders. This is a standard
-- append-only dynamic table.
CREATE TABLE orders (
order_id STRING,
price DECIMAL(32,2),
currency STRING,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time
) WITH (/* ... */);
-- Define a versioned table of currency rates.
-- This could be from a change-data-capture
-- such as Debezium, a compacted Kafka topic, or any other
-- way of defining a versioned table.
CREATE TABLE currency_rates (
currency STRING,
conversion_rate DECIMAL(32, 2),
update_time TIMESTAMP(3) METADATA FROM `values.source.timestamp` VIRTUAL,
WATERMARK FOR update_time AS update_time,
PRIMARY KEY(currency) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'value.format' = 'debezium-json',
/* ... */
);
SELECT
order_id,
price,
currency,
conversion_rate,
order_time
FROM orders
LEFT JOIN currency_rates FOR SYSTEM_TIME AS OF orders.order_time
ON orders.currency = currency_rates.currency;
order_id price currency conversion_rate order_time
======== ===== ======== =============== =========
o_001 11.11 EUR 1.14 12:00:00
o_002 12.51 EUR 1.10 12:06:00
处理时间时态表联接使用处理时间属性将行与外部版本化表中的键的最新版本相关联。处理时间时态连接最常用于使用外部表(即维度表)丰富流。
Lookup Join通常用于使用从外部系统查询的数据来丰富表。联接需要一个表具有处理时间属性,另一个表由查找源连接器支持。作用和Processing Time Temporal Join类似,写法有相同
CREATE TABLE orders (
order_id STRING,
price DECIMAL(32,2),
total DECIMAL(32,2),
currency STRING,
order_time TIMESTAMP(3),
proc_time as PROC_TIME()
) WITH (/* ... */);
-- Customers is backed by the JDBC connector and can be used for lookup joins
CREATE TEMPORARY TABLE Customers (
id INT,
name STRING,
country STRING,
zip STRING
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://mysqlhost:3306/customerdb',
'table-name' = 'customers'
);
-- enrich each order with customer information
SELECT o.order_id, o.total, c.country, c.zip
FROM Orders AS o
JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id;