• Flink水位线-详细说明


    💎💎💎💎💎

    更多资源链接,欢迎访问作者gitee仓库:https://gitee.com/fanggaolei/learning-notes-warehouse/tree/master

    时间语义

    在理解水位线概念之前我们应该先了解时间语义的内容

    Flink 中的时间语义?

    image-20221117163355711

    1.处理时间(Processing Time)

    处理时间的概念非常简单,就是指执行处理操作的机器的系统时间。

    2.事件时间(Event Time)

    事件时间,是指每个事件在对应的设备上发生的时间,也就是数据生成的时间。

    哪种时间语义更重要?

      实际应用中,数据产生的时间处理的时间可能是完全不同的。很长时间收集起来的数据,处理或许只要一瞬间;也有可能数据量过大、处理能力不足,短时间堆了大量数据处理不完,产生“背压”(back pressure)。

       通常来说,处理时间是我们计算效率的衡量标准,而事件时间会更符合我们的业务计算逻辑。所以更多时候我们使用事件时间;不过处理时间也不是一无是处。对于处理时间而言,由于没有任何附加考虑,数据一来就直接处理,因此这种方式可以让我们的流处理延迟降到最低,效率达到最高。

    1. 水位线(Watermark

    image-20221117165859376

    1.1 什么是水位线?

       在Flink中,水位线是一种衡量Event Time进展的机制,用来处理实时数据中的乱序问题的,通常是水位线和窗口结合使用来实现。 从设备生成实时流事件,到Flink的source,再到多个oparator处理数据,过程中会受到网络延迟、背压等多种因素影响造成数据乱序。

    ​    具体实现上,水位线可以看作一条特殊的数据记录,它是插入到数据流中的一个标记点,主要内容就是一个时间戳,用来指示当前的事件时间。 而它插入流中的位置,就应该是在某个数据到来之后;这样就可以从这个数据中提取时间戳,作为当前水位线的时间戳了。

    image-20221119172614899

    1.有序流中的水位线

    image-20221118170851128

    2.乱序流中的水位线

      这里所说的“乱序”(out-of-order),是指数据的先后顺序不一致,主要就是基于数据的产生时间而言的。

    image-20221118171008781

      我们插入新的水位线时,要先判断一下时间戳是否比之前的大,否则就不再生成新的水位线

    image-20221118171221753

      如果考虑到大量数据同时到来的处理效率,我们同样可以周期性地生成水位线。这时只需要保存一下之前所有数据中的最大时间戳,需要插入水位线时,就直接以它作为时间戳生成新的水位线

    image-20221118171234685

      为了让窗口能够正确收集到迟到的数据,我们也可以等上 2 秒;也就是用当前已有数据的最大时间戳减去 2 秒,就是要插入的水位线的时间戳

    image-20221118171246223

    3.水位线的特性

    ⚫ 水位线是插入到数据流中的一个标记,可以认为是一个特殊的数据

    ⚫ 水位线主要的内容是一个时间戳,用来表示当前事件时间的进展

    ⚫ 水位线是基于数据的时间戳生成的

    ⚫ 水位线的时间戳必须单调递增,以确保任务的事件时间时钟一直向前推进

    ⚫ 水位线可以通过设置延迟,来保证正确处理乱序数据

    ⚫ 一个水位线 Watermark(t),表示在当前流中事件时间已经达到了时间戳 t, 这代表 t 之前的所有数据都到齐了,之后流中不会出现时间戳 t’ ≤ t 的数据

    1.2 如何生成水位线?

    1.生成水位线的总体原则

      我们知道,完美的水位线是“绝对正确”的,也就是一个水位线一旦出现,就表示这个时间之前的数据已经全部到齐、之后再也不会出现了。而完美的东西总是可望不可即,==我们只能尽量去保证水位线的正确。如果对结果正确性要求很高、想要让窗口收集到所有数据,==我们该怎么做呢?

      一个字,等。由于网络传输的延迟不确定,为了获取所有迟到数据,我们只能等待更长的时间。作为筹划全局的程序员,我们当然不会傻傻地一直等下去。那到底等多久呢?这就需要对相关领域有一定的了解了。比如,如果我们知道当前业务中事件的迟到时间不会超过 5 秒,那就可以将水位线的时间戳设为当前已有数据的最大时间戳减去 5 秒,相当于设置了 5 秒的延迟等待。

    2.水位线生成策略(Watermark Strategies)

    import com.fang.chapter05.Event;
    import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
    import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    import java.time.Duration;
    
    public class WatermarkTest {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            env.getConfig().setAutoWatermarkInterval(100);
    
            //从元素中读取数据
             SingleOutputStreamOperator<Event> stream = env.fromElements(
                    new Event("Marry", "./home", 1000L),
                    new Event("Bob", "./home", 1100L),
                    new Event("Marry", "./home", 1000L),
                    new Event("Bob", "./prod?id=1", 1000L),
                    new Event("Bob", "./home", 3500L),
                    new Event("Bob", "./prod?id=2", 3200L)
                    //有序流的watermark生成
    //        ).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps()
    //        .withTimestampAssigner(new SerializableTimestampAssigner() {
    //            @Override
    //            public long extractTimestamp(Event element, long recordTimestamp) {
    //                return element.timestamp;
    //            }
    //        })  //提取时间戳,生成水位线
            ).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                    .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                        @Override
                        public long extractTimestamp(Event element, long recordTimestamp) {
                            return element.timestamp;
                        }
                    })
            );
            env.execute();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    它主要用来为流中的数据分配时间戳,并生成水位线来指示事件时间

    .assignTimestampsAndWatermarks()
    
    • 1

      .assignTimestampsAndWatermarks()方法需要传入一个 WatermarkStrategy 作为参数,这就
    是 所 谓 的 “ 水 位 线 生 成 策 略 ”

      WatermarkStrategy 中 包 含 了 一 个 “ 时 间 戳 分 配器”TimestampAssigner 和一个“水位线生成器”WatermarkGenerator

    3.Flink 内置水位线生成器

    (1)有序流

    stream.assignTimestampsAndWatermarks(
     WatermarkStrategy.<Event>forMonotonousTimestamps()
     .withTimestampAssigner(new SerializableTimestampAssigner<Event>() 
    {
     		@Override
    		public long extractTimestamp(Event element, long recordTimestamp) 
             {
     			return element.timestamp;
     		   }
     		 })
    );
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    (2)乱序流

      由于乱序流中需要等待迟到数据到齐,所以必须设置一个固定量的延迟时间(Fixed Amount of Lateness)

    .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                    .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                        @Override
                        public long extractTimestamp(Event element, long recordTimestamp) {
                            return element.timestamp;
                        }
                    })
            );
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    1.3 水位线的传递

      水位线定义的本质了:它表示的是“当前时间之前的数据,都已经到齐了”。这是一种保证,告诉下游任务“只要你接到这个水位线,就代表之后我不会再给你发更早的数据了,你可以放心做统计计算而不会遗漏数据”。所以如果一个任务收到了来自上游并行任务的不同的水位线,说明上游各个分区处理得有快有慢,进度各不相同比如上游有两个并行子任务都发来了水位线,一个是 5 秒,一个是 7 秒;这代表第一个并行任务已经处理完 5 秒之前的所有数据,而第二个并行任务处理到了 7 秒。那这时自己的时钟怎么确定呢?

      当然也要以“这之前的数据全部到齐”为标准。如果我们以较大的水位线 7 秒作为当前时间,那就表示“7 秒前的数据都已经处理完”,这显然不是事实——第一个上游分区才处理到 5 秒,5~7 秒的数据还会不停地发来;而如果以最小的水位线 5 秒作为当前时钟就不会有这个问题了,因为确实所有上游分区都已经处理完,不会再发 5 秒前的数据了。这让我们想到“木桶原理”:所有的上游并行任务就像围成木桶的一块块木板,它们中最短的那一块,决定了我们桶中的水位。

    image-20221118174258154

    1.4 水位线的计算

      水位线的默认计算公式:水位线 = 观察到的最大事件时间 – 最大延迟时间 – 1 毫秒

  • 相关阅读:
    Docker--查看容器的启动参数(命令)--方法/实例
    C#-反射
    23. 从零用Rust编写正反向代理,流控小姐姐的温柔一刀!
    6、Linux-服务管理、权限管理和授权(sudo权限)
    JSP内置对象out对象的功能简介说明
    Windows历史版本下载
    零数科技受邀加入中国信通院隐私计算联盟
    c语言上机作业:迭代法求平方根
    计算机毕业设计springboot+vue+elementUI校园疫情防控系统
    【Azure Batch】在批处理的Task中如何让它执行多个CMD指令呢
  • 原文地址:https://blog.csdn.net/m0_58022371/article/details/127939514