• Flink on yarn 实战和源码分析


    版本:1.13.6

    目录

    Flink on yarn 的3种模式的使用

    yarn session 模式源码分析

    yarn per-job模式源码分析

    application模式源码分析


    Flink on yarn 的3种模式的使用

    Application Mode #

    ./bin/flink run-application -t yarn-application ./examples/streaming/TopSpeedWindowing.jar

    Per-Job Mode #

    ./bin/flink run -t yarn-per-job --detached ./examples/streaming/TopSpeedWindowing.jar

    Session Mode #

    ./bin/flink run -t yarn-session ./examples/streaming/TopSpeedWindowing.jar

    注意:application 方式 使用的action为 run-application,而 per-job session mode 都是 run -t。 为什么不统一呢,官方没注意到?

    yarn session 模式源码分析

    以样例TopSpeedWindowing.jar为例,命令行提交命令为:

    ./bin/flink run -t yarn-session ./examples/streaming/TopSpeedWindowing.jar

    根据bin/flink脚本中的入口类 org.apache.flink.client.cli.CliFrontend

    1、找到bin/flink 提交脚本中看到启动类即程序的入口是: org.apache.flink.client.cli.CliFrontend

    2、查看其中的main方法,执行的逻辑简单总结如下:

    1. 获取flink的conf目录的路径

    2. 根据conf路径,加载配置

    3. 封装命令行接口:按顺序Generic、Yarn、Default   

    1. /** Submits the job based on the arguments. */
    2. public static void main(final String[] args) {
    3. EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);
    4. // 1. find the configuration directory
    5. final String configurationDirectory = getConfigurationDirectoryFromEnv();
    6. // 2. load the global configuration
    7. final Configuration configuration =
    8. GlobalConfiguration.loadConfiguration(configurationDirectory);
    9. // 3. load the custom command lines
    10. final List customCommandLines =
    11. loadCustomCommandLines(configuration, configurationDirectory);
    12. int retCode = 31;
    13. try {
    14. final CliFrontend cli = new CliFrontend(configuration, customCommandLines);
    15. SecurityUtils.install(new SecurityConfiguration(cli.configuration));
    16. retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.parseAndRun(args));
    17. } catch (Throwable t) {
    18. final Throwable strippedThrowable =
    19. ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
    20. LOG.error("Fatal error while running command line interface.", strippedThrowable);
    21. strippedThrowable.printStackTrace();
    22. } finally {
    23. System.exit(retCode);
    24. }
    25. }

    parseAndRun 方法中 根据命令行的第一个参数匹配action ,此模式参数为run ,所以 case ACTION_RUN:

    1. /**
    2. * Parses the command line arguments and starts the requested action.
    3. *
    4. * @param args command line arguments of the client.
    5. * @return The return code of the program
    6. */
    7. public int parseAndRun(String[] args) {
    8. // check for action
    9. if (args.length < 1) {
    10. CliFrontendParser.printHelp(customCommandLines);
    11. System.out.println("Please specify an action.");
    12. return 1;
    13. }
    14. // get action
    15. String action = args[0];
    16. // remove action from parameters
    17. final String[] params = Arrays.copyOfRange(args, 1, args.length);
    18. try {
    19. // do action
    20. switch (action) {
    21. case ACTION_RUN:
    22. run(params);
    23. return 0;
    24. case ACTION_RUN_APPLICATION:
    25. runApplication(params);
    26. return 0;
    27. case ACTION_LIST:
    28. list(params);
    29. return 0;
    30. case ACTION_INFO:
    31. info(params);
    32. return 0;
    33. case ACTION_CANCEL:
    34. cancel(params);
    35. return 0;
    36. case ACTION_STOP:
    37. stop(params);
    38. return 0;
    39. case ACTION_SAVEPOINT:
    40. savepoint(params);
    41. return 0;
    42. case "-h":
    43. case "--help":
    44. CliFrontendParser.printHelp(customCommandLines);
    45. return 0;
    46. case "-v":
    47. case "--version":
    48. String version = EnvironmentInformation.getVersion();
    49. String commitID = EnvironmentInformation.getRevisionInformation().commitId;
    50. System.out.print("Version: " + version);
    51. System.out.println(
    52. commitID.equals(EnvironmentInformation.UNKNOWN)
    53. ? ""
    54. : ", Commit ID: " + commitID);
    55. return 0;
    56. default:
    57. System.out.printf("\"%s\" is not a valid action.\n", action);
    58. System.out.println();
    59. System.out.println(
    60. "Valid actions are \"run\", \"run-application\", \"list\", \"info\", \"savepoint\", \"stop\", or \"cancel\".");
    61. System.out.println();
    62. System.out.println(
    63. "Specify the version option (-v or --version) to print Flink version.");
    64. System.out.println();
    65. System.out.println(
    66. "Specify the help option (-h or --help) to get help on the command.");
    67. return 1;
    68. }
    69. } catch (CliArgsException ce) {
    70. return handleArgException(ce);
    71. } catch (ProgramParametrizationException ppe) {
    72. return handleParametrizationException(ppe);
    73. } catch (ProgramMissingJobException pmje) {
    74. return handleMissingJobException();
    75. } catch (Exception e) {
    76. return handleError(e);
    77. }
    78. }

    1. /**
    2. * Executions the run action.
    3. *
    4. * @param args Command line arguments for the run action.
    5. */
    6. protected void run(String[] args) throws Exception {
    7. LOG.info("Running 'run' command.");
    8. //获取run的动作,默认的配置项
    9. final Options commandOptions = CliFrontendParser.getRunCommandOptions();
    10. //根据用户指定的配置项,进行解析 例如-t -p -c等
    11. final CommandLine commandLine = getCommandLine(commandOptions, args, true);
    12. // evaluate help flag,只要参数中包含help 打印后结束
    13. if (commandLine.hasOption(HELP_OPTION.getOpt())) {
    14. CliFrontendParser.printHelpForRun(customCommandLines);
    15. return;
    16. }
    17. final CustomCommandLine activeCommandLine =
    18. validateAndGetActiveCommandLine(checkNotNull(commandLine));
    19. final ProgramOptions programOptions = ProgramOptions.create(commandLine);
    20. //获取 用户的jar包和其他依赖
    21. final List jobJars = getJobJarAndDependencies(programOptions);
    22. //获取有效配置:HA的id、Target(session、per-job)、JobManager内存、TaskManager内存、每个TM的slot数
    23. final Configuration effectiveConfiguration =
    24. getEffectiveConfiguration(activeCommandLine, commandLine, programOptions, jobJars);
    25. LOG.debug("Effective executor configuration: {}", effectiveConfiguration);
    26. // PackagedProgram 类很关键,
    27. try (PackagedProgram program = getPackagedProgram(programOptions, effectiveConfiguration)) {
    28. // 执行程序
    29. executeProgram(effectiveConfiguration, program);
    30. }
    31. }

    yarn per-job模式源码分析

    以样例TopSpeedWindowing.jar为例,命令行提交命令为:

    ./bin/flink run -t yarn-per-job --detached ./examples/streaming/TopSpeedWindowing.jar

    application模式源码分析

    Flink 1.11 版本引入了 Application 模式。

    应用模式是唯一一个main方法不在客户端上执行的。其他两种模式都要在本地管理依赖资源,运行main方法然后生成JobGraph并提交到集群,不仅增加了网络传输的压力,还消耗了客户端大量的CPU资源。为了解决这个问题,应用模式先把用户jar包等资源提交到资源平台,然后创建Flink集群 并 自动在服务器上运行应用main方法。在main方法中可以通过execute或executeAysnc提交任务并记录提交作业id,作业执行完毕后,集群自动关闭。因此这种模式可以跟Per Job一样做到单任务的资源隔离,同时也可以解决客户端的计算瓶颈问题。

    另外,应用模式支持提交多个作业,作业的顺序依赖于启动顺序。如果使用的是execute()会阻塞按顺序执行;如果使用的是executeAysnc,各个任务可能会乱序执行。

    bin目录下 flink脚本内容,可以看到 入口类为 org.apache.flink.client.cli.CliFrontend

    1. #!/usr/bin/env bash
    2. ################################################################################
    3. # Licensed to the Apache Software Foundation (ASF) under one
    4. # or more contributor license agreements. See the NOTICE file
    5. # distributed with this work for additional information
    6. # regarding copyright ownership. The ASF licenses this file
    7. # to you under the Apache License, Version 2.0 (the
    8. # "License"); you may not use this file except in compliance
    9. # with the License. You may obtain a copy of the License at
    10. #
    11. # http://www.apache.org/licenses/LICENSE-2.0
    12. #
    13. # Unless required by applicable law or agreed to in writing, software
    14. # distributed under the License is distributed on an "AS IS" BASIS,
    15. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    16. # See the License for the specific language governing permissions and
    17. # limitations under the License.
    18. ################################################################################
    19. target="$0"
    20. # For the case, the executable has been directly symlinked, figure out
    21. # the correct bin path by following its symlink up to an upper bound.
    22. # Note: we can't use the readlink utility here if we want to be POSIX
    23. # compatible.
    24. iteration=0
    25. while [ -L "$target" ]; do
    26. if [ "$iteration" -gt 100 ]; then
    27. echo "Cannot resolve path: You have a cyclic symlink in $target."
    28. break
    29. fi
    30. ls=`ls -ld -- "$target"`
    31. target=`expr "$ls" : '.* -> \(.*\)$'`
    32. iteration=$((iteration + 1))
    33. done
    34. # Convert relative path to absolute path
    35. bin=`dirname "$target"`
    36. # get flink config
    37. . "$bin"/config.sh
    38. if [ "$FLINK_IDENT_STRING" = "" ]; then
    39. FLINK_IDENT_STRING="$USER"
    40. fi
    41. CC_CLASSPATH=`constructFlinkClassPath`
    42. log=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-client-$HOSTNAME.log
    43. log_setting=(-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j-cli.properties -Dlog4j.configurationFile=file:"$FLINK_CONF_DIR"/log4j-cli.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml)
    44. # Add Client-specific JVM options
    45. FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_CLI}"
    46. # Add HADOOP_CLASSPATH to allow the usage of Hadoop file systems
    47. exec "${JAVA_RUN}" $JVM_ARGS $FLINK_ENV_JAVA_OPTS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.client.cli.CliFrontend "$@"

    使用application方式提交样例作业

    hadoop1 jps

    hadoop2 jps

    hadoop3 jps

    jobmanager

    taskmanager -p 3 1slot/taskmanager 所以 需要启动3个container

  • 相关阅读:
    【springboot】你了解@Autowired 和 @Resource吗?@Autowired 和 @Resource深入分析
    Android 摇一摇功能实现,重力加速度大于15
    [动态规划] (十四) 简单多状态 LeetCode LCR 091.粉刷房子
    【毕业设计】机器学习的员工离职模型研究-python
    Vue2详解
    【leetcode】排序数组中两个数字之和
    《第一行代码Andorid》阅读笔记-第一章
    爬虫基础入门
    基本概念 I 和 Q:I/Q 数据的基础知识
    无良软件测试培训机构,退退退
  • 原文地址:https://blog.csdn.net/yelangshisan/article/details/133265734