• Flink Java 之 window(滚动, 滑动, 计数, 会话)


    先上代码

    package com.daidai.window;
    
    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
    import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.util.Collector;
    
    public class TimeWindow {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            DataStreamSource<String> source = env.socketTextStream("daidai", 9999);
    
            SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = source.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                @Override
                public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                    String[] strings = s.split(" ");
                    for (String string : strings) {
                        collector.collect(Tuple2.of(string, 1));
                    }
                }
            });
    
            //滚动窗口
            SingleOutputStreamOperator<Tuple2<String, Integer>> result1 = wordAndOne
                    .keyBy(t -> t.f0)
                    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                    .sum(1);
            //滑动窗口
            SingleOutputStreamOperator<Tuple2<String, Integer>> result2 = wordAndOne
                    .keyBy(t -> t.f0)
                    .window(TumblingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(3)))
                    .sum(1);
    
            //计数窗口
            SingleOutputStreamOperator<Tuple2<String, Integer>> result3 = wordAndOne
                    .keyBy(t -> t.f0)
                    .countWindow(5L)
                    .sum(1);
    
            //回话窗口
            SingleOutputStreamOperator<Tuple2<String, Integer>> result4 = wordAndOne
                    .keyBy(t -> t.f0)
                    .window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
                    .sum(1);
    
    
            result1.print("滚动");
            result2.print("滑动");
            result3.print("计数");
            result4.print("会话");
    
            env.execute();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61

    解析

    Keyed Windows
    stream
           .keyBy(...)               <-  仅 keyed 窗口需要
           .window(...)              <-  必填项:"assigner"
          [.trigger(...)]            <-  可选项:"trigger" (省略则使用默认 trigger)
          [.evictor(...)]            <-  可选项:"evictor" (省略则不使用 evictor)
          [.allowedLateness(...)]    <-  可选项:"lateness" (省略则为 0)
          [.sideOutputLateData(...)] <-  可选项:"output tag" (省略则不对迟到数据使用 side output)
           .reduce/aggregate/apply()      <-  必填项:"function"
          [.getSideOutput(...)]      <-  可选项:"output tag"
    
    Non-Keyed Windows
    stream
           .windowAll(...)           <-  必填项:"assigner"
          [.trigger(...)]            <-  可选项:"trigger" (else default trigger)
          [.evictor(...)]            <-  可选项:"evictor" (else no evictor)
          [.allowedLateness(...)]    <-  可选项:"lateness" (else zero)
          [.sideOutputLateData(...)] <-  可选项:"output tag" (else no side output for late data)
           .reduce/aggregate/apply()      <-  必填项:"function"
          [.getSideOutput(...)]      <-  可选项:"output tag"
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    在定义窗口前确定的是你的 stream 是 keyed 还是 non-keyed。

    keyBy(…) 会将你的无界 stream 分割为逻辑上的 keyed stream。 如果 keyBy(…) 没有被调用,你的 stream 就不是 keyed。对于 keyed stream,其中数据的任何属性都可以作为 key (详见此处)。 使用 keyed stream 允许你的窗口计算由多个 task 并行,因为每个逻辑上的 keyed stream 都可以被单独处理。 属于同一个 key 的元素会被发送到同一个 task。

    对于 non-keyed stream,原始的 stream 不会被分割为多个逻辑上的 stream, 所以所有的窗口计算会被同一个 task 完成,也就是 parallelism 为 1。


    滚动窗口(Tumbling Windows)

    滚动窗口的 assigner 分发元素到指定大小的窗口。滚动窗口的大小是固定的,且各自范围之间不重叠。 比如说,如果你指定了滚动窗口的大小为 5 分钟,那么每 5 分钟就会有一个窗口被计算,且一个新的窗口被创建(如下图所示)。
    在这里插入图片描述

    DataStream<T> input = ...;
    
    // 滚动 event-time 窗口
    input
        .keyBy(<key selector>)
        .window(TumblingEventTimeWindows.of(Time.seconds(5)))
        .<windowed transformation>(<window function>);
    
    // 滚动 processing-time 窗口
    input
        .keyBy(<key selector>)
        .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
        .<windowed transformation>(<window function>);
    
    // 长度为一天的滚动 event-time 窗口, 偏移量为 -8 小时。
    input
        .keyBy(<key selector>)
        .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
        .<windowed transformation>(<window function>);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    滑动窗口(Sliding Windows)

    滑动窗口的 assigner 分发元素到指定大小的窗口,窗口大小通过 window size 参数设置。 滑动窗口需要一个额外的滑动距离(window slide)参数来控制生成新窗口的频率。 因此,如果 slide 小于窗口大小,滑动窗口可以允许窗口重叠。这种情况下,一个元素可能会被分发到多个窗口。
    比如说,你设置了大小为 10 分钟,滑动距离 5 分钟的窗口,你会在每 5 分钟得到一个新的窗口, 里面包含之前 10 分钟到达的数据(如下图所示)。
    在这里插入图片描述

    DataStream<T> input = ...;
    
    // 滑动 event-time 窗口
    input
        .keyBy(<key selector>)
        .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
        .<windowed transformation>(<window function>);
    
    // 滑动 processing-time 窗口
    input
        .keyBy(<key selector>)
        .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
        .<windowed transformation>(<window function>);
    
    // 滑动 processing-time 窗口,偏移量为 -8 小时
    input
        .keyBy(<key selector>)
        .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
        .<windowed transformation>(<window function>);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    会话窗口(Session Windows)

    会话窗口不会相互重叠,且没有固定的开始或结束时间。 会话窗口在一段时间没有收到数据之后会关闭,即在一段不活跃的间隔之后。 会话窗口的 assigner 可以设置固定的会话间隔(session gap)或 用 session gap extractor 函数来动态地定义多长时间算作不活跃。 当超出了不活跃的时间段,当前的会话就会关闭,并且将接下来的数据分发到新的会话窗口。
    在这里插入图片描述

    DataStream<T> input = ...;
    
    // 设置了固定间隔的 event-time 会话窗口
    input
        .keyBy(<key selector>)
        .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
        .<windowed transformation>(<window function>);
        
    // 设置了动态间隔的 event-time 会话窗口
    input
        .keyBy(<key selector>)
        .window(EventTimeSessionWindows.withDynamicGap((element) -> {
            // 决定并返回会话间隔
        }))
        .<windowed transformation>(<window function>);
    
    // 设置了固定间隔的 processing-time session 窗口
    input
        .keyBy(<key selector>)
        .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
        .<windowed transformation>(<window function>);
        
    // 设置了动态间隔的 processing-time 会话窗口
    input
        .keyBy(<key selector>)
        .window(ProcessingTimeSessionWindows.withDynamicGap((element) -> {
            // 决定并返回会话间隔
        }))
        .<windowed transformation>(<window function>);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
  • 相关阅读:
    玩转Docker | Docker基础入门与常用命令指南
    lattice crosslink开发板mipi核心板csi测试dsi屏lif md6000 fpga
    [附源码]java毕业设计小区宠物管理系统
    OpenMesh 网格平滑
    4.00001Postgresql的内存管理-从哪里开始了解内存管理之架构理解
    设计模式-组合模式
    企业视频直播画面时移看点的用法
    LeetCode739. Daily Temperatures——单调栈
    随笔 | 写在七月末的这一天
    求臻医学:实体肿瘤FDA/NMPA新获批抗癌药物/适应症盘点
  • 原文地址:https://blog.csdn.net/weixin_46376562/article/details/125602162