本文的主要目的是学会flink window窗口编程,本文章不会对:滑动窗口,滚动窗口,sessoin窗口作深入的讲解,那不是本文的目的。本文在假设你对窗口有一定了解的基础的前提下,带你看看flink窗口编程。 在读本篇文章的基础上,你需要先阅读:flink 水位线彻底站起来
回忆下批处理吧, 我们以日志数据来举例。一般来说我们的日志都是按照日期进行分类的, 比如:/log/2022/03/02/, 如果我想分析2022年三月二号的日志的话,我只需要加载上面的目录就行了, 而实际上这不就是一个窗口嘛? 这个窗口就是我们定义好的文件夹中的数据。 所以窗口不是流处理独有的概念,他并没有那么高深。 我们来给窗口下个定义: 窗口就是对数据精确划分的一个集合。
至于你怎么划分那是你的事了, 比如常见的根据时间划分, 也有的是根据数量划分, 甚至根据某些特性划分,划分的方式多了去了。 所有这些划分最终的目的,就是把聚集的数据 按照业务层需要的方式 去分割成一个个的小集合, 而这每个小集合都是一个窗口。
我为什么会将理解窗口作为一个小节,那是因为我也是小白过来的,明白小白的心态。理解了窗口的本质,会对后面的学习事半功倍。
flink的窗口对象有两个实现类,见下图:

官网这么说的:A global windows assigner assigns all elements with the same key to the same single global window. This windowing scheme is only useful if you also specify a custom trigger. Otherwise, no computation will be performed, as the global window does not have a natural end at which we could process the aggregated elements.
flink有一个全局窗口分配器(后面会讲分配器),该分配器将相同key(指的是keyedStream)流数据全部分配到一个全局窗口GlabalWindow中,GlabalWindow窗口必须指定触发器trigger,因为全局窗口分配器没有默认的触发器,所以必须指定触发器trigger。
看个例子,keyBy数据流有个方法。countWindow()大家知道countWindow不属于时间窗口,那么其只可能是GlobalWindow结合触发器trigger实现的,看源码:
public WindowedStream countWindow(long size) {
return window(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));
}
//发现其底层就是调用了window方法,
GlobalWindows.create()创建了全局窗口分配器
然后指定trigger
这个代码看不懂没关系,看了后面的分配器就懂了,此时只需要知道GlobalWindow没有触发器即可。要用GlobalWindow的时候开发者必须指定一个触发器,否则的话GlobalWindow永远不会被触发。
时间窗口,可以说这是做常用的了吧,不管是滑动窗口,滚动窗口又或是sessoin窗口内部都是TimeWindow对象。
TimeWindow 对象有三个属性
在看下面的介绍之前,其实flink的窗口可能和大部分的人理解的不一样,他并不是先构建一个携带时间范围的窗口对象,然后流数据过来的时候根据流数据的时间去判断其属于哪个窗口。 你是不是这么理解的,此处请对号入座。 网上很多人也是这么说的,为什么大家都这样说,因为这样说好理解,但是实际上flink内部不是这样实现的。 请往下看…
当流数据到来的时候flink通过窗口分配器WindowAssigner为每个数据建立一个TimeWindow对象,同时每条数据再注册一个触发器trigger。
flink会根据数据中的event time 和用户子自定义的窗口宽度size 生成TimeWindow对象的窗口范围[startTime, endTime] .
假如现在时间是15:01, 窗口宽度size=5, 然后连续过来三个流数据:
e1(event time=15:02),e2(event time=15:02),e3(event time=15:02)。然后这三个流数据会生成三个一一对应的TimeWindow对象。
注意了,flink根据e1,e2,e3的event time和 窗口宽度size=5 计算出这三个TimeWindow对象的窗口范围都为[15:00, 15:05], 这种计算方法是flink计算的,这个计算才是核心,有兴趣的建议看看源码。
这三个窗口对象的窗口时间范围都是[15:00, 15:05], 在此基础上flink会为这三个流数据都注册一个trigger定时触发器。 因为这三个元素的窗口时间范围一样, 因此这三个trigger触发器的时间也是一样的,然后等时间一到, 这三个对象就会同时参与计算, 这样的话这三个对象逻辑上就属于同一个窗口。下图是滑动窗口在event time下的trigger定时触发器源码:
这块比较拗口,我们再来理一理:
1.流中的每个元素到来, 都会分配一个TimeWindow对象和一个触发器Trigger
2.TimeWindow对象根据当前元素的event time以及窗口的宽度 来确定 窗口范围[starttime,endtime]
3.随着流元素的逐个到来, 紧挨着到来的流元素生成的TimeWindow对象的[starttime,endtime]可能一样
4.[starttime,endtime]一样的TimeWindow对象逻辑上属于一个窗口,这个逻辑上的窗口才是
大众理解的窗口
5.属于同一个逻辑窗口的流数据,对应的trigger也是一样的。为什么呢?因为这些元素的TimeWindow对象的窗口范围[starttime,endtime]是一样的, 而trigger对象需要的时间就是从maxtimestamp 其实(maxtimestamp=endtime-1)

