• WindowAssigner设计


    WindowAssigner作为Window数据元素的分配器,可以根据指定的窗口类型,将指定的数据元素分配到指定的窗口中。WindowAssigner是抽象父类,它的实现子类有很多,如:基于EventTime的SlidingEventTimeWindows 、TumblingEventTimeWindows等。

    抽象父类WindowAssigner提供了以下抽象方法,子类可以按需选择、按需实现:

    @PublicEvolving
    public abstract class WindowAssigner<T, W extends Window> implements Serializable {
        private static final long serialVersionUID = 1L;
    
        // 定义了将StreamRecord分配到对应Window的逻辑
        public abstract Collection<W> assignWindows(T element, long timestamp, WindowAssignerContext context);
    
        // 获取默认的窗口触发器
        public abstract Trigger<T, W> getDefaultTrigger(StreamExecutionEnvironment env);
    
        // 获取TypeSerializer:用来序列化“被WindowAssigner分配”的窗口
        public abstract TypeSerializer<W> getWindowSerializer(ExecutionConfig executionConfig);
    
        // 是否是基于EventTime实现的Window
        public abstract boolean isEventTime();
    
        // 用来获取当前的ProcessingTime
        public abstract static class WindowAssignerContext {
    
            public abstract long getCurrentProcessingTime();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    WindowAssigner的抽象方法中,最重要的就是assignWindows()方法,也就是按照WindowAssigner子类不同的分配规则,将StreamRecord分配给所属的Window。

    以滑动窗口SlidingEventTimeWindows 为例,窗口大小为10s,滑动步长为5s

    input
        .keyBy(<key selector>)
        .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    
    • 1
    • 2
    • 3

    窗口大小、滑动步长、offset(如果有的话)都会被转换成毫秒值

    public static SlidingEventTimeWindows of(Time size, Time slide) {
        // 窗口大小、滑动步长,最终都得转换成毫秒
        return new SlidingEventTimeWindows(size.toMilliseconds(), slide.toMilliseconds(), 0);
    }
    
    • 1
    • 2
    • 3
    • 4

    当StreamRecord开始分配后,会根据StreamRecord中的timestamp来划分StreamRecord的所属Window。假设现在有一条StreamRecord,它的timestamp为14s,那么经过TimeWindow的统一计算,这个StreamRecord的“最近”的Window的StartTime就是10,经过for循环判断(for循环就是为了筛选当前StreamRecord属于哪几个Window),这条timestamp为14s的StreamRecord属于10-20和5-15这两个Window,于是这2个新建的TimeWindow就会被add到List集合中。

    @Override
    public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
        // 判断StreamRecord中的timestamp是否有效
        if (timestamp > Long.MIN_VALUE) {
            // 所属窗口的数量 = 窗口长度 / 滑动时间
            // 先固定List的长度
            List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
            // 确定timestamp对应Window的StartTime
            long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);
            // 从lastStart开始遍历,每次移动一个slide的“距离”
            // 目的就是将当前StreamRecord所属的Window,全都搞出来
            for (long start = lastStart;
                 start > timestamp - size;
                 start -= slide) {
                // 向List集合中添加新建的TimeWindow,这个Window的开始时间为start,结束时间为start+size
                windows.add(new TimeWindow(start, start + size));
            }
            return windows;
        } else {
            throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
                                       "Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
                                       "'DataStream.assignTimestampsAndWatermarks(...)'?");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    在WindowOperator中,每来一条StreamRecord都会调用一次WindowAssigner#assignWindows()方法,得到StreamRecord所属的Window的List集合。然后遍历这个List集合,将StreamRecord所属的每一个Window保存到InternalAppendingState中(InternalAppendingState是专门用来保存Window数据的)。

    最后根据触发器Trigger的判断,决定将Window中的数据是交给自定义函数xxxFunction的process回调函数,还是clear抛弃

  • 相关阅读:
    PRML 概率分布
    axios 源码简析
    深入理解函数式编程(下)
    百度地图发布2022国庆出行预测
    Springboot整合Prometheus-自定义指标
    SpringBoot日志文件
    【UV打印机】波形开发-喷头工作原理(一)
    5.代理设计模式
    buu web部分wp
    设计模式(2) - 创建型模式
  • 原文地址:https://blog.csdn.net/qq_36299025/article/details/127610325