《Flink 详解》系列(已完结),共包含以下 10 10 10 篇文章:
😊 如果您觉得这篇文章有用 ✔️ 的话,请给博主一个一键三连 🚀🚀🚀 吧 (点赞 🧡、关注 💛、收藏 💚)!!!您的支持 💖💖💖 将激励 🔥 博主输出更多优质内容!!!
由于现在 Flink 实行流批一体代码,Batch API 基本废弃,就不再过多介绍。在 Flink DataStream API 中,Graph 内部转换图如下:

以 WordCount 为例,流图、作业图、执行图、物理执行图之间的 Task 调度如下:

对于 Flink 流计算应用,运行用户代码时,首先调用 DataStream API ,将用户代码转换为 Transformation,然后经过:StreamGraph → JobGraph → ExecutionGraph 三层转换(这些都是 Flink 内置的数据结构),最后经过 Flink 调度执行,在 Flink 集群中启动计算任务,形成一个物理执行图。

流图 StreamGraph 核心对象包括两个:StreamNode 和 StreamEdge。
StreamNode 从 Transformation 转换而来,可以简单理解为 StreamNode 表示一个算子,存在实体和虚拟,可以有多个输入和输出,实体 StreamNode 最终变成物理算子,虚拟的附着在 StreamEdge 边上。
StreamEdge 是 StreamGraph 的边,用来连接两个 StreamNode 点,一个 StreamEdge 可以有多个出边、入边等信息。
JobGraph 是由 StreamGraph 优化而来,是通过 OperationChain 机制将算子合并起来,在执行时,调度在同一个 Task 线程上,避免数据的跨线程,跨网络传递。

作业图 JobGraph 核心对象包括三个:
JobVertex 点:经过算子融合优化后符合条件的多个 StreamNode 可能会融合在一起生成一个 JobVertex,即一个 JobVertex 包含一个或多个算子, JobVertex 的输入是 JobEdge,输出是 IntermediateDataSet。
JobEdge 边:JobEdge 表示 JobGraph 中的一 个数据流转通道, 其上游数据源是 IntermediateDataSet ,下游消费者是 JobVertex 。JobEdge 中的数据分发模式会直接影响执行时 Task 之间的数据连接关系是点对点连接还是全连接。
IntermediateDataSet 中间数据集:中间数据集 IntermediateDataSet 是一种逻辑结构,用来表示 JobVertex 的输出,即该 JobVertex 中包含的算子会产生的数据集。不同的执行模式下,其对应的结果分区类型不同,决定了在执行时刻数据交换的模式。
ExecutionGraph 是调度 Flink 作业执行的核心数据结构,包含了作业中所有并行执行的 Task 信息、Task 之间的关联关系、数据流转关系。
StreamGraph 和 JobGraph 都在 Flink Client 生成,然后交给 Flink 集群。JobGraph 到 ExecutionGraph 在 JobMaster 中完成,转换过程中重要变化如下:

执行图 ExecutionGraph 核心对象包括
6
6
6 个:
ExecutionJobVertex:该对象和 JobGraph 中的 JobVertex 一一对应。该对象还包含一组 ExecutionVertex, 数量与该 JobVertex 中所包含的 StreamNode 的并行度一致,假设 StreamNode 的并行度为 5 5 5,那么 ExecutionJobVertex 中也会包含 5 5 5 个 ExecutionVertex。ExecutionJobVertex 用来将一个 JobVertex 封装成 ExecutionJobVertex,并依次创建 ExecutionVertex、Execution、IntermediateResult 和 IntermediateResultPartition,用于丰富 ExecutionGraph。
ExecutionVertex:ExecutionJobVertex 会对作业进行并行化处理,构造可以并行执行的实例,每一个并行执行的实例就是 ExecutionVertex。
IntermediateResult:IntermediateResult 又叫作中间结果集,该对象是个逻辑概念表示 ExecutionJobVertex 输出,和 JobGrap 中的 IntermediateDalaSet 一一对应,同样一个 ExecutionJobVertex 可以有多个中间结果,取决于当前 JobVertex 有几个出边(JobEdge)。
IntermediateResultPartition:IntermediateResultPartition 又叫作中间结果分区。表示 1 1 1 个 ExecutionVertex 输出结果,与 ExecutionEdge 相关联。
ExecutionEdge:表示 ExecutionVertex 的输入,连接到上游产生的 IntermediateResultPartition。 1 1 1 个 Execution 对应唯一的 1 1 1 个 IntermediateResultPartition 和 1 1 1 个 ExecutionVertex。 1 1 1 个 ExecutionVertex 可以有多个 ExecutionEdge。
Execution:ExecutionVertex 相当于每个 Task 的模板,在真正执行的时候,会将 ExecutionVertex 中的信息包装为 1 1 1 个 Execution,执行一个 ExecutionVertex 的一次尝试。
JobManager 和 TaskManager 之间关于 Task 的部署和 Task 执行状态的更新都是通过 ExecutionAttemptID 来识别标识的。
调度器 是 Flink 作业执行的核心组件,管理作业执行的所有相关过程,包括 JobGraph 到 ExecutionGraph 的转换、作业生命周期管理(作业的发布、取消、停止)、作业的 Task 生命周期管理(Task 的发布、取消、停止)、资源申请与释放、作业和 Task 的 Faillover 等。
DefaultScheduler:Flink 目前默认的调度器,是 Flink 新的调度设计,使用 SchedulerStrategy 来实现调度。
LegacySchedular:过去的调度器,实现了原来的 Execution 调度逻辑。
SchedulingStrategy 接口定义了调度行为,其中包含 4 4 4 种行为:

startScheduling:调度入口,触发调度器的调度行为。restartTasks:重启执行失败的 Task,一般是 Task 执行异常导致的。onExecutionStateChange:当 Execution 状态发生改变时。onPartitionConsumable:当 IntermediateResultPartition 中的数据可以消费时。调度模式包含
3
3
3 种:Eager 模式、分阶段模式(Lazy_From_Source)、分阶段 Slot 重用模式(Lazy_From_Sources_With_Batch_Slot_Request)。
Eager 调度:适用于流计算。一次性申请需要的所有资源,如果资源不足,则作业启动失败。
分阶段调度:LAZY_FROM_SOURCES 适用于批处理。从 SourceTask 开始分阶段调度,申请资源的时候,一次性申请本阶段所需要的所有资源。上游 Task 执行完毕后开始调度执行下游的 Task,读取上游的数据,执行本阶段的计算任务,执行完毕之后,调度后一个阶段的 Task,依次进行调度,直到作业完成。
分阶段 Slot 重用调度:LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST 适用于批处理。与分阶段调度基本一样,区别在于该模式下使用批处理资源申请模式,可以在资源不足的情况下执行作业,但是需要确保在本阶段的作业执行中没有 Shuffle 行为。
目前视线中的 Eager 模式和 LAZY_FROM_SOURCES 模式的资源申请逻辑一样,LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST 是单独的资源申请逻辑。


调度策略全部实现于调度器 SchedulingStrategy,有三种实现:
EagerSchedulingStrategy:适用于流计算,同时调度所有的 task。
LazyFromSourcesSchedulingStrategy:适用于批处理,当输入数据准备好时(上游处理完)进行 vertices 调度。
PipelinedRegionSchedulingStrategy:以流水线的局部为粒度进行调度。
PipelinedRegionSchedulingStrategy 是
1.11
1.11
1.11 加入的,从
1.12
1.12
1.12 开始,将以 pipelined region 为单位进行调度。
pipelined region 是一组流水线连接的任务。这意味着,对于包含多个 region 的流作业,在开始部署任务之前,它不再等待所有任务获取 slot。取而代之的是,一旦任何 region 获得了足够的任务 slot 就可以部署它。对于批处理作业,将不会为任务分配 slot,也不会单独部署任务。取而代之的是,一旦某个 region 获得了足够的 slot,则该任务将与所有其他任务一起部署在同一区域中。
在 Flink集群中,JobMaster 负责作业的生命周期管理,具体的管理行为在调度器和 ExecutionGraph 中实现。
作业的完整生命周期状态变换如下图所示:

作业首先处于创建状态(created),然后切换到运行状态(running),并且在完成所有工作后,它将切换到完成状态(finished)。
在失败的情况下,作业首先切换到失败状态(failing),取消所有正在运行任务。如果所有节点都已达到最终状态,并且作业不可重新启动,则状态将转换为失败(failed)。
如果作业可以重新启动,那么它将进入重新启动状态(restarting)。一旦完成重新启动,它将变成创建状态(created)。
在用户取消作业的情况下,将进入取消状态(cancelling),会取消所有当前正在运行的任务。一旦所有运行的任务已经达到最终状态,该作业将转换到已取消状态(canceled)。
完成状态(finished),取消状态(canceled)和 失败状态(failed)表示一个全局的终结状态,并且触发清理工作,而 暂停状态(suspended)仅处于本地终止状态。意味着作业的执行在相应的 JobManager 上终止,但集群的另一个 JobManager 可以从持久的 HA 存储中恢复这个作业并重新启动。因此,处于暂停状态的作业将不会被完全清理。
TaskManager 负责 Task 的生命周期管理,并将状态的变化通知到 JobMaster,在 ExecutionGraph 中跟踪 Execution 的状态变化,一个 Execution 对于一个 Task。
Task 的生命周期如下:共 8 8 8 种状态。

在执行 ExecutionGraph 期间,每个并行任务经过多个阶段,从创建(created)到完成(finished)或失败(failed) ,上图说明了它们之间的状态和可能的转换。任务可以执行多次(例如故障恢复)。每个 Execution 跟踪一个 ExecutionVertex 的执行,每个 ExecutionVertex 都有一个当前 Execution(current execution)和一个前驱 Execution(prior execution)。
任务调度流程图如下:

(1)当 Flink 执行 executor 会自动根据程序代码生成 DAG 数据流图,即 Jobgraph。
(2)ActorSystem 创建 Actor 将数据流图发送给 JobManager 中的 Actor。
(3)JobManager 会不断接收 TaskManager 的心跳消息,从而可以获取到有效的 TaskManager。
(4)JobManager 通过调度器在 TaskManager 中调度执行 Task(在 Flink 中,最小的调度单元就是 Task,对应就是一个线程)。
(5)在程序运行过程中,Task 与 Task 之间是可以进行数据传输的。
Actor System、Scheduler、CheckPoint 三个重要的组件。
每个 TaskManager 是一个 JVM 的进程,可以在不同的线程中执行一个或多个子任务。为了控制一个 worker 能接收多少个 task。worker 通过 task slot 来进行控制(一个 worker 至少有一个 task slot)。每个 task slot 表示 TaskManager 拥有资源的一个固定大小的子集。
一般来说,我们分配槽的个数都是和 CPU 的核数相等,比如
8
8
8 核,那么就分配
8
8
8 个槽。Flink 将进程的内存划分到多个 slot 中。图中有
2
2
2 个 TaskManager,每个 TaskManager 有
3
3
3 个 slot,每个 slot 占有
1
/
3
1/3
1/3 的内存。
内存被划分到不同的 slot 之后可以获得如下好处:
slot 的数量。 任务槽的作用就是分离任务的托管内存,不会发生 CPU 隔离。slot 有独占的内存空间,这样在一个 TaskManager 中可以运行多个不同的作业,作业之间不受影响。总结:task slot 的个数代表 TaskManager 可以并行执行的 task 数。
默认情况下,Flink 允许子任务共享插槽,即使它们是不同任务的子任务,只要它们来自同一个作业。结果是一个槽可以保存作业的整个管道。允许插槽共享有主要好处:
只需计算 Job 中最高并行度(parallelism)的 task slot。只要这个满足,其他的 Job 也都能满足。
资源分配更加公平。如果有比较空闲的 slot 可以将更多的任务分配给它。图中若没有任务槽共享,负载不高的 Source / Map 等 subtask 将会占据许多资源,而负载较高的窗口 subtask 则会缺乏资源。
有了任务槽共享,可以将基本并行度(base parallelism)从
2
2
2 提升到
6
6
6。提高了分槽资源的利用率。同时它还可以保障 TaskManager 给 subtask 的分配的 slot 方案更加公平。