触发逻辑
如果你2.3小节没看懂建议不要往下看了。到这里我假设2.3小节你已经明白。 每个流数据都有一个
TimeWindow对象和一个Trigger触发器,
当不同流元素TimeWindow的窗口时间范围[start,end]一样的时候,其触发器Trigger也是一样的,
既然Trigger一样那么时间一到就会同时开始处理。
此时他们属于同一个逻辑窗口, 不严谨的说法就是他们是属于一个窗口的。
flink时间窗口根据时间的特性分为:event time和process time,不管哪种窗口上面的触发逻辑都是一摸一样。不同的是 event time类型的Trigger触发器多了一个判断水位线的逻辑,process time的触发其实就是我上面说的触发逻辑,接下来聊聊event time的触发.

event time 模式下:Trigger对象解析数据的时候,除了注册了触发器之外,还加了了watermar>=TimeWindow.maxtimestamp 的判断。 步骤1:就是Trigger对象根据TimeWindow的 maxtimestamp注册的定时器,即便2不发生,窗口最终也会被触发。此步骤就是本小节开头说的触发逻辑。
步骤2:仔细看红色圈圈的代码,我们可以看到和水位线条进行比较的是窗口的maxtimestamp时间,而触发器的用的时间也是maxtimestamp, 如果watermar>=TimeWindow.maxtimestamp 不成立的话,注册的Trigger对象即使到了时间也不会被触发,也就是说对event time而言水位线越过窗口才是唯一的触发条件。 当前watermar>=TimeWindow.maxtimestamp 不满足Trigger不触发的逻辑我在源码中暂时还未找到。 希望找到的小伙伴留言给我。
stream
.keyBy(...) <- keyed versus non-keyed windows
.window(WindowAssigner) <- 必须:窗口分配器,决定了流数据被分配到哪个窗口中
.trigger(Trigger) <- 可选: "触发器" (若不设置会选择默认值)
.evictor(Evictor) <- 可选: "清除器" (else no evictor)
.allowedLateness(Time) <- 可选: 允许元素迟到多久,默认为0意思是不允许迟到
.sideOutputLateData(OutputTag)<- 可选: 迟到的数据输出到OutputTag管道中
.reduce/aggregate/apply() <- 必须: 落到窗口中的数据怎么处理
.getSideOutput(OutputTag) <- 可选: 获取迟到的数据
stream
.windowAll(WindowAssigner) ->必须
.trigger(Trigger) ->可选
.evictor(Evictor) ->可选
.allowedLateness(Time) ->可选
.sideOutputLateData(OutputTag) ->可选
.reduce/aggregate/apply() ->必须
.getSideOutput(OutputTag) ->可选
说明:可以看到键控流和非键控流的最大的区别就是是否对流数据 调用了keyBy算子。 经过keyBy处理的数据流,会根据key分成多条task并行任务, 这些task任务可以并行执行。 而不经过keyBy算子的话,那所有的数据都将被一个窗口处理,也就是只能调用windowAll, 这样的话所有的数据都会被一个task处理, 也就是说所有的数据都将发送到下游算子的单个task中–>并行度为1.

WindowAssigner是最顶层接口,其余的都是它的实现类,其实现类都称为窗口分配器,可以拿来直接用。 其实现类我不会都说一遍,下文会说一些常用的,至于其他的感兴趣可以自己去看。
public abstract class WindowAssigner implements Serializable {
private static final long serialVersionUID = 1L;
/**
*每个流数据 element都会经过这个方法,其作用就是根据timestamp
生成TimeWindow或者GlobalWindow,实际开发中大部分都是TimeWindow
TimeWindow包含starttime,endtime,maxtimestamp
*/
public abstract Collection assignWindows(
T element, long timestamp, WindowAssignerContext context);
/** 该方法为到来的流元素element定义了一个Trigger对象,该Trigger对象决定了窗口以何种方式触发
*/
public abstract Trigger getDefaultTrigger(StreamExecutionEnvironment env);
...
}
对应的WindowAssigner实现类有两个:
- TumblingEventTimeWindows
- TumblingProcessingTimeWindows
常用场景:每隔五分钟统计一次
下图是一个每隔五秒划分一个窗口的图解。滑动窗口是不会重叠的。

