• Flink之数据擦除及自定义Evictor


    1 窗口数据移除机制

    Flink中窗口数据移除机制是通过Evictor来控制的, Flink内置的Evictor如下:
    • DeltaEvictor
    • TimeEvictor
    • CountEvictor
    Evictor的作用就是在窗口触发前或窗口触发中将其中的某些数据进行移除.
    1.1 源码解析

    关于Evictor的源码只需要关注三个方法就可以了evictBefore,evictAfter,evict.

    源码内容如下:

    public class TimeEvictor<W extends Window> implements Evictor<Object, W> {
        // ...
    
        // doEvictAfter默认为false
        // 窗口计算前擦除数据
        @Override
        public void evictBefore(
                Iterable<TimestampedValue<Object>> elements, int size, W window, EvictorContext ctx) {
            if (!doEvictAfter) {
                // 执行数据擦除方法
                evict(elements, size, ctx);
            }
        }
    
        // 窗口计算后擦除数据
        @Override
        public void evictAfter(
                Iterable<TimestampedValue<Object>> elements, int size, W window, EvictorContext ctx) {
            if (doEvictAfter) {
                // 执行数据擦除方法
                evict(elements, size, ctx);
            }
        }
    
        // 擦除数据逻辑
        private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) {
            if (!hasTimestamp(elements)) {
                return;
            }
    
            long currentTime = getMaxTimestamp(elements);
            long evictCutoff = currentTime - windowSize;
    
            // 遍历迭代器中的数据
            for (Iterator<TimestampedValue<Object>> iterator = elements.iterator();
                    iterator.hasNext(); ) {
                TimestampedValue<Object> record = iterator.next();
                // 判断本条数据的事件时间是否 <= 移除截止点
                if (record.getTimestamp() <= evictCutoff) {
                    iterator.remove(); // 移除数据
                }
            }
        }
    
        // ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46

    注释中对于evictBefore,evictAfter,evict这三个方法都进行了解释,这里就不细说了.

    1.2 代码实现
    • 自定义Evictor

      public class CustomEvictor extends TimeEvictor<TimeWindow> {
          private long size;
          private boolean isAfter;
      
          public CustomEvictor(long windowSize) {
              super(windowSize);
              this.size = windowSize;
          }
      
          public CustomEvictor(long windowSize, boolean doEvictAfter) {
              super(windowSize, doEvictAfter);
              this.size = windowSize;
              this.isAfter = doEvictAfter;
          }
      
          @Override
          public void evictBefore(
                  Iterable<TimestampedValue<Object>> elements, int size, TimeWindow window, EvictorContext ctx) {
              if (!isAfter) {
                  evict(elements, size, ctx);
              }
          }
      
          @Override
          public void evictAfter(
                  Iterable<TimestampedValue<Object>> elements, int size, TimeWindow window, EvictorContext ctx) {
              if (isAfter) {
                  evict(elements, size, ctx);
              }
          }
      
          /**
           * @Param elements
           * @Param size
           * @Param ctx
           * @return void
           * @Description TODO 真正的处理逻辑
          **/
          private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) {
              if (!hasTimestamp(elements)) {
                  return;
              }
      
              // 获取当前最大事件时间
              long currentTime = getMaxTimestamp(elements);
              // 移除截止点
              long evictCutoff = currentTime - this.size;
      
              for (Iterator<TimestampedValue<Object>> iterator = elements.iterator();
                   iterator.hasNext(); ) {
                  TimestampedValue<Object> record = iterator.next();
      
                  // 将数据强转,便于获取ID进行判断
                  UserEvent2 userEvent2 = (UserEvent2) record.getValue();
      
                  // 判断本条数据的事件时间是否 <= 移除截止点
                  // 这里增加一个判断逻辑,当数据中的ID为1001时,将数据移除
                  if (record.getTimestamp() <= evictCutoff || userEvent2.getUId().equals("1001")) {
                      iterator.remove();
                  }
              }
          }
      
      
          /**
           * @Param elements
           * @return boolean
           * @Description TODO 这个也是TimeEvictor中的方法,直接复制过来即可
          **/
          private boolean hasTimestamp(Iterable<TimestampedValue<Object>> elements) {
              Iterator<TimestampedValue<Object>> it = elements.iterator();
              if (it.hasNext()) {
                  return it.next().hasTimestamp();
              }
              return false;
          }
      
      
          /**
           * @Param elements
           * @return long
           * @Description TODO TODO 这个也是TimeEvictor中的方法,直接复制过来即可
          **/
          private long getMaxTimestamp(Iterable<TimestampedValue<Object>> elements) {
              long currentTime = Long.MIN_VALUE;
              for (Iterator<TimestampedValue<Object>> iterator = elements.iterator();
                   iterator.hasNext(); ) {
                  TimestampedValue<Object> record = iterator.next();
                  currentTime = Math.max(currentTime, record.getTimestamp());
              }
              return currentTime;
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55
      • 56
      • 57
      • 58
      • 59
      • 60
      • 61
      • 62
      • 63
      • 64
      • 65
      • 66
      • 67
      • 68
      • 69
      • 70
      • 71
      • 72
      • 73
      • 74
      • 75
      • 76
      • 77
      • 78
      • 79
      • 80
      • 81
      • 82
      • 83
      • 84
      • 85
      • 86
      • 87
      • 88
      • 89
      • 90
      • 91
      • 92
      • 93
    • 业务代码

      public class FlinkWindowDataDelay {
          public static void main(String[] args) throws Exception {
              // ...
              SingleOutputStreamOperator<UserEvent2> windowedStream = keyedStream
                      .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))) // 滑动窗口,窗口大小10s, 滑动步长5s
                      .evictor(new CustomEvictor(10 * 1000, false)) // 添加自定义Evictor
                      .max("time");// 获取用户行为发生事件最大的这条数据
      
              // ...
              env.execute();
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12

      具体数据擦除的逻辑根据实际业务规则而定.

  • 相关阅读:
    前端开发面经1
    Jackson中处理双向关系的最佳方法
    docker-compose + elasticsearch7.6(配置密码及证书) + kibana7.6 + elasticsearch-head搭建集群
    Docker中MySql容器的数据挂载
    60行自己动手写LockSupport是什么体验?
    【算法刷题 | 动态规划14】6.28(最大子数组和、判断子序列、不同的子序列)
    【mycat】常用分片规则
    Selenium自动化测试框架工作原理你明白了吗?
    Reactor And Gev 详解 通俗易懂
    hadoop3.x学习(一)--安装与环境配置
  • 原文地址:https://blog.csdn.net/AnameJL/article/details/134416913