• flink篇——Time和watermark机制


    常见数据乱序的处理方式

    1、watermark: 可以设置小一点hold住大部分情况,提供近似正确的结果
    2、.allowedLateness(Time.minutes(1)) //允许处理迟到数据1分钟
    3、.sideOutputLateData(new OutputTag(String, Double, Long)) //侧输出流,先输出到一个旁路,打上标签,保证数据不会丢
    4、数据不重要,那就不要了

    Time种类

    Event Time:事件产生的时间,它通常由事件中的时间戳描述

    Ingestion time:事件进入Flink程序的时间

    Processing Time:事件被处理时当前系统的时间

    设置 Time: environment.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

    watermark种类和设定

    概念

    WaterMark(水位线)主要用来处理乱序事件,而正确地处理乱序事件,通常用WaterMark机制结合窗口来实现。

    原理

    当任何 Event 进入到 Flink 系统时,会根据当前最大事件时间产生 Watermarks 时间戳。
    WaterMark 的值 = 进入 Flink 的最大的事件产生时间(maxEventTime)— 指定的乱序时间(t)

    触发条件

    1. watermark >= window的结束时间
    2. 该窗口必须有数据 注意:[window_start_time,window_end_time) 中有数据存在,前闭后开区间

    三种情况

    顺序数据流中的watermark

    mapStream.assignAscendingTimestamps,最大延迟时间为0,watermark=maxtime

    乱序数据流中的watermark

    周期性

    周期性地生成 Watermark 的生成,默认是 200ms。每隔 N 毫秒自动向流里注入一个 Watermark,时间间隔由 streamEnv.getConfig.setAutoWatermarkInterval()决定。
    使用AssignerWithPeriodicWatermarks

    间断性

    间断性的生成 Watermark ,一般是基于某些事件触发 Watermark 的生成和发送。
    比如说只给用户id为000001的添加watermark,其他的用户就不添加。
    使用AssignerWithPunctuatedWatermarks

    并行数据流中的 Watermark

    当有多个watermark同时到达下游算子的时候,flink会选择较小的watermark进行更新。

    demo

    Event Time的使用一定要指定数据源中的时间戳
    调用assignTimestampAndWatermarks方法,传入一个BoundedOutOfOrdernessTimestampExtractor,就可以指定watermark

    
    //先转换成样例类类型
        val dataStream = inputStream
          .map(data => {
            val arr = data.split(",") //按照,分割数据,获取结果
            SensorReadingTest5(arr(0), arr(1).toLong, arr(2).toDouble) //生成一个传感器类的数据,参数中传toLong和toDouble是因为默认分割后是字符串类别
          })
    //      .assignAscendingTimestamps(_.timestamp ) //这种是当时间肯定是按照时间排序的,没有乱序的情况,升序提取时间戳(如果数据中timestamp为秒,可以*1000L转为毫秒)
          .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReadingTest5](Time.seconds(3)) {  // 指定乱序最大等3
            override def extractTimestamp(t: SensorReadingTest5): Long = t.timestamp * 1000L //指定watermark的字段
          })
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    java开发案例:

            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    
            env.getConfig().setAutoWatermarkInterval(100);
    
    
     env
                    .addSource(cdcSource)
                    .map(new MapFunction<JSO
    ………………………………
    ………………………………
         .assignTimestampsAndWatermarks(WatermarkStrategy.<CapacitorBoxUnbalanceRule>forMonotonousTimestamps()
                            .withTimestampAssigner(new SerializableTimestampAssigner<CapacitorBoxUnbalanceRule>() {
                                @Override
                                public long extractTimestamp(CapacitorBoxUnbalanceRule element, long recordTimestamp) {
                                    return Long.MAX_VALUE;
                                }
                            }));
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    allowedLateness

    设置了allowedLateness( 延迟time )后,此时该窗口可能会触发多次计算
    第一次触发条件:watermark >= window_end_time 并且该窗口需要有数据
    其他多次触发条件:watermark < window_end_time + 延迟time 并且该窗口需要有新数据进入

    sideOutputLateData

    对于超过允许迟到时间的数据,通过单独的数据流全部收集起来,后续再处理

    .keyBy(0)
                    .timeWindow(Time.seconds(3))
                    .allowedLateness(Time.seconds(2)) //允许事件迟到2秒
                    .sideOutputLateData(outputTag)    //收集迟到太多的数据
                    .process(new SumProcessWindowFunction())
                    .print().setParallelism(1);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    问题

    (1)每次进行Checkpoint前,都需要暂停处理新流入数据,然后开始执行快照,假如状态比较大,一次快照可能长达几秒甚至几分钟。

    优化方案:Flink提供了异步快照(Asynchronous Snapshot)的机制。当实际执行快照时,Flink可以立即向下广播Checkpoint Barrier,表示自己已经执行完自己部分的快照。一旦数据同步完成,再给Checkpoint Coordinator发送确认信息。

    (2)Checkpoint Barrier对齐时,必须等待所有上游通道都处理完,假如某个上游通道处理很慢,这可能造成整个数据流堵塞。
    优化方案:Flink允许跳过对齐这一步,或者说一个算子子任务不需要等待所有上游通道的Checkpoint Barrier,直接将Checkpoint Barrier广播,执行快照并继续处理后续流入的数据。为了保证数据一致性,Flink必须将那些较慢的数据流中的元素也一起快照,一旦重启,这些元素会被重新处理一遍。

    相关文章

    https://zhuanlan.zhihu.com/p/158951593

  • 相关阅读:
    45、Collections工具类
    CentOS安装指定版本的Docker(包括卸载)
    MySQL知识【可视化软件navicat安装&使用】第五章
    从0搭建Vue3组件库(三): 组件库的环境配置
    Mybatis-Plus(核心功能篇 ==>主键策略
    同创永益CNBR平台——云原生时代下的系统稳定器
    SpringBoot使用Pio-tl动态填写合同(文档)
    Golang interface 接口详细原理和使用技巧
    读取PDF中指定数据写入EXCEL文件
    Spring Boot 之Thymeleaf的爆红用注解【<!--/*@thymesVar id=“data“ type=“ch“*/-->】解决
  • 原文地址:https://blog.csdn.net/MyNameIsWangYi/article/details/126460605