仔细看上图,Tumbling在英文中是滑动的意思,因此上图中:
TumblingEventTimeWindows和TumblingProcessingTimeWindows都是滑动窗口。
TumblingEventTimeWindows.of(Time.minutes(5),Time.seconds(10))
第一个参数指的是窗口的大小,第二个参数指的是窗口的起始偏移量。 窗口是按照自然时间划分的,比如Time.minutes(5)
划分的窗口是:[00:00:00,00:05:00],[00:05:00,00:10:00]。TumblingEventTimeWindows.of(Time.minutes(5),Time.seconds(10))意思是在自然划分的窗口的基础上起始偏移量为十秒,此时划分的窗口就是:
[00:00:10, 00:05:10], [00:05:10,00:10:10]
TumblingProcessingTimeWindows 参数也是一样的就不多说了
stream1
.keyBy(0)
.window(TumblingEventTimeWindows.of(Time.minutes(5),Time.seconds(10)))
.
windowFunction指的是具体的窗口执行函数比如:reduce/aggregate/apply()
对应的WindowAssigner实现类有两个:
- SlidingEventTimeWindows
- SlidingProcessingTimeWindows
常用场景:每隔一分钟统计过去五分钟的数据量
滚动窗口有两个必要属性:1.步长slide 2.窗口宽度size
意思是每隔slide 滑动一次, 一般slide比size小, 你仔细想想,当slide=size的时候滚动窗口就变成了滑动窗口。 滚动窗口会有重叠,意思是一个流元素可能会属于多个滑动窗口中, slide越小窗口分配的就越多。
SlidingEventTimeWindows.of(Time.minutes(10),Time.seconds(5),Time.se conds(1))第一个参数是窗口大小size,第二个参数是步长,第三个参数是偏移量。窗口划分也是基于自然时间划分的。
stream1
.keyBy(0)
.window(SlidingEventTimeWindows.of(Time.minutes(10),Time.seconds(5),Time.seconds(1))).
windowFunction指的是具体的窗口执行函数比如:reduce/aggregate/apply()
对应的WindowAssigner实现类有两个:
- EventTimeSessionWindows //静态事件时间
- DynamicEventTimeSessionWindows //动态事件时间
- ProcessingTimeSessionWindows //静态处理时间
- DynamicProcessingTimeSessionWindows //动态处理时间
常用于:十分钟内没有新的数据到来则开启新的窗口。
会话窗口其实逻辑也很简单, 它有有一个时间间隔在这里为了下文方便我们把这个时间间隔记作sessionTimeout。在sessionTimeout内如果没有新的数据流到窗口则该窗口关闭, 然后开启新的窗口。 那如果一直有数据过来则该窗口永远不会关闭。 这也意味着会话窗口的endTime是不确定的,话句话说窗口的大小是不确定的。
sessoin窗口按照类型分为静态和动态窗口
上面关于动态和静态的sessoin很难理解,因为sessoin窗口和普通的窗口不一样,sessoin窗口涉及到窗口的合并,如果不了解窗口分配很难理解,下面我将尽量讲清楚窗口的分配逻辑。
窗口对象:TimeWindow(timestamp, timestamp + sessionTimeout)
timestamp是流数据的时间,可能是event time 也可能是process time,和你用的窗口分配器类型有关(如果你用EventTimeSessionWindows则timestamp表示event time )
sessoin窗口没办法很简单的举例子来说明窗口的分配,我们必须结合源码来看,我将尽量讲解清除。



上面你可能知道了sessoin窗口根据sessionTimeout是自动生成还是动态提取被分为:静态sessoin窗口和动态sessoin窗口。
sessoin窗口和滑动/滚动窗口最大的不同就在于sessoin窗口有一个合并操作。不管是动态还是静态,通过源码可以看到sessoin每到来一个元素都会创建一个时间窗口对象:TimeWindow(timestamp, timestamp + sessionTimeout)
基于时间timestamp 和sessoinTimeout确定一个窗口范围,我们说过sessoin窗口范围是在不断变化的,如果在[timestamp, timestamp + sessionTimeout]范围内到达了新的流数据,该流数据对应的窗口为[timestamp2, timestamp2 + sessionTimeout], 则合并后二者实际上属于一个窗口[timestamp,timestamp2 + sessionTimeout],看下图。

stream1
.keyBy(0)
.window(EventTimeSessionWindows.withGap(Time.minutes(5)))
.
windowFunction指的是具体的窗口执行函数比如:reduce/aggregate/apply()
stream1.keyBy(0).window( DynamicEventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor>(){
@Override
public long extract(Tuple2 element) {
return element.f1;
}
}));
SessionWindowTimeGapExtractor用于返回当前流数据
构建窗口的sessoinTimeout参数。
每个流数据都会调用一次,因为
每个流数据都会构建一个TimeWindow(timestamp, timestamp + sessionTimeout)
关于sessoin只能尽量说成这样了。 另外值得注意的就是,上面我讲解的WindowAssigner都是flink已经实现好的,其内部包含了Trigger所以不需要再次指定,内置的WindowAssigner都有一个默认的Trigger. 你可以点开源码看下。
滚动窗口和滑动窗口用法基本一样,我们挑滑动窗口来做两个例子,例子分为event time 和process time两种模式。 记住process time不需要水位线。
public class Order {
private String name;//订单名称
private int price;//订单价格
private long timestamp; //订单产生时间,用于event time
public Order() {
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getPrice() {
return price;
}
public void setPrice(int price) {
this.price = price;
}
public long getTimestamp() {
return timestamp;
}
public void setTimestamp(long timestamp) {
this.timestamp = timestamp;
}
@Override
public String toString() {
return "Order{" +
"name='" + name + '\'' +
", price=" + price +
", timestamp=" + timestamp +
'}';
}
}
import org.apache.flink.streaming.api.functions.source.SourceFunction;
//模拟订单产生的时间
public class SourceOrder implements SourceFunction {
private volatile boolean flag=true;
@Override
public void run(SourceContext ctx) throws Exception {
long timestamp = System.currentTimeMillis();
while (flag) {
Order order_0 = new Order();
Order order_1 = new Order();
order_0.setName("order_0");
order_0.setPrice(1);
order_0.setTimestamp(timestamp);
order_1.setName("order_1");
order_1.setPrice(2);
order_1.setTimestamp(timestamp);
ctx.collect(order_0);
ctx.collect(order_1);
Thread.sleep(1000);
timestamp += 1000;//订单产生间隔一秒
}
}
@Override
public void cancel() {
flag = false;
}
}
import org.apache.flink.api.common.eventtime.TimestampAssigner;
import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.*;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;
import java.text.SimpleDateFormat;
public class WindowsEventTimeMain {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//定义两条流
DataStreamSource ds = env.addSource(new SourceOrder());
// ds.print("源数据:");
// ds.print();
env.getConfig().setAutoWatermarkInterval(100);//水位线生成周期
KeyedStream keyedStream = ds.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner(new TimestampAssignerSupplier(){
@Override
public TimestampAssigner createTimestampAssigner(Context context) {
return (e,timestamp) -> e.getTimestamp();
}
})).keyBy((KeySelector) value -> value.getName());
OutputTag outputTag = new OutputTag("sideout"){};
SingleOutputStreamOperator single= keyedStream
.window(TumblingEventTimeWindows.of(Time.seconds(2)))
.sideOutputLateData(outputTag)
.reduce(new ReduceFunction() {
@Override
public Order reduce(Order value1, Order value2) throws Exception {
SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String str_time01 = sdf.format(value1.getTimestamp());
String str_time02 = sdf.format(value2.getTimestamp());
Order order = new Order();
order.setName(value1.getName()+">>"+str_time01+":"+str_time02);
order.setPrice(value1.getPrice()+value2.getPrice());
order.setTimestamp(9999999);
return order;
}
});
single.getSideOutput(outputTag).print("迟到数据:");
single.print("真实窗口输出:");
env.execute("event time demo.");
}
}

