• Spark3.0 Sql 使用HiveTableScanExec 读取Hive orc表源码分析及参数调优


    Spark3.0 Sql 使用HiveTableScanExec 读取Hive orc表源码分析及参数调优

    1 环境准备

    1.1 示例代码

    import org.apache.spark.sql.SparkSession
    
    object SparkSqlHive {
      def main(args: Array[String]): Unit = {
    
        val ss = SparkSession.builder().master("local[2]").appName("the test of SparkSession")
          .config("spark.sql.hive.convertMetastoreOrc", "false")  // 默认true使用FileSourceScanExec算子读取,故设置为false
          .config("spark.hadoop.mapreduce.input.fileinputformat.split.maxsize","67108864") 
          .enableHiveSupport()
          .getOrCreate()
        // 临时表存在则先删除
        ss.sql("DROP TABLE IF EXISTS temp.temp_ods_start_log");
        // 读取orc表ods_start_log 数据 存到临时表中
        val df = ss.sql("CREATE TABLE IF NOT EXISTS temp.temp_ods_start_log as select substr(str,1,10) as str10 from biods.ods_start_log where dt='20210721'")
        // action算子,触发job启动
        df.count()
        // 线程休眠一段时间,便于spark ui上观察分析
        Thread.sleep(1000000)
        ss.stop()
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    1.2 hive orc表准备

    由于orc表无法加载txt数据,故先把数据加载txt表,再写入orc表。

    -- 创建数据库
    create database biods;
    
    -- 创建orc外部表
    create external table biods.ods_start_log
    (
    `str` string
    )
    comment '用户启动日志信息'
    partitioned by (`dt` string)
    stored as orc
    location '/bi/ods/ods_start_log';
    
    -- 创建txt外部表
    create external table biods.txt_ods_start_log
    (
    `str` string
    )
    comment '用户启动日志信息'
    partitioned by (`dt` string)
    stored as textfile
    location '/bi/ods/txt_ods_start_log';
    
    -- 添加分区
    alter table biods.ods_start_log add partition(dt='20210721');
    alter table biods.txt_ods_start_log add partition(dt='20210721');
    
    -- 加载数据
    load data local inpath '/home/cwc/data/start0721.log' overwrite into table biods.txt_ods_start_log partition(dt='20210721');
    
    -- 写入orc表
    insert overwrite table biods.ods_start_log
    partition(dt='20210721')
    select str 
    from biods.txt_ods_start_log
    where dt='20210721';
    
    --hdfs 合并多个文件为1个文件(如有需要)
    alter table biods.ods_start_log partition(dt='20210721') concatenate;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    最终构造分区文件如下:

    [root@hadoop3 ~]# hdfs dfs -ls -R -h hdfs://hadoop1:9000/bi/ods/ods_start_log/dt=20210721
    drwxr-xr-x   - root supergroup          0 2022-10-22 12:29 hdfs://hadoop1:9000/bi/ods/ods_start_log/dt=20210721/.hive-staging_hive_2022-10-22_12-29-16_934_837446341335257460-1
    drwxr-xr-x   - root supergroup          0 2022-10-22 12:29 hdfs://hadoop1:9000/bi/ods/ods_start_log/dt=20210721/.hive-staging_hive_2022-10-22_12-29-16_934_837446341335257460-1/-ext-10001
    drwxr-xr-x   - root supergroup          0 2022-10-22 12:29 hdfs://hadoop1:9000/bi/ods/ods_start_log/dt=20210721/.hive-staging_hive_2022-10-22_12-29-16_934_837446341335257460-1/_tmp.-ext-10002
    -rwxr-xr-x   3 root supergroup    246.0 M 2022-10-25 12:18 hdfs://hadoop1:9000/bi/ods/ods_start_log/dt=20210721/000000_1
    -rwxr-xr-x   3 root supergroup     94.1 M 2022-10-25 12:18 hdfs://hadoop1:9000/bi/ods/ods_start_log/dt=20210721/000001_0
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    文件的块大小如下,为256M

    [root@hadoop3 ~]# hadoop fs -stat "%o %r" hdfs://hadoop1:9000/bi/ods/ods_start_log/dt=20210721/000001_0
    268435456 3
    [root@hadoop3 ~]# hadoop fs -stat "%o %r" hdfs://hadoop1:9000/bi/ods/ods_start_log/dt=20210721/000000_1
    268435456 3
    
    • 1
    • 2
    • 3
    • 4

    orc文件的stripe个数如下:
    94m的文件有2个stripe, 246m的文件有11个stripe

    [root@hadoop3 ~]# hive --orcfiledump hdfs://hadoop1:9000/bi/ods/ods_start_log/dt=20210721/000001_0 | less
    
    Processing data file hdfs://hadoop1:9000/bi/ods/ods_start_log/dt=20210721/000001_0 [length: 98673168]
    Structure for hdfs://hadoop1:9000/bi/ods/ods_start_log/dt=20210721/000001_0
    File Version: 0.12 with ORC_135
    Rows: 3043150
    Compression: ZLIB
    Compression size: 262144
    Type: struct<str:string>
    
    Stripe Statistics:
      Stripe 1:
        Column 0: count: 2020000 hasNull: false
        Column 1: count: 2020000 hasNull: false min: 2021-09-16 16:26:46.194 [main] INFO  com.lagou.ecommerce.AppStart - {"app_active":{"name":"app_active","json":{"entry":"1","action":"0","error_code":"0"},"time":1595347200000},"attr":{"area":"三亚","uid":"2F10092A1723314","app_v":"1.1.2","event_type":"common","device_id":"1FB872-9A1001723314","os_type":"0.8.8","channel":"WS","language":"chinese","brand":"iphone-1"}} max: 2021-09-16 16:28:48.696 [main] INFO  com.lagou.ecommerce.AppStart - {"app_active":{"name":"app_active","json":{"entry":"2","action":"1","error_code":"0"},"time":1596124800000},"attr":{"area":"铜川","uid":"2F10092A10999969","app_v":"1.1.7","event_type":"common","device_id":"1FB872-9A10010999969","os_type":"2.0","channel":"BL","language":"chinese","brand":"iphone-5"}} sum: 748218740
      Stripe 2:
        Column 0: count: 1023150 hasNull: false
        Column 1: count: 1023150 hasNull: false min: 2021-09-16 16:26:32.958 [main] INFO  com.lagou.ecommerce.AppStart - {"app_active":{"name":"app_active","json":{"entry":"1","action":"0","error_code":"0"},"time":1595260800000},"attr":{"area":"乳山","uid":"2F10092A727865","app_v":"1.1.19","event_type":"common","device_id":"1FB872-9A100727865","os_type":"9.7.7","channel":"HA","language":"chinese","brand":"Huawei-3"}} max: 2021-09-16 16:28:35.297 [main] INFO  com.lagou.ecommerce.AppStart - {"app_active":{"name":"app_active","json":{"entry":"2","action":"1","error_code":"0"},"time":1596038400000},"attr":{"area":"深圳","uid":"2F10092A9999986","app_v":"1.1.2","event_type":"common","device_id":"1FB872-9A1009999986","os_type":"0.4.6","channel":"LD","language":"chinese","brand":"Huawei-6"}} sum: 378150821
    
    File Statistics:
      Column 0: count: 3043150 hasNull: false
      Column 1: count: 3043150 hasNull: false min: 2021-09-16 16:26:32.958 [main] INFO  com.lagou.ecommerce.AppStart - {"app_active":{"name":"app_active","json":{"entry":"1","action":"0","error_code":"0"},"time":1595260800000},"attr":{"area":"乳山","uid":"2F10092A727865","app_v":"1.1.19","event_type":"common","device_id":"1FB872-9A100727865","os_type":"9.7.7","channel":"HA","language":"chinese","brand":"Huawei-3"}} max: 2021-09-16 16:28:48.696 [main] INFO  com.lagou.ecommerce.AppStart - {"app_active":{"name":"app_active","json":{"entry":"2","action":"1","error_code":"0"},"time":1596124800000},"attr":{"area":"铜川","uid":"2F10092A10999969","app_v":"1.1.7","event_type":"common","device_id":"1FB872-9A10010999969","os_type":"2.0","channel":"BL","language":"chinese","brand":"iphone-5"}} sum: 1126369561
    
    Stripes:
      Stripe: offset: 3 data: 65480370 rows: 2020000 tail: 46 index: 21746
        Stream: column 0 section ROW_INDEX start: 3 length 34
        Stream: column 1 section ROW_INDEX start: 37 length 21712
        Stream: column 1 section DATA start: 21749 length 64216148
        Stream: column 1 section LENGTH start: 64237897 length 1264222
        Encoding column 0: DIRECT
        Encoding column 1: DIRECT_V2
      Stripe: offset: 65502165 data: 33158620 rows: 1023150 tail: 46 index: 11309
        Stream: column 0 section ROW_INDEX start: 65502165 length 29
        Stream: column 1 section ROW_INDEX start: 65502194 length 11280
        Stream: column 1 section DATA start: 65513474 length 32518042
        Stream: column 1 section LENGTH start: 98031516 length 640578
        Encoding column 0: DIRECT
        Encoding column 1: DIRECT_V2
    
    File length: 98673168 bytes
    Padding length: 0 bytes
    Padding ratio: 0%
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    在这里插入图片描述

    2 HiveTableScanExec读取orc表分析

    根据driver端日志分析,最早是在 OrcInputFormat 中生成split, 最终调度task读取orc表 生成 HadoopRDD.cala

    22/10/27 23:46:59 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on LAPTOP-NDJFRBI4:57758 (size: 24.3 KiB, free: 4.0 GiB)
    22/10/27 23:46:59 INFO SparkContext: Created broadcast 0 from 
    ---------这里开始在OrcInputFormat 计算生成split, 一个split对应一个task, 所以是在driver或者master执行,和大数据移动计算,而不是移动数据思想呼应。
    22/10/27 23:46:59 INFO OrcInputFormat: ORC pushdown predicate: null
    22/10/27 23:46:59 INFO deprecation: mapred.input.dir is deprecated. Instead, use mapreduce.input.fileinputformat.inputdir
    22/10/27 23:46:59 INFO OrcInputFormat: Using column configuration variables columns [str] / columns.types [string] (isAcidRead false)
    22/10/27 23:47:00 INFO OrcCodecPool: Got brand-new codec ZLIB
    22/10/27 23:47:00 INFO OrcCodecPool: Got brand-new codec ZLIB
    22/10/27 23:47:00 INFO OrcInputFormat: FooterCacheHitRatio: 0/2
    22/10/27 23:47:00 INFO SparkContext: Starting job: sql at SparkSqlHive.scala:17
    22/10/27 23:47:00 INFO DAGScheduler: Got job 0 (sql at SparkSqlHive.scala:17) with 3 output partitions
    22/10/27 23:47:00 INFO DAGScheduler: Final stage: ResultStage 0 (sql at SparkSqlHive.scala:17)
    22/10/27 23:47:00 INFO DAGScheduler: Parents of final stage: List()
    22/10/27 23:47:00 INFO DAGScheduler: Missing parents: List()
    22/10/27 23:47:00 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[5] at sql at SparkSqlHive.scala:17), which has no missing parents
    22/10/27 23:47:00 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 228.3 KiB, free 4.0 GiB)
    22/10/27 23:47:00 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 81.8 KiB, free 4.0 GiB)
    22/10/27 23:47:00 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on LAPTOP-NDJFRBI4:57758 (size: 81.8 KiB, free: 4.0 GiB)
    22/10/27 23:47:00 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1223
    22/10/27 23:47:00 INFO DAGScheduler: Submitting 3 missing tasks from ResultStage 0 (MapPartitionsRDD[5] at sql at SparkSqlHive.scala:17) (first 15 tasks are for partitions Vector(0, 1, 2))
    22/10/27 23:47:00 INFO TaskSchedulerImpl: Adding task set 0.0 with 3 tasks
    22/10/27 23:47:00 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, LAPTOP-NDJFRBI4, executor driver, partition 0, ANY, 7546 bytes)
    22/10/27 23:47:00 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, LAPTOP-NDJFRBI4, executor driver, partition 1, ANY, 7546 bytes)
    22/10/27 23:47:00 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
    22/10/27 23:47:00 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
    ----------------------注意这里开始调度执行 task
    22/10/27 23:47:00 INFO HadoopRDD: Input split: OrcSplit [hdfs://hadoop1:9000/bi/ods/ods_start_log/dt=20210721/000000_1, start=130975043, length=126979143, isOriginal=true, fileLength=257955207, hasFooter=false, hasBase=true, deltas=[]]
    22/10/27 23:47:00 INFO HadoopRDD: Input split: OrcSplit [hdfs://hadoop1:9000/bi/ods/ods_start_log/dt=20210721/000000_1, start=3, length=130975040, isOriginal=true, fileLength=257955207, hasFooter=false, hasBase=true, deltas=[]]
    22/10/27 23:47:00 INFO OrcInputFormat: Using column configuration variables columns [str] / columns.types [string] (isAcidRead false)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    2.1 于是来到OrcInputFormat#generateSplitsInfo

    org.apache.hadoop.hive.ql.io.orc.OrcInputFormat#generateSplitsInfo

    static List generateSplitsInfo(Configuration conf, Context context)
          throws IOException {
          // 对应打印日志的地方
        if (LOG.isInfoEnabled()) {
          LOG.info("ORC pushdown predicate: " + context.sarg);
        }
        ....
            for (SplitStrategy splitStrategy : splitStrategies) {
              if (isDebugEnabled) {
                LOG.debug("Split strategy: {}", splitStrategy);
              }
    
              // Hack note - different split strategies return differently typed lists, yay Java.
              // This works purely by magic, because we know which strategy produces which type.
              if (splitStrategy instanceof ETLSplitStrategy) {
              //---------------------  生成splitFutures,用于异步生成split--------------------------
                scheduleSplits((ETLSplitStrategy)splitStrategy,
                    context, splitFutures, strategyFutures, splits);
              } else {
                @SuppressWarnings("unchecked")
                List readySplits = (List)splitStrategy.getSplits();
                splits.addAll(readySplits);
              }
            }
          }
    
          // Run the last combined strategy, if any.
          if (combinedCtx != null && combinedCtx.combined != null) {
            scheduleSplits(combinedCtx.combined, context, splitFutures, strategyFutures, splits);
            combinedCtx.combined = null;
          }
    
          // complete split futures
          for (Future ssFuture : strategyFutures) {
             ssFuture.get(); // Make sure we get exceptions strategies might have thrown.
          }
          // All the split strategies are done, so it must be safe to access splitFutures.
          for (Future> splitFuture : splitFutures) {
           //---------------------  等待splitFuture 执行完成,获得split,详见下面SplitGenerator的call方法--------------------------
            splits.addAll(splitFuture.get());
          }
        } catch (Exception e) {
          cancelFutures(pathFutures);
          cancelFutures(strategyFutures);
          cancelFutures(splitFutures);
          throw new RuntimeException("ORC split generation failed with exception: " + e.getMessage(), e);
        }
    
        if (context.cacheStripeDetails) {
          LOG.info("FooterCacheHitRatio: " + context.cacheHitCounter.get() + "/"
              + context.numFilesCounter.get());
        }
    
        if (isDebugEnabled) {
          for (OrcSplit split : splits) {
            LOG.debug(split + " projected_columns_uncompressed_size: "
                + split.getColumnarProjectionSize());
          }
        }
        return splits;
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61

    2.2 于是重点进OrcInputFormat#scheduleSplits 看,如何生成splitfuture

    org.apache.hadoop.hive.ql.io.orc.OrcInputFormat#scheduleSplits

      private static void scheduleSplits(ETLSplitStrategy splitStrategy, Context context,
          List<Future<List<OrcSplit>>> splitFutures, List<Future<Void>> strategyFutures,
          List<OrcSplit> splits) throws IOException {
          // 重点看这里
        Future<Void> ssFuture = splitStrategy.generateSplitWork(context, splitFutures, splits);
        if (ssFuture == null) return;
        strategyFutures.add(ssFuture);
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.ETLSplitStrategy#generateSplitWork

    2.3 进入 OrcInputFormat.ETLSplitStrategy#runGetSplitsSync

    org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.ETLSplitStrategy#runGetSplitsSync

    private void runGetSplitsSync(List<Future<List<OrcSplit>>> splitFutures,
            List<OrcSplit> splits, UserGroupInformation ugi) throws IOException {
          UserGroupInformation tpUgi = ugi == null ? UserGroupInformation.getCurrentUser() : ugi;
          List<SplitInfo> splitInfos = getSplits();
          List<Future<List<OrcSplit>>> localListF = null;
          List<OrcSplit> localListS = null;
          for (SplitInfo splitInfo : splitInfos) {
          // SplitGenerator是一个callable类
            SplitGenerator sg = new SplitGenerator(splitInfo, tpUgi, allowSyntheticFileIds);
            if (!sg.isBlocking()) {
              if (localListS == null) {
                localListS = new ArrayList<>(splits.size());
              }
              // Already called in doAs, so no need to doAs here.
              localListS.addAll(sg.call());
            } else {
              if (localListF == null) {
                localListF = new ArrayList<>(splits.size());
              }
              // 创建futuretask
              localListF.add(Context.threadPool.submit(sg));
            }
          }
          if (localListS != null) {
            synchronized (splits) {
              splits.addAll(localListS);
            }
          }
          if (localListF != null) {
            synchronized (splitFutures) {
              splitFutures.addAll(localListF);
            }
           }
         }
       }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.SplitGenerator
    org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.SplitGenerator#call
    org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.SplitGenerator#callInternal

    2.4 (重点)进入OrcInputFormat.SplitGenerator#generateSplitsFromStripes

    org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.SplitGenerator#generateSplitsFromStripes
    通过orc文件的stripe 生成split,是以stripe 为基本单位,组装成spllt, 而不会继续分割stripe

        private List<OrcSplit> generateSplitsFromStripes(boolean[] includeStripe) throws IOException {
          List<OrcSplit> splits = new ArrayList<>(stripes.size());
    
          // after major compaction, base files may become empty base files. Following sequence is an example
          // 1) insert some rows
          // 2) delete all rows
          // 3) major compaction
          // 4) insert some rows
          // In such cases, consider entire base delta file as an orc split (similar to what BI strategy does)
          if (stripes == null || stripes.isEmpty()) {
            splits.add(createSplit(0, file.getLen(), orcTail));
          } else {
            // if we didn't have predicate pushdown, read everything
            if (includeStripe == null) {
              includeStripe = new boolean[stripes.size()];
              Arrays.fill(includeStripe, true);
            }
    
            OffsetAndLength current = new OffsetAndLength();
            int idx = -1;
            for (StripeInformation stripe : stripes) {
              idx++;
    
              if (!includeStripe[idx]) {
                // create split for the previous unfinished stripe
                if (current.offset != -1) {
                  splits.add(createSplit(current.offset, current.length, orcTail));
                  current.offset = -1;
                }
                continue;
              }
    
              current = generateOrUpdateSplit(
                splits, current, stripe.getOffset(), stripe.getLength(), orcTail);
            }
            generateLastSplit(splits, current, orcTail);
          }
          // Add uncovered ACID delta splits.
          splits.addAll(deltaSplits);
          return splits;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41

    2.5 (重点)进入** OrcInputFormat.SplitGenerator#generateOrUpdateSplit

    org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.SplitGenerator#generateOrUpdateSplit

    private OffsetAndLength generateOrUpdateSplit(
            List<OrcSplit> splits, OffsetAndLength current, long offset,
            long length, OrcTail orcTail) throws IOException {
          // if we are working on a stripe, over the min stripe size, and
          // crossed a block boundary, cut the input split here.
           // current.offset : 表示当前组合stripe的偏移
           // offset : 表示传进来stripe的偏移
           // blockSize : 表示hdfs文件的block大小,是文件自带属性
           // 当 maxSize 特别大的时候,current.offset不变,current.length,offset逐渐增大,当offset大于blockSize,即满足第1个判断条件,就以block大小来划分
           // context.minSize,context.maxSize 正是传进去的参数
          if (current.offset != -1 && current.length > context.minSize &&
              (current.offset / blockSize != offset / blockSize)) {
            splits.add(createSplit(current.offset, current.length, orcTail));
            current.offset = -1;
          }
          // if we aren't building a split, start a new one.
          if (current.offset == -1) {
            current.offset = offset;
            current.length = length;
          } else {
            current.length = (offset + length) - current.offset;
          }
            // 如果stripe累加大小大于 mapreduce.input.fileinputformat.split.maxsize,就开始分割成1个split
          if (current.length >= context.maxSize) {
            splits.add(createSplit(current.offset, current.length, orcTail));
            current.offset = -1;
          }
          return current;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    2.6 总结一下,调用关系如下:

    在这里插入图片描述

    其中context.minSize,context.maxSize定义如下 :

    org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.Context#Context(org.apache.hadoop.conf.Configuration, int, org.apache.hadoop.hive.ql.io.orc.ExternalCache.ExternalFooterCachesByConf)

          minSize = HiveConf.getLongVar(conf, ConfVars.MAPREDMINSPLITSIZE, DEFAULT_MIN_SPLIT_SIZE);
          maxSize = HiveConf.getLongVar(conf, ConfVars.MAPREDMAXSPLITSIZE, DEFAULT_MAX_SPLIT_SIZE);
    
    • 1
    • 2

    在这里插入图片描述默认情况下

    mapreduce.input.fileinputformat.split.minsize=1
    mapreduce.input.fileinputformat.split.maxsize= 256M

    因此,默认情况下,切片大小=blocksize。因为一般文件的block大小为128M, 是小于mapreduce.input.fileinputformat.split.maxsize默认值的

    根据orc文件的stripe,结合参数配置,最终生成split

    请添加图片描述

    结论: 因此使用 "spark.hadoop.mapreduce.input.fileinputformat.split.maxsize" 能够控制生成split个数,进而控制读取的map task数量

    小疑问:为什么设置spark.hadoop.mapreduce.input.fileinputformat.split.maxsize 最后怎么生效成mapreduce.input.fileinputformat.split.maxsize
    答案在 org.apache.spark.deploy.SparkHadoopUtil#appendSparkHadoopConfigs 中,创建Sparkcontext时,,将spark.hadoop开头的参数截取后面部分,设置为hadoop的参数

    以下为设置"spark.hadoop.mapreduce.input.fileinputformat.split.maxsize""67108864"时,spark sql 读取orc文件的示意图

    在这里插入图片描述

    3、总结

    (1)通过orc文件的stripe 生成split,是以stripe 为基本单位,组装成spllt, 而不会继续分割stripe
    (2)默认情况下,split大小=文件blocksize。因为一般文件的block大小为128M, 是小于mapreduce.input.fileinputformat.split.maxsize默认值256M
    (3)当使用 "spark.hadoop.mapreduce.input.fileinputformat.split.maxsize" 小于自身文件block值时,能够提高生成split个数,进而提高读取的map task数量

    4、遗留问题

    orc表文件中的stripe是如何生成的?
    https://www.jianshu.com/p/0ba4f5c3f113
    https://cloud.tencent.com/developer/article/1580677
    https://blog.csdn.net/dante_003/article/details/79245240

  • 相关阅读:
    人工智能第2版学习——博弈中的搜索1
    这6种性能优化,让你的程序飞起来!
    Compose中的FlowLayout
    【云原生】kubernetes应用程序包管理工具Helm
    这是我见过最牛逼的滑动加载前端框架
    一文简述AI自动化漏洞修复实践
    Go:Gnome sort 侏儒排序(附完整源码)
    NLP文本生成全解析:从传统方法到预训练完整介绍
    Android Proguard混淆
    Spring框架——加载属性(properties)文件
  • 原文地址:https://blog.csdn.net/u014034497/article/details/127543274