• Flink Yarn Per Job - 启动AM


    图片

    启动AM

    YarnClusterDescriptor

    private ClusterClientProvider deployInternal(
        ClusterSpecification clusterSpecification,
        String applicationName,
        String yarnClusterEntrypoint,
        @Nullable JobGraph jobGraph,
        boolean detached) throws Exception {
      ...
      /*TODO 开始启动AM*/
      ApplicationReport report = startAppMaster(
          flinkConfiguration,
          applicationName,
          yarnClusterEntrypoint,
          jobGraph,
          yarnClient,
          yarnApplication,
          validClusterSpecification);
    
       ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    private ApplicationReport startAppMaster(

        Configuration configuration,

        String applicationName,

        String yarnClusterEntrypoint,

        JobGraph jobGraph,

        YarnClient yarnClient,

        YarnClientApplication yarnApplication,

        ClusterSpecification clusterSpecification) throws Exception {


      // ------------------ Initialize the file systems -------------------------

      1.0  初始化、创建 Hadoop的 FileSystem

      org.apache.flink.core.fs.FileSystem.initialize(

          configuration,

          PluginUtils.createPluginManagerFromRootFolder(configuration));


      final FileSystem fs = FileSystem.get(yarnConfiguration);

    ... ...


      ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext();


      final List providedLibDirs = Utils.getQualifiedRemoteSharedPaths(configuration, yarnConfiguration);


      2.0  Yarn应用的文件上传器:FS、对应的HDFS路径

      2.0   用来上传:用户jar包、flink的依赖、flink的配置文件

      2.0  直接 fileUploader.close()

      

      final YarnApplicationFileUploader fileUploader = YarnApplicationFileUploader.from(

        fs,

        getStagingDir(fs),

        providedLibDirs,

        appContext.getApplicationId(),

        getFileReplication());


    // The files need to be shipped and added to classpath.

    // 日志文件、lib目录下除了dist的jar 包

    SetsystemShipFiles = new HashSet<>(shipFiles.size());
    for (File file : shipFiles) {
    systemShipFiles.add(file.getAbsoluteFile());
    }

    final String logConfigFilePath = configuration.getString(YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE);
    if (logConfigFilePath != null) {
    systemShipFiles.add(new File(logConfigFilePath));
    }

    ... ...


      // Set-up ApplicationSubmissionContext for the application

      final ApplicationId appId = appContext.getApplicationId();


    ... ...


      configuration.setString(HighAvailabilityOptions.HA_CLUSTER_ID, zkNamespace);


      3.0   高可用配置:重试次数,默认2次

      3.0  DEFAULT_RM_AM_MAX_ATTEMPTS=2

      if (HighAvailabilityMode.isHighAvailabilityModeActivated(configuration)) {

        // activate re-execution of failed applications

        appContext.setMaxAppAttempts(

            configuration.getInteger(

                YarnConfigOptions.APPLICATION_ATTEMPTS.key(),

                YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS));


        activateHighAvailabilitySupport(appContext);

      } else {

        // set number of application retries to 1 in the default case

        3.1 不是高可用重试次数为1次

        appContext.setMaxAppAttempts(

            configuration.getInteger(

                YarnConfigOptions.APPLICATION_ATTEMPTS.key(),

                1));

      }


      4.0 添加用户jar包

      final Set userJarFiles = new HashSet<>();

      if (jobGraph != null) {

        userJarFiles.addAll(jobGraph.getUserJars().stream().map(f -> f.toUri()).map(Path::new).collect(Collectors.toSet()));

      }


      final List jarUrls = ConfigUtils.decodeListFromConfig(configuration, PipelineOptions.JARS, URI::create);

      if (jarUrls != null && YarnApplicationClusterEntryPoint.class.getName().equals(yarnClusterEntrypoint)) {

        userJarFiles.addAll(jarUrls.stream().map(Path::new).collect(Collectors.toSet()));

      }


    ... ...


      // upload and register ship-only files

      // Plugin files only need to be shipped and should not be added to classpath.

     plugins目录下的文件

      if (providedLibDirs == null || providedLibDirs.isEmpty()) {

        Set shipOnlyFiles = new HashSet<>();

        addPluginsFoldersToShipFiles(shipOnlyFiles);

        fileUploader.registerMultipleLocalResources(

            shipOnlyFiles.stream().map(e -> new Path(e.toURI())).collect(Collectors.toSet()),

            Path.CUR_DIR,

            LocalResourceType.FILE);

      }


      if (!shipArchives.isEmpty()) {

        fileUploader.registerMultipleLocalResources(

          shipArchives.stream().map(e -> new Path(e.toURI())).collect(Collectors.toSet()),

          Path.CUR_DIR,

          LocalResourceType.ARCHIVE);

      }


      5.0 Upload and register user jars

    上传和注册用户jar包

      final List userClassPaths = fileUploader.registerMultipleLocalResources(

        userJarFiles,

        userJarInclusion == YarnConfigOptions.UserJarInclusion.DISABLED

            ? ConfigConstants.DEFAULT_FLINK_USR_LIB_DIR

            : Path.CUR_DIR,

        LocalResourceType.FILE);


      if (userJarInclusion == YarnConfigOptions.UserJarInclusion.ORDER) {

        systemClassPaths.addAll(userClassPaths);

      }


      // normalize classpath by sorting

      Collections.sort(systemClassPaths);

      Collections.sort(userClassPaths);


      // classpath assembler

      StringBuilder classPathBuilder = new StringBuilder();

      if (userJarInclusion == YarnConfigOptions.UserJarInclusion.FIRST) {

        for (String userClassPath : userClassPaths) {

          classPathBuilder.append(userClassPath).append(File.pathSeparator);

        }

      }

      for (String classPath : systemClassPaths) {

        classPathBuilder.append(classPath).append(File.pathSeparator);

      }


      // Setup jar for ApplicationMaster

      final YarnLocalResourceDescriptor localResourceDescFlinkJar = fileUploader.uploadFlinkDist(flinkJarPath);

      classPathBuilder.append(localResourceDescFlinkJar.getResourceKey()).append(File.pathSeparator);


      // write job graph to tmp file and add it to local resource

    // 将jobGraph写入到 临时文件,并且添加到本地资源


      // TODO: server use user main method to generate job graph

    // 该服务用用户类的main方法生成JobGraph

      if (jobGraph != null) {

        File tmpJobGraphFile = null;

        try {

          tmpJobGraphFile = File.createTempFile(appId.toString(), null);

          try (FileOutputStream output = new FileOutputStream(tmpJobGraphFile);

            ObjectOutputStream obOutput = new ObjectOutputStream(output)) {

            obOutput.writeObject(jobGraph);

          }


          final String jobGraphFilename = "job.graph";

          configuration.setString(JOB_GRAPH_FILE_PATH, jobGraphFilename);


          fileUploader.registerSingleLocalResource(

            jobGraphFilename,

            new Path(tmpJobGraphFile.toURI()),

            "",

            LocalResourceType.FILE,

            true,

            false);

          classPathBuilder.append(jobGraphFilename).append(File.pathSeparator);

        } catch (Exception e) {

          LOG.warn("Add job graph to local resource fail.");

          throw e;

        } finally {

          if (tmpJobGraphFile != null && !tmpJobGraphFile.delete()) {

            LOG.warn("Fail to delete temporary file {}.", tmpJobGraphFile.toPath());

          }

        }

      }


      6.0 Upload the flink configuration

      6.0 write out configuration file

      // 上传Flink的配置文件 - flink-conf.yaml

      File tmpConfigurationFile = null;

      try {

        tmpConfigurationFile = File.createTempFile(appId + "-flink-conf.yaml", null);

        BootstrapTools.writeConfiguration(configuration, tmpConfigurationFile);


        String flinkConfigKey = "flink-conf.yaml";

        fileUploader.registerSingleLocalResource(

          flinkConfigKey,

          new Path(tmpConfigurationFile.getAbsolutePath()),

          "",

          LocalResourceType.FILE,

          true,

          true);

        classPathBuilder.append("flink-conf.yaml").append(File.pathSeparator);

      } finally {

        if (tmpConfigurationFile != null && !tmpConfigurationFile.delete()) {

          LOG.warn("Fail to delete temporary file {}.", tmpConfigurationFile.toPath());

        }

      }


      if (userJarInclusion == YarnConfigOptions.UserJarInclusion.LAST) {

        for (String userClassPath : userClassPaths) {

          classPathBuilder.append(userClassPath).append(File.pathSeparator);

        }

      }


    ... ... 



      7.0  jobmanager内存配置

     TOTAL_PROCESS_MEMORY=jobmanager.memory.process.size

      final JobManagerProcessSpec processSpec = JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap(

        flinkConfiguration,

        JobManagerOptions.TOTAL_PROCESS_MEMORY);

      final ContainerLaunchContext amContainer = setupApplicationMasterContainer(

          yarnClusterEntrypoint,

          hasKrb5,

          processSpec);


     

      amContainer.setLocalResources(fileUploader.getRegisteredLocalResources());


    8.0 关闭上传器  

    fileUploader.close();


      // Setup CLASSPATH and environment variables for ApplicationMaster

      9.0 创建Map,用来存储 AM的环境变量和类路径

      final Map appMasterEnv = new HashMap<>();

      9.1 set user specified app master environment variables

      appMasterEnv.putAll(

        ConfigurationUtils.getPrefixedKeyValuePairs(ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX, configuration));

      9.2 set Flink app class path

      appMasterEnv.put(YarnConfigKeys.ENV_FLINK_CLASSPATH, classPathBuilder.toString());


      9.3 set Flink on YARN internal configuration values

      appMasterEnv.put(YarnConfigKeys.FLINK_DIST_JAR, localResourceDescFlinkJar.toString());

      appMasterEnv.put(YarnConfigKeys.ENV_APP_ID, appId.toString());

      appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_HOME_DIR, fileUploader.getHomeDir().toString());

      appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_SHIP_FILES, encodeYarnLocalResourceDescriptorListToString(fileUploader.getEnvShipResourceList()));

      appMasterEnv.put(YarnConfigKeys.ENV_ZOOKEEPER_NAMESPACE, getZookeeperNamespace());

      appMasterEnv.put(YarnConfigKeys.FLINK_YARN_FILES, fileUploader.getApplicationDir().toUri().toString());


      // https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnApplicationSecurity.md#identity-on-an-insecure-cluster-hadoop_user_name

      appMasterEnv.put(YarnConfigKeys.ENV_HADOOP_USER_NAME, UserGroupInformation.getCurrentUser().getUserName());


      if (localizedKeytabPath != null) {

        appMasterEnv.put(YarnConfigKeys.LOCAL_KEYTAB_PATH, localizedKeytabPath);

        String principal = configuration.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL);

        appMasterEnv.put(YarnConfigKeys.KEYTAB_PRINCIPAL, principal);

        if (remotePathKeytab != null) {

          appMasterEnv.put(YarnConfigKeys.REMOTE_KEYTAB_PATH, remotePathKeytab.toString());

        }

      }


      ... ...



       10.0 将之前封装的 Map(AM的环境信息、类路径),设置到容器里

      amContainer.setEnvironment(appMasterEnv);


      // Set up resource type requirements for ApplicationMaster

      Resource capability = Records.newRecord(Resource.class);

      capability.setMemory(clusterSpecification.getMasterMemoryMB());

      capability.setVirtualCores(flinkConfiguration.getInteger(YarnConfigOptions.APP_MASTER_VCORES));


      final String customApplicationName = customName != null ? customName : applicationName;


      appContext.setApplicationName(customApplicationName);

      appContext.setApplicationType(applicationType != null ? applicationType : "Apache Flink");

      appContext.setAMContainerSpec(amContainer);

      appContext.setResource(capability);


      // Set priority for application

      int priorityNum = flinkConfiguration.getInteger(YarnConfigOptions.APPLICATION_PRIORITY);

      if (priorityNum >= 0) {

        Priority priority = Priority.newInstance(priorityNum);

        appContext.setPriority(priority);

      }


      if (yarnQueue != null) {

        appContext.setQueue(yarnQueue);

      }


      setApplicationNodeLabel(appContext);


      setApplicationTags(appContext);


      // add a hook to clean up in case deployment fails

      Thread deploymentFailureHook = new DeploymentFailureHook(yarnApplication, fileUploader.getApplicationDir());

      Runtime.getRuntime().addShutdownHook(deploymentFailureHook);

      LOG.info("Submitting application master " + appId);


      11.0 可以提交应用


      yarnClient.submitApplication(appContext);


      LOG.info("Waiting for the cluster to be allocated");

      final long startTime = System.currentTimeMillis();

      ApplicationReport report;

      YarnApplicationState lastAppState = YarnApplicationState.NEW;

     ... ...


      }


      // since deployment was successful, remove the hook

      ShutdownHookUtil.removeShutdownHook(deploymentFailureHook, getClass().getSimpleName(), LOG);

      return report;

    }

    startAppMaster主要功能:

    1. 初始化、创建 Hadoop的 FileSystem

    2. Yarn应用的文件上传器:FS对应的HDFS路径;用来上传:用户jar包、flink的依赖、flink的配置文件

    3. 高可用配置:重试次数,默认2次(DEFAULT_RM_AM_MAX_ATTEMPTS=2)。不是高可用重试次数为1次

    4. 添加用户jar包, plugins目录下的文件

    5. 上传和注册用户jar包

    6. 将jobGraph写入到 临时文件,并且添加到本地资源

    7. 该服务用用户类的main方法生成JobGraph

    8. 上传Flink的配置文件 - flink-conf.yaml

    9. jobmanager内存配置TOTAL_PROCESS_MEMORY=jobmanager.memory.process.size

    10. 创建Map,用来存储 AM的环境变量和类路径

    11. 将之前封装的 Map(AM的环境信息、类路径),设置到容器里

    12. 提交应用

    JM的内存配置

    YarnClusterDescriptor

    final JobManagerProcessSpec processSpec = JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap(
      flinkConfiguration,
      JobManagerOptions.TOTAL_PROCESS_MEMORY);
    
    • 1
    • 2
    • 3

    JobManagerProcessUtils

    public static JobManagerProcessSpec processSpecFromConfigWithNewOptionToInterpretLegacyHeap(
        Configuration config,
        ConfigOption newOptionToInterpretLegacyHeap) {
      try {
        return processSpecFromConfig(
          getConfigurationWithLegacyHeapSizeMappedToNewConfigOption(
            config,
            newOptionToInterpretLegacyHeap));
      } catch (IllegalConfigurationException e) {
        throw new IllegalConfigurationException("JobManager memory configuration failed: " + e.getMessage(), e);
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    static JobManagerProcessSpec processSpecFromConfig(Configuration config) {
      return createMemoryProcessSpec(PROCESS_MEMORY_UTILS.memoryProcessSpecFromConfig(config));
    }
    
    private static JobManagerProcessSpec createMemoryProcessSpec(
        CommonProcessMemorySpec processMemory) {
      return new JobManagerProcessSpec(processMemory.getFlinkMemory(), processMemory.getJvmMetaspaceAndOverhead());
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    JobManagerProcessSpec extends CommonProcessMemorySpe
    
    • 1
    /**
    * Describe the specifics of different resource dimensions of the JobManager process.
    *
    *

    A JobManager's memory consists of the following components:
    *

      *
    • JVM Heap Memory
    • *
    • Off-heap Memory
    • *
    • JVM Metaspace
    • *
    • JVM Overhead
    • *
    * We use Total Process Memory to refer to all the memory components, while Total Flink Memory refering to all
    * the components except JVM Metaspace and JVM Overhead.
    *
    *

    The relationships of JobManager memory components are shown below.
    *

     *               ┌ ─ ─ Total Process Memory  ─ ─ ┐
    * ┌ ─ ─ Total Flink Memory ─ ─ ┐
    * │ ┌───────────────────────────┐ │
    * On-Heap ----- ││ JVM Heap Memory ││
    * │ └───────────────────────────┘ │
    * │ ┌───────────────────────────┐ │
    * ┌─ ││ Off-heap Memory ││
    * │ │ └───────────────────────────┘ │
    * │ └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
    * │ │┌─────────────────────────────┐│
    * Off-Heap ─| │ JVM Metaspace │
    * │ │└─────────────────────────────┘│
    * │ ┌─────────────────────────────┐
    * └─ ││ JVM Overhead ││
    * └─────────────────────────────┘
    * └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
    *
    */
    public class JobManagerProcessSpec extends CommonProcessMemorySpec {
    private static final long serialVersionUID = 1L;

    JobManagerProcessSpec(JobManagerFlinkMemory flinkMemory, JvmMetaspaceAndOverhead jvmMetaspaceAndOverhead) {
    super(flinkMemory, jvmMetaspaceAndOverhead);
    }

    @VisibleForTesting
    public JobManagerProcessSpec(
    MemorySize jvmHeapSize,
    MemorySize offHeapSize,
    MemorySize jvmMetaspaceSize,
    MemorySize jvmOverheadSize) {
    this(new JobManagerFlinkMemory(jvmHeapSize, offHeapSize), new JvmMetaspaceAndOverhead(jvmMetaspaceSize, jvmOverheadSize));
    }

    @Override
    public String toString() {
    return "JobManagerProcessSpec {" +
    "jvmHeapSize=" + getJvmHeapMemorySize().toHumanReadableString() + ", " +
    "offHeapSize=" + getJvmDirectMemorySize().toHumanReadableString() + ", " +
    "jvmMetaspaceSize=" + getJvmMetaspaceSize().toHumanReadableString() + ", " +
    "jvmOverheadSize=" + getJvmOverheadSize().toHumanReadableString() + '}';
    }
    }

    CommonProcessMemorySpec

    父类构造器为flinkMemory和jvmMetaspaceAndOverhead赋值

    public class CommonProcessMemorySpec implements ProcessMemorySpec {
      private static final long serialVersionUID = 1L;
    
      private final FM flinkMemory;
      private final JvmMetaspaceAndOverhead jvmMetaspaceAndOverhead;
    
      protected CommonProcessMemorySpec(FM flinkMemory, JvmMetaspaceAndOverhead jvmMetaspaceAndOverhead) {
        this.flinkMemory = flinkMemory;
        this.jvmMetaspaceAndOverhead = jvmMetaspaceAndOverhead;
      }
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    amContainer配置

    YarnClusterDescriptor

    final ContainerLaunchContext amContainer = setupApplicationMasterContainer(
          yarnClusterEntrypoint,
          hasKrb5,
          processSpec);
    
    • 1
    • 2
    • 3
    • 4

    YarnClusterDescriptor

    ContainerLaunchContext setupApplicationMasterContainer(
    String yarnClusterEntrypoint,
    boolean hasKrb5,
    JobManagerProcessSpec processSpec) {
    // ------------------ Prepare Application Master Container ------------------------------
    // respect custom JVM options in the YAML file
    String javaOpts = flinkConfiguration.getString(CoreOptions.FLINK_JVM_OPTIONS);
    if (flinkConfiguration.getString(CoreOptions.FLINK_JM_JVM_OPTIONS).length() > 0) {
    javaOpts += " " + flinkConfiguration.getString(CoreOptions.FLINK_JM_JVM_OPTIONS);
    }

    //krb5.conf file will be available as local resource in JM/TM container
    if (hasKrb5) {
    javaOpts += " -Djava.security.krb5.conf=krb5.conf";
    }

    // Set up the container launch context for the application master

    // 设置AM启动的上下文

    ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);


    final Map startCommandValues = new HashMap<>();
    startCommandValues.put("java", "$JAVA_HOME/bin/java");

    String jvmHeapMem = JobManagerProcessUtils.generateJvmParametersStr(processSpec, flinkConfiguration);
    startCommandValues.put("jvmmem", jvmHeapMem);

    startCommandValues.put("jvmopts", javaOpts);
    startCommandValues.put("logging", YarnLogConfigUtil.getLoggingYarnCommand(flinkConfiguration));

    startCommandValues.put("class", yarnClusterEntrypoint);
    startCommandValues.put("redirects",
    "1> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.out " +
    "2> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.err");
    String dynamicParameterListStr = JobManagerProcessUtils.generateDynamicConfigsStr(processSpec);
    startCommandValues.put("args", dynamicParameterListStr);

    final String commandTemplate = flinkConfiguration
    .getString(ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE,
    ConfigConstants.DEFAULT_YARN_CONTAINER_START_COMMAND_TEMPLATE);
    final String amCommand =
    BootstrapTools.getStartCommand(commandTemplate, startCommandValues);

    amContainer.setCommands(Collections.singletonList(amCommand));

    LOG.debug("Application Master start command: " + amCommand);

    return amContainer;
    }

    图片

  • 相关阅读:
    后端面经学习自测(三)
    [附源码]计算机毕业设计springboot良辰之境影视评鉴系统
    【Java】【PAT】Basic Level 1022 D进制的A+B
    CAD图在线Web测量工具代码实现(测量距离、面积、角度等)
    《深度学习之模型设计:核心算法与案例实践》知识记录
    如何快速实现OPC DA转为modbus TCP
    智慧煤矿/智慧矿区视频汇聚存储与安全风险智能分析平台建设思路
    java:逆序排序的三种方法
    linux环境下安装Nacos
    DeepStream-gst-dsexample
  • 原文地址:https://blog.csdn.net/hyunbar/article/details/126128829