• Hive SQL ——窗口函数源码阅读


    前言

       使用Starrocks引擎中的窗口函数 row_number() over( )对10亿的数据集进行去重操作,BE内存溢出问题频发(忘记当时指定的BE内存上限是多少了.....),此时才意识到,开窗操作,如果使用 不当,反而更容易引发性能问题。 下文是对Hive中的窗口函数底层源码进行初步学习,若有问题,请指正~

    一、窗口函数的执行步骤

    (1)将数据分割成多个分区;

    (2)在各个分区上调用窗口函数;

       由于窗口函数的返回结果不是一个聚合值,而是另一张表的格式(table-in, table-out),因此Hive社区引入分区表函数  Partitioned Table Function(PTF)。

      简略的代码流转图:

      hive会把QueryBlock,翻译成执行操作数OperatorTree,其中每个operator都会有三个重要的方法:

    • initializeOp() :初始化算子
    • process() :执行每一行数据
    • forward() :把处理好的每一行数据发送到下个Operator

       当遇到窗口函数时,会生成PTFOperator,PTFOperator依赖PTFInvocation 读取已经排好序的数据,创建相应的输入分区:PTFPartition inputPart;

       WindowTableFunction 负责管理窗口帧、调用窗口函数(UDAF)、并将结果写入输出分区: PTFPartition outputPart。

    二、源码分析

    2.1 PTFOperator 类

       是PartitionedTableFunction的运算符,继承Operator抽象类(Hive运算符基类)

    重写process(Object row, int tag) 方法,该方法来处理一行数据Row

    1. @Override
    2. public void process(Object row, int tag) throws HiveException {
    3. if (!isMapOperator) {
    4. /*
    5. * check if current row belongs to the current accumulated Partition:
    6. * - If not:
    7. * - process the current Partition
    8. * - reset input Partition
    9. * - set currentKey to the newKey if it is null or has changed.
    10. */
    11. newKeys.getNewKey(row, inputObjInspectors[0]);
    12. //会判断当前row所属的Key(newKeys)是否等于当前正在累积数据的partition所属的key(currentKeys)
    13. boolean keysAreEqual = (currentKeys != null && newKeys != null) ?
    14. newKeys.equals(currentKeys) : false;
    15. // 如果不相等,就结束当前partition分区的数据累积,触发窗口计算
    16. if (currentKeys != null && !keysAreEqual) {
    17. // 关闭正在积累的分区
    18. ptfInvocation.finishPartition();
    19. }
    20. // 如果currentKeys为空或者被改变,就将newKeys赋值给currentKeys
    21. if (currentKeys == null || !keysAreEqual) {
    22. // 开启一个新的分区partition
    23. ptfInvocation.startPartition();
    24. if (currentKeys == null) {
    25. currentKeys = newKeys.copyKey();
    26. } else {
    27. currentKeys.copyKey(newKeys);
    28. }
    29. }
    30. } else if (firstMapRow) { // 说明当前row是进入的第一行
    31. ptfInvocation.startPartition();
    32. firstMapRow = false;
    33. }
    34. // 将数据row添加到分区中,积累数据
    35. ptfInvocation.processRow(row);
    36. }

       上面的代码可以看出,所有数据应该是按照分区排好了序,排队进入process方法,当遇到进入的row和当前分区不是同一个key时,当前分区就可以关闭了,然后在打开下一个分区。

    2.2 PTFInvocation类

      PTFInvocationPTFOperator类 的内部类

     在PTFOperator的初始化方法中创建了实例。

    1. @Override
    2. protected void initializeOp(Configuration jobConf) throws HiveException {
    3. ...
    4. ptfInvocation = setupChain();
    5. ptfInvocation.initializeStreaming(jobConf, isMapOperator);
    6. ...
    7. }

       它的主要作用是负责PTF 数据链中行( row)的流动,通过 ptfInvocation.processRow(row) 方法调用传递链中的每一行,并且通过ptfInvocation.startPartition()、ptfInvocation.finishPartition()方法来通知分区何时开始何时结束。

     该类中包含TableFunction,用来处理分区数据。

    1. PTFPartition inputPart; // inputPart理解为:分区对象,一直是在复用一个inputPart
    2. TableFunctionEvaluator tabFn; // tabFn理解为:窗口函数的实例
    3. //向分区中添加一行数据
    4. void processRow(Object row) throws HiveException {
    5. if (isStreaming()) {
    6. // tabFn是窗口函数的实例
    7. handleOutputRows(tabFn.processRow(row));
    8. } else {
    9. // inputPart就是当前正在累积数据的分区
    10. inputPart.append(row);
    11. }
    12. }
    13. // 开启一个分区
    14. void startPartition() throws HiveException {
    15. if (isStreaming()) {
    16. tabFn.startPartition();
    17. } else {
    18. if (prev == null || prev.isOutputIterator()) {
    19. if (inputPart == null) {
    20. // 创建新分区对象:PTFPartition对象
    21. createInputPartition();
    22. } else {
    23. // 重置分区
    24. inputPart.reset();
    25. }
    26. }
    27. }
    28. if (next != null) {
    29. next.startPartition();
    30. }
    31. }
    32. // 关闭一个分区
    33. void finishPartition() throws HiveException {
    34. if (isStreaming()) {
    35. handleOutputRows(tabFn.finishPartition());
    36. } else {
    37. if (tabFn.canIterateOutput()) {
    38. outputPartRowsItr = inputPart == null ? null :
    39. tabFn.iterator(inputPart.iterator());
    40. } else {
    41. // tabFn是窗口函数的实例,execute方法:执行窗口函数逻辑的计算,返回outputPart依旧是一个分区对象
    42. outputPart = inputPart == null ? null : tabFn.execute(inputPart);
    43. outputPartRowsItr = outputPart == null ? null : outputPart.iterator();
    44. }
    45. if (next != null) {
    46. if (!next.isStreaming() && !isOutputIterator()) {
    47. next.inputPart = outputPart;
    48. } else {
    49. if (outputPartRowsItr != null) {
    50. while (outputPartRowsItr.hasNext()) {
    51. next.processRow(outputPartRowsItr.next());
    52. }
    53. }
    54. }
    55. }
    56. if (next != null) {
    57. next.finishPartition();
    58. } else {
    59. if (!isStreaming()) {
    60. if (outputPartRowsItr != null) {
    61. while (outputPartRowsItr.hasNext()) {
    62. // 将窗口函数计算结果逐条输出到下一个Operator中
    63. forward(outputPartRowsItr.next(), outputObjInspector);
    64. }
    65. }
    66. }
    67. }
    68. }

    2.3 PTFPartition类

       该类表示由TableFunctionWindowFunction来处理的行集合,使用PTFRowContainer来保存数据。

    1. private final PTFRowContainer> elems; // 存放数据的容器
    2. public void append(Object o) throws HiveException {
    3. //在往PTFPartition中添加数据时,如果当前累计条数超过了Int最大值(21亿),会抛异常。
    4. if (elems.rowCount() == Integer.MAX_VALUE) {
    5. throw new HiveException(String.format("Cannot add more than %d elements to a PTFPartition",
    6. Integer.MAX_VALUE));
    7. }
    8. @SuppressWarnings("unchecked")
    9. List l = (List)
    10. ObjectInspectorUtils.copyToStandardObject(o, inputOI, ObjectInspectorCopyOption.WRITABLE);
    11. elems.addRow(l);
    12. }
    13. 2.4 TableFunctionEvaluator类

         该类负责对分区内的数据做实际的窗口计算

      1. public abstract class TableFunctionEvaluator {
      2. transient protected PTFPartition outputPartition; // transient瞬态变量,该属性可以不参与序列化
      3. // iPart理解为:分区对象
      4. public PTFPartition execute(PTFPartition iPart)
      5. throws HiveException {
      6. if (ptfDesc.isMapSide()) {
      7. return transformRawInput(iPart);
      8. }
      9. PTFPartitionIterator pItr = iPart.iterator();
      10. PTFOperator.connectLeadLagFunctionsToPartition(ptfDesc.getLlInfo(), pItr);
      11. if (outputPartition == null) {
      12. outputPartition = PTFPartition.create(ptfDesc.getCfg(),
      13. tableDef.getOutputShape().getSerde(),
      14. OI, tableDef.getOutputShape().getOI());
      15. } else {
      16. outputPartition.reset();
      17. }
      18. // 入参1:输入PTFPartition转换的迭代器;入参2:输出PTFPartition
      19. execute(pItr, outputPartition);
      20. return outputPartition;
      21. }
      22. protected abstract void execute(PTFPartitionIterator pItr, PTFPartition oPart) throws HiveException;
      23. }
      24.  抽象方法 execute(PTFPartitionIterator pItr, PTFPartition oPart) 方法的具体实现在子类WindowingTableFunction

        1. public class WindowingTableFunction extends TableFunctionEvaluator {
        2. @Override
        3. public void execute(PTFPartitionIterator pItr, PTFPartition outP) throws HiveException {
        4. ArrayList> oColumns = new ArrayList>();
        5. PTFPartition iPart = pItr.getPartition();
        6. StructObjectInspector inputOI = iPart.getOutputOI();
        7. WindowTableFunctionDef wTFnDef = (WindowTableFunctionDef) getTableDef();
        8. for (WindowFunctionDef wFn : wTFnDef.getWindowFunctions()) {
        9. // 这里是判断逻辑:如果该窗口定义是一个从第一行到最后一行的全局无限窗口就返回false,反之true
        10. boolean processWindow = processWindow(wFn.getWindowFrame());
        11. pItr.reset();
        12. if (!processWindow) {
        13. Object out = evaluateFunctionOnPartition(wFn, iPart);
        14. if (!wFn.isPivotResult()) {
        15. out = new SameList(iPart.size(), out);
        16. }
        17. oColumns.add((List) out);
        18. } else {
        19. oColumns.add(executeFnwithWindow(wFn, iPart));
        20. }
        21. }
        22. /*
        23. * Output Columns in the following order
        24. * - the columns representing the output from Window Fns
        25. * - the input Rows columns
        26. */
        27. for (int i = 0; i < iPart.size(); i++) {
        28. ArrayList oRow = new ArrayList();
        29. Object iRow = iPart.getAt(i);
        30. for (int j = 0; j < oColumns.size(); j++) {
        31. oRow.add(oColumns.get(j).get(i));
        32. }
        33. for (StructField f : inputOI.getAllStructFieldRefs()) {
        34. oRow.add(inputOI.getStructFieldData(iRow, f));
        35. }
        36. //最终将处理好的数据逐条添加到输出PTFPartition中
        37. outP.append(oRow);
        38. }
        39. }
        40. // Evaluate the function result for each row in the partition
        41. ArrayList executeFnwithWindow(
        42. WindowFunctionDef wFnDef,
        43. PTFPartition iPart)
        44. throws HiveException {
        45. ArrayList vals = new ArrayList();
        46. for (int i = 0; i < iPart.size(); i++) {
        47. // 入参:1.窗口函数、2.当前行的行号、3.输入PTFPartition对象
        48. Object out = evaluateWindowFunction(wFnDef, i, iPart);
        49. vals.add(out);
        50. }
        51. return vals;
        52. }
        53. // Evaluate the result given a partition and the row number to process
        54. private Object evaluateWindowFunction(WindowFunctionDef wFn, int rowToProcess, PTFPartition partition)
        55. throws HiveException {
        56. BasePartitionEvaluator partitionEval = wFn.getWFnEval()
        57. .getPartitionWindowingEvaluator(wFn.getWindowFrame(), partition, wFn.getArgs(), wFn.getOI(), nullsLast);
        58. // 给定当前行,获取窗口的聚合
        59. return partitionEval.iterate(rowToProcess, ptfDesc.getLlInfo());
        60. }
        61. }
        62. 注:WindowingTableFunction类中的execute方法 ,没怎么理解清楚,待补充~

          三、Hive SQL窗口函数实现原理

              window Funtion的使用语法:

          1. select
          2. col1,
          3. col2,
          4. row_number() over (partition by col1 order by col2 窗口子句) as rn
          5. from tableA

          上面的语句主要分两部分

          • window函数部分(window_func)

          • 窗口定义部分

          3.1 window函数部分

             windows函数部分即是:在窗口上执行的函数。主要有count 、sum、avg聚合类窗口函数、还有常用的row_number、rank这样的排序函数。

          3.2  窗口定义部分

          即为: over里面的三部分内容(均可省略不写)

          • partition by 分区

          • order by 排序

          • (rows | range )between ... and .....  窗口子句

          ps :Hive 窗口函数的详细介绍:

          (07)Hive——窗口函数详解_hive 窗口函数-CSDN博客

          3.3  window Function实现原理

             窗口函数的实现,主要借助 Partitioned Table Function (即PTF);

          (1)PTF的输入可以是:表、子查询或另一个PTF函数输出;

          (2)PTF输出是一张表。

          写一个相对复杂的sql,来看一下执行窗口函数时,数据的流转情况:

          1. select
          2. id,
          3. sq,
          4. cell_type,
          5. rank,
          6. row_number() over(partition by id order by rank ) as rn ,
          7. rank() over(partition by id order by rank) as r,
          8. dense_rank() over(partition by cell_type order by id) as dr
          9. from window_test_table
          10. group by
          11. id,
          12. sq,
          13. cell_type,
          14. rank;

          数据流转如下图:

          以上代码实现主要有三个阶段:

          • 计算除窗口函数以外所有的其他运算,如:group by,join ,having等。上面的代码的第一阶段即为:

          1. select
          2. id,
          3. sq,
          4. cell_type,
          5. rank
          6. from window_test_table
          7. group by
          8. id,
          9. sq,
          10. cell_type,
          11. rank;
          • 将第一步的输出作为第一个 PTF 的输入,计算对应的窗口函数值。上面代码的第二阶段即为:

          1. select
          2. id,
          3. sq,
          4. cell_type,
          5. rank,
          6. rn,
          7. r
          8. from
          9. window(
          10. <w>,--将第一阶段输出记为w
          11. partition by id, --分区
          12. order by rank, --窗口函数的order
          13. [rn:row_number(),r:rank()] --窗口函数调用
          14. )

             由于row_number(),rank() 两个函数对应的窗口是相同的(partition by id  order by rank),因此,这两个函数可以在一次shuffle中完成。

          • 将第二步的输出结果作为 第二个PTF 的输入,计算对应的窗口函数值。上面代码的第三阶段即为:

          1. select
          2. id,
          3. sq,
          4. cell_type,
          5. rank,
          6. rn,
          7. r,
          8. dr
          9. from
          10. window(
          11. <w1>,--将第二阶段输出记为w1
          12. partition by cell_type, --分区
          13. order by id, --窗口函数的order
          14. [dr:dense_rank()] --窗口函数调用
          15. )

              由于dense_rank()的窗口与前两个函数不同,因此需要再partition一次,得到最终的输出结果。

               总结:上述代码显示需要shuffle三次才能得到最终的结果(第一阶段的group by ,第二阶段,第三阶段的开窗操作)。对应到MapReduce程序,即需要经历三次 map->reduce组合;对应到spark sql上,需要Exchange三次,再加上中间排序操作,在数据量很大的情况下,效率上确实会有较大的影响。

          四、窗口函数的性能问题

             在使用Hive进行数据处理时,借助窗口函数可以对数据进行分组、排序等操作,但是在使用row_number这类窗口函数时,会遇到性能较慢的问题,j即比普通的聚合函数( sum,min,max等)运行成本更高,为啥?

          4.1 性能问题产生原因

          4.1.1 第一个版本

          小破站一个up主给出的答案:

           原因:

          (1)开窗函数不能做预聚合 ,数据量很多,shuffle慢,计算慢,并且会有

          数据倾斜的风险;

          (2)开窗多一步order by ,更耗时间;

          4.1.2 第二个版本

          原因:

          (1)普通的聚合函数语句,可以根据函数不同,采用partial + merge 的方式运行,也即是map端预聚合;但那是window 窗口语句只能在reduce 端一次性聚合,即只有complete 执行模式。

          (2)普通聚合函数的物理执行计划分为SortBased和HashBased的;而window是SortBased。

          (3)window语句作用于 对行,并为每行返回一个聚合结果,这决定了window在执行过程中需要更大的buffer 进行汇总。

          4.2 性能问题的优化方法

          4.2.1 用聚合函数替代 排序开窗函数

               例如:假设需要求出历史至今用户粒度末次交易的sku名称或者交易金额等,这种情况下,可以将 交易时间和sku名称拼接起来,取max ,之后再将sku名称拆解开,即能达到预期效果。

              在Hive 中,row_number是一个常用的窗口函数,用于为结果集中的每一行分配一个唯一的数字。通常会搭配over子句来指定窗口的范围和排序方式。例如:

          1. select
          2. col1,
          3. col2,
          4. row_number() over (partition by col1 order by col2 窗口子句) as rn
          5. from tableA

             上述示例row_number 函数将根据col1进行分组,并按照col2的值进行排序,为每一组数据分配一个唯一的行号。然而,在处理大规模数据时,使用row_number可能会导致性能下降,这是因为row_number 需要对数据进行排序和标记,而这些操作在大数据量下会消耗较多的计算资源。

             注: 以下都是row_number() over () 开窗函数性能优化的几种方式:

          4.2.2 减少数据量

             一种最直接的优化方法是减少需要进行row_number计算的数据量。可以通过在where子句中添加条件、对数据进行分区等方式来减小数据规模,从而提升计算性能。

             ps: 这种方式在生产环境中用过。

          4.2.3 避免多次排序

             在使用row_number时,尽量避免多次排序操作。可以将row_number 函数应用在子查询中,然后再进行排序操作,避免重复的排序过程。

          1. select
          2. col1,
          3. col2,
          4. rn
          5. from
          6. ( select
          7. col1,
          8. col2,
          9. row_number() over (partition by col1 order by col2) as rn
          10. from tableA) tmp1
          11. order by col1,col2;

          参考文章:

          常用的SQL优化方式, 用聚合函数替代排序开窗求最值, sparksql, hivesql_哔哩哔哩_bilibili

          https://blog.51cto.com/u_16213435/9877979

          Hive学习(一)窗口函数源码阅读_hive 源码阅读-CSDN博客

          https://mp.weixin.qq.com/s/WBryrbpHGO9jmzMp0e7jhw

        63. 相关阅读:
          Java最强大的技术之一:反射
          香港服务器怎么看是CN2 GT线路还是CN2 GIA线路?
          React源码分析1-jsx转换及React.createElement
          信息化发展48
          hadoop的yarn部署
          小谈设计模式(6)—依赖倒转原则
          Linux 下安装配置部署MySql8.0
          【C++设计模式】(五)创建型模式 — 建造者模式
          *Django中的Ajax jq的书写样式1
          幸福心理与抗逆力培养的工控系统安全实验课程研究
        64. 原文地址:https://blog.csdn.net/SHWAITME/article/details/140905722