• Spark Executor decommission 原理分析


    背景

    Spark Executor decommission 最初是 AWS 为了 Spot 节点和 Spot Block 节点配置的, Spot 节点仅有正常节点几分之一的价格,但是可能在任何时间收回,收回的时候仅给 120 秒的缓冲期。
    Spot Block 比正常价格便宜 30%~45%,但是稳定性比 Spot 节点高,回收时缓冲期为最高可以达6小时).

    参考文档:
    Using AWS Spot Instances and Spot Blocks in QDS Clusters
    New – EC2 Spot Blocks for Defined-Duration Workloads

    其他场景没有 Spot 和 Spot Block 节点,Spark 和 cluster manager(YARN 和 Mesos) 结合。如 YARN 框架 Graceful Decommission of YARN Nodes 说明配置了 node 的优雅退出过程。包含以下3步:

    1. 先添加要 decommission 的 node 到 /etc/hadoop/conf/yarn.exclude
    2. 然后执行 yarn rmadmin -refreshNodes -g 100 --client,-g 是 gracefull,100 是等待时间100秒,NodeManager 节点开始 decommission 过程。
    3. Application Master 收到 decommission 的 executors,给 executor 发送 Decommission 的请求。Executor 进行 Decommission 操作。
      Application Master 的日志:
    24/03/05 16:55:55 INFO YarnClusterSchedulerBackend: Decommission executors: 2
    24/03/05 16:55:55 INFO YarnClusterSchedulerBackend: Notify executor 2 to decommission.
    24/03/05 16:55:55 INFO BlockManagerMasterEndpoint: Mark BlockManagers (BlockManagerId(2, core-8ce527b-1, 24639, None)) as being decommissioning.
    24/03/05 16:55:55 INFO ShuffleStatus: Updating map output for 1843 to BlockManagerId(3, core-8ce527b-2, 21569, None)
    24/03/05 16:55:55 INFO ShuffleStatus: Updating map output for 1825 to BlockManagerId(1, core-8ce527b-2, 29453, None)
    
    • 1
    • 2
    • 3
    • 4
    • 5

    另一个触发 Decommission 的流程开启 Dynamic Allocation 后,Shuffle Tracking 查找没有任务的 executors,开始触发 decommission 的过程。

    Spark Decommission 的功能

    1. Executor 停止运行新的 Task
    2. 把 cached RDD blocks 和 shuffle blocks 传输到其他的 Executor 和 Remote Storage。更新 driver 端的 shuffle status。
    3. Reducer 获取新的 shuffle status,读取 shuffle 数据。

    增加的参数

     --conf spark.storage.decommission.enabled=true \
     --conf spark.storage.decommission.shuffleBlocks.enabled=true \
     --conf spark.storage.decommission.rddBlocks.enabled=true \
     --conf spark.dynamicAllocation.cachedExecutorIdleTimeout=10s \
     --conf spark.storage.decommission.fallbackStorage.path=bos://bmr-rd-wh/houzhizhen/decommission-fallbackStorage-path/ \
    
    • 1
    • 2
    • 3
    • 4
    • 5

    执行流程

    1. SparkContext 中,向 block manager masger 注册 fallback 的地址。

    如果设置了 spark.storage.decommission.fallbackStorage.path, 则注册以下地址。

    val FALLBACK_BLOCK_MANAGER_ID: BlockManagerId = BlockManagerId("fallback", "remote", 7337)
    
    • 1

    代表 block 可以上传到指定的地址。

    2. 当 Executor 接收到 Decommission 请求时,向 Driver 调用 getPeers 获取 Decommission 的目标地址列表

    getPeers 返回所有的执行器列表,加上 fallback 地址,去掉正在 decommission 的 Executors。

    3. Executor 把 shuffle block 上传到 getPeers 返回的地址

    • 3.1 先拿到 block manager 所有的 block,放到 shufflesToMigrate 队列。
    • 3.2 采用多线程的架构,每个 peer 一个线程,每个线程做以下工作
      1): 从 shufflesToMigrate 取一个 block
      2): 上传到 peer,如果 peer 是 fallback,调用 upload,如果 peer 是 executor, 则调用 uploadBlockSync。
      3): 向 driver updateBlockInfo, 如果 peer 是 fallback 则在 upload 里,上传完就调用 updateBlockInfo。如果 peer 是 executor, 则由 executor 成功接收到 block 后调用。

    4. reducer 从 driver 获取最新的地址

    如果正常的 executor 地址,则从 executor 读取,如果是 fall back 地址,则调用 FallBackStorage.read 读取数据块。

    代码分析

    1. Driver 注册 fall back 的地址

    SparkContext 中,向 block manager masger 注册 fallback 的地址。

    val FALLBACK_BLOCK_MANAGER_ID: BlockManagerId = BlockManagerId("fallback", "remote", 7337)
    
    • 1

    2. Driver 处理 decommission executors

    在 decommissionExecutors 里,执行以下步骤:

    1. 通知 scheduler 这些 executor 正在 decommission, 当调度器收到 fetch failed 时做一些特除处理。
    2. decommissioningBlockManagerSet 添加这些 executor 的 id. 其他 Executor 再 decommission 时,getPeers 时去掉正在 decommission 的 Executor。
    3. 给每个 executor 发送 DecommissionExecutor 事件。
    4. 如果设置了 spark.executor.decommission.forceKillTimeout, 则启动一个定时任务, 如果 executor 还处于 pendingDecommission, 则发送 kill 事件。

    3. Executor 收到 DecommissionExecutor 的处理

    1. 判断 migrationEnabled, 判断逻辑如下:
    val migrationEnabled = env.conf.get(STORAGE_DECOMMISSION_ENABLED) &&
            (env.conf.get(STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED) ||
              env.conf.get(STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED))
    
    • 1
    • 2
    • 3
    1. env.blockManager.decommissionBlockManager()

    2. executor.decommission()

    3. 开启一个线程,不断检查 decomission 状态,如果 decommission 成功,则 exitExecutor(0, ExecutorLossMessage.decommissionFinished, notifyDriver = true)退出 executor。

    env.blockManager.decommissionBlockManager() 的处理
     decommissioner = Some(new BlockManagerDecommissioner(conf, this))
     decommissioner.foreach(_.start())
    
    • 1
    • 2
    BlockManagerDecommissioner#start

    start 启动两个线程,一个为了 rdd block, 另一个为了 shuffle block.

    def start(): Unit = {
        logInfo("Starting block migration")
        rddBlockMigrationExecutor.foreach(_.submit(rddBlockMigrationRunnable))
        shuffleBlockMigrationRefreshExecutor.foreach(_.submit(shuffleBlockMigrationRefreshRunnable))
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    rddBlockMigrationRunnable
    1. bm.getPeers 如果没有活着的 executor,则停止。

    2. val replicateBlocksInfo = bm.getMigratableRDDBlocks()

    3. 对于每个 block,调用 migrateBlock(blockToReplicate: ReplicateBlock): Boolean

    BlockManagerDecommissioner#refreshMigratableShuffleBlocks
    1. 调用 getStoredShuffles,放到 shufflesToMigrate 队列
     override def getStoredShuffles(): Seq[ShuffleBlockInfo] = {
        val allBlocks = blockManager.diskBlockManager.getAllBlocks()
        allBlocks.flatMap {
          case ShuffleIndexBlockId(shuffleId, mapId, _) =>
            Some(ShuffleBlockInfo(shuffleId, mapId))
          case _ =>
            None
        }
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    1. 对于每个 peer,启动一个线程 ShuffleMigrationRunnable, 执行以下操作
      1): 从 shufflesToMigrate 获取一个 shuffleBlockInfo
      2): 如果定义fallbackStorage 并且当前peer 是 FALLBACK_BLOCK_MANAGER_ID, 执行 _.copy(shuffleBlockInfo, bm); 否则执行步骤 3
      3): 执行 bm.blockTransferService.uploadBlockSync 把 block 上传到其他 Executor.

    问题

    1. 数据循环拷贝。如有10个执行器1–10。现在只有 executor 10 有任务运行。刚开始 decommission executor 1,会把 shuffle 数据 decommission 到 executor 2 ~ executor 10 和 fallback. 然后 executor 2 decommission, 从 executor 1 拷贝到 executor 2的数据再拷贝到其他节点。
    2. executor 1 在 decommission 过程中, executor-2 执行 decommission,executor-1 还是会发送数据到 executor-2。

    如果直接发往 remote reliable storage, 则问题1和问题2 不存在。https://github.com/apache/spark/pull/45228/files

  • 相关阅读:
    初学者教程:如何学习渗透测试?
    图 拓扑排序 leecode 207 Course Schedule
    Python算法图解——递归(三):打印九九乘法表
    多服务器云探针源码(服务器云监控)/多服务器多节点_云监控程序python源码
    拼多多API接口大全
    Elasticsearch通过Http请求实现聚合分组及聚合计算查询
    slim.max_pool2d()
    [CTF]2022美团CTF WEB WP
    Python环境和PyCharm搭建教程
    C++基本点
  • 原文地址:https://blog.csdn.net/houzhizhen/article/details/136501291