Spark Executor decommission 最初是 AWS 为了 Spot 节点和 Spot Block 节点配置的, Spot 节点仅有正常节点几分之一的价格,但是可能在任何时间收回,收回的时候仅给 120 秒的缓冲期。
Spot Block 比正常价格便宜 30%~45%,但是稳定性比 Spot 节点高,回收时缓冲期为最高可以达6小时).
参考文档:
Using AWS Spot Instances and Spot Blocks in QDS Clusters
New – EC2 Spot Blocks for Defined-Duration Workloads
其他场景没有 Spot 和 Spot Block 节点,Spark 和 cluster manager(YARN 和 Mesos) 结合。如 YARN 框架 Graceful Decommission of YARN Nodes 说明配置了 node 的优雅退出过程。包含以下3步:
yarn rmadmin -refreshNodes -g 100 --client,-g 是 gracefull,100 是等待时间100秒,NodeManager 节点开始 decommission 过程。24/03/05 16:55:55 INFO YarnClusterSchedulerBackend: Decommission executors: 2
24/03/05 16:55:55 INFO YarnClusterSchedulerBackend: Notify executor 2 to decommission.
24/03/05 16:55:55 INFO BlockManagerMasterEndpoint: Mark BlockManagers (BlockManagerId(2, core-8ce527b-1, 24639, None)) as being decommissioning.
24/03/05 16:55:55 INFO ShuffleStatus: Updating map output for 1843 to BlockManagerId(3, core-8ce527b-2, 21569, None)
24/03/05 16:55:55 INFO ShuffleStatus: Updating map output for 1825 to BlockManagerId(1, core-8ce527b-2, 29453, None)
另一个触发 Decommission 的流程开启 Dynamic Allocation 后,Shuffle Tracking 查找没有任务的 executors,开始触发 decommission 的过程。
--conf spark.storage.decommission.enabled=true \
--conf spark.storage.decommission.shuffleBlocks.enabled=true \
--conf spark.storage.decommission.rddBlocks.enabled=true \
--conf spark.dynamicAllocation.cachedExecutorIdleTimeout=10s \
--conf spark.storage.decommission.fallbackStorage.path=bos://bmr-rd-wh/houzhizhen/decommission-fallbackStorage-path/ \
如果设置了 spark.storage.decommission.fallbackStorage.path, 则注册以下地址。
val FALLBACK_BLOCK_MANAGER_ID: BlockManagerId = BlockManagerId("fallback", "remote", 7337)
代表 block 可以上传到指定的地址。
getPeers 返回所有的执行器列表,加上 fallback 地址,去掉正在 decommission 的 Executors。
如果正常的 executor 地址,则从 executor 读取,如果是 fall back 地址,则调用 FallBackStorage.read 读取数据块。
SparkContext 中,向 block manager masger 注册 fallback 的地址。
val FALLBACK_BLOCK_MANAGER_ID: BlockManagerId = BlockManagerId("fallback", "remote", 7337)
在 decommissionExecutors 里,执行以下步骤:
val migrationEnabled = env.conf.get(STORAGE_DECOMMISSION_ENABLED) &&
(env.conf.get(STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED) ||
env.conf.get(STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED))
env.blockManager.decommissionBlockManager()
executor.decommission()
开启一个线程,不断检查 decomission 状态,如果 decommission 成功,则 exitExecutor(0, ExecutorLossMessage.decommissionFinished, notifyDriver = true)退出 executor。
decommissioner = Some(new BlockManagerDecommissioner(conf, this))
decommissioner.foreach(_.start())
start 启动两个线程,一个为了 rdd block, 另一个为了 shuffle block.
def start(): Unit = {
logInfo("Starting block migration")
rddBlockMigrationExecutor.foreach(_.submit(rddBlockMigrationRunnable))
shuffleBlockMigrationRefreshExecutor.foreach(_.submit(shuffleBlockMigrationRefreshRunnable))
}
bm.getPeers 如果没有活着的 executor,则停止。
val replicateBlocksInfo = bm.getMigratableRDDBlocks()
对于每个 block,调用 migrateBlock(blockToReplicate: ReplicateBlock): Boolean
override def getStoredShuffles(): Seq[ShuffleBlockInfo] = {
val allBlocks = blockManager.diskBlockManager.getAllBlocks()
allBlocks.flatMap {
case ShuffleIndexBlockId(shuffleId, mapId, _) =>
Some(ShuffleBlockInfo(shuffleId, mapId))
case _ =>
None
}
}
如果直接发往 remote reliable storage, 则问题1和问题2 不存在。https://github.com/apache/spark/pull/45228/files