• spark shuffle过程源码解析


    前言

    为了更好的理解spark的shuffle过程,通过走读源码,彻底理解shuffle过程中的执行过程以及与排序相关的内容。

    本文所使用的spark版本为:2.4.4

    1、shuffle之BypassMergeSortShuffleWriter

    基本原理:

    1、下游reduce有多少个分区partition,上游map就建立多少个fileWriter[reduceNumer],每一个下游分区的数据写入到一个独立的文件中。当所有的分区文件写完之后,将多个分区的数据合并到一个文件中,代码如下:

    1. while (records.hasNext()) {
    2. final Product2 record = records.next();
    3. final K key = record._1();
    4. //作者注:将数据写到对应分区的文件中去。
    5. partitionWriters[partitioner.getPartition(key)].write(key, record._2());
    6. }
    7. for (int i = 0; i < numPartitions; i++) {
    8. final DiskBlockObjectWriter writer = partitionWriters[i];
    9. partitionWriterSegments[i] = writer.commitAndGet();
    10. writer.close();
    11. }
    12. File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);
    13. File tmp = Utils.tempFileWith(output);
    14. try {
    15. //作者注:合并所有分区的小文件为一个大文件,保证同一个分区的数据连续存在
    16. partitionLengths = writePartitionedFile(tmp);
    17. //作者注:构建索引文件
    18. shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp);
    19. } finally {
    20. if (tmp.exists() && !tmp.delete()) {
    21. logger.error("Error while deleting temp file {}", tmp.getAbsolutePath());
    22. }
    23. }

    2、由于每一个独立的分区文件的数据都是属于同一个reduce的,在进行文件合并的时候,不需要进行排序,只需要按照文件顺序合并到一个文件中即可,并建立对应的分区数据索引文件。

    3、使用BypassMergeSortShuffleWriter的条件是:

            (1)、下游分区的个数不能超过参数spark.shuffle.sort.bypassMergeThreshold的值(默认是200)

            (2)、非map端预聚合算子(reduceByKey)

            具体判断代码如下:

    1. def shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]): Boolean = {
    2. // We cannot bypass sorting if we need to do map-side aggregation.
    3. if (dep.mapSideCombine) {
    4. false
    5. } else {
    6. val bypassMergeThreshold: Int = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)
    7. dep.partitioner.numPartitions <= bypassMergeThreshold
    8. }
    9. }

    2、shuffle之SortShuffleWriter

    执行该writer的条件:

    (1)、下游分区partitions的个数超过spark.shuffle.sort.bypassMergeThreshold参数设置的值(默认200)

    (2)、跳过一个叫做UnsafeShuffleWriter(详情见3)的writer

    执行过程描述:

    1、如果map端是预聚合算子(例如reduceByKey)

    (1)、使用一个map:PartitionedAppendOnlyMap对象进行数据的存储和预聚合,代码如下:

            备注:可以看到在map.changeValue时,更新的key并不是数据的key,而是在数据key的基础上,加上了该key的分区id((getPartition(kv._1), kv._1)),这样做的目的是为了下面将数据溢出到磁盘时,按分区id进行排序,以保证同一个分区的数据能连续存放到一起。

    1. //作者注:判断是否是一个预聚合算子
    2. if (shouldCombine) {
    3. // Combine values in-memory first using our AppendOnlyMap
    4. //作者注:获取预聚合算子的执行函数
    5. val mergeValue = aggregator.get.mergeValue
    6. val createCombiner = aggregator.get.createCombiner
    7. var kv: Product2[K, V] = null
    8. val update = (hadValue: Boolean, oldValue: C) => {
    9. if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
    10. }
    11. while (records.hasNext) {
    12. addElementsRead()
    13. kv = records.next()
    14. //作者注:使用一个map:PartitionedAppendOnlyMap类型进行数据的存储和预聚合更新
    15. map.changeValue((getPartition(kv._1), kv._1), update)
    16. //作者注:执行溢出到磁盘操作
    17. maybeSpillCollection(usingMap = true)
    18. }
    19. }

    (2)、执行数据溢出到磁盘操作:maybeSpillCollection,代码如下:

    1. private def maybeSpillCollection(usingMap: Boolean): Unit = {
    2. var estimatedSize = 0L
    3. //作者注:判断是否是预聚合算子
    4. if (usingMap) {
    5. //作者注:预聚合算子,则把map对象里面的数据写入到磁盘
    6. estimatedSize = map.estimateSize()
    7. if (maybeSpill(map, estimatedSize)) {
    8. map = new PartitionedAppendOnlyMap[K, C]
    9. }
    10. } else {
    11. //作者注:不是预聚合算子,则把buffer对象里面的数据写入到磁盘
    12. estimatedSize = buffer.estimateSize()
    13. if (maybeSpill(buffer, estimatedSize)) {
    14. buffer = new PartitionedPairBuffer[K, C]
    15. }
    16. }

           然后执行maybeSpill函数,根据溢出条件判断是否溢出到磁盘,代码如下:

    1. protected def maybeSpill(collection: C, currentMemory: Long): Boolean = {
    2. var shouldSpill = false
    3. if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {
    4. // Claim up to double our current memory from the shuffle memory pool
    5. val amountToRequest = 2 * currentMemory - myMemoryThreshold
    6. val granted = acquireMemory(amountToRequest)
    7. myMemoryThreshold += granted
    8. // If we were granted too little memory to grow further (either tryToAcquire returned 0,
    9. // or we already had more memory than myMemoryThreshold), spill the current collection
    10. shouldSpill = currentMemory >= myMemoryThreshold
    11. }
    12. //作者注:是否溢出磁盘,有两个判断条件
    13. //1、shouldSplill:判断内存空间的是否充足
    14. //2、_elementsRead > numElementsForceSpillThreshold:判断当前的写的数据条数是否超过阈值numElementsForceSpillThreshold(默认Integer.MAX_VALUE)
    15. shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold
    16. // Actually spill
    17. if (shouldSpill) {
    18. _spillCount += 1
    19. logSpillage(currentMemory)
    20. spill(collection)
    21. _elementsRead = 0
    22. _memoryBytesSpilled += currentMemory
    23. releaseMemory()
    24. }
    25. shouldSpill
    26. }

            如果满足条件,这执行spill(collection)进行数据溢出,代码如下:

    1. override protected[this] def spill(collection: WritablePartitionedPairCollection[K, C]): Unit = {
    2. val inMemoryIterator = collection.destructiveSortedWritablePartitionedIterator(comparator)
    3. val spillFile = spillMemoryIteratorToDisk(inMemoryIterator)
    4. spills += spillFile
    5. }

            看一下这行代码:

    val inMemoryIterator = collection.destructiveSortedWritablePartitionedIterator(comparator)

            这一行代码的作用是对数据进行排序,如何排序呢?经过作者的debug过程发现,这个排序过程其实并不是对数据的key进行排序,而是对分区id进行排序,这样保证同一个分区的数据能连续在一起,为后续的溢出文件合并的归并排序提供基础。

            至此,数据的磁盘溢出操作完成。下一步就是如何将溢出的数据进行合并。

    (3)、溢出磁盘数据文件合并成一个大文件,并建立一个分区的索引文件,具体的代码执行过程如下(SortShuffleWriter):里面具体的执行过程再次不再赘述。

    1. try {
    2. val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
    3. //作者注:将溢出的磁盘文件和当前缓存的文件进行归并合并,保证同一分区的数据连续存在
    4. val partitionLengths = sorter.writePartitionedFile(blockId, tmp)
    5. //作者注:构建索引文件
    6. shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)
    7. mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
    8. } finally {
    9. if (tmp.exists() && !tmp.delete()) {
    10. logError(s"Error while deleting temp file ${tmp.getAbsolutePath}")
    11. }
    12. }

    2、如果map端不是预聚合算子(例如groupByKey)

    上面介绍了预聚合算子的shufflerwriter的执行过程,而非预聚合算子的shufflewriter的执行过程基本和预聚合算子是一样的,唯一不同的一点就是,存储数据的结构不是map:PartitionedAppendOnlyMap,而是buffer:PartitionedPairBuffer,代码如下:

    1. if (shouldCombine) {
    2. // Combine values in-memory first using our AppendOnlyMap
    3. val mergeValue = aggregator.get.mergeValue
    4. val createCombiner = aggregator.get.createCombiner
    5. var kv: Product2[K, V] = null
    6. val update = (hadValue: Boolean, oldValue: C) => {
    7. if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
    8. }
    9. while (records.hasNext) {
    10. addElementsRead()
    11. kv = records.next()
    12. map.changeValue((getPartition(kv._1), kv._1), update)
    13. maybeSpillCollection(usingMap = true)
    14. }
    15. } else {
    16. // Stick values into our buffer
    17. //作者注:非预聚合算子的数据存储到buffer中
    18. while (records.hasNext) {
    19. addElementsRead()
    20. val kv = records.next()
    21. buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C])
    22. maybeSpillCollection(usingMap = false)
    23. }
    24. }

    至于后续的数据溢出、数据排序、溢出数据文件的合并等过程,和预聚合算子的执行过程一模一样,调用的一样的执行过程,在此就不再赘述。

    3、shuffle之UnsafeShuffleWriter

    这个UnsafeShuffleWriter的具体执行过程作者没有深入追究,因为从名字上也能看出,Unsafe使用的是堆外内存进行数据的存储以及相关的操作,基本的原理是将数据对象进行序列化后存储到堆外内存,然后使用二进制的方式进行数据的排序工作,这样能提升运算性能。

    在实际的执行过程中,是优先使用这种方式进行shuffle过程的write的,具体的执行条件如下:

    1. def canUseSerializedShuffle(dependency: ShuffleDependency[_, _, _]): Boolean = {
    2. val shufId = dependency.shuffleId
    3. val numPartitions = dependency.partitioner.numPartitions
    4. //作者注:序列化器支持relocation.
    5. //作者注:目前spark提供的有两个序列化器:JavaSerializer和KryoSerializer
    6. //其中KryoSerializer支持relocation,而JavaSerializer不支持relocation
    7. if (!dependency.serializer.supportsRelocationOfSerializedObjects) {
    8. log.debug(s"Can't use serialized shuffle for shuffle $shufId because the serializer, " +
    9. s"${dependency.serializer.getClass.getName}, does not support object relocation")
    10. false
    11. } else if (dependency.mapSideCombine) { //作者注:非map端预聚合算子
    12. log.debug(s"Can't use serialized shuffle for shuffle $shufId because we need to do " +
    13. s"map-side aggregation")
    14. false
    15. } else if (numPartitions > MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE) {
    16. //作者注:下游分区个数小于MAXIMUM_PARTITION_ID = (1 << 24) - 1
    17. log.debug(s"Can't use serialized shuffle for shuffle $shufId because it has more than " +
    18. s"$MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE partitions")
    19. false
    20. } else {
    21. log.debug(s"Can use serialized shuffle for shuffle $shufId")
    22. true
    23. }
    24. }

    具体的执行逻辑作者没有深入追踪,因为追踪到后面可能全是二进制的数据,无法直观查看数据信息,读者如果有兴趣可以自行debug追踪。

  • 相关阅读:
    Linux驱动开发:内核模块和字符设备驱动
    spring初识
    19Linux基本使用和web程序部署
    Spring 事务失效的场景
    SAP UI5 视图里的 OverflowToolbar 控件
    仅手机大小!极空间T2随身数据魔盒发布:既是NAS 又是U盘
    可调用对象、std..function、std..bind
    C++关键词探索:理解变量、函数参数、函数返回值以及类成员函数的修饰符
    uniapp项目实践总结(二十)URLScheme 协议知识总结
    UE AIModule 源码解读之写法借鉴(一)
  • 原文地址:https://blog.csdn.net/chenzhiang1/article/details/126834574