码农知识堂 - 1000bd
  •   Python
  •   PHP
  •   JS/TS
  •   JAVA
  •   C/C++
  •   C#
  •   GO
  •   Kotlin
  •   Swift
  • 208.Flink(三):窗口的使用,处理函数的使用


    目录

    一、窗口

    1.窗口的概念

    2.窗口的分类

    (1)按照驱动类型分

    (2)按照窗口分配数据的规则分类

    3.窗口api概览

    (1)按键分区(Keyed)和非按键分区(Non-Keyed)

    *1)按键分区窗口(Keyed Windows)

    *2)非按键分区(Non-Keyed Windows)

    (2)代码中窗口API的调用

    (3)窗口分配器

    (4)窗口函数

    *1)增量聚合函数

    ^1)归约函数(ReduceFunction)

    ^2)聚合函数(AggregateFunction)

    *2)全窗口函数(full window functions)

    *3)增量聚合和全窗口函数的结合使用

    (5)触发器(Trigger)

    (6)移除器(Evictor)

    (7)窗口的简单原理

    *1)一个数据来了,怎么认为他是哪个窗口内的数据?

    *2)窗口特性

    *3)窗口的生命周期

    4.时间语义

    (1)Flink中的时间语义

    (2)Flink以事件时间为默认时间语义

    5.水位线(Watermark)

    (1)水位线的概念

    *1)有序流中的水位线

    *2)乱序流中的水位线

    (2)水位线和窗口的工作原理

    (3) 生成水位线

    *1)总体原则

    *2)有序流中内置水位线设置

    *3)乱序流中内置水位线设置

    *4)自定义水位线生成器(周期式、断点式)

    *5)在数据源中发送水位线

    (6)迟到数据的处理

    *1)设置乱序容忍度

    *2)设置窗口延迟关闭

    *3)侧输出流

    (7)基于时间的合流——双流联结(Join)

    *1)窗口联结(Window Join)

    *2)间隔联结(Interval Join)

    二、处理函数

    1.基本处理函数(ProcessFunction)

    (1)处理函数的功能和使用

    (2)ProcessFunction解析

    (3)处理函数的分类

    2.按键分区处理函数(KeyedProcessFunction)

    (1)定时器(Timer)和定时服务(TimerService)

    (2)KeyedProcessFunction注意点及实现

    3.应用案例:Top N

    (1)方法一:ProcessAllWindowFunction

    (2)方法二:

    4.侧输出流


    一、窗口

    在批处理统计中,我们可以等待一批数据都到齐后,统一处理。但是在实时处理统计中,我们是来一条就得处理一条,那么我们怎么统计最近一段时间内的数据呢?引入“窗口”。

    1.窗口的概念

    Flink是一种流式计算引擎,主要是来处理无界数据流的,数据源源不断、无穷无尽。想要更加方便高效地处理无界流,一种方式就是将无限数据切割成有限的“数据块”进行处理,这就是所谓的“窗口”(Window)。

    Flink中窗口并不是静态准备好的,而是动态创建——当有落在这个窗口区间范围的数据达到时,才创建对应的窗口。

    到达窗口结束时间时,窗口就触发计算并关闭,事实上“触发计算”和“窗口关闭”两个行为也可以分开。

    2.窗口的分类

    (1)按照驱动类型分

    *1)时间窗口

    一定时间作为一个窗口

    *2)计数窗口

    达到多少数量作为一个窗口

    (2)按照窗口分配数据的规则分类

    *1)滚动窗口

    以一个固定时间为窗口,第一个窗口结束的时间就是下一个窗口开始的时间。

    *2)滑动窗口

    窗口大小 + 步长。

    如果步长 = 窗口大小,其实就是滚动窗口的情况。

    步长 > 窗口大小,会有数据被漏掉。

    步长 < 窗口大小,窗口会有重叠

    *3)会话窗口

    基于会话对数据分组

    *4)全局窗口

    全局有效,没有结束时间

    3.窗口api概览

    (1)按键分区(Keyed)和非按键分区(Non-Keyed)

    定义窗口前,需要确认数据流是基于keyBy还是没有keyBy的。

    *1)按键分区窗口(Keyed Windows)

    经过按键分区keyBy操作后,数据流会按照key被分为多条逻辑流,窗口计算会在多个并行子任务上同时执行。相同key的数据会被发送到同一个并行子任务,而窗口操作会基于每个key进行单独的处理。

    stream.keyBy(...).window(...)

    *2)非按键分区(Non-Keyed Windows)

    窗口逻辑只能在一个任务(task)上执行,就相当于并行度变成了1。

    对于非按键分区的窗口操作,手动调大窗口算子的并行度也是无效的,windowAll本身就是一个非并行的操作。

    stream.windowAll(...)

    (2)代码中窗口API的调用

    窗口操作主要有两个部分:窗口分配器(Window Assigners)和窗口函数(Window Functions)。

    stream.keyBy(<key selector>).window(<window assigner>).aggregate(<window function>)

    .window()方法需要传入一个窗口分配器,它指明了窗口的类型。

    .aggregate()方法传入一个窗口函数作为参数,它用来定义窗口具体的处理逻辑。

    (3)窗口分配器

    窗口分配器指定窗口的类型。窗口分配器最通用的定义方式,就是调用.window()方法。

    (4)窗口函数

    窗口函数定义了要对窗口中收集的数据做的计算操作,根据处理的方式可以分为两类:增量聚合函数和全窗口函数。

    1. package com.atguigu.window;
    2. import com.atguigu.bean.WaterSensor;
    3. import com.atguigu.functions.WaterSensorMapFunction;
    4. import org.apache.flink.streaming.api.datastream.KeyedStream;
    5. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    6. import org.apache.flink.streaming.api.datastream.WindowedStream;
    7. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    8. import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
    9. import org.apache.flink.streaming.api.windowing.time.Time;
    10. import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
    11. /**
    12. * TODO
    13. *
    14. * @author cjp
    15. * @version 1.0
    16. */
    17. public class WindowApiDemo {
    18. public static void main(String[] args) throws Exception {
    19. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    20. env.setParallelism(1);
    21. SingleOutputStreamOperator<WaterSensor> sensorDS = env
    22. .socketTextStream("hadoop102", 7777)
    23. .map(new WaterSensorMapFunction());
    24. KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());
    25. // TODO 1. 指定 窗口分配器: 指定 用 哪一种窗口 --- 时间 or 计数? 滚动、滑动、会话?
    26. // 1.1 没有keyby的窗口: 窗口内的 所有数据 进入同一个 子任务,并行度只能为1
    27. // sensorDS.windowAll()
    28. // 1.2 有keyby的窗口: 每个key上都定义了一组窗口,各自独立地进行统计计算
    29. // 基于时间的
    30. // sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10))) // 滚动窗口,窗口长度10s
    31. // sensorKS.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(2))) // 滑动窗口,窗口长度10s,滑动步长2s
    32. // sensorKS.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5))) // 会话窗口,超时间隔5s
    33. // sensorKS.window(GlobalWindows.create()) // 全局窗口,计数窗口的底层就是用的这个,需要自定义的时候才会用
    34. // 基于计数的
    35. // sensorKS.countWindow(5) // 滚动窗口,窗口长度=5个元素
    36. // sensorKS.countWindow(5,2) // 滑动窗口,窗口长度=5个元素,滑动步长=2个元素
    37. // TODO 2. 指定 窗口函数 : 窗口内数据的 计算逻辑
    38. WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
    39. // 增量聚合: 来一条数据,计算一条数据,窗口触发的时候输出计算结果
    40. // sensorWS
    41. // .reduce()
    42. // .aggregate(, )
    43. // 全窗口函数:数据来了不计算,存起来,窗口触发的时候,计算并输出结果
    44. // sensorWS.process()
    45. env.execute();
    46. }
    47. }

    *1)增量聚合函数
    ^1)归约函数(ReduceFunction)
    1. package com.atguigu.window;
    2. import com.atguigu.bean.WaterSensor;
    3. import com.atguigu.functions.WaterSensorMapFunction;
    4. import org.apache.flink.api.common.functions.ReduceFunction;
    5. import org.apache.flink.streaming.api.datastream.KeyedStream;
    6. import org.apache.flink.streaming.api.datastream.Single
  • 相关阅读:
    某新闻app sign加密
    命令历史应用
    MTK平台Metadata的加载(1)——Metadata介绍
    数字IC手撕代码--投票表决器
    无重复字符的最长子串-返回结果字符串
    杂牌行车记录仪特殊AVI结构恢复案例
    【ESP32】串口+wifi 透传,以及回调函数的使用
    【UNI】对接蓝牙智能笔遇到的问题
    Flume笔记
    javaScript:碰撞检测
  • 原文地址:https://blog.csdn.net/qq_40594696/article/details/133163591
  • 最新文章
  • 攻防演习之三天拿下官网站群
    数据安全治理学习——前期安全规划和安全管理体系建设
    企业安全 | 企业内一次钓鱼演练准备过程
    内网渗透测试 | Kerberos协议及其部分攻击手法
    0day的产生 | 不懂代码的"代码审计"
    安装scrcpy-client模块av模块异常,环境问题解决方案
    leetcode hot100【LeetCode 279. 完全平方数】java实现
    OpenWrt下安装Mosquitto
    AnatoMask论文汇总
    【AI日记】24.11.01 LangChain、openai api和github copilot
  • 热门文章
  • 十款代码表白小特效 一个比一个浪漫 赶紧收藏起来吧!!!
    奉劝各位学弟学妹们,该打造你的技术影响力了!
    五年了,我在 CSDN 的两个一百万。
    Java俄罗斯方块,老程序员花了一个周末,连接中学年代!
    面试官都震惊,你这网络基础可以啊!
    你真的会用百度吗?我不信 — 那些不为人知的搜索引擎语法
    心情不好的时候,用 Python 画棵樱花树送给自己吧
    通宵一晚做出来的一款类似CS的第一人称射击游戏Demo!原来做游戏也不是很难,连憨憨学妹都学会了!
    13 万字 C 语言从入门到精通保姆级教程2021 年版
    10行代码集2000张美女图,Python爬虫120例,再上征途
Copyright © 2022 侵权请联系2656653265@qq.com    京ICP备2022015340号-1
正则表达式工具 cron表达式工具 密码生成工具

京公网安备 11010502049817号