• 【Flink】处理迟到元素(续)、自定义水位线和多流的合并与合流


    一 处理迟到元素

    1 处理策略

    (3)使用迟到元素更新窗口计算结果

    由于存在迟到的元素,所以已经计算出的窗口结果是不准确和不完全的。我们可以使用迟到元素更新已经计算完的窗口结果。

    如果我们要求一个 operator 支持重新计算和更新已经发出的结果,就需要在第一次发出结果以后也要保存之前所有的状态。但显然我们不能一直保存所有的状态,肯定会在某一个时间点将状态清空,而一旦状态被清空,结果就再也不能重新计算或者更新了。而迟到的元素只能被抛弃或者发送到旁路输出流。

    window operator API 提供了方法来明确声明我们要等待迟到元素。当使用 event-timewindow,我们可以指定一个时间段叫做 allowed lateness(允许等待时间)。window operator 如果设置了allowed lateness,这个 window operator 在水位线没过窗口结束时间时也将不会删除窗口和窗口中的状态。窗口会在一段时间内 (allowed lateness 设置的) 保留所有的元素。

    当迟到元素在 allowed lateness 时间内到达时,这个迟到元素会被实时处理并发送到触发器 (trigger)。当水位线超过了窗口结束时间 + allowed lateness 时间时,窗口会被删除,并且所有后来的迟到的元素都会被丢弃。

    使用迟到数据更新窗口的计算结果:

    a 代码编写
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
    
        SingleOutputStreamOperator<String> result = env
                .socketTextStream("localhost", 9999)
                .map(new MapFunction<String, Tuple2<String, Long>>() {
                    @Override
                    public Tuple2<String, Long> map(String value) throws Exception {
                        String[] arr = value.split(" ");
                        return Tuple2.of(arr[0], Long.parseLong(arr[1]) * 1000L);
                    }
                })
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                                .withTimestampAssigner(
                                        new SerializableTimestampAssigner<Tuple2<String, Long>>() {
                                            @Override
                                            public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
                                                return element.f1;
                                            }
                                        }
                                )
                )
                .keyBy(r -> r.f0)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                // 允许等待时间为5S,5S后窗口被销毁
                .allowedLateness(Time.seconds(5))
                // 等待5S后到来的数据被输出到侧输出流中
                .sideOutputLateData(new OutputTag<Tuple2<String, Long>>("late") {
                })
                .process(new ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow>() {
                    // 在窗口闭合的时候调用
                    @Override
                    public void process(String s, Context context, Iterable<Tuple2<String, Long>> elements, Collector<String> out) throws Exception {
                        // 初始化一个窗口状态变量,注意,窗口状态变量的可见范围是当前窗口
                        // 是否为第一次触发计算
                        ValueState<Boolean> firstCalcaulate =
                                context.windowState().getState(new ValueStateDescriptor<Boolean>("first", Types.BOOLEAN));
                        // 闭合窗口,产生第一次计算
                        if (firstCalcaulate.value() == null) {
                            out.collect("窗口第一次触发了计算!水位线是【" + context.currentWatermark()
                                    + "】窗口中共有【" + elements.spliterator().getExactSizeIfKnown() + "】条元素");
                            // 第一次触发process计算,将状态更新为true
                            firstCalcaulate.update(true);
                        } else {
                            out.collect("迟到数据到了,更新以后窗口中的元素数量是【" + elements.spliterator().getExactSizeIfKnown() + "】");
                        }
                    }
                });
    
        result.print("正常数据:");
    
        result.getSideOutput(new OutputTag<Tuple2<String,Long>>("late"){}).print("迟到数据:");
    
        env.execute();
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    b 测试

    输入数据输出结果如下:

    a 1
    a 2
    a 10	输出:正常数据:> 窗口第一次触发了计算!水位线是【4999】窗口中共有【2】条元素
    	    注意:此时【0-5】窗口不会被销毁,窗口销毁时间 = 5 + 5 - 1ms = 9999ms
    a 1		输出:正常数据:> 迟到数据到了,更新以后窗口中的元素数量是【3】
    a 1		输出:正常数据:> 迟到数据到了,更新以后窗口中的元素数量是【4】
    a 7		注意:属于【5-10】窗口,不会触发【5-10】窗口的第一次计算
    a 8		
    	    注意:输入a 15,关闭【0-5】窗口,且触发【5-10】窗口的计算
    a 15	输出:正常数据:> 窗口第一次触发了计算!水位线是【9999】窗口中共有【2】条元素
    	    注意:再输入【0-5】窗口中的数据,被输出到侧输出流中
    a 1		输出:迟到数据:> (a,1000)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    所以从本质上看,此种处理迟到数据的策略是【上文(2)】的扩展,那么为什么不直接将延迟时间设置为10S呢?因为

    • 有些窗口不存在迟到数据
    • 能够更快的看到计算结果

    所以对于迟到数据的处理方法,要根据实际情况进行权衡。

    二 自定义水位线

    1 产生水位线的接口

    @Public
    public interface WatermarkGenerator<T> {
        /**
        * 每来一个事件都会调用, 允许水位线产生器记忆和检查事件的时间戳。
        * 允许水位线产生器基于事件本身发射水位线。
        */
        void onEvent(T event, long eventTimestamp, WatermarkOutput output);
        /**
        * 周期性的调用(默认 200ms 调用一次), 可能会产生新的水位线,也可能不会。
        *
        * 调用周期通过 ExecutionConfig#getAutoWatermarkInterval() 方法来配置。
        */
        void onPeriodicEmit(WatermarkOutput output);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    2 自定义水位线的产生逻辑

    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
    
        env
                .socketTextStream("localhost",9999)
                .map(new MapFunction<String, Tuple2<String,Long>>() {
                    @Override
                    public Tuple2<String, Long> map(String value) throws Exception {
                        String[] arr = value.split(" ");
                        return Tuple2.of(arr[0],Long.parseLong(arr[1]) * 1000L);
                    }
                })
                .assignTimestampsAndWatermarks(new CustomerWatermarkGenerator())
                .print();
    
        env.execute();
    }
    public static class CustomerWatermarkGenerator implements WatermarkStrategy<Tuple2<String,Long>> {
    
        @Override
        public TimestampAssigner<Tuple2<String, Long>> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
            return new SerializableTimestampAssigner<Tuple2<String, Long>>() {
                @Override
                public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
                    return element.f1;
                }
            };
        }
    
        @Override
        public WatermarkGenerator<Tuple2<String, Long>> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
            return new WatermarkGenerator<Tuple2<String, Long>>() {
                // 延迟时间为5s
                private long bound = 5000L;
                // 防止溢出,最后计算水位线时需要- bound - 1L
                private long maxTs = -Long.MAX_VALUE + bound + 1L;
                // 来一个事件,执行一次
                @Override
                public void onEvent(Tuple2<String, Long> event, long eventTimestamp, WatermarkOutput output) {
                    // 更新观察到的最大事件事件
                    maxTs = Math.max(maxTs,event.f1);
                }
                
                // 周期型计算水位线
                @Override
                public void onPeriodicEmit(WatermarkOutput output) {
                    // 发送水位线,注意水位线的计算公式
                    output.emitWatermark(new Watermark(maxTs - bound - 1L));
                }
            };
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53

    三 多流的合流与分流

    1 union算子

    union算子用于多条流的合并,所有流中的元素类型必须相同。

    多流按照先进先出(FIFO)的原则进行合并。

    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
    
        //DataStreamSource extends SingleOutputStreamOperator
        //SingleOutputStreamOperator extends DataStream
        DataStreamSource<Integer> stream1 = env.fromElements(1, 2);
        DataStreamSource<Integer> stream2 = env.fromElements(3, 4);
        DataStreamSource<Integer> stream3 = env.fromElements(5, 6);
    
    
        DataStream<Integer> result = stream2.union(stream1, stream3);
    
        result.print();
    
        env.execute();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    水位线在单条流进行分流(找最小的时钟)和多条流进行合流(复制,向下广播)的时候,可以从union中看出来,如下:

    当三条流进行合流时,会更新事件时钟算子,选择最小的(详情见2(2)合流的测试);向下传播的时候,会进行广播(详情见2(1)分流的测试),此为水位线传播规则。

    在这里插入图片描述

    2 水位线传递规则

    (1) 分流

    a 代码编写
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
    
        env
                .socketTextStream("localhost",9999)
                .map(new MapFunction<String, Tuple2<String,Long>>() {
                    @Override
                    public Tuple2<String, Long> map(String value) throws Exception {
                        String[] arr = value.split(" ");
                        return Tuple2.of(arr[0],Long.parseLong(arr[1]) * 1000L);
                    }
                })
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Tuple2<String, Long>>forMonotonousTimestamps()
                        .withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
                            @Override
                            public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
                                return element.f1;
                            }
                        })
                )
                .keyBy(r -> r.f0)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .process(new ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow>() {
                    @Override
                    public void process(String s, Context context, Iterable<Tuple2<String, Long>> elements, Collector<String> out) throws Exception {
                        out.collect("key:" + s + " 的窗口触发了");
                    }
                })
                .print();
    
        env.execute();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    b 测试

    输入输出结果如下:

    由于分流开窗,每个key的每个窗口都会有一个ProcessWindowFunction,水位线也会进入process算子里面

    初始	负无穷大的水位线
    a 1   后面跟了一个999ms的水位线
    b 5   后面跟了一个4999ms的水位线,当b的4999进入process,就会关闭a和b的窗口
    	  输出:key:a 的窗口触发了
    a 6
    b 6
    a 10  输出:key:b 的窗口触发了
    	       key:a 的窗口触发了
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    分流会向不同的分区广播水位线。

    (2) 合流

    a 代码编写
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
    
        SingleOutputStreamOperator<Tuple2<String, Long>> stream1 = env
                .socketTextStream("localhost", 9999)
                .map(new MapFunction<String, Tuple2<String, Long>>() {
                    @Override
                    public Tuple2<String, Long> map(String value) throws Exception {
                        String[] arr = value.split(" ");
                        return Tuple2.of(arr[0], Long.parseLong(arr[1]) * 1000L);
                    }
                })
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Tuple2<String, Long>>forMonotonousTimestamps()
                                .withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
                                    @Override
                                    public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
                                        return element.f1;
                                    }
                                })
                );
    
        SingleOutputStreamOperator<Tuple2<String, Long>> stream2 = env
                .socketTextStream("localhost", 9998)
                .map(new MapFunction<String, Tuple2<String, Long>>() {
                    @Override
                    public Tuple2<String, Long> map(String value) throws Exception {
                        String[] arr = value.split(" ");
                        return Tuple2.of(arr[0], Long.parseLong(arr[1]) * 1000L);
                    }
                })
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Tuple2<String, Long>>forMonotonousTimestamps()
                                .withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
                                    @Override
                                    public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
                                        return element.f1;
                                    }
                                })
                );
    
        stream1
                .union(stream2)
                .process(new ProcessFunction<Tuple2<String, Long>, String>() {
                    @Override
                    public void processElement(Tuple2<String, Long> value, Context ctx, Collector<String> out) throws Exception {
                        out.collect("当前水位线是:" + ctx.timerService().currentWatermark());
                    }
                })
                .print();
    
        env.execute();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    b 测试

    在流一种输入a 1,流二中输入a 2、a 3,流一中输入a 4、a 5。

    监控9999和9998端口
    9999:
    负无穷大水位线
    a 1 到达后200ms,后面会跟随着一个999ms的水位线,首先输出当前水位线
        输出:当前水位线是:-9223372036854775808
        并覆盖掉自己的负无穷大时钟,更新算子时钟min(-MAX,999),仍然为负无穷大
        
    a 4 到达后200ms,后面会跟随着一个3999的水位线,首先选择两条流中较小的水位线输出,min(999,3999)
        输出:当前水位线是:999
        并覆盖掉自己的999时钟,更新算子时钟min(2999,3999)为2999
    a 5 到达后200ms,后面会跟随着一个4999的水位线,首先选择两条流中较小的水位线输出,min(2999,3999)
    	输出:当前水位线是:2999
    	并覆盖掉自己的3999时钟,更新算子时钟min(2999,4999)为2999
    
    9998:
    负无穷大水位线
    a 2 到达后200ms,后面会跟随着一个1999ms的水位线,首先输出当前水位线
        输出:当前水位线是:-9223372036854775808
        并覆盖掉自己的负无穷大时钟,更新算子时钟min(1999,999),为999
        
    a 3 到达后200ms,后面会跟随着一个2999ms的水位线,首先选择两条流中较小的水位线,min(999,1999)输出
        输出:当前水位线是:999
        并覆盖自己的1999ms时钟,更新算子时钟min(999,2999)为999
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    元素到来首先调用processElement,水位线到来,更新时钟。

    分析过程如下图:

    在这里插入图片描述

    3 connect算子

    union算子用于两条流的合并,两条流中的元素类型可以不同。

    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
    
        DataStreamSource<Example1.Event> clickStream = env.addSource(new Example1.ClickSource());
    
        DataStreamSource<String> queryStream = env.socketTextStream("localhost", 9999).setParallelism(1);
    
        clickStream
                .keyBy(r -> r.user)
                // 和查询流的广播流进行连接,即对点击流的每一个分区都广播一份
                // 实现在查询流中输入./home可以将用户访问./home的数据过滤出来
                // 这也是为什么要连接查询流的广播流
                .connect(queryStream.broadcast())
                // 依次为第一条流、第二条流、输出的泛型
                // CoFlatMapFunction是和FlatMapFunction对应的一种接口,不是复函数
                // 功能相对受限,没有办法注册定时器、状态变量
                .flatMap(new CoFlatMapFunction<Example1.Event, String, Example1.Event>() {
                    private String query = "";
                    // 不同元素类型的两条流合并到了一起
                    // 第一条流中的元素调用flatMap1方法,反之调用flatMap2方法
                    @Override
                    public void flatMap1(Example1.Event value, Collector<Example1.Event> out) throws Exception {
                        if(value.url.equals(query)) out.collect(value);
                    }
    
                    @Override
                    public void flatMap2(String value, Collector<Example1.Event> out) throws Exception {
                        query = value;
                    }
                })
                .print();
    
        env.execute();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    可以随时在查询流中改变想要查找的内容,查询流和点击流合并,不满足条件的数据会被放弃掉,无法查找以前的数据。

    connect一般用于两条流都进行keyBy,即将两条数据相同分区的数据进行合流处理;要么一条流进行keyBy,另一条流进行广播,类似于spark中的广播变量(使用不同key将数据shuffle到了不同的分区,针对每一个分区广播一个变量),只不过这里针对的对象是流,流中的每一条元素进行复制,广播到所有分区。

  • 相关阅读:
    jmeter接口测试及详细步骤以及项目实战教程
    【21天学习挑战赛—经典算法】折半查找
    Spring Boot异步请求处理框架
    C语言之break continue详解
    下载和补全ComfyUI节点方法之一
    【斯坦福计网CS144项目】Lab3: TCPSender
    手写数字识别-基于卷积神经网络
    Day01 嵌入式 -----流水灯
    数据结构day7栈-链式栈原理及实现
    《软件性能测试、分析与调优实践之路》(第2版)--第7章节选--常见性能问题分析总结
  • 原文地址:https://blog.csdn.net/weixin_43923463/article/details/128043738