• Flink之Watermark策略代码模板


    方式作用
    WatermarkStrategy.noWatermarks()不生成watermark
    WatermarkStrategy.forMonotonousTimestamps()紧跟最大事件时间watermark生成策略
    WatermarkStrategy.forBoundedOutOfOrderness()允许乱序watermark生成策略
    WatermarkStrategy.forGenerator()自定义watermark生成策略
    • noWatermarks
      public class FlinkWaterMark throws Exception {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // 获取数据源
            DataStreamSource<String> socketSource = env.socketTextStream("localhost", 8888);
            // 构造watermark生成策略,选择不生成watermark
            WatermarkStrategy<UserEvent2> watermark = WatermarkStrategy.noWatermarks();
            // 将构造完成的watermark分配给数据流
            SingleOutputStreamOperator<UserEvent2> source = socketSource.assignTimestampsAndWatermarks(watermark);
            // ...
            env.execute();
        }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      关于noWaterMarks()的使用没有太多内容.
    • forMonotonousTimestamps
      public class FlinkWaterMark throws Exception {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // 获取数据源
            DataStreamSource<String> socketSource = env.socketTextStream("localhost", 8888);
            // 构造watermark, 使用紧跟最大事件时间策略
            WatermarkStrategy<String> watermark = WatermarkStrategy.<String>forMonotonousTimestamps()
                    // 抽取时间时间, 根据数据中实际情况选择
                    .withTimestampAssigner(new SerializableTimestampAssigner<String>() {
                        @Override
                        public long extractTimestamp(String element, long recordTimestamp) {
                            /**
                             * 这里是样例代码,实际情况根据具体业务具体数据特性抽取对应的时间
                             **/
                            String time = element.split(",")[0];
                            long timestamp = Long.parseLong(time);
                            return timestamp;
                        }
                    });
            // 将构造完成的watermark分配给数据流
            SingleOutputStreamOperator<UserEvent2> source = socketSource.assignTimestampsAndWatermarks(watermark);
            // ...
            env.execute();
        }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      对于forMonotonousTimestamps()可说内容并不多,如果选择了forMonotonousTimestamps这种方式就必须保证事件时间严格有序,如果出现乱序的情况可能存在大量数据丢失的问题.
      通过源码内容可以看到forMonotonousTimestamps底层也是使用的forBoundedOutOfOrderness方式,只不过将容错时间设置为了0,源码如下:
      // 首先看这里,继承的BoundedOutOfOrdernessWatermarks
      public class AscendingTimestampsWatermarks<T> extends BoundedOutOfOrdernessWatermarks<T> {
      
        /** Creates a new watermark generator with for ascending timestamps. */
        public AscendingTimestampsWatermarks() {
            super(Duration.ofMillis(0)); // 这里将容错时间设置为了0
        }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
    • forBoundedOutOfOrderness
      public class FlinkWaterMark throws Exception {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // 获取数据源
            DataStreamSource<String> socketSource = env.socketTextStream("localhost", 8888);
            // 构造watermark, 使用允许水位线乱序策略,并设置最大容错时间为2s
            WatermarkStrategy<String> watermark = WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMillis(2000))
                    // 抽取时间时间, 根据数据中实际情况选择
                    .withTimestampAssigner(new SerializableTimestampAssigner<String>() {
                        @Override
                        public long extractTimestamp(String element, long recordTimestamp) {
                            /**
                             * 这里是样例代码,实际情况根据具体业务具体数据特性抽取对应的时间
                             **/
                            String time = element.split(",")[0];
                            long timestamp = Long.parseLong(time);
                            return timestamp;
                        }
                    });
            // 将构造完成的watermark分配给数据流
            SingleOutputStreamOperator<UserEvent2> source = socketSource.assignTimestampsAndWatermarks(watermark);
            // ...
            env.execute();
        }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      对于允许乱序策略前面文章有介绍过其原理,比如代码中设置容错时间为2S,那么前后的数据差最大只能是2S,如果差值大于2S,后来的这条数据就会被抛弃.
  • 相关阅读:
    VS Code关闭受限模式,关闭信任工作区
    Node.js 入门教程 22 将所有 Node.js 依赖包更新到最新版本
    leetcode困难之1127. 用户购买平台
    【Linux】Shell及Linux权限
    sqlite基本操作
    java spring cloud 企业工程管理系统源码+二次开发+定制化服务
    【从零学习python 】85.Python进程池的并行计算技术应用
    【11.3】【VP】Codeforces Round #724 (Div. 2)
    二、【react-redux】react-redux优化
    【Linux内核代码分析1】Linux时间子系统及HRTIMER实现
  • 原文地址:https://blog.csdn.net/AnameJL/article/details/133645484