点击查看相关章节 Flink 1.13 源码解析——JobManager启动流程 WebMonitorEndpoint启动
Flink 1.13 源码解析——Flink作业提交流程 下
目录
2.1、StreamExecutionEnvironment的构建
在之前的章节里我们针对Flink集群主、从节点的启动分别进行了源码级别的分析。我们总说Flink可以将一个编写好的代码,构建成高级抽象,那么高级抽象是什么?我认为是:针对一个任意类型数据的任意类型计算逻辑的任务复杂和数据规模的计算应用程序编程模型的抽象!
在接下来的几章中,我们将分析Flink作业的提交流程、Flink JobMaster与JobManager的交互以及Flink StreamGraph、JobGraph、ExecutionGraph的构建和转换流程。本章将分析Flink作业的提交流程。
首先我们在提交Flink job的时候会执行flink run命令,此时会执行flink.sh脚本,我们通过该脚本文件中的以下内容确定flink中的入口类是什么:
exec "${JAVA_RUN}" $JVM_ARGS $FLINK_ENV_JAVA_OPTS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.client.cli.CliFrontend "$@"
接下来我们去看这个org.apache.flink.client.cli.CliFrontend类的main方法:
- public static void main(final String[] args) {
- EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);
-
- // 1. find the configuration directory
- // TODO 获取配置目录
- final String configurationDirectory = getConfigurationDirectoryFromEnv();
-
- // 2. load the global configuration
- // TODO 解析配置文件 flink-conf
- final Configuration configuration =
- GlobalConfiguration.loadConfiguration(configurationDirectory);
-
- // 3. load the custom command lines
- // TODO 构造解析args命令行的对象,里面构建了三种对象
- final List
customCommandLines = - loadCustomCommandLines(configuration, configurationDirectory);
-
- int retCode = 31;
- try {
- final CliFrontend cli = new CliFrontend(configuration, customCommandLines);
-
- // TODO
- SecurityUtils.install(new SecurityConfiguration(cli.configuration));
- retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.parseAndRun(args));
- } catch (Throwable t) {
- final Throwable strippedThrowable =
- ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
- LOG.error("Fatal error while running command line interface.", strippedThrowable);
- strippedThrowable.printStackTrace();
- } finally {
- System.exit(retCode);
- }
- }
可以看到在这里做了以下一些工作:
1、获取配置文件路径
2、解析配置文件,构建configuration对象
3、将命令行参数构建为CustomCommandLine集合
4、解析命令行参数并执行
我们来看最重要的一步:解析命令行参数并执行,点进 cli.parseAndRun(args)方法:
- public int parseAndRun(String[] args) {
-
- // check for action
- // TODO 检查命令参数正确性
- if (args.length < 1) {
- CliFrontendParser.printHelp(customCommandLines);
- System.out.println("Please specify an action.");
- return 1;
- }
-
- // get action
- // TODO 从命令行flink 后面的参数解析要执行的动作,例如flink run,动作就是run
- String action = args[0];
-
- // remove action from parameters
- final String[] params = Arrays.copyOfRange(args, 1, args.length);
-
- try {
- // do action
- switch (action) {
- case ACTION_RUN:
- // TODO 如果是run
- run(params);
- return 0;
- case ACTION_RUN_APPLICATION:
- runApplication(params);
- return 0;
- case ...
- case "-h":
- case "--help":
- CliFrontendParser.printHelp(customCommandLines);
- return 0;
- case "-v":
- case "--version":
- String version = EnvironmentInformation.getVersion();
- String commitID = EnvironmentInformation.getRevisionInformation().commitId;
- System.out.print("Version: " + version);
- System.out.println(
- commitID.equals(EnvironmentInformation.UNKNOWN)
- ? ""
- : ", Commit ID: " + commitID);
- return 0;
- default:
- System.out.printf("\"%s\" is not a valid action.\n", action);
- ... ...
- return 1;
- }
- } catch (....) {
- ...
- }
- }
从上面这段代码可以看出,在这里解析命令行的第一个参数,我们以run命令为例,当解析到flink后面跟着的命令参数为run是调用 run(params),我们点进来继续看:
- protected void run(String[] args) throws Exception {
- LOG.info("Running 'run' command.");
-
- final Options commandOptions = CliFrontendParser.getRunCommandOptions();
- // TODO 真正开始解析命令行参数
- final CommandLine commandLine = getCommandLine(commandOptions, args, true);
-
- // evaluate help flag
- // TODO 如果是 flink -h,则打印flink帮助文档
- if (commandLine.hasOption(HELP_OPTION.getOpt())) {
- CliFrontendParser.printHelpForRun(customCommandLines);
- return;
- }
-
- final CustomCommandLine activeCommandLine =
- validateAndGetActiveCommandLine(checkNotNull(commandLine));
-
- // 创建程序参数对象
- final ProgramOptions programOptions = ProgramOptions.create(commandLine);
-
- // TODO 获取job的jar包和其他依赖jar
- final List
jobJars = getJobJarAndDependencies(programOptions); -
- // TODO 将解析出来的参数封装为配置对象
- final Configuration effectiveConfiguration =
- getEffectiveConfiguration(activeCommandLine, commandLine, programOptions, jobJars);
-
- LOG.debug("Effective executor configuration: {}", effectiveConfiguration);
-
- // TODO 获取打包的程序
- try (PackagedProgram program = getPackagedProgram(programOptions, effectiveConfiguration)) {
- // TODO 执行程序
- executeProgram(effectiveConfiguration, program);
- }
- }
在这里开始解析所有的命令行参数,如果判断命令行里带有-h的命令则打印帮助日志,然后再根据命令参数创建程序参数对象、获取job所依赖的jar以及job本身的jar,然后将解析出来的各种配置、参数封装为一个配置对象,并根据这个对象构建我们的job程序,最后执行该程序,我们看executeProgram方法时如何执行这个程序的:
- protected void executeProgram(final Configuration configuration, final PackagedProgram program)
- throws ProgramInvocationException {
- // TODO
- ClientUtils.executeProgram(
- new DefaultExecutorServiceLoader(), configuration, program, false, false);
- }
我们再点进ClientUtils.executeProgram方法里:
- public static void executeProgram(
- PipelineExecutorServiceLoader executorServiceLoader,
- Configuration configuration,
- PackagedProgram program,
- boolean enforceSingleJobExecution,
- boolean suppressSysout)
- throws ProgramInvocationException {
- checkNotNull(executorServiceLoader);
- final ClassLoader userCodeClassLoader = program.getUserCodeClassLoader();
- final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
- try {
- Thread.currentThread().setContextClassLoader(userCodeClassLoader);
-
- LOG.info(
- "Starting program (detached: {})",
- !configuration.getBoolean(DeploymentOptions.ATTACHED));
-
- // TODO 配置执行环境
- ContextEnvironment.setAsContext(
- executorServiceLoader,
- configuration,
- userCodeClassLoader,
- enforceSingleJobExecution,
- suppressSysout);
-
- StreamContextEnvironment.setAsContext(
- executorServiceLoader,
- configuration,
- userCodeClassLoader,
- enforceSingleJobExecution,
- suppressSysout);
-
- try {
- // TODO 真正提交执行
- program.invokeInteractiveModeForExecution();
- } finally {
- ContextEnvironment.unsetAsContext();
- StreamContextEnvironment.unsetAsContext();
- }
- } finally {
- Thread.currentThread().setContextClassLoader(contextClassLoader);
- }
- }
在这个方法里,首先根据传入的配置对象开始配置执行环境,最后通过 program.invokeInteractiveModeForExecution()正式开始执行Job,我们进入这个方法:
- public void invokeInteractiveModeForExecution() throws ProgramInvocationException {
- FlinkSecurityManager.monitorUserSystemExitForCurrentThread();
- try {
- // TODO 调用自己编写的应用程序的main方法
- callMainMethod(mainClass, args);
- } finally {
- FlinkSecurityManager.unmonitorUserSystemExitForCurrentThread();
- }
- }
再进入callMainMethod(mainClass, args)方法:
- private static void callMainMethod(Class> entryClass, String[] args)
- throws ProgramInvocationException {
- Method mainMethod;
- if (!Modifier.isPublic(entryClass.getModifiers())) {
- throw new ProgramInvocationException(
- "The class " + entryClass.getName() + " must be public.");
- }
-
- // TODO 反射拿到main的实例
- try {
- mainMethod = entryClass.getMethod("main", String[].class);
- } catch (NoSuchMethodException e) {
- throw new ProgramInvocationException(
- "The class " + entryClass.getName() + " has no main(String[]) method.");
- } catch (Throwable t) {
- throw new ProgramInvocationException(
- "Could not look up the main(String[]) method from the class "
- + entryClass.getName()
- + ": "
- + t.getMessage(),
- t);
- }
-
- if (!Modifier.isStatic(mainMethod.getModifiers())) {
- throw new ProgramInvocationException(
- "The class " + entryClass.getName() + " declares a non-static main method.");
- }
- if (!Modifier.isPublic(mainMethod.getModifiers())) {
- throw new ProgramInvocationException(
- "The class " + entryClass.getName() + " declares a non-public main method.");
- }
-
- try {
- // TODO 调用main 方法
- mainMethod.invoke(null, (Object) args);
- } catch (IllegalArgumentException e) {
- throw new ProgramInvocationException(
- "Could not invoke the main method, arguments are not matching.", e);
- } catch (IllegalAccessException e) {
- throw new ProgramInvocationException(
- "Access to the main method was denied: " + e.getMessage(), e);
- } catch (InvocationTargetException e) {
- Throwable exceptionInMethod = e.getTargetException();
- if (exceptionInMethod instanceof Error) {
- throw (Error) exceptionInMethod;
- } else if (exceptionInMethod instanceof ProgramParametrizationException) {
- throw (ProgramParametrizationException) exceptionInMethod;
- } else if (exceptionInMethod instanceof ProgramInvocationException) {
- throw (ProgramInvocationException) exceptionInMethod;
- } else {
- throw new ProgramInvocationException(
- "The main method caused an error: " + exceptionInMethod.getMessage(),
- exceptionInMethod);
- }
- } catch (Throwable t) {
- throw new ProgramInvocationException(
- "An error occurred while invoking the program's main method: " + t.getMessage(),
- t);
- }
- }
可以看到在这里,通过反射拿到了我们主程序jar的main方法实例,再通过mainMethod.invoke来执行我们的main方法,到这里前置准备工作就完成了,接下来我们看在我们的主程序中是如何将我们编写的逻辑提交执行的。
我们以Flink-example-streaming中提供的案例来举例,也正如我们平时编写Flink作业时的编程模型,首先我们来看Flink入口类的构建流程:
- /* TODO
- 1. 初始化得到StateBackend
- 2. 解析所有checkpoint相关配置
- */
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
我们点进getExecutionEnvironment方法,再点进getExecutionEnvironment:
- public static StreamExecutionEnvironment getExecutionEnvironment(Configuration configuration) {
- return Utils.resolveFactory(threadLocalContextEnvironmentFactory, contextEnvironmentFactory)
- // TODO 构建StreamExecutionEnvironment
- .map(factory -> factory.createExecutionEnvironment(configuration))
- .orElseGet(() -> StreamExecutionEnvironment.createLocalEnvironment(configuration));
- }
我们再次进入factory.createExecutionEnvironment方法,选择StreamContextEnvironment实现:
- public static void setAsContext(
- final PipelineExecutorServiceLoader executorServiceLoader,
- final Configuration configuration,
- final ClassLoader userCodeClassLoader,
- final boolean enforceSingleJobExecution,
- final boolean suppressSysout) {
- StreamExecutionEnvironmentFactory factory =
- conf -> {
- Configuration mergedConfiguration = new Configuration();
- mergedConfiguration.addAll(configuration);
- mergedConfiguration.addAll(conf);
-
- // TODO 初始化StreamContextEnvironment
- return new StreamContextEnvironment(
- executorServiceLoader,
- mergedConfiguration,
- userCodeClassLoader,
- enforceSingleJobExecution,
- suppressSysout);
- };
- initializeContextEnvironment(factory);
- }
可以看到,在这里初始化了StreamContextEnvironment,我们点进他的构造方法,选择父类的构造方法:
- @PublicEvolving
- public StreamExecutionEnvironment(
- final PipelineExecutorServiceLoader executorServiceLoader,
- final Configuration configuration,
- final ClassLoader userClassloader) {
- this.executorServiceLoader = checkNotNull(executorServiceLoader);
- this.configuration = new Configuration(checkNotNull(configuration));
- this.userClassloader =
- userClassloader == null ? getClass().getClassLoader() : userClassloader;
-
- /*
- TODO 进行各种组件配置
- 1.初始化得到StateBackend
- 2.初始化checkpoint相关参数
- */
-
- this.configure(this.configuration, this.userClassloader);
- }
我们在点进this.configure方法:
- @PublicEvolving
- public void configure(ReadableConfig configuration, ClassLoader classLoader) {
- configuration
- .getOptional(StreamPipelineOptions.TIME_CHARACTERISTIC)
- .ifPresent(this::setStreamTimeCharacteristic);
- // TODO 加载得到StateBackEnd
- Optional.ofNullable(loadStateBackend(configuration, classLoader))
- .ifPresent(this::setStateBackend);
- configuration
- .getOptional(PipelineOptions.OPERATOR_CHAINING)
- .ifPresent(c -> this.isChainingEnabled = c);
- configuration
- .getOptional(ExecutionOptions.BUFFER_TIMEOUT)
- .ifPresent(t -> this.setBufferTimeout(t.toMillis()));
- configuration
- .getOptional(DeploymentOptions.JOB_LISTENERS)
- .ifPresent(listeners -> registerCustomListeners(classLoader, listeners));
- configuration
- .getOptional(PipelineOptions.CACHED_FILES)
- .ifPresent(
- f -> {
- this.cacheFile.clear();
- this.cacheFile.addAll(DistributedCache.parseCachedFilesFromString(f));
- });
- configuration
- .getOptional(ExecutionOptions.RUNTIME_MODE)
- .ifPresent(
- runtimeMode ->
- this.configuration.set(ExecutionOptions.RUNTIME_MODE, runtimeMode));
- configuration
- .getOptional(ExecutionOptions.SORT_INPUTS)
- .ifPresent(
- sortInputs ->
- this.getConfiguration()
- .set(ExecutionOptions.SORT_INPUTS, sortInputs));
- configuration
- .getOptional(ExecutionOptions.USE_BATCH_STATE_BACKEND)
- .ifPresent(
- sortInputs ->
- this.getConfiguration()
- .set(ExecutionOptions.USE_BATCH_STATE_BACKEND, sortInputs));
- configuration
- .getOptional(PipelineOptions.NAME)
- .ifPresent(jobName -> this.getConfiguration().set(PipelineOptions.NAME, jobName));
- config.configure(configuration, classLoader);
-
- // TODO checkpoint相关参数的解析和配置
- /*
- TODO 1、从configuration对象中解析各种跟checkpoint有关的参数放置在CheckpointConfig对象中
- 2、将来解析各种算子,构造StreamGraph的时候,这个checkpointConfig会传递给StreamGraph
- 3、由StreamGraph去构造JobGraph的时候,会继续传递
- */
- checkpointCfg.configure(configuration);
- }
可以看到这里做了两件主要的工作:
我们首先来看状态后端的加载,点击loadStateBackend(configuration, classLoader)方法:
- private StateBackend loadStateBackend(ReadableConfig configuration, ClassLoader classLoader) {
- try {
- // TODO 获取配置中有关StateBackend的相关配置,构建StateBackend
- return StateBackendLoader.loadStateBackendFromConfig(configuration, classLoader, null);
- } catch (DynamicCodeLoadingException | IOException e) {
- throw new WrappingRuntimeException(e);
- }
- }
在点进StateBackendLoader.loadStateBackendFromConfig方法:
- public static StateBackend loadStateBackendFromConfig(
- ReadableConfig config, ClassLoader classLoader, @Nullable Logger logger)
- throws IllegalConfigurationException, DynamicCodeLoadingException, IOException {
-
- checkNotNull(config, "config");
- checkNotNull(classLoader, "classLoader");
-
- // TODO 获取StateBackend的相关配置
- final StateBackend backend =
- loadUnwrappedStateBackendFromConfig(config, classLoader, logger);
-
- checkArgument(
- !(backend instanceof DelegatingStateBackend),
- "expecting non-delegating state backend");
-
- if (config.get(CheckpointingOptions.ENABLE_STATE_CHANGE_LOG) && (backend != null)) {
- return loadChangelogStateBackend(backend, classLoader);
- } else {
- return backend;
- }
- }
可以看到,在这里获取了StateBackend的相关配置,并在最后将StateBackend对象返回了出去,我们点进loadUnwrappedStateBackendFromConfig方法:
- private static StateBackend loadUnwrappedStateBackendFromConfig(
- ReadableConfig config, ClassLoader classLoader, @Nullable Logger logger)
- throws IllegalConfigurationException, DynamicCodeLoadingException, IOException {
-
- checkNotNull(config, "config");
- checkNotNull(classLoader, "classLoader");
-
- final String backendName = config.get(StateBackendOptions.STATE_BACKEND);
- if (backendName == null) {
- return null;
- }
-
- // by default the factory class is the backend name
- String factoryClassName = backendName;
-
- switch (backendName.toLowerCase()) {
-
- case MEMORY_STATE_BACKEND_NAME:
- MemoryStateBackend backend =
- new MemoryStateBackendFactory().createFromConfig(config, classLoader);
-
- if (logger != null) {
- logger.warn(
- "MemoryStateBackend has been deprecated. Please use 'hashmap' state "
- + "backend instead with JobManagerCheckpointStorage for equivalent "
- + "functionality");
-
- logger.info("State backend is set to job manager {}", backend);
- }
-
- return backend;
-
- case FS_STATE_BACKEND_NAME:
- if (logger != null) {
- logger.warn(
- "{} state backend has been deprecated. Please use 'hashmap' state "
- + "backend instead.",
- backendName.toLowerCase());
- }
- // fall through and use the HashMapStateBackend instead which
- // utilizes the same HeapKeyedStateBackend runtime implementation.
- case HASHMAP_STATE_BACKEND_NAME:
- HashMapStateBackend hashMapStateBackend =
- new HashMapStateBackendFactory().createFromConfig(config, classLoader);
- if (logger != null) {
- logger.info("State backend is set to heap memory {}", hashMapStateBackend);
- }
- return hashMapStateBackend;
-
- // TODO 如果是rocksdb 则 RocksDB
- case ROCKSDB_STATE_BACKEND_NAME:
- factoryClassName = ROCKSDB_STATE_BACKEND_FACTORY;
-
- // fall through to the 'default' case that uses reflection to load the backend
- // that way we can keep RocksDB in a separate module
-
- default:
- ... ...
-
- return factory.createFromConfig(config, classLoader);
- }
- }
可以看到这里对配置中的StateBackend相关配置进行了匹配,在Flink1.13版本以前,Flink支持三种状态后端的配置:
1. JobManager
2. filesystem
3. rocksdb
但从Flink1.13开始,状态后端只支持两种,一种是HashMap,一种是RocksDB。
1、HashMap这种方式就是我们之前所说的,把状态存放在内存里。具体实现上,哈希表状态后端在内部会直接把状态当作对象(objects),保存在 Taskmanager 的 JVM 堆(heap)上。普通的状态,以及窗口中收集的数据和触发器(triggers),都会以键值对(key-value)的形式存储起来,所以底层是一个哈希表(HashMap),
这种状态后端也因此得名。
2、HashMapStateBackend 是内存计算,读写速度非常快;但是,状态的大小会受到集群可用内存的限制,如果应用的状态随着时间不停地增长,就会耗尽内存资源。而 RocksDB 是硬盘存储,所以可以根据可用的磁盘空间进行扩展,而且是唯一支持增量检查点的状态后端,所以它非常适合于超级海量状态的存储。不过由于每个状态的读写都需要做序列化/反序列化,而且可能需要直接从磁盘读取数据,这就会导致性能的降低,平均读写性能要比 HashMapStateBackend 慢一个数量级。
到此为止,我们StreamExecutionEnvironment的构建就已经完成了,接下来将进入算子的构建环节。
在看代码之前,我们先来聊几个概念,也就是算子的转化过程。
我们在算子内写的计算逻辑是一个Function,而算子的工作就是将这个Function封装为一个StreamOperator,最终StreamOperator也将被封装为一个Transformation,然后加入到env的Transformation的集合中,总结来说关系是这样: Function => StreamOperator => Transformation。中间还涉及到一个对象是DataStream,我们可以把DataStream看做是Function的载体,作为不同算子之间连接的桥梁。
接下来我们来看代码,在StreamExecutionEnvironment构建完成之后,我们要开始进行Source的构建,正如代码中的:
env.readTextFile(input)
这个readTextFile也是一个算子,我们以他为例来看看算子内部所做的工作,我们点进这个readTextFile方法,来到这里:
- public DataStreamSource
readTextFile(String filePath, String charsetName) { - Preconditions.checkArgument(
- !StringUtils.isNullOrWhitespaceOnly(filePath),
- "The file path must not be null or blank.");
-
- TextInputFormat format = new TextInputFormat(new Path(filePath));
- format.setFilesFilter(FilePathFilter.createDefaultFilter());
- TypeInformation
typeInfo = BasicTypeInfo.STRING_TYPE_INFO; - format.setCharsetName(charsetName);
-
- // TODO
- return readFile(format, filePath, FileProcessingMode.PROCESS_ONCE, -1, typeInfo);
- }
我们继续看readFile方法:
- @PublicEvolving
- public
DataStreamSource readFile( - FileInputFormat
inputFormat, - String filePath,
- FileProcessingMode watchType,
- long interval,
- TypeInformation
typeInformation) { -
- Preconditions.checkNotNull(inputFormat, "InputFormat must not be null.");
- Preconditions.checkArgument(
- !StringUtils.isNullOrWhitespaceOnly(filePath),
- "The file path must not be null or blank.");
-
- inputFormat.setFilePath(filePath);
- // TODO
- return createFileInput(
- inputFormat, typeInformation, "Custom File Source", watchType, interval);
- }
再进入createFileInput方法:
- private
DataStreamSource createFileInput( - FileInputFormat
inputFormat, - TypeInformation
typeInfo, - String sourceName,
- FileProcessingMode monitoringMode,
- long interval) {
-
- Preconditions.checkNotNull(inputFormat, "Unspecified file input format.");
- Preconditions.checkNotNull(typeInfo, "Unspecified output type information.");
- Preconditions.checkNotNull(sourceName, "Unspecified name for the source.");
- Preconditions.checkNotNull(monitoringMode, "Unspecified monitoring mode.");
-
- Preconditions.checkArgument(
- monitoringMode.equals(FileProcessingMode.PROCESS_ONCE)
- || interval >= ContinuousFileMonitoringFunction.MIN_MONITORING_INTERVAL,
- "The path monitoring interval cannot be less than "
- + ContinuousFileMonitoringFunction.MIN_MONITORING_INTERVAL
- + " ms.");
-
- // TODO 生成一个function
- ContinuousFileMonitoringFunction
monitoringFunction = - new ContinuousFileMonitoringFunction<>(
- inputFormat, monitoringMode, getParallelism(), interval);
-
- ContinuousFileReaderOperatorFactory
factory = - new ContinuousFileReaderOperatorFactory<>(inputFormat);
-
- final Boundedness boundedness =
- monitoringMode == FileProcessingMode.PROCESS_ONCE
- ? Boundedness.BOUNDED
- : Boundedness.CONTINUOUS_UNBOUNDED;
-
- // TODO 生成DataStreamSource
- // TODO Function => StreamOperator => Transformation
- SingleOutputStreamOperator
source = - // TODO 将function封装为DataStream
- addSource(monitoringFunction, sourceName, null, boundedness)
- // TODO 执行转换再转为Transformation,将得到的Transformation加入 Transformations集合中
- .transform("Split Reader: " + sourceName, typeInfo, factory);
-
- return new DataStreamSource<>(source);
- }
到这里,就能开始看到算子的转化步骤了:
我们首先来看Function封装为StreamOperator,并交给DataStream的过程,我们点进addSource(...)方法:
- private
DataStreamSource addSource( - final SourceFunction
function, - final String sourceName,
- @Nullable final TypeInformation
typeInfo, - final Boundedness boundedness) {
- checkNotNull(function);
- checkNotNull(sourceName);
- checkNotNull(boundedness);
-
- TypeInformation
resolvedTypeInfo = - getTypeInfo(function, sourceName, SourceFunction.class, typeInfo);
-
- boolean isParallel = function instanceof ParallelSourceFunction;
-
- clean(function);
-
- /*
- TODO 注意几个要点:
- 1.StreamSource 本身是一个StreamOperator
- 2.StreamSource 包装了Function
- 3. StreamSource作为成员变量被封装成一个Transformation
- 所以三者的关系: Function => StreamOperator => Transformation
- */
- final StreamSource
sourceOperator = new StreamSource<>(function); - return new DataStreamSource<>(
- this, resolvedTypeInfo, sourceOperator, isParallel, sourceName, boundedness);
- }
可以看到,在这里将Function封装为了一个StreamSource,根据继承关系可以看到StreamSource本身就是一个StreamOperator。接下来又将构建好的StreamSource交给了DataStreamSource,DataStreamSource的顶级父类就是一个DataStream。
在构建完成DataStream之后,我们来看Transform方法,点进来:
- @PublicEvolving
- public
SingleOutputStreamOperator transform( - String operatorName,
- TypeInformation
outTypeInfo, - OneInputStreamOperatorFactory
operatorFactory) { -
- // TODO
- return doTransform(operatorName, outTypeInfo, operatorFactory);
- }
继续进入doTransform方法:
- protected
SingleOutputStreamOperator doTransform( - String operatorName,
- TypeInformation
outTypeInfo, - StreamOperatorFactory
operatorFactory) { -
- // read the output type of the input Transform to coax out errors about MissingTypeInfo
- transformation.getOutputType();
-
- // TODO 构建出了Transformation
- OneInputTransformation
resultTransform = - new OneInputTransformation<>(
- this.transformation,
- operatorName,
- operatorFactory,
- outTypeInfo,
- environment.getParallelism());
-
- @SuppressWarnings({"unchecked", "rawtypes"})
- SingleOutputStreamOperator
returnStream = - new SingleOutputStreamOperator(environment, resultTransform);
-
- // TODO 将封装了Transformation的StreamOperator加入Transformations集合
- getExecutionEnvironment().addOperator(resultTransform);
-
- return returnStream;
- }
可以看到,在这个方法中,我们封装出了Transformation对象,并在方法尾部将Transformation对象添加进了env的Transformations集合中,然后又将我们这个Transformation对象封装进了DataStream里,继续往下游传输。
到这里,readTextFile算子的构建就已经完成了,接下来我们继续来看下一个算子:
- DataStream
> counts = - // split up the lines in pairs (2-tuples) containing: (word,1)
- // TODO
- text.flatMap(new Tokenizer())
- // group by the tuple field "0" and sum up tuple field "1"
- .keyBy(value -> value.f0)
- .sum(1);
我们首先进入flatMap算子中:
- // TODO 可以看到这里的入参为一个Function, 每一个StreamOperator都包含了一个Function
- public
SingleOutputStreamOperator flatMap(FlatMapFunction flatMapper) { -
- TypeInformation
outType = - TypeExtractor.getFlatMapReturnTypes(
- clean(flatMapper), getType(), Utils.getCallLocationName(), true);
-
- // TODO
- return flatMap(flatMapper, outType);
- }
这里可以看到,我们将一个Function对象作为参数传入了进来,我们继续看flatMap方法:
- public
SingleOutputStreamOperator flatMap( - FlatMapFunction
flatMapper, TypeInformation outputType) { - // TODO
- return transform("Flat Map", outputType, new StreamFlatMap<>(clean(flatMapper)));
- }
点进transform里:
- @PublicEvolving
- public
SingleOutputStreamOperator transform( - String operatorName,
- TypeInformation
outTypeInfo, - OneInputStreamOperator
operator) { -
- // TODO
- return doTransform(operatorName, outTypeInfo, SimpleOperatorFactory.of(operator));
- }
再进入doTransformation:
- protected
SingleOutputStreamOperator doTransform( - String operatorName,
- TypeInformation
outTypeInfo, - StreamOperatorFactory
operatorFactory) { -
- // read the output type of the input Transform to coax out errors about MissingTypeInfo
- transformation.getOutputType();
-
- // TODO 构建出了Transformation
- OneInputTransformation
resultTransform = - new OneInputTransformation<>(
- this.transformation,
- operatorName,
- operatorFactory,
- outTypeInfo,
- environment.getParallelism());
-
- @SuppressWarnings({"unchecked", "rawtypes"})
- SingleOutputStreamOperator
returnStream = - new SingleOutputStreamOperator(environment, resultTransform);
-
- // TODO 将封装了Transformation的StreamOperator加入Transformations集合
- getExecutionEnvironment().addOperator(resultTransform);
-
- return returnStream;
- }
可以看到,我们又回到了这里。基本上所有的算子都是这个处理逻辑,通过Function构建StreamOperator,再构建Transformation,在添加完Transformations集合后封装为DataStream返回继续传递给下游算子。
在完成了一系列算子的计算和转换之后,所有的算子也以Transformation的形式添加到了env的Transformations集合中,接下来我们来看env.execute方法的实现,我们点进这个方法:
- public JobExecutionResult execute(String jobName) throws Exception {
- Preconditions.checkNotNull(jobName, "Streaming Job name should not be null.");
-
- // TODO 获取到StreamGraph,并执行StreamGraph
- return execute(getStreamGraph(jobName));
- }
可以看到,这里会构建一个StreamGraph,然后再去执行这个StreamGraph。关于StreamGraph的构建流程我将在下一章中详细分析,这里就先以job的提交流程为主,我们来看这个execute方法:
- @Internal
- public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
- // 异步执行StreamGraph
- final JobClient jobClient = executeAsync(streamGraph);
-
- try {
- final JobExecutionResult jobExecutionResult;
-
- if (configuration.getBoolean(DeploymentOptions.ATTACHED)) {
- // TODO 通过get方法阻塞等待StreamGraph的提交结果
- jobExecutionResult = jobClient.getJobExecutionResult().get();
- } else {
- jobExecutionResult = new DetachedJobExecutionResult(jobClient.getJobID());
- }
-
- jobListeners.forEach(
- jobListener -> jobListener.onJobExecuted(jobExecutionResult, null));
-
- return jobExecutionResult;
- } catch (Throwable t) {
- // get() on the JobExecutionResult Future will throw an ExecutionException. This
- // behaviour was largely not there in Flink versions before the PipelineExecutor
- // refactoring so we should strip that exception.
- Throwable strippedException = ExceptionUtils.stripExecutionException(t);
-
- jobListeners.forEach(
- jobListener -> {
- jobListener.onJobExecuted(null, strippedException);
- });
- ExceptionUtils.rethrowException(strippedException);
-
- // never reached, only make javac happy
- return null;
- }
- }
可以看到,在这个方法里异步执行了这个StreamGraph,并在后面的代码中通过get方法阻塞等待StreamGraph的执行结果,我们来看StreamGraph的异步执行过程,点进executeAsync方法:
- @Internal
- public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
- checkNotNull(streamGraph, "StreamGraph cannot be null.");
- checkNotNull(
- configuration.get(DeploymentOptions.TARGET),
- "No execution.target specified in your configuration file.");
-
- final PipelineExecutorFactory executorFactory =
- executorServiceLoader.getExecutorFactory(configuration);
-
- checkNotNull(
- executorFactory,
- "Cannot find compatible factory for specified execution.target (=%s)",
- configuration.get(DeploymentOptions.TARGET));
-
- /*
- TODO 异步提交得到future
- */
- CompletableFuture
jobClientFuture = - executorFactory
- .getExecutor(configuration)
- .execute(streamGraph, configuration, userClassloader);
-
- try {
- // TODO 阻塞获取StreamGraph的执行结果
- JobClient jobClient = jobClientFuture.get();
- jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(jobClient, null));
- return jobClient;
- } catch (ExecutionException executionException) {
- final Throwable strippedException =
- ExceptionUtils.stripExecutionException(executionException);
- jobListeners.forEach(
- jobListener -> jobListener.onJobSubmitted(null, strippedException));
-
- throw new FlinkException(
- String.format("Failed to execute job '%s'.", streamGraph.getJobName()),
- strippedException);
- }
- }
可以看到在这里使用了异步编程来提交StreamGraph,我们继续点进execute方法,选择AbstractSessionClusterExecutor实现:
- // TODO 此处的pipeline参数就是StreamGraph
- @Override
- public CompletableFuture
execute( - @Nonnull final Pipeline pipeline,
- @Nonnull final Configuration configuration,
- @Nonnull final ClassLoader userCodeClassloader)
- throws Exception {
- // TODO 通过StreamGraph构建JobGraph
- final JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration);
-
- /*
- TODO 到此为止,JobGraph已经构建完成,接下来开始JobGraph的提交
- */
-
- // TODO
- try (final ClusterDescriptor
clusterDescriptor = - clusterClientFactory.createClusterDescriptor(configuration)) {
- final ClusterID clusterID = clusterClientFactory.getClusterId(configuration);
- checkState(clusterID != null);
-
- /*
- TODO 用于创建RestClusterClient的 Provider: ClusterClientProvider
- 1. 内部会初始化得到RestClusterClient
- 2. 初始化RestClusterClient的时候,会初始化他内部的成员变量: RestClient
- 3. 在初始化RestClient的时候,也会初始化他内部的一个netty客户端
- TODO 提交Job的客户端: RestClusterClient中的RestClient中的Netty客户端
- TODO 接受Job的服务端: JobManager中启动的WebMonitorEndpoint中的Netty 服务端
- */
- final ClusterClientProvider
clusterClientProvider = - clusterDescriptor.retrieve(clusterID);
- ClusterClient
clusterClient = clusterClientProvider.getClusterClient(); -
- /*
- TODO 提交执行
- 1. MiniClusterClient 本地执行
- 2. RestClusterClient 提交到Flink Rest服务器接受处理
- */
- return clusterClient
- // TODO 调用RestClient 内部的netty客户端进行提交
- .submitJob(jobGraph)
- .thenApplyAsync(
- FunctionUtils.uncheckedFunction(
- jobId -> {
- ClientUtils.waitUntilJobInitializationFinished(
- () -> clusterClient.getJobStatus(jobId).get(),
- () -> clusterClient.requestJobResult(jobId).get(),
- userCodeClassloader);
- return jobId;
- }))
- .thenApplyAsync(
- jobID ->
- (JobClient)
- new ClusterClientJobClientAdapter<>(
- clusterClientProvider,
- jobID,
- userCodeClassloader))
- .whenCompleteAsync((ignored1, ignored2) -> clusterClient.close());
- }
- }
在这里做了以下的工作:
1、通过PipelineExecutorUtils.getJobGraph方法,根据StreamGraph获取JobGraph。
2、构建了一个ClusterDescriptor对象,并使用此对象构建出ClusterClientProvider,进而构建出我们真正进行提交的对象ClusterClient。
我们首先来看ClusterClientProvider的构建过程,点进clusterDescriptor.retrieve方法,选择StandaloneClusterDescriptor实现:
- @Override
- public ClusterClientProvider
retrieve( - StandaloneClusterId standaloneClusterId) throws ClusterRetrieveException {
- return () -> {
- try {
- // TODO
- return new RestClusterClient<>(config, standaloneClusterId);
- } catch (Exception e) {
- throw new RuntimeException("Couldn't retrieve standalone cluster", e);
- }
- };
- }
在这个方法里,初始化并返回了一个RestClusterClient,我们来看构造方法:
- private RestClusterClient(
- Configuration configuration,
- @Nullable RestClient restClient,
- T clusterId,
- WaitStrategy waitStrategy,
- ClientHighAvailabilityServices clientHAServices)
- throws Exception {
- this.configuration = checkNotNull(configuration);
-
- // TODO 解析配置
- this.restClusterClientConfiguration =
- RestClusterClientConfiguration.fromConfiguration(configuration);
-
- if (restClient != null) {
- this.restClient = restClient;
- } else {
- // TODO 构建一个RestClient
- // TODO 内部其实就是构建了一个Netty客户端
- this.restClient =
- new RestClient(
- restClusterClientConfiguration.getRestClientConfiguration(),
- executorService);
- }
-
- this.waitStrategy = checkNotNull(waitStrategy);
- this.clusterId = checkNotNull(clusterId);
-
- this.clientHAServices = checkNotNull(clientHAServices);
-
- this.webMonitorRetrievalService = clientHAServices.getClusterRestEndpointLeaderRetriever();
- this.retryExecutorService =
- Executors.newSingleThreadScheduledExecutor(
- new ExecutorThreadFactory("Flink-RestClusterClient-Retry"));
- // TODO 监听WebMonitorEndpoint的地址改变
- startLeaderRetrievers();
- }
在这里主要完成了三个工作:
1、解析配置
2、构建Netty客户端
3、监听WebMonitorEndpoint的地址改变
在完成了Netty客户端的构建之后,我们继续看JobGraph的提交,我们继续看这段代码:
- /*
- TODO 提交执行
- 1. MiniClusterClient 本地执行
- 2. RestClusterClient 提交到Flink Rest服务器接受处理
- */
- return clusterClient
- // TODO 调用RestClient 内部的netty客户端进行提交
- .submitJob(jobGraph)
- .thenApplyAsync(
- FunctionUtils.uncheckedFunction(
- jobId -> {
- ClientUtils.waitUntilJobInitializationFinished(
- () -> clusterClient.getJobStatus(jobId).get(),
- () -> clusterClient.requestJobResult(jobId).get(),
- userCodeClassloader);
- return jobId;
- }))
- .thenApplyAsync(
- jobID ->
- (JobClient)
- new ClusterClientJobClientAdapter<>(
- clusterClientProvider,
- jobID,
- userCodeClassloader))
- .whenCompleteAsync((ignored1, ignored2) -> clusterClient.close());
我们点进提交方法clusterClient.submitJob,选择RestClusterClient实现,这个方法很长,我们拆分开分析,首先是这段代码:
- CompletableFuture
jobGraphFileFuture = - CompletableFuture.supplyAsync(
- () -> {
- try {
- final java.nio.file.Path jobGraphFile =
- Files.createTempFile("flink-jobgraph", ".bin");
- try (ObjectOutputStream objectOut =
- new ObjectOutputStream(
- Files.newOutputStream(jobGraphFile))) {
- objectOut.writeObject(jobGraph);
- }
- return jobGraphFile;
- } catch (IOException e) {
- throw new CompletionException(
- new FlinkException("Failed to serialize JobGraph.", e));
- }
- },
- executorService);
在这段代码里,进行的工作就是将JobGraph进行持久化,持久化成一个JobGraphFile,这个file的前缀是flink-jobgraph,后缀是 .bin。我们在提交JobGraph到Flink集群运行的时候,其实提交的就是这个文件,最终由Flink集群的WebMonitor(JobSubmitHandler)去接收请求来执行处理。JobSubmitHandler在执行处理的第一件事就是把接收到的文件反序列化得到JobGraph对象。
我们继续看下一段代码:
- /*
- TODO 等待持久化完成之后,将JobGraphFile加入待上传的文件列表
- */
- CompletableFuture
>> requestFuture = - jobGraphFileFuture.thenApply(
- jobGraphFile -> {
- List
jarFileNames = new ArrayList<>(8); - List
artifactFileNames = - new ArrayList<>(8);
- Collection
filesToUpload = new ArrayList<>(8); - // TODO 将JobGraphFile加入待上传的文件列表
- filesToUpload.add(
- new FileUpload(
- jobGraphFile, RestConstants.CONTENT_TYPE_BINARY));
- // TODO 上传Job所需的jar
- for (Path jar : jobGraph.getUserJars()) {
- jarFileNames.add(jar.getName());
- filesToUpload.add(
- new FileUpload(
- Paths.get(jar.toUri()),
- RestConstants.CONTENT_TYPE_JAR));
- }
- ... ...
- // TODO 构建提交任务的请求体,包含对应的一些资源,主要是JobGraph的持久化文件和对应的依赖jar
- final JobSubmitRequestBody requestBody =
- new JobSubmitRequestBody(
- jobGraphFile.getFileName().toString(),
- jarFileNames,
- artifactFileNames);
- // TODO 返回一个Tuple2,包含两个内容: requestBody和filesToUpload
- return Tuple2.of(
- requestBody, Collections.unmodifiableCollection(filesToUpload));
- });
在这段代码里,将JobGraphFile加入待上传的文件列表,并将job所需的jar也加入此列表,最后构建提交任务的requestBody,这个requestBody中包含了所需的一些资源,主要是JobGraph的持久化文件和对应的依赖jar。
我们继续来看下一段代码:
- // TODO 发送请求
- final CompletableFuture
submissionFuture = - requestFuture.thenCompose(
- requestAndFileUploads ->
- // TODO 提交
- sendRetriableRequest(
- JobSubmitHeaders.getInstance(),
- EmptyMessageParameters.getInstance(),
- requestAndFileUploads.f0,
- requestAndFileUploads.f1,
- isConnectionProblemOrServiceUnavailable()));
在这段代码里,我们进行了JobGraph的提交,我们点进sendRetriableRequest方法:
- private <
- M extends MessageHeaders
, - U extends MessageParameters,
- R extends RequestBody,
- P extends ResponseBody>
- CompletableFuture
sendRetriableRequest(
- M messageHeaders,
- U messageParameters,
- R request,
- Collection
filesToUpload, - Predicate
retryPredicate) { - // TODO 可重试机制
- return retry(
- () ->
- // TODO 获取主节点JobManager中的WebMonitorEndpoint的地址
- // TODO 其实客户端提交JobGraph就是提交给WebMonitorEndpoint
- getWebMonitorBaseUrl()
- .thenCompose(
- webMonitorBaseUrl -> {
- try {
- /*
- TODO 提交Request给WebMonitorEndpoint,最终由JobSubmitHandler来执行请求处理
- 通过 Http Restful方式提交
- */
- return restClient.sendRequest(
- webMonitorBaseUrl.getHost(),
- webMonitorBaseUrl.getPort(),
- messageHeaders,
- messageParameters,
- request,
- filesToUpload);
- } catch (IOException e) {
- throw new CompletionException(e);
- }
- }),
- retryPredicate);
- }
在这个方法里,首先获取WebMonitorEndpoint的地址,然后再通过http restfull的方式提交了作业任务,我们继续来看提交流程,点进restClient.sendRequest方法:
- @Override
- public <
- M extends MessageHeaders
, - U extends MessageParameters,
- R extends RequestBody,
- P extends ResponseBody>
- CompletableFuture
sendRequest(
- final String targetAddress,
- final int targetPort,
- final M messageHeaders,
- final U messageParameters,
- final R request,
- final Collection
files) - throws IOException {
- if (failHttpRequest.test(messageHeaders, messageParameters, request)) {
- return FutureUtils.completedExceptionally(new IOException("expected"));
- } else {
- // TODO 继续提交
- return super.sendRequest(
- targetAddress,
- targetPort,
- messageHeaders,
- messageParameters,
- request,
- files);
- }
- }
再点进super.sendRequest方法:
- public <
- M extends MessageHeaders
, - U extends MessageParameters,
- R extends RequestBody,
- P extends ResponseBody>
- CompletableFuture
sendRequest(
- String targetAddress,
- int targetPort,
- M messageHeaders,
- U messageParameters,
- R request,
- Collection
fileUploads, - RestAPIVersion apiVersion)
- throws IOException {
- Preconditions.checkNotNull(targetAddress);
- Preconditions.checkArgument(
- NetUtils.isValidHostPort(targetPort),
- "The target port " + targetPort + " is not in the range [0, 65535].");
- Preconditions.checkNotNull(messageHeaders);
- ... ...
- ... ...
-
- /*
- TODO 处理得到url,然后决定使用WebMonitorEndpoint中的哪个Handler来执行处理
- */
- String versionedHandlerURL =
- "/" + apiVersion.getURLVersionPrefix() + messageHeaders.getTargetRestEndpointURL();
- String targetUrl = MessageParameters.resolveUrl(versionedHandlerURL, messageParameters);
-
- LOG.debug(
- "Sending request of class {} to {}:{}{}",
- request.getClass(),
- targetAddress,
- targetPort,
- targetUrl);
- // serialize payload
- StringWriter sw = new StringWriter();
- objectMapper.writeValue(sw, request);
- ByteBuf payload =
- Unpooled.wrappedBuffer(sw.toString().getBytes(ConfigConstants.DEFAULT_CHARSET));
-
- // TODO 构建一个http Request对象
- Request httpRequest =
- createRequest(
- targetAddress + ':' + targetPort,
- targetUrl,
- messageHeaders.getHttpMethod().getNettyHttpMethod(),
- payload,
- fileUploads);
-
- final JavaType responseType;
-
- final Collection
> typeParameters = messageHeaders.getResponseTypeParameters(); -
- if (typeParameters.isEmpty()) {
- responseType = objectMapper.constructType(messageHeaders.getResponseClass());
- } else {
- responseType =
- objectMapper
- .getTypeFactory()
- .constructParametricType(
- messageHeaders.getResponseClass(),
- typeParameters.toArray(new Class>[typeParameters.size()]));
- }
-
- // TODO 提交请求
- return submitRequest(targetAddress, targetPort, httpRequest, responseType);
- }
在这个方法里,做了以下工作;
我们继续看请求的提交,点进submitRequest方法里:
- private
extends ResponseBody> CompletableFuture
submitRequest(
- String targetAddress, int targetPort, Request httpRequest, JavaType responseType) {
- /*
- TODO 通过netty客户端发送请求给netty服务端
- */
- final ChannelFuture connectFuture = bootstrap.connect(targetAddress, targetPort);
-
- final CompletableFuture
channelFuture = new CompletableFuture<>(); -
- connectFuture.addListener(
- (ChannelFuture future) -> {
- if (future.isSuccess()) {
- channelFuture.complete(future.channel());
- } else {
- channelFuture.completeExceptionally(future.cause());
- }
- });
-
- return channelFuture
- .thenComposeAsync(
- channel -> {
- ClientHandler handler = channel.pipeline().get(ClientHandler.class);
-
- CompletableFuture
future; - boolean success = false;
-
- try {
- if (handler == null) {
- throw new IOException(
- "Netty pipeline was not properly initialized.");
- } else {
- // TODO 发送请求数据包到服务端
- httpRequest.writeTo(channel);
- future = handler.getJsonFuture();
- success = true;
- }
- } catch (IOException e) {
- future =
- FutureUtils.completedExceptionally(
- new ConnectionException(
- "Could not write request.", e));
- } finally {
- if (!success) {
- channel.close();
- }
- }
-
- return future;
- },
- executor)
- .thenComposeAsync(
- (JsonResponse rawResponse) -> parseResponse(rawResponse, responseType),
- executor);
- }
可以看到,这里使用了bootstrap.connect去连接netty服务端,在连接成功后,调用httpRequest.writeTo(channel);方法发送数据。
这里的bootstrap是netty客户端的引导程序,主节点启动的时候,启动了WebMonitorEndpoint的组件,这个组件在启动的时候启动了Netty的服务端,然后客户端提交Job的时候,其实是通过RestClient提交的.在初始化RestClient的时候就初始化了Netty客户端。 如果调用 submitRequest(...)方法,就会执行请求的提交,netty客户端链接netty服务端,发送请求,其实就是将Request请求对象的数据写入服务端。
到此为止,我们的Job就已经提交给了主节点的WebMonitorEndpoint了,在本章中没有对StreamGraph和JobGraph的构建流程进行详细的讲解,我计划在后续章节中分别来分析这两个Graph的构建。
在StreamExecutionEnvironment初始化的工作中,主要做了两件事,分别是StateBackend的配置和Checkpoint的配置
在 Flink 应用程序中,其实所有的操作,都是 StreamOperator,分为 SourceOperator, StreamOperator,SinkOperator,然后能被优化的 Operator 就会 chain 在一起,形成一个 OperatorChain。
算子的转换流程为: Function => StreamOperator => Transformation => OperatorChain(并行化之后,得到 StreamTask 执行)。
在env.execute环节中,根据我们构建的Transformations集合,构建出StreamGraph,再将StreamGraph转化为JobGraph,并将JobGraph持久化,最终将我们的JobGraphFile以及依赖Jar以及其他一些配置构建为一个RequestBody,通过RestClient内部构建的Netty客户端发送至JobManager中的WebMonitorEndpoint中的Netty 服务端,再由Netty服务端解析url交给对应的handler处理。