• Flink之窗口触发机制及自定义Trigger的使用


    1 窗口触发机制

    窗口计算的触发机制都是由Trigger类决定的,Flink中为各类内置的WindowsAssigner都设计了对应的默认Trigger. 层次结构如下:
    • Trigger
    • ProcessingTimeoutTrigger
    • EventTimeTrigger
    • CountTrigger
    • DeltaTrigger
    • NeverTrigger in GlobalWindows
    • ContinuousEventTimeTrigger
    • PurgingTrigger
    • ContinuousProcessingTimeTrigger
    • ProcessingTimeTrigger
    通常情况下是不需要自己重写Trigger的,使用Flink内置的就可以,除非特殊业务特殊需求.
    1.1 源码解析

    EventTimeTrigger源码说明如何触发窗口计算,在EventTimeTrigger源码中只需要关注onElementonEventTime两个方法即可,源码内容如下:

    @PublicEvolving
    public class EventTimeTrigger extends Trigger<Object, TimeWindow> {
        private static final long serialVersionUID = 1L;
    
        private EventTimeTrigger() {}
    
        // 基于数据驱动的方法
        @Override
        public TriggerResult onElement(
                Object element, long timestamp, TimeWindow window, TriggerContext ctx)
                throws Exception {
            // 判断当前watermark是否大于等于窗口的最大时间
            if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
                // if the watermark is already past the window fire immediately
                // 如果大于等于窗口的最大时间触发计算
                return TriggerResult.FIRE;
            } else {
                // 小于窗口的最大时间首先注册定时器
                ctx.registerEventTimeTimer(window.maxTimestamp());
                // 然后等待数据继续输入,不触发计算
                return TriggerResult.CONTINUE;
            }
        }
    
        // 基于事件时间定时器驱动的方法
        @Override
        public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
            // 根据不断发送来的watermark判断是否触发计算
            return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
        }
        // ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    源码中将不需要的关注的代码都已省略

    • onElement

      注释中写明这个方法是基于数据进行驱动的,也就是说只有数据到达时才会执行这个方法,每一个窗口都有自己的startTimeendTime,也就是窗口的范围,判断条件中window.maxTimestamp()获取的就是当前窗口的endTime,如果当前watermark超出当前窗口的endTime就会触发这个窗口计算,TriggerResult.FIRE表示的就是窗口开始计算,如果当前watermark小于endTime就不会触发窗口计算这个窗口会继续等待数据输入,也就是TriggerResult.CONTINUE方法.

    • onEventTime

      onElement是由数据驱动的,但是Flink的实际数据处理过程是存在没有数据发送到当前窗口,但是会有watermark源源不断的发送到当前窗口的情况,在多并行度的执行条件下就会发生这种情况.在onEventTime方法中如果上游发送过来的watermark等于当前窗口的endTime就会执行TriggerResult.FIRE否则还是执行TriggerResult.CONTINUE.

    Trigger的触发机制就是这样,其他的CountTrigger等大致逻辑基本是一样的,了解清楚源码中这两个方法的作用很容易理解.

    1.2 代码实现

    通常Flink内置的Trigger都可以满足数据处理需求,往往在实际开发中可能会存在特殊的业务需求,这时用户可以自定义Trigger,以达到控制窗口触发计算的规则. 可以仿照EventTimeTrigger来构建一个自定义Trigger,只需要将其中的部分代码简单进行修改,并在onElement方法中添加自定的触发逻辑即可.
    • 自定义Trigger

      /**
       * 这里首先需要继承Trigger类,并将中的Object修改成自己需要的数据类型,这段代码中需要根据UserEvent2中的数据
       * 来控制触发窗口计算的条件,所以将Object修改成UserEvent2
       **/ 
      public class CustomTrigger extends Trigger<UserEvent2, TimeWindow> {
          public CustomTrigger() {}
      
          // 通过修改onElement方法中窗口计算的触发逻辑实现自定义方式
          @Override
          public TriggerResult onElement(
                  // 这里也要将原有的Object类型修改成上面的UserEvent2
                  UserEvent2 element, long timestamp, TimeWindow window, TriggerContext ctx)
                  throws Exception {
              // 原有的判断逻辑不动,这个是为了便捷,判断逻辑可以根据实际需求进行修改,或者如同下面中添加一个新的触发逻辑
              if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
                  return TriggerResult.FIRE;
              // 这里增加一个判断逻辑,当用户行为时间为2700的时候也触发计算
              } else if (element.getTime().equals("2700")) {
                  return TriggerResult.FIRE;
              // 原有的判断逻辑不动
              } else {
                  ctx.registerEventTimeTimer(window.maxTimestamp());
                  return TriggerResult.CONTINUE;
              }
          }
      
          @Override
          public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
              return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
          }
      
          @Override
          public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx)
                  throws Exception {
              return TriggerResult.CONTINUE;
          }
      
          @Override
          public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
              ctx.deleteEventTimeTimer(window.maxTimestamp());
          }
      
          @Override
          public boolean canMerge() {
              return true;
          }
      
          @Override
          public void onMerge(TimeWindow window, OnMergeContext ctx) {
              long windowMaxTimestamp = window.maxTimestamp();
              if (windowMaxTimestamp > ctx.getCurrentWatermark()) {
                  ctx.registerEventTimeTimer(windowMaxTimestamp);
              }
          }
      
          // 将toString中俄返回值根据用户的需要进行修改
          @Override
          public String toString() {
              return "CustomTrigger()";
          }
      
          // 将返回值更改成创建的自定义Trigger类
          public static CustomTrigger create() {
              return new CustomTrigger();
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55
      • 56
      • 57
      • 58
      • 59
      • 60
      • 61
      • 62
      • 63
      • 64
      • 65
      • 66
    • 业务代码

      // ...
      SingleOutputStreamOperator<UserEvent2> windowedStream = keyedStream
              .window(TumblingEventTimeWindows.of(Time.seconds(10))) // 设置滚动窗口大小为10s
              .trigger(new CustomTrigger()) // 传入自定义的Trigger类
              .allowedLateness(Time.seconds(2)) // 允许迟到数据迟到时间2s,同watermark中的forBoundedOutOfOrderness功能类似
              .sideOutputLateData(lateData) // 将迟到数据进行测流输出
              .max("time");// 获取用户行为发生事件最大的这条数据
      // ...
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8

      上面这段业务代码中设置的滚动窗口的大小为10s,正常来说只有满足end - start = 10000的时候才会触发窗口计算,但是在自定义Trigger中指定了当数据中时间为2700的时候也触发窗口计算,在时间为2700的数据没到达时候还会按照原有的逻辑触发窗口计算,但是只要2700的数据到达,不管时候达到TumblingEventTimeWindows.of(Time.seconds(10))这个条件,都会触发窗口计算.

  • 相关阅读:
    小文一篇,说说:where、:has和:is的特殊性吧
    Yii2 创建定时任务
    什么是浏览器的同源策略(same-origin policy)?它对AJAX有什么影响?
    【操作系统】自旋锁实现&&自旋锁原理(亲测可用)
    Kotlin基本语法
    代码配置仓库GitLab安装部署
    ubuntu16 虚拟机单盘扩容
    云原生实战课大纲
    MySQL并发事务访问相同记录
    【Gitee】生成与配置SSH公钥
  • 原文地址:https://blog.csdn.net/AnameJL/article/details/133857635