WindowAssigner作为Window数据元素的分配器,可以根据指定的窗口类型,将指定的数据元素分配到指定的窗口中。WindowAssigner是抽象父类,它的实现子类有很多,如:基于EventTime的SlidingEventTimeWindows 、TumblingEventTimeWindows等。
抽象父类WindowAssigner提供了以下抽象方法,子类可以按需选择、按需实现:
@PublicEvolving
public abstract class WindowAssigner<T, W extends Window> implements Serializable {
private static final long serialVersionUID = 1L;
// 定义了将StreamRecord分配到对应Window的逻辑
public abstract Collection<W> assignWindows(T element, long timestamp, WindowAssignerContext context);
// 获取默认的窗口触发器
public abstract Trigger<T, W> getDefaultTrigger(StreamExecutionEnvironment env);
// 获取TypeSerializer:用来序列化“被WindowAssigner分配”的窗口
public abstract TypeSerializer<W> getWindowSerializer(ExecutionConfig executionConfig);
// 是否是基于EventTime实现的Window
public abstract boolean isEventTime();
// 用来获取当前的ProcessingTime
public abstract static class WindowAssignerContext {
public abstract long getCurrentProcessingTime();
}
}
WindowAssigner的抽象方法中,最重要的就是assignWindows()方法,也就是按照WindowAssigner子类不同的分配规则,将StreamRecord分配给所属的Window。
以滑动窗口SlidingEventTimeWindows 为例,窗口大小为10s,滑动步长为5s
input
.keyBy(<key selector>)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
窗口大小、滑动步长、offset(如果有的话)都会被转换成毫秒值
public static SlidingEventTimeWindows of(Time size, Time slide) {
// 窗口大小、滑动步长,最终都得转换成毫秒
return new SlidingEventTimeWindows(size.toMilliseconds(), slide.toMilliseconds(), 0);
}
当StreamRecord开始分配后,会根据StreamRecord中的timestamp来划分StreamRecord的所属Window。假设现在有一条StreamRecord,它的timestamp为14s,那么经过TimeWindow的统一计算,这个StreamRecord的“最近”的Window的StartTime就是10,经过for循环判断(for循环就是为了筛选当前StreamRecord属于哪几个Window),这条timestamp为14s的StreamRecord属于10-20和5-15这两个Window,于是这2个新建的TimeWindow就会被add到List集合中。
@Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
// 判断StreamRecord中的timestamp是否有效
if (timestamp > Long.MIN_VALUE) {
// 所属窗口的数量 = 窗口长度 / 滑动时间
// 先固定List的长度
List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
// 确定timestamp对应Window的StartTime
long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);
// 从lastStart开始遍历,每次移动一个slide的“距离”
// 目的就是将当前StreamRecord所属的Window,全都搞出来
for (long start = lastStart;
start > timestamp - size;
start -= slide) {
// 向List集合中添加新建的TimeWindow,这个Window的开始时间为start,结束时间为start+size
windows.add(new TimeWindow(start, start + size));
}
return windows;
} else {
throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
"Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
"'DataStream.assignTimestampsAndWatermarks(...)'?");
}
}
在WindowOperator中,每来一条StreamRecord都会调用一次WindowAssigner#assignWindows()方法,得到StreamRecord所属的Window的List集合。然后遍历这个List集合,将StreamRecord所属的每一个Window保存到InternalAppendingState中(InternalAppendingState是专门用来保存Window数据的)。
最后根据触发器Trigger的判断,决定将Window中的数据是交给自定义函数xxxFunction的process回调函数,还是clear抛弃