• Hive UDF array_struct_sort 对Array<Struct>进行排序


    一、UDF说明

    array_struct_sort(array(struct1,struct2,...), string sortField):Returns the passed array struct, ordered by the given field.
    对所给的Array按sortField字段进行排序并返回。

    二、代码

    1. package com.scb.dss.udf;
    2. import org.apache.hadoop.hive.ql.exec.Description;
    3. import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
    4. import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
    5. import org.apache.hadoop.hive.ql.metadata.HiveException;
    6. import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
    7. import org.apache.hadoop.hive.serde.Constants;
    8. import org.apache.hadoop.hive.serde2.objectinspector.*;
    9. import org.apache.hadoop.io.Text;
    10. import java.util.*;
    11. import static org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category.LIST;
    12. @Description(name = "array_struct_sort",
    13. value = "_FUNC_(array(struct1,struct2,...), string sortField) - "
    14. + "Returns the passed array struct, ordered by the given field",
    15. extended = "Example:\n"
    16. + " > SELECT class, array_struct_sort(collect_list(struct_t), 'age') as struct_array\n" +
    17. " FROM (\n" +
    18. " SELECT '1' as class, named_struct('name', 'N003', 'age', '20') as struct_t\n" +
    19. " union all \n" +
    20. " SELECT '2' as class, named_struct('name', 'N001', 'age', '18') as struct_t\n" +
    21. " union all \n" +
    22. " SELECT '1' as class, named_struct('name', 'N002', 'age', '19') as struct_t\n" +
    23. " union all\n" +
    24. " SELECT '2' as class, named_struct('name', 'N000', 'age', '17') as struct_t\n" +
    25. " ) as test_data\n" +
    26. " group by class;\n")
    27. public class UDFArrayStructSort extends GenericUDF {
    28. protected ObjectInspector[] argumentOIs;
    29. ListObjectInspector loi;
    30. StructObjectInspector elOi;
    31. // cache comparators for performance
    32. Map comparatorCache = new HashMap();
    33. @Override
    34. public ObjectInspector initialize(ObjectInspector[] ois) throws UDFArgumentException {
    35. // all common initialization
    36. argumentOIs = ois;
    37. // clear comparator cache from previous invokations
    38. comparatorCache.clear();
    39. return checkAndReadObjectInspectors(ois);
    40. }
    41. /**
    42. * Utility method to check that an object inspector is of the correct type,
    43. * and returns its element object inspector
    44. *
    45. * @param ois
    46. * @return
    47. * @throws UDFArgumentTypeException
    48. */
    49. protected ListObjectInspector checkAndReadObjectInspectors(ObjectInspector[] ois)
    50. throws UDFArgumentTypeException, UDFArgumentException {
    51. // check number of arguments. We only accept two,
    52. // the list of struct to sort and the name of the struct field
    53. // to sort by
    54. if (ois.length != 2) {
    55. throw new UDFArgumentException("2 arguments needed, found " + ois.length);
    56. }
    57. // first argument must be a list/array
    58. if (!ois[0].getCategory().equals(LIST)) {
    59. throw new UDFArgumentTypeException(0, "Argument 1"
    60. + " of function " + this.getClass().getCanonicalName() + " must be " + Constants.LIST_TYPE_NAME
    61. + ", but " + ois[0].getTypeName()
    62. + " was found.");
    63. }
    64. // a list/array is read by a LIST object inspector
    65. loi = (ListObjectInspector) ois[0];
    66. // a list has an element type associated to it
    67. // elements must be structs for this UDF
    68. if (loi.getListElementObjectInspector().getCategory() != ObjectInspector.Category.STRUCT) {
    69. throw new UDFArgumentTypeException(0, "Argument 1"
    70. + " of function " + this.getClass().getCanonicalName() + " must be an array of structs " +
    71. " but is an array of " + loi.getListElementObjectInspector().getCategory().name());
    72. }
    73. // store the object inspector for the elements
    74. elOi = (StructObjectInspector) loi.getListElementObjectInspector();
    75. // returns the same object inspector
    76. return loi;
    77. }
    78. // factory method for cached comparators
    79. Comparator getComparator(Text field) {
    80. if (!comparatorCache.containsKey(field.toString())) {
    81. comparatorCache.put(field.toString(), new StructFieldComparator(field.toString()));
    82. }
    83. return comparatorCache.get(field.toString());
    84. }
    85. @Override
    86. public Object evaluate(DeferredObject[] dos) throws HiveException {
    87. // get list
    88. if (dos == null || dos.length != 2) {
    89. throw new HiveException("received " + (dos == null ? "null" :
    90. Integer.toString(dos.length) + " elements instead of 2"));
    91. }
    92. // each object is supposed to be a struct
    93. // we make a shallow copy of the list. We don't want to sort
    94. // the list in place since the object could be used elsewhere in the
    95. // hive query
    96. ArrayList al = new ArrayList(loi.getList(dos[0].get()));
    97. // sort with our comparator, then return
    98. // note that we could get a different field to sort by for every
    99. // invocation
    100. Collections.sort(al, getComparator((Text) dos[1].get()));
    101. return al;
    102. }
    103. @Override
    104. public String getDisplayString(String[] children) {
    105. return (children == null ? null : this.getClass().getCanonicalName() + "(" + children[0] + "," + children[1] + ")");
    106. }
    107. // to sort a list , we must supply our comparator
    108. public class StructFieldComparator implements Comparator {
    109. StructField field;
    110. public StructFieldComparator(String fieldName) {
    111. field = elOi.getStructFieldRef(fieldName);
    112. }
    113. public int compare(Object o1, Object o2) {
    114. // ok..so both not null
    115. Object f1 = elOi.getStructFieldData(o1, field);
    116. Object f2 = elOi.getStructFieldData(o2, field);
    117. // compare using hive's utility functions
    118. return ObjectInspectorUtils.compare(f1, field.getFieldObjectInspector(),
    119. f2, field.getFieldObjectInspector());
    120. }
    121. }
    122. }

    三、测试

    测试数据如下:

    classstruct
    1{"name":"N003","age":"20"}
    2{"name":"N001","age":"18"}
    1{"name":"N002","age":"19"}
    2{"name":"N000","age":"17"}

    测试代码:

    1. SELECT class, array_struct_sort(collect_list(struct_t), 'age') as struct_array
    2. FROM (
    3. SELECT '1' as class, named_struct('name', 'N003', 'age', '20') as struct_t
    4. union all
    5. SELECT '2' as class, named_struct('name', 'N001', 'age', '18') as struct_t
    6. union all
    7. SELECT '1' as class, named_struct('name', 'N002', 'age', '19') as struct_t
    8. union all
    9. SELECT '2' as class, named_struct('name', 'N000', 'age', '17') as struct_t
    10. ) as test_data
    11. group by class;

    测试结果如下:

    在结合上节的Hive UDAF collect_map我们就可以对MAP>>进行聚合排序操作了。

    1. SELECT class, collect_map(class, struct_array) as res
    2. FROM (
    3. SELECT class, array_struct_sort(collect_list(struct_t), 'age') as struct_array
    4. FROM (
    5. SELECT '1' as class, named_struct('name', 'N003', 'age', '20') as struct_t
    6. union all
    7. SELECT '2' as class, named_struct('name', 'N001', 'age', '18') as struct_t
    8. union all
    9. SELECT '1' as class, named_struct('name', 'N002', 'age', '19') as struct_t
    10. union all
    11. SELECT '2' as class, named_struct('name', 'N000', 'age', '17') as struct_t
    12. ) as test_data
    13. group by class
    14. ) as tmp
    15. group by class
    16. ;

     

     跟上节的结果相比,这次出来的struct可以按照age进行排序了。

    res字段类型为MAP>>,如果要取一班年纪最新的学生名字,代码如下:

    1. select res['1'][0].name
    2. from (
    3. SELECT class, collect_map(class, struct_array) as res
    4. FROM (
    5. SELECT class, array_struct_sort(collect_list(struct_t), 'age') as struct_array
    6. FROM (
    7. SELECT '1' as class, named_struct('name', 'N003', 'age', '20') as struct_t
    8. union all
    9. SELECT '2' as class, named_struct('name', 'N001', 'age', '18') as struct_t
    10. union all
    11. SELECT '1' as class, named_struct('name', 'N002', 'age', '19') as struct_t
    12. union all
    13. SELECT '2' as class, named_struct('name', 'N000', 'age', '17') as struct_t
    14. ) as test_data
    15. group by class
    16. ) as tmp
    17. group by class
    18. ) as t
    19. ;

     

    可以在加个where条件去过滤NULL 

    四、参考文档

    » Structured data in Hive: a generic UDF to sort arrays of structs Roberto Congiu's blog

    五、待改进

    目前UDF只支持升序排序,后续可以在加个第三个参数来控制升序、降序。只需要在实现一个StructFieldComparator。

  • 相关阅读:
    C++20之Module(浅析)
    SQL 语法速成手册
    IB和A-level物理怎么选?
    海康威视Java实习面试
    js实现拖动效果
    酷克数据发布HD-SQL-LLaMA模型,开启数据分析“人人可及”新时代
    维格云小程序如何快速上手开发?
    【C++入门篇】保姆级教程篇【上】
    npm 全局配置
    【外设】拓展坞接入外设一直弹窗报错问题
  • 原文地址:https://blog.csdn.net/qq_37771475/article/details/126371687