基于process time就很简单了,不需要水位线条,代码很简单那。
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.text.SimpleDateFormat;
public class WindowsProcessTimeMain {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//定义两条流
DataStreamSource ds = env.addSource(new SourceOrder());
// ds.print();
SingleOutputStreamOperator outStream = ds
.keyBy("name")
.window(TumblingProcessingTimeWindows.of(Time.seconds(2)))
.reduce(new ReduceFunction() {
@Override
public Order reduce(Order value1, Order value2) throws Exception {
SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String str_time01 = sdf.format(value1.getTimestamp());
String str_time02 = sdf.format(value2.getTimestamp());
Order order = new Order();
order.setName(value1.getName()+">>"+str_time01+":"+str_time02);
order.setPrice(value1.getPrice()+value2.getPrice());
return order;
}
});
outStream.print();
env.execute("prcocess time demo");
}
}
结果:

对了,你可以把时间TumblingProcessingTimeWindows.of(Time.seconds(2))改成:
TumblingProcessingTimeWindows.of(Time.seconds(1)) 结果如下:

你会发现,这不是reduce的结果,reduce根本没有被调用。 因为设置一秒的话,由于数据源是一秒生成一条数据, 那么也就意味着每个窗口大概率只有一条数据, 那么reduce在只有一条数据的时候不会生效。 咳咳,感觉有点坑。 希望对此有研究的小伙伴给我留言。
reduce:对窗口中的每条数据做逐个处理,最后整个窗口输出一条数据。
@Public
@FunctionalInterface
public interface ReduceFunction extends Function, Serializable {
/**
* @param value1 The first value to combine.
* @param value2 The second value to combine.
* @return The combined value of both input values.
* @throws Exception This method may throw exceptions. Throwing an exception will cause the
* operation to fail and may trigger recovery.
*/
T reduce(T value1, T value2) throws Exception;
}
通过源码可以看到:reduce最后的返回值类型和流数据的类型必须保持一致,
而这也是开发中需要主义的地方。
上一节的实战部分展示了reduce函数,我们想一下,reduce函数发出的结果有一个小的问题,就是这个结果是属于哪个窗口的我们并不知道。 如果我可以获取到结果所属的窗口的时间那就很有用了,这样的话结果更加清晰。 你想每条reduce的结果我都知道其计算的是哪个窗口,把窗口时间附加到结果中很明确,方便后续我们做一些基于时间的过滤。 但是有些人可能会说了,我不需要知道窗口的时间啊, 我只需要用System.currentTimeMillis()获取当前系统时间,然后把这个时间赋予到结果中不也行嘛? 问题已经出现,请大家好好思考一下,对event time来说其窗口时间 范围i是根据event time来计算的,如果我今天去kafka拿到昨天产生的基于event time的数据,明显窗口时间是昨天的,而你用System.currentTimeMillis()获取的时间就是今天的啊,这样肯定有问题。 有人就说了既然如此我随便取一个 流元素的event time不也行嘛? 答案是可以的,但是不严谨。 flink的reduce 有一个重载函数方法reduce(ReduceFunction, WindowFunction)
windowFunction用于处理reduce发出的结果。
下面代码实现了,对reduce结果添加窗口时间范围的功能。
import org.apache.flink.api.common.eventtime.TimestampAssigner;
import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.text.SimpleDateFormat;
import java.util.function.Consumer;
public class WindowsEventTimeReducetionMain {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//定义两条流
DataStreamSource ds = env.addSource(new SourceOrder());
// ds.print("源数据:");
// ds.print();
env.getConfig().setAutoWatermarkInterval(100);//水位线生成周期
KeyedStream keyedStream = ds.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner(new TimestampAssignerSupplier(){
@Override
public TimestampAssigner createTimestampAssigner(Context context) {
return (e,timestamp) -> e.getTimestamp();
}
})).keyBy((KeySelector) value -> value.getName());
OutputTag outputTag = new OutputTag("sideout"){};
SingleOutputStreamOperator single= keyedStream
.window(TumblingEventTimeWindows.of(Time.seconds(2)))
.sideOutputLateData(outputTag)
.reduce(new ReduceFunction() {
@Override
public Order reduce(Order value1, Order value2) throws Exception {
Order order = new Order();
order.setName(value1.getName());
order.setPrice(value1.getPrice() + value2.getPrice());//订单价格求和
return order;
}
}, new WindowFunction() {
//由于窗口的数据已经被reduceFunction处理并且做了累加,数据到WindowFunction
//的时候就是那个reduceFunction处理后的数据,处理后的数据都在Iterable
//中,直接遍历出来加上 窗口时间即可
@Override
public void apply(String s, TimeWindow window, Iterable input, Collector out) throws Exception {
input.forEach(new Consumer() {
@Override
public void accept(Order order) {
SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String window_start_time =sdf.format( window.getStart());//当前窗口的起始时间
String window_end_time =sdf.format( window.getEnd());//endtime是下一个窗口的初始时间
order.setTimestamp(window.maxTimestamp());//maxTimestamp=endtime-1
order.setName(order.getName()+">>>>window_time_range[ "+window_start_time+" , "+window_end_time+" ]");
out.collect(order);
}
});
}
});
single.getSideOutput(outputTag).print("迟到数据:");
single.print("真实输出:");
env.execute("sdaaf");
}
}
结果:
真实输出::8> Order{name=‘order_0>>>>window_time_range[ 2022-08-24 15:35:32 , 2022-08-24 15:35:34 ]’, price=2, timestamp=1661326533999}
真实输出::7> Order{name=‘order_1>>>>window_time_range[ 2022-08-24 15:35:32 , 2022-08-24 15:35:34 ]’, price=4, timestamp=1661326533999}
真实输出::7> Order{name=‘order_1>>>>window_time_range[ 2022-08-24 15:35:34 , 2022-08-24 15:35:36 ]’, price=4, timestamp=1661326535999}
真实输出::8> Order{name=‘order_0>>>>window_time_range[ 2022-08-24 15:35:34 , 2022-08-24 15:35:36 ]’, price=2, timestamp=1661326535999}
真实输出::7> Order{name=‘order_1>>>>window_time_range[ 2022-08-24 15:35:36 , 2022-08-24 15:35:38 ]’, price=4, timestamp=1661326537999}
真实输出::8> Order{name=‘order_0>>>>window_time_range[ 2022-08-24 15:35:36 , 2022-08-24 15:35:38 ]’, price=2, timestamp=1661326537999}
真实输出::8> Order{name=‘order_0>>>>window_time_range[ 2022-08-24 15:35:38 , 2022-08-24 15:35:40 ]’, price=2, timestamp=1661326539999}
真实输出::7> Order{name=‘order_1>>>>window_time_range[ 2022-08-24 15:35:38 , 2022-08-24 15:35:40 ]’, price=4, timestamp=1661326539999}
reduce是一个的局限性有点大,输入类型和输出类型必须保持一致,对于window窗口内的聚合行为有一个比较高级的聚合方法aggregate,aggregate方法接收一个聚合器接口参数,以及洽谈的一些参数,所有的重载方法都在下图。

