• Flink Yarn Per Job - 提交流程一


    图片

    AbstractJobClusterExecutor.java

    @Override
    public CompletableFuture execute(@Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration, @Nonnull final ClassLoader userCodeClassloader) throws Exception {
      /*TODO 将 流图(StreamGraph) 转换成 作业图(JobGraph)*/
      final JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration);
    
      /*TODO 集群描述器:创建、启动了 YarnClient, 包含了一些yarn、flink的配置和环境信息*/
      try (final ClusterDescriptor clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)) {
        final ExecutionConfigAccessor configAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);
    
        /*TODO 集群特有资源配置:JobManager内存、TaskManager内存、每个Tm的slot数*/
        final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(configuration);
    
        final ClusterClientProvider clusterClientProvider = clusterDescriptor
            .deployJobCluster(clusterSpecification, jobGraph, configAccessor.getDetachedMode());
        LOG.info("Job has been submitted with JobID " + jobGraph.getJobID());
    
        return CompletableFuture.completedFuture(
            new ClusterClientJobClientAdapter<>(clusterClientProvider, jobGraph.getJobID(), userCodeClassloader));
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    将StreamGraph转换为JobGraph

    **(1)找到createJobGraph方法
    **

    1)PipelineExecutorUtils

      public static JobGraph getJobGraph(@Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration) throws MalformedURLException {
        checkNotNull(pipeline);
        checkNotNull(configuration);
    
        final ExecutionConfigAccessor executionConfigAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);
        // 往下看
        final JobGraph jobGraph = FlinkPipelineTranslationUtil
            .getJobGraph(pipeline, configuration, executionConfigAccessor.getParallelism());
    
        configuration
            .getOptional(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID)
            .ifPresent(strJobID -> jobGraph.setJobID(JobID.fromHexString(strJobID)));
    
        jobGraph.addJars(executionConfigAccessor.getJars());
        jobGraph.setClasspaths(executionConfigAccessor.getClasspaths());
        jobGraph.setSavepointRestoreSettings(executionConfigAccessor.getSavepointRestoreSettings());
    
        return jobGraph;
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    2)FlinkPipelineTranslationUtil

      public static JobGraph getJobGraph(
          Pipeline pipeline,
          Configuration optimizerConfiguration,
          int defaultParallelism) {
    
        FlinkPipelineTranslator pipelineTranslator = getPipelineTranslator(pipeline);
        // 往下看
        return pipelineTranslator.translateToJobGraph(pipeline,
            optimizerConfiguration,
            defaultParallelism);
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    3)StreamGraphTranslator implements FlinkPipelineTranslator
    
    • 1
    
    
    • 1

    @Override
    public JobGraph translateToJobGraph(
    Pipeline pipeline,
    Configuration optimizerConfiguration,
    int defaultParallelism) {

    StreamGraph streamGraph = (StreamGraph) pipeline;
    
    • 1

    // 重点
    return streamGraph.getJobGraph(null);
    }

    
      
    
    4)StreamGraph
    
      
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    public JobGraph getJobGraph(@Nullable JobID jobID) {
    return StreamingJobGraphGenerator.createJobGraph(this, jobID);
    }

    
      
    
    5)StreamingJobGraphGenerator
    
      
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    public static JobGraph createJobGraph(StreamGraph streamGraph, @Nullable JobID jobID) {
    return new StreamingJobGraphGenerator(streamGraph, jobID).createJobGraph();
    }

    
      
    
    
    • 1
    • 2
    • 3

    private JobGraph createJobGraph() {
    preValidate();

    // make sure that all vertices start immediately
    /*TODO streaming 模式下,调度模式是所有节点(vertices)一起启动:Eager */
    jobGraph.setScheduleMode(streamGraph.getScheduleMode());
    jobGraph.enableApproximateLocalRecovery(streamGraph.getCheckpointConfig().isApproximateLocalRecoveryEnabled());

    // Generate deterministic hashes for the nodes in order to identify them across
    // submission iff they didn’t change.
    // 广度优先遍历 StreamGraph 并且为每个SteamNode生成hash id,
    // 保证如果提交的拓扑没有改变,则每次生成的hash都是一样的
    Map hashes = defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);

    // Generate legacy version hashes for backwards compatibility
    List> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());
    for (StreamGraphHasher hasher : legacyStreamGraphHashers) {
    legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
    }

    /* TODO 最重要的函数,生成 JobVertex,JobEdge等,并尽可能地将多个节点chain在一起*/
    setChaining(hashes, legacyHashes);

    /TODO 将每个JobVertex的入边集合也序列化到该JobVertex的StreamConfig中 (出边集合已经在setChaining的时候写入了)/
    setPhysicalEdges();

    /TODO 根据group name,为每个 JobVertex 指定所属的 SlotSharingGroup 以及针对 Iteration的头尾设置 CoLocationGroup/
    setSlotSharingAndCoLocation();

    setManagedMemoryFraction(
    Collections.unmodifiableMap(jobVertices),
    Collections.unmodifiableMap(vertexConfigs),
    Collections.unmodifiableMap(chainedConfigs),
    id -> streamGraph.getStreamNode(id).getManagedMemoryOperatorScopeUseCaseWeights(),
    id -> streamGraph.getStreamNode(id).getManagedMemorySlotScopeUseCases());

    configureCheckpointing();

    jobGraph.setSavepointRestoreSettings(streamGraph.getSavepointRestoreSettings());

    JobGraphUtils.addUserArtifactEntries(streamGraph.getUserArtifacts(), jobGraph);

    // set the ExecutionConfig last when it has been finalized
    try {
    /TODO 将 StreamGraph 的 ExecutionConfig 序列化到 JobGraph 的配置中/
    jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
    }
    catch (IOException e) {
    throw new IllegalConfigurationException(“Could not serialize the ExecutionConfig.” +
    “This indicates that non-serializable types (like custom serializers) were registered”);
    }

    return jobGraph;
    }

    
      
    
    **(1)生成 JobVertex,JobEdge,并尽可能地将多个节点chain在一起**
    
      
    
    1)StreamingJobGraphGenerator
    
      
    
    operators start at position 1 because 0 is for chained source inputs
    
    chain的开始位置是1,因为0是source input  
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    /**

    • Sets up task chains from the source {@link StreamNode} instances.
    • This will recursively create all {@link JobVertex} instances.

    */
    private void setChaining(Map hashes, List> legacyHashes) {
    // we separate out the sources that run as inputs to another operator (chained inputs)
    // from the sources that needs to run as the main (head) operator.
    final Map chainEntryPoints = buildChainedInputsAndGetHeadInputs(hashes, legacyHashes);
    final Collection initialEntryPoints = new ArrayList<>(chainEntryPoints.values());

    // iterate over a copy of the values, because this map gets concurrently modified
      // 从source开始建⽴ node chains
    for (OperatorChainInfo info : initialEntryPoints) {
        // 构建node chains,返回当前节点的物理出边;startNodeId != currentNodeId 时,说明currentNode是chain中的子节点
    createChain(
    info.getStartNodeId(),
    1, // operators start at position 1 because 0 is for chained source inputs
    info,
    chainEntryPoints);
    }
    }

    
      
    
    
    • 1
    • 2
    • 3

    private List createChain(
    final Integer currentNodeId,
    final int chainIndex,
    final OperatorChainInfo chainInfo,
    final Map chainEntryPoints) {

    Integer startNodeId = chainInfo.getStartNodeId();
    if (!builtVertices.contains(startNodeId)) {
    /TODO 过渡用的出边集合, 用来生成最终的 JobEdge, 注意不包括 chain 内部的边/
    List transitiveOutEdges = new ArrayList();

    List chainableOutputs = new ArrayList();
    List nonChainableOutputs = new ArrayList();
    
    StreamNode currentNode = streamGraph.getStreamNode(currentNodeId);
    
    /*TODO 将当前节点的出边分成 chainable 和 nonChainable 两类*/
    for (StreamEdge outEdge : currentNode.getOutEdges()) {
      if (isChainable(outEdge, streamGraph)) {
        chainableOutputs.add(outEdge);
      } else {
        nonChainableOutputs.add(outEdge);
      }
    }
    
    for (StreamEdge chainable : chainableOutputs) {
      transitiveOutEdges.addAll(
          createChain(chainable.getTargetId(), chainIndex + 1, chainInfo, chainEntryPoints));
    }
    
    /*TODO 递归调用 createChain*/
    for (StreamEdge nonChainable : nonChainableOutputs) {
      transitiveOutEdges.add(nonChainable);
      createChain(
          nonChainable.getTargetId(),
          1, // operators start at position 1 because 0 is for chained source inputs
          chainEntryPoints.computeIfAbsent(
            nonChainable.getTargetId(),
            (k) -> chainInfo.newChain(nonChainable.getTargetId())),
          chainEntryPoints);
    }
    
    /*TODO 生成当前节点的显示名,如:"Keyed Aggregation -> Sink: Unnamed"*/
    chainedNames.put(currentNodeId, createChainedName(currentNodeId, chainableOutputs, Optional.ofNullable(chainEntryPoints.get(currentNodeId))));
    chainedMinResources.put(currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs));
    chainedPreferredResources.put(currentNodeId, createChainedPreferredResources(currentNodeId, chainableOutputs));
    
    OperatorID currentOperatorId = chainInfo.addNodeToChain(currentNodeId, chainedNames.get(currentNodeId));
    
    if (currentNode.getInputFormat() != null) {
      getOrCreateFormatContainer(startNodeId).addInputFormat(currentOperatorId, currentNode.getInputFormat());
    }
    
    if (currentNode.getOutputFormat() != null) {
      getOrCreateFormatContainer(startNodeId).addOutputFormat(currentOperatorId, currentNode.getOutputFormat());
    }
    
    /*TODO 如果当前节点是起始节点, 则直接创建 JobVertex 并返回 StreamConfig, 否则先创建一个空的 StreamConfig */
    StreamConfig config = currentNodeId.equals(startNodeId)
        ? createJobVertex(startNodeId, chainInfo)
        : new StreamConfig(new Configuration());
    
    /*TODO 设置 JobVertex 的 StreamConfig, 基本上是序列化 StreamNode 中的配置到 StreamConfig中.*/
    setVertexConfig(currentNodeId, config, chainableOutputs, nonChainableOutputs, chainInfo.getChainedSources());
    
    if (currentNodeId.equals(startNodeId)) {
      /*TODO 如果是chain的起始节点,标记成chain start(不是chain中的节点,也会被标记成 chain start)*/
      config.setChainStart();
      config.setChainIndex(chainIndex);
      config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
    
      /*TODO 将当前节点(headOfChain)与所有出边相连*/
      for (StreamEdge edge : transitiveOutEdges) {
        /*TODO 通过StreamEdge构建出JobEdge,创建 IntermediateDataSet,用来将JobVertex和JobEdge相连*/
        connect(startNodeId, edge);
      }
    
      /*TODO 把物理出边写入配置, 部署时会用到*/
      config.setOutEdgesInOrder(transitiveOutEdges);
      /*TODO 将chain中所有子节点的StreamConfig写入到 headOfChain 节点的 CHAINED_TASK_CONFIG 配置中*/
      config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));
    
    } else {
      /*TODO 如果是 chain 中的子节点*/
      chainedConfigs.computeIfAbsent(startNodeId, k -> new HashMap());
    
      config.setChainIndex(chainIndex);
      StreamNode node = streamGraph.getStreamNode(currentNodeId);
      config.setOperatorName(node.getOperatorName());
      /*TODO 将当前节点的StreamConfig添加到该chain的config集合中*/
      chainedConfigs.get(startNodeId).put(currentNodeId, config);
    }
    
    config.setOperatorID(currentOperatorId);
    
    if (chainableOutputs.isEmpty()) {
      config.setChainEnd();
    }
    /*TODO 返回连往chain外部的出边集合*/
    return transitiveOutEdges;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89

    } else {
    return new ArrayList<>();
    }
    }

    
      
    
      
    
      
    
    创建启动YarnClient  
    
      
    
      
    
      
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    1)StandaloneClientFactory implements ClusterClientFactory

    
      
    
    创建、启动了 YarnClient, 包含了一些yarn、flink的配置和环境信息
    
      
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    public StandaloneClusterDescriptor createClusterDescriptor(Configuration configuration) {
    checkNotNull(configuration);
    return new StandaloneClusterDescriptor(configuration);
    }

    
      
    
    2)YarnClusterClientFactory
    
      
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    private YarnClusterDescriptor getClusterDescriptor(Configuration configuration) {
    /TODO 创建了YarnClient/
    final YarnClient yarnClient = YarnClient.createYarnClient();
    final YarnConfiguration yarnConfiguration = new YarnConfiguration();

    /TODO 初始化、启动 YarnClient/
    yarnClient.init(yarnConfiguration);
    yarnClient.start();

    return new YarnClusterDescriptor(
    configuration,
    yarnConfiguration,
    yarnClient,
    YarnClientYarnClusterInformationRetriever.create(yarnClient),
    false);
    }

    
      
    
      
    
      
    
    集群资源配置
    
      
    
      
    
      
    
    **(1) 配置内存**  
    
      
    
    JobManager内存 = jobmanager.memory.process.size
    
    TaskManager内存 = taskmanager.memory.process.size
    
    每个Tm的slot数 = taskmanager.numberOfTaskSlots
    
      
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    public ClusterSpecification getClusterSpecification(Configuration configuration) {
    checkNotNull(configuration);
     // jm 的内存 jobmanager.memory.process.size
    final int jobManagerMemoryMB = JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap(
    configuration,
    JobManagerOptions.TOTAL_PROCESS_MEMORY)
    .getTotalProcessMemorySize()
    .getMebiBytes();
    // tm 的内存 taskmanager.memory.process.size
    final int taskManagerMemoryMB = TaskExecutorProcessUtils
    .processSpecFromConfig(TaskExecutorProcessUtils.getConfigurationMapLegacyTaskManagerHeapSizeToConfigOption(
    configuration, TaskManagerOptions.TOTAL_PROCESS_MEMORY))
    .getTotalProcessMemorySize()
    .getMebiBytes();
    // slot的个数 taskmanager.numberOfTaskSlots
    int slotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS);

    return new ClusterSpecification.ClusterSpecificationBuilder()
    .setMasterMemoryMB(jobManagerMemoryMB)
    .setTaskManagerMemoryMB(taskManagerMemoryMB)
    .setSlotsPerTaskManager(slotsPerTaskManager)
    .createClusterSpecification();
    }

    
      
    
      
    
      
    
    集群部署
    
      
    
      
    
      
    
    YarnClusterDescriptor
    
      
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    public ClusterClientProvider deployJobCluster(
    ClusterSpecification clusterSpecification,
    JobGraph jobGraph,
    boolean detached) throws ClusterDeploymentException {
    try {
    // 1)
    return deployInternal(
    clusterSpecification,
    “Flink per-job cluster”,
          // 2)
    getYarnJobClusterEntrypoint(),
    jobGraph,
    detached);
    } catch (Exception e) {
    throw new ClusterDeploymentException(“Could not deploy Yarn job cluster.”, e);
    }
    }

    
      
    
    **(1) deployInternal方法**  
    
      
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    /**

    • This method will block until the ApplicationMaster/JobManager have been deployed on YARN.
    • @param clusterSpecification Initial cluster specification for the Flink cluster to be deployed
    • @param applicationName name of the Yarn application to start
    • @param yarnClusterEntrypoint Class name of the Yarn cluster entry point.
    • @param jobGraph A job graph which is deployed with the Flink cluster, {@code null} if none
    • @param detached True if the cluster should be started in detached mode
      */
      private ClusterClientProvider deployInternal(
      ClusterSpecification clusterSpecification,
      String applicationName,
      String yarnClusterEntrypoint,
      @Nullable JobGraph jobGraph,
      boolean detached) throws Exception {

    final UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
    if (HadoopUtils.isKerberosSecurityEnabled(currentUser)) {
    boolean useTicketCache = flinkConfiguration.getBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE);

    if (!HadoopUtils.areKerberosCredentialsValid(currentUser, useTicketCache)) {
      throw new RuntimeException("Hadoop security with Kerberos is enabled but the login user " +
        "does not have Kerberos credentials or delegation tokens!");
    }
    
    • 1
    • 2
    • 3
    • 4

    }

    /TODO 部署前检查:jar包路径、conf路径、yarn最大核数…/
    isReadyForDeployment(clusterSpecification);

    // ------------------ Check if the specified queue exists --------------------

    /TODO 检查指定的yarn队列是否存在/
    checkYarnQueues(yarnClient);

    // ------------------ Check if the YARN ClusterClient has the requested resources --------------
    /TODO 检查yarn是否有足够的资源/

    // Create application via yarnClient
    final YarnClientApplication yarnApplication = yarnClient.createApplication();
    final GetNewApplicationResponse appResponse = yarnApplication.getNewApplicationResponse();

    Resource maxRes = appResponse.getMaximumResourceCapability();

    final ClusterResourceDescription freeClusterMem;
    try {
    freeClusterMem = getCurrentFreeClusterResources(yarnClient);
    } catch (YarnException | IOException e) {
    failSessionDuringDeployment(yarnClient, yarnApplication);
    throw new YarnDeploymentException(“Could not retrieve information about free cluster resources.”, e);
    }

    final int yarnMinAllocationMB = yarnConfiguration.getInt(
    YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
    YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
    if (yarnMinAllocationMB <= 0) {
    throw new YarnDeploymentException(“The minimum allocation memory "
    + “(” + yarnMinAllocationMB + " MB) configured via '” + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB
    + “’ should be greater than 0.”);
    }

    final ClusterSpecification validClusterSpecification;
    try {
    validClusterSpecification = validateClusterResources(
    clusterSpecification,
    yarnMinAllocationMB,
    maxRes,
    freeClusterMem);
    } catch (YarnDeploymentException yde) {
    failSessionDuringDeployment(yarnClient, yarnApplication);
    throw yde;
    }

    LOG.info(“Cluster specification: {}”, validClusterSpecification);

    final ClusterEntrypoint.ExecutionMode executionMode = detached ?
    ClusterEntrypoint.ExecutionMode.DETACHED
    : ClusterEntrypoint.ExecutionMode.NORMAL;

    flinkConfiguration.setString(ClusterEntrypoint.EXECUTION_MODE, executionMode.toString());

    /TODO 开始启动AM/
    ApplicationReport report = startAppMaster(
    flinkConfiguration,
    applicationName,
    yarnClusterEntrypoint,
    jobGraph,
    yarnClient,
    yarnApplication,
    validClusterSpecification);

    // print the application id for user to cancel themselves.
    if (detached) {
    final ApplicationId yarnApplicationId = report.getApplicationId();
    logDetachedClusterInformation(yarnApplicationId, LOG);
    }

    setClusterEntrypointInfoToConfig(report);

    return () -> {
    try {
    return new RestClusterClient<>(flinkConfiguration, report.getApplicationId());
    } catch (Exception e) {
    throw new RuntimeException(“Error while creating RestClusterClient.”, e);
    }
    };
    }

    
      
    
    1)部署前检查:jar包路径、conf路径、yarn最大核数
    
      
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    private void isReadyForDeployment(ClusterSpecification clusterSpecification) throws Exception {

    if (this.flinkJarPath == null) {
    throw new YarnDeploymentException(“The Flink jar path is null”);
    }
    if (this.flinkConfiguration == null) {
    throw new YarnDeploymentException(“Flink configuration object has not been set”);
    }

    // Check if we don’t exceed YARN’s maximum virtual cores.
    final int numYarnMaxVcores = yarnClusterInformationRetriever.getMaxVcores();

    int configuredAmVcores = flinkConfiguration.getInteger(YarnConfigOptions.APP_MASTER_VCORES);
    if (configuredAmVcores > numYarnMaxVcores) {
    throw new IllegalConfigurationException(
    String.format(“The number of requested virtual cores for application master %d” +
    " exceeds the maximum number of virtual cores %d available in the Yarn Cluster.",
    configuredAmVcores, numYarnMaxVcores));
    }

    int configuredVcores = flinkConfiguration.getInteger(YarnConfigOptions.VCORES, clusterSpecification.getSlotsPerTaskManager());
    // don’t configure more than the maximum configured number of vcores
    if (configuredVcores > numYarnMaxVcores) {
    throw new IllegalConfigurationException(
    String.format(“The number of requested virtual cores per node %d” +
    " exceeds the maximum number of virtual cores %d available in the Yarn Cluster." +
    " Please note that the number of virtual cores is set to the number of task slots by default" +
    " unless configured in the Flink config with ‘%s.’",
    configuredVcores, numYarnMaxVcores, YarnConfigOptions.VCORES.key()));
    }

    // check if required Hadoop environment variables are set. If not, warn user
    if (System.getenv(“HADOOP_CONF_DIR”) == null &&
    System.getenv(“YARN_CONF_DIR”) == null) {
    LOG.warn("Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set. " +
    "The Flink YARN Client needs one of these to be set to properly load the Hadoop " +
    “configuration for accessing YARN.”);
    }
    }

    
      
    
    2)检查yarn资源是否够  
    
      
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    private ClusterResourceDescription getCurrentFreeClusterResources(YarnClient yarnClient) throws YarnException, IOException {
    List nodes = yarnClient.getNodeReports(NodeState.RUNNING);

    int totalFreeMemory = 0;
    int containerLimit = 0;
    int[] nodeManagersFree = new int[nodes.size()];

    for (int i = 0; i < nodes.size(); i++) {
    NodeReport rep = nodes.get(i);
    int free = rep.getCapability().getMemory() - (rep.getUsed() != null ? rep.getUsed().getMemory() : 0);
    nodeManagersFree[i] = free;
    totalFreeMemory += free;
    if (free > containerLimit) {
    containerLimit = free;
    }
    }
    return new ClusterResourceDescription(totalFreeMemory, containerLimit, nodeManagersFree);
    }

    
      
    
    **找到最小资源配置**
    
      
    
    RM\_SCHEDULER\_MINIMUM\_ALLOCATION\_MB=yarn.scheduler.minimum-allocation-mb
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB=1024

    
      
    
    ![图片](https://img-blog.csdnimg.cn/img_convert/66c9f8a8c48db96275b7f71cf99efc38.png)
    
    • 1
    • 2
    • 3
    • 4
  • 相关阅读:
    vue echarts 镂空饼图配置
    产品Web3D交互展示有什么优势?如何快速制作?
    HTML标签列表及常用语法集合
    初识C++ (三)
    Redis-SpringBoot实战与缓存问题
    《代码大全2》第1章 欢迎进入软件构建的世界
    数据标准化算法介绍—数据建模工具
    java 工程管理系统源码+项目说明+功能描述+前后端分离 + 二次开发
    CleanMyMac X免费macOS清理系统管家
    滥用出资人权利的后果是什么
  • 原文地址:https://blog.csdn.net/hyunbar/article/details/126108581