除了红色箭头是AggregationFunction ,其他的都是 AggregateFunction
先看红色箭头AggregationFunction的接口源码:
private SingleOutputStreamOperator aggregate(AggregationFunction aggregator) {
return reduce(aggregator);
}
public abstract class AggregationFunction implements ReduceFunction {
private static final long serialVersionUID = 1L;
/** Aggregation types that can be used on a windowed stream or keyed stream. */
public enum AggregationType {
SUM,
MIN,
MAX,
MINBY,
MAXBY,
}
}
请仔细看上面的代码,你会发现
aggregate(AggregationFunction aggregator) { return reduce(aggregator);
这个聚合器竟然调用了reduce,当你点开AggregationFunction的时候发现了这家伙继承了Reducetion,且里面有一个枚举对象,枚举对象中的值为: SUM, MIN,MAX, MINBY, MAXBY。
。AggregationFunction有两个实现类:

具体我就不展开了,这个聚合器你基本不会重新的,flink基本上已经实现好了:
可以直接用,用法就是: window后面直接调用".sum" “.max” “.min” “.minBy” “maxBy”
这也是这一小节的重点,它和reduce最大的不同就是输入类型和输出类型可以不一致
public interface AggregateFunction extends Function, Serializable {
/**
* 创建一个累加器,可以是任意flink支持的可序列化的类型
* 用于缓存数据
* 最终的结果是从这个缓存中拿出来的。
*/
ACC createAccumulator();
/**
* 每来一条数据都要和缓存中的数据做同样的逻辑计算
* 然后返回一个新的缓存对象 (缓存指的是累加器)
*/
ACC add(IN value, ACC accumulator);
/**
* 当窗口出发的时候会调用这个方法,从累加器中获取结果
* 即发出窗口的计算结果。
*/
OUT getResult(ACC accumulator);
/**
* 我之前说过,flink的窗口只有sessoin窗口的大小是不固定的
* 这个不固定是通过窗口合并实现的, 所以
* 当你的窗口是sessoin窗口的时候,可能会有窗口合并
* 而累加器是属于窗口的属性,当窗口合并的时候必须在这里
* 手动合并两个窗口的累加器缓存。
* 换言之如果我们不用sessoin窗口,此方法不需要实现就行。
* 无论是滑动窗口还是滚动窗口他们的窗口大小时不变的,也不需要合并。
*/
ACC merge(ACC a, ACC b);
}
下面我们还是原来的reduce代码稍加改动实现和reduce一样的功能,我自定义了HashMap累加器存储订单号和 价格,方会该订单的所有价格之和,最终结果定义成String类型。
package com.pg.flink.dataStreame.window;
import org.apache.flink.api.common.eventtime.TimestampAssigner;
import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;
import java.util.HashMap;
public class WindowsEventTimeAggregateMain {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//定义两条流
DataStreamSource ds = env.addSource(new SourceOrder());
// ds.print("源数据:");
// ds.print();
env.getConfig().setAutoWatermarkInterval(100);//水位线生成周期
KeyedStream keyedStream = ds.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner(new TimestampAssignerSupplier(){
@Override
public TimestampAssigner createTimestampAssigner(Context context) {
return (e,timestamp) -> e.getTimestamp();
}
})).keyBy((KeySelector) value -> value.getName());
OutputTag outputTag = new OutputTag("sideout"){};
SingleOutputStreamOperator single= keyedStream
.window(TumblingEventTimeWindows.of(Time.seconds(2)))
.sideOutputLateData(outputTag)
.aggregate(new AggregateFunction, String>() {
@Override
//初始化累加器(其实就是一个缓存)
public HashMap createAccumulator() {
return new HashMap<>();//key 存储order的name,value 存储其价格
}
@Override
public HashMap add(Order value, HashMap accumulator) {
String name = value.getName();
int price = value.getPrice();
HashMap new_accumulator = new HashMap<>();
//当name不存在的时候表示是第一个数据此时累加器没有数据,我们给个默认值0即可
new_accumulator.put(name, price + accumulator.getOrDefault(name,0));
return new_accumulator;
}
@Override
public String getResult(HashMap accumulator) {
return accumulator.toString();
}
@Override
public HashMap merge(HashMap a, HashMap b) {
//因为不是sessoin窗口,返回一个空的map即可
return new HashMap<>();
}
});
single.getSideOutput(outputTag).print("迟到数据:");
single.print("真实输出:");
env.execute("sdaaf");
}
}
结果如下:
真实输出::7> {order_1=4}
真实输出::8> {order_0=2}
真实输出::7> {order_1=4}
真实输出::8> {order_0=2}
真实输出::7> {order_1=4}
真实输出::8> {order_0=2}
多说一句:我自定义的sourec每隔一秒中产生一个order_0,和一个order_1
窗口长度两秒, 所以每一次的结果都是两条数据相加,所以结果是准确的哦。
aggregate(AggregateFunction
window中的数据结果就是AggregateFunction处理后的结果, 然后此结果会再次流入WindowFunction再次处理,最后返回最终的处理结果,WindowFunction最主要的功能就是提供的访问窗口的时间的方法,能让我们知道 当前结果的计算时间信息。
import com.pg.flink.dataStreame.window.Order;
import com.pg.flink.dataStreame.window.SourceOrder;
import org.apache.flink.api.common.eventtime.TimestampAssigner;
import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.text.SimpleDateFormat;
import java.util.HashMap;
import java.util.function.Consumer;
public class WindowsEventTimeAggregateWindowFunctionMain {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//定义两条流
DataStreamSource ds = env.addSource(new SourceOrder());
// ds.print("源数据:");
// ds.print();
env.getConfig().setAutoWatermarkInterval(100);//水位线生成周期
KeyedStream keyedStream = ds.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner(new TimestampAssignerSupplier(){
@Override
public TimestampAssigner createTimestampAssigner(Context context) {
return (e,timestamp) -> e.getTimestamp();
}
})).keyBy((KeySelector) value -> value.getName());
OutputTag outputTag = new OutputTag("sideout"){};
SingleOutputStreamOperator single= keyedStream
.window(TumblingEventTimeWindows.of(Time.seconds(2)))
.sideOutputLateData(outputTag)
.aggregate(new AggregateFunction, String>() {
@Override
//初始化累加器(其实就是一个缓存)
public HashMap createAccumulator() {
return new HashMap<>();//key 存储order的name,value 存储其价格
}
@Override
public HashMap add(Order value, HashMap accumulator) {
String name = value.getName();
int price = value.getPrice();
HashMap new_accumulator = new HashMap<>();
//当name不存在的时候表示是第一个数据此时累加器没有数据,我们给个默认值0即可
new_accumulator.put(name, price + accumulator.getOrDefault(name, 0));
return new_accumulator;
}
@Override
public String getResult(HashMap accumulator) {
return accumulator.toString();
}
@Override
public HashMap merge(HashMap a, HashMap b) {
//因为不是sessoin窗口,返回一个空的map即可
return new HashMap<>();
}
}, new WindowFunction() {
@Override
//第一个参数是key的类型
// 第二个参数是当前数据是被TimeWindow对象包装的
//第三个获取的窗口中的所有数据,因为窗口数据已经被AggregateTion处理过来,所以迭代器此时只有一条数据,此数据就是AggregateTion中getResult返回的结果
//第四个参数用来发出最终结果
public void apply(String s, TimeWindow window, Iterable input, Collector out) throws Exception {
//
input.forEach(new Consumer() {
@Override
public void accept(String order) {
SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String window_start_time =sdf.format( window.getStart());//当前窗口的起始时间
String window_end_time =sdf.format( window.getEnd());//endtime是下一个窗口的初始时间
out.collect(order+">>window_time_range["+window_start_time+" , "+window_end_time+" ]");
}
});
}
});
single.getSideOutput(outputTag).print("迟到数据:");
single.print("真实输出:");
env.execute("sdaaf");
}
}
结果:
真实输出::8> {order_0=2}>>window_time_range[2022-08-25 19:10:36 , 2022-08-25 19:10:38 ]
真实输出::7> {order_1=4}>>window_time_range[2022-08-25 19:10:36 , 2022-08-25 19:10:38 ]
真实输出::8> {order_0=2}>>window_time_range[2022-08-25 19:10:38 , 2022-08-25 19:10:40 ]
真实输出::7> {order_1=4}>>window_time_range[2022-08-25 19:10:38 , 2022-08-25 19:10:40 ]
真实输出::7> {order_1=4}>>window_time_range[2022-08-25 19:10:40 , 2022-08-25 19:10:42 ]
aggregate(AggregateFunction
,ProcessWindowFunction
和6.23比较类似,也可以获取到窗口的时间,但是ProcessWindowFunction有自己的特点:
- 可以访问算子的processTime
- 可以访问算子的水位线时钟
- 可以使用定时器(尤其注意只有keyedStream可以使用此功能)
- 可以访问更新以及初始化状态,此状态是你自定义的状态,我们可以基于这些状态去发出数据,这样甚至可以直接更改算子的行为。
更多有关processFunction的介绍参考:flink processFunction
代码实例暂时先不展示状态相关的操作,后续会讲解,就展示一下简单的获取窗口时间吧。
import com.pg.flink.dataStreame.window.Order;
import com.pg.flink.dataStreame.window.SourceOrder;
import org.apache.flink.api.common.eventtime.TimestampAssigner;
import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.text.SimpleDateFormat;
import java.util.HashMap;
import java.util.function.Consumer;
public class WindowsEventTimeAggregateProcessFunctionMain {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//定义两条流
DataStreamSource ds = env.addSource(new SourceOrder());
// ds.print("源数据:");
// ds.print();
env.getConfig().setAutoWatermarkInterval(100);//水位线生成周期
KeyedStream keyedStream = ds.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner(new TimestampAssignerSupplier(){
@Override
public TimestampAssigner createTimestampAssigner(Context context) {
return (e,timestamp) -> e.getTimestamp();
}
})).keyBy((KeySelector) value -> value.getName());
OutputTag outputTag = new OutputTag("sideout"){};
SingleOutputStreamOperator single= keyedStream
.window(TumblingEventTimeWindows.of(Time.seconds(2)))
.sideOutputLateData(outputTag)
.aggregate(new AggregateFunction, String>() {
@Override
//初始化累加器(其实就是一个缓存)
public HashMap createAccumulator() {
return new HashMap<>();//key 存储order的name,value 存储其价格
}
@Override
public HashMap add(Order value, HashMap accumulator) {
String name = value.getName();
int price = value.getPrice();
HashMap new_accumulator = new HashMap<>();
//当name不存在的时候表示是第一个数据此时累加器没有数据,我们给个默认值0即可
new_accumulator.put(name, price + accumulator.getOrDefault(name, 0));
return new_accumulator;
}
@Override
public String getResult(HashMap accumulator) {
return accumulator.toString();
}
@Override
public HashMap merge(HashMap a, HashMap b) {
//因为不是sessoin窗口,返回一个空的map即可
return new HashMap<>();
}
}, new ProcessWindowFunction() {
@Override
public void process(String s, Context context, Iterable elements, Collector out) throws Exception {
elements.forEach(new Consumer() {
@Override
public void accept(String s) {
SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String window_start_time =sdf.format( context.window().getStart());//当前窗口的起始时间
String window_end_time =sdf.format( context.window().getEnd());//endtime是下一个窗口的初始时间
out.collect(s+">>window_time_range["+window_start_time+" , "+window_end_time+" ]");
}
});
}
});
single.getSideOutput(outputTag).print("迟到数据:");
single.print("真实输出:");
env.execute("sdaaf");
}
}
真实输出::7> {order_1=4}>>window_time_range[2022-08-26 00:35:08 , 2022-08-26 00:35:10 ]
真实输出::8> {order_0=2}>>window_time_range[2022-08-26 00:35:08 , 2022-08-26 00:35:10 ]
真实输出::7> {order_1=4}>>window_time_range[2022-08-26 00:35:10 , 2022-08-26 00:35:12 ]
process(ProcessWindowFunction
此函数接收一个ProcessWindowFunction函数,process函数是最强大也是最难用的函数,可以这么说无论是reduce还是aggregate都定义了窗口的个性化操作, reduce是前后数据的迭代处理,aggregate定义通过累加器自由可以实现很多的功能,但是不管是reduce韩式aggregate中处理数据的时候数据元素都是一个个过来的, 是逐个过来的。 最容易看到的就是aggregate中的累加器,每次数据到来都会更新累加器,这明显就是逐个的行为。 但是process就不一样的,process是直接拿到当前窗口的所有数据,其内部有一个迭代器Iterable, 包含了当前窗口的所有数据。
除此之外,ProcessWindowFunction还有个内部类, 可以访问水位线,processTime, window对象,和状态。 可以说, reduce和aggregate能实现的功能, process都能实现。 往深了说reduce和aggregate不过是process的特殊形式, 它们不过是把常用的功能抽取出来了而已。 而比较复杂的功能reduce和aggregate是无法实现的, reduce限制最严格,要求输入类型和输出类型一致,且reduce无法访问窗口的结束和开始时间, aggregate好一点,其内部有一个累加器相对而言也更灵活。 重要的来了,无论是reduce还是aggregate其参数都是继承了Function,而process函数的参数继承的是RichFunction,这也就意味着 processFunction是可以自定义以及访问状态的, 这也是proces最大的特性。 关于状态我之后会单独出一篇文章,状态是flink的核心,也是很难理解的地方。
我们已经说了,process可以实现reduce和aggregate的所有功能,以及可以实现一些更高级的功能,为了方便理解,我们下面用process实现reduce功能,并为每条输出添加所属的窗口的时间。
import com.pg.flink.dataStreame.window.Order;
import com.pg.flink.dataStreame.window.SourceOrder;
import org.apache.flink.api.common.eventtime.TimestampAssigner;
import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.text.SimpleDateFormat;
public class WindowsEventTimeProcessFunctionMain {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//定义两条流
DataStreamSource ds = env.addSource(new SourceOrder());
// ds.print("源数据:");
// ds.print();
env.getConfig().setAutoWatermarkInterval(100);//水位线生成周期
KeyedStream keyedStream = ds.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner(new TimestampAssignerSupplier() {
@Override
public TimestampAssigner createTimestampAssigner(Context context) {
return (e, timestamp) -> e.getTimestamp();
}
})).keyBy((KeySelector) value -> value.getName());
OutputTag outputTag = new OutputTag("sideout") {
};
SingleOutputStreamOperator single = keyedStream
.window(TumblingEventTimeWindows.of(Time.seconds(2)))
.sideOutputLateData(outputTag)
.process(new ProcessWindowFunction() {
@Override
public void process(String s, Context context, Iterable elements, Collector out) throws Exception {
int sum = 0;
for (Order element : elements) {
sum += element.getPrice();
}
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String window_start_time = sdf.format(context.window().getStart());//当前窗口的起始时间
String window_end_time = sdf.format(context.window().getEnd());//endtime是下一个窗口的初始时间
out.collect("key=" + s + ", sum=" + sum + ">>>" + "window_time_range[" + window_start_time + "," + window_end_time + "]");
}
});
single.getSideOutput(outputTag).print("迟到数据:");
single.print("真实输出:");
env.execute("sdaaf");
}
}
结果:
真实输出::7> key=order_1, sum=4>>>window_time_range[2022-08-26 01:34:12,2022-08-26 01:34:14]
真实输出::8> key=order_0, sum=2>>>window_time_range[2022-08-26 01:34:12,2022-08-26 01:34:14]
真实输出::7> key=order_1, sum=4>>>window_time_range[2022-08-26 01:34:14,2022-08-26 01:34:16]
真实输出::8> key=order_0, sum=2>>>window_time_range[2022-08-26 01:34:14,2022-08-26 01:34:16]
总结:process是万能的,窗口的一切你想要的操作在这里都能实现。
我们再来看看窗口的代码结构:
stream
.keyBy(...) <- keyed versus non-keyed windows
.window(WindowAssigner) <- 必须:窗口分配器,决定了流数据被分配到哪个窗口中
.trigger(Trigger) <- 可选: "触发器" (若不设置会选择默认值)
.evictor(Evictor) <- 可选: "清除器" (else no evictor)
.allowedLateness(Time) <- 可选: 允许元素迟到多久,默认为0意思是不允许迟到
.sideOutputLateData(OutputTag)<- 可选: 迟到的数据输出到OutputTag管道中
.reduce/aggregate/apply() <- 必须: 落到窗口中的数据怎么处理
.getSideOutput(OutputTag) <- 可选: 获取迟到的数据
读到这里请回头看看我们的代码中用到的部分:
...省略。。。
.window(TumblingEventTimeWindows.of(Time.seconds(2)))
.sideOutputLateData(outputTag)
.process(new ProcessWindowFunction<Order, String, String, TimeWindow>() {...省略...};
请仔细阅读上面两处的代码,你会发现:
.trigger(Trigger) <- 可选: "触发器" (若不设置会选择默认值)
.evictor(Evictor) <- 可选: "清除器" (else no evictor)
这两个地方我们根本没用到,我们用的是TumblingEventTimeWindows,这是flink内置的窗口分配器,其内部源码自己点开你会发现里面有个trigger,也就是说flink内置的窗口分配器其内部有一个默认的触发器trigger. 还拿TumblingEventTimeWindows举例子,其内部的触发器是:EventTimeTrigger,这个触发器有时候并不能满足我们的需要。 具体请看下一节。
短窗口的计算由于其窗口期较短,那么很快就能获取到结果,但是对于长窗口来说窗口时间比较长,如果等窗口期结束才能看到结果,那么这份数据就不具备实时性,大多数情况我们希望能够看到一个长窗口的结果不断变动的情况。比如TumblingEventTimeWindows.of(Time.days(1)), 意思是按天划分窗口,也就是说必须是以天的维度触发一次计算,这种统计历史数据没问题,但是不适合实时看板统计。 窗口虽然是按照天划分,但是我想每分钟都看一下当前天的实时结果,那么此时TumblingEventTimeWindows内置的默认的trigger就不适合了,此时我们需要一个新的trigger取代其默认的trigger,
对此Flink提供了ContinuousEventTimeTrigger连续事件时间触发器与ContinuousProcessingTimeTrigger连续处理时间触发器,指定一个固定时间间隔interval,不需要等到窗口结束才能获取结果,能够在固定的interval获取到窗口的中间结果。
场景:求每个区域的每小时的商品销售额, 要求每隔1min能能够看到销售额变动情况。核心代码实现如下:
.keyBy(...)
.timeWindow(Time.hours(1))
.trigger(ContinuousEventTimeTrigger.of(Time.minutes(1)))