• spark sql之巧用group by


    为啥想到巧用group by 还是因为优秀的人 想法就是666 

    看到一篇文章

    AI & 大数据 - 专区 - OSCHINA - 中文开源技术交流社区Artificial Intelligence 人工智能是研究、开发用于模拟、延伸和扩展人的智能的理论、方法、技术及应用系统的一门新的技术科学。大数据(big data),是指无法在一定时间范围内用常规软件工具进行捕捉、管理和处理的数据集合,是需要新处理模式才能具有更强的决策力、洞察发现力和流程优化能力的海量、高增长率和多样化的信息资产。AI 研究通常需要大量数据支撑。https://www.oschina.net/group/ai-bigdata?circle=big-data大表与大表关联

     hint在我上一篇文章已经分析过了。

     这里 大佬用了一个特别巧妙的方法,group by key ,ceil(rand()*100) 乍一看好像就明白了,但要你说好像又说不出个所以然。

    瞬间想到distributed by ceil(rand()*10 将任务输出为10个文件

    测试代码。

    1. public static void main(String[] args) throws KuduException, InterruptedException {
    2. SparkConf sparkConf = new SparkConf().setMaster("yarn").setAppName("ClusterSparkTestGroupBy");
    3. if (System.getProperty("os.name").toLowerCase().contains("windows")) {
    4. sparkConf = new SparkConf().setMaster("local[*]").setAppName("dp");
    5. sparkConf.set("spark.driver.memory", "1g").set("spark.testing.memory", "1073740000");
    6. kerberos_auth();
    7. }
    8. SparkSession session = SparkSession.builder().config(sparkConf)
    9. .enableHiveSupport().getOrCreate();
    10. Dataset<Row> dataset = session.sql("select \n" +
    11. "concat(a.src_supplier_id1,'----',b.src_supplier_id2) \n" +
    12. "from (\n" +
    13. "select biz_id,concat_ws(',',collect_list(src_supplier_id)) src_supplier_id1 from dwiadata.ia_fdw_hr_company_contact_info_relation group by biz_id\n" +
    14. ")a \n" +
    15. "join (\n" +
    16. "select biz_id,concat_ws(',',collect_list(src_supplier_id)) src_supplier_id2 from dwiadata.ia_fdw_hr_company_contact_info_relation group by biz_id\n" +
    17. ")b \n" +
    18. "on a.biz_id =b.biz_id ");
    19. System.out.println("dataset.partition="+dataset.javaRDD().getNumPartitions());
    20. dataset.write().mode(SaveMode.Overwrite).text("file:///D:\\install\\code\\tencent\\dw_ia_portraitsearch\\output\\common");
    21. Dataset<Row> dataset2 = session.sql("select \n" +
    22. "concat(a.src_supplier_id1,'----',b.src_supplier_id2)\n" +
    23. "from (\n" +
    24. "select biz_id,concat_ws(',',collect_list(src_supplier_id)) src_supplier_id1 from dwiadata.ia_fdw_hr_company_contact_info_relation group by biz_id ,ceil(rand()*6)\n" +
    25. ")a \n" +
    26. "join (\n" +
    27. "select biz_id,concat_ws(',',collect_list(src_supplier_id)) src_supplier_id2 from dwiadata.ia_fdw_hr_company_contact_info_relation group by biz_id,ceil(rand()*6)\n" +
    28. ")b \n" +
    29. "on a.biz_id =b.biz_id ");
    30. dataset2.explain(true);
    31. System.out.println("dataset2.partition="+dataset.javaRDD().getNumPartitions());
    32. dataset2.write().mode(SaveMode.Overwrite).text("file:///D:\\install\\code\\tencent\\dw_ia_portraitsearch\\output\\hint");
    33. Thread.sleep(Integer.MAX_VALUE);
    34. session.close();
    35. }

    看输出的文件 

    common还是普通的那个5个key

    只会形成5个文件 其中part-00000是默认的一个空白文件

    group呢?

    我们猜想肯定是30个文件+一个00000就是31个文件

     结果实际不是只有26+1个文件。

    就很奇怪 

    第一时间想到是不是因为 rand() 随机的原因。一个key希望被分成若6份,但是实际没有

    比如key biz_id=1003的数量是61个,最后 只被分成了 1 2 3 4 5 , rand()一直没有随机到6

     但是 实际情况不是这样的。 因为就拿1003来说 61个随机1-6 不会恰好都是12345 没有一个6

    比如 

    1有20个

    2有10个

    3有10个

    4有10个

    5有10个

    6有1个也是有可能的

    可能分布不均 但是不可能有一个rand 一个都没有 

    我们观察

     可知确实被分成了6份 确实不均 符合我们的猜想,那么按照group by biz_id ,ceil(rand()*6)

    确实将key 分成了30份,那么最终为什么形成的文件少于30个呢?

    文件特殊的有两种

     

     其中76号task 属于同样的key分到了一个文件

    98号task属于不同的key 分到了一个文件

    看spark ui 也符合自己的猜想 都是有12条记录 2个key

    这个图也说明了a表和b表都是过滤出了30条记录

    然后 5*6*6=180条记录

    第一个5是5个key  两个6是hash连接 1001分成6份 join就有6*6条记录。

    但是始终没想明白为啥 有的key会聚合到一个文件里。。。。

    前面这么多都是为了让自己更好的了解sparksql 你在group by 的时候内部做的一些操作。

    比如

     这里默认spark.sql.shuffle.partitions=200个task ,网上一般让我们改成executors的2-3倍。

    假设我们现在

    executor有10台  每个excutor有16线程我们设置core 15 

    那么 我们的executors=150个  spark.sql.shuffle.partitions设置 300-450好像比较合适。

    但是问题来了,按照我们上述实验这样设置有没有问题? 我觉得是有问题的

    首先我们明确知道key只有5个 我们采用group by 只分成6份,那么就是预计任务数只有30个

    我们还设置200个 不影响计算结果,但是 占用了这么多资源不是浪费么。

    有的又说了 在spark-submit的时候 我只设置executor=2 core=2 总共也就只会起4个cores

    不管你是200个task 还是36个task 我都是可以的。 但是按照官网的2-3倍好像又有问题。

    所以我们提前预知任务的个数,然后采用合适的资源,再根据资源的选择去确定任务个数,这样才是最正确的做法。

     此时我们再分析该大佬做的优化。

    原始表a大概有10个key 其中每个基本都是10w条数据

    原始表b大概有10-20个key,每个key大概在5w左右

    其中大概有 7 2 5 这三个key关联到了。

    大概就是3*5w*10w条记录也就是 150亿条数据

    一般来说。。一条记录按照1kb来说。这里就是

    1.5*1000*1000*1000kb=1.5*1000*1000mb=1.5*1000G=1.5T 当然我这个计算是不准确的。。。

    但是100G左右是差不多的。

    此时

    --executor-memory 2g \

    --num-executors 50 \

    --executor-cores 2 \

    那么此时我们就有100个cores了。按道理2-3倍

    所以此时task数量差不多是200-300个。

    根据key的个数据 3个 我们需要将其打散为100个 所以采用ceil(rand()*100)居然和我一样。。我这里都是自己想的。。。

    然后对出现的结果采用repartition(1000)

    是因为我们此时 join后的记录大概有150亿,repartition后每个分区就有150w数据方便处理。

    话不多说。。直接实战。。。。。

    大家随便找个数据多点的表 然后 随便过滤下数据 差不多每个key 1000-5w左右。

    本意目的就是 自己和自己关联。 5090*5090  2230*2230 6655*6655

    最后差不多是4-5亿多条数据。

    最后结果如下 我们来分析下 谁有谁劣。。。

    第一种数据直接join  是stage0-1

    第二种就是 group by biz_id, ceil(rand()*100) 是stage2-6

    首先

    ia_fdw_hr_company_contact_info该表在hdfs 上有7个文件

     然后spark 读取的时候差不多是按照128M去分片的。所以分成14个片 第14个只有一点点

    我们来看stage0 的

     可以看到左上角读取了

    • Input Size / Records: 163.9 MB / 21773127
    • Shuffle Write: 454.1 KB / 70

     21773127 是表总数据量 这个163.9 MB 好像是hdfs的总大小 应该是1639 MB.....

    Shuffle Write 是指读取这个文件后我要输出的内容。

    最后只输出了70条 大小是454kb...

    70条是怎么来的? spark将hdfs的文件分片为14片。每个片里去读取文件。因为我们读取的时候

    是根据key ,collect_list(other_column) 所以每个片只读出了5条数据

    1001,[a,b,c,d......]

    1002,[a,b,c,d......]

    1003,[a,b,c,d......]

    1004,[a,b,c,d......]

    1005,[a,b,c,d......]

    所以总共就是14*5=70条记录。同时stage0显示14个task也就是这14个片

    stage=1 显示250个task 是因为我们设置了

    sparkConf.set("spark.sql.shuffle.partitions","250");

     左上角

    • Output: 650.7 KB / 5
    • Shuffle Read: 908.2 KB / 140

    output就是我们最终数据的结果 因为我们直接join 所以只有5个输出 分别是1001 1002 1003 1004 1005

    Read 是指这个stage1读取上个stage的write数, 我们刚刚说是70 怎么变成140了? 因为我们是自己和自己关联,所以spark 直接复用 70*2 看大小 908.2kb

    刚好是上一阶段 Shuffle Write: 454.1 KB / 70 的两倍

    此时注意下面task节目 我按照shuffle read 排序了,只显示了5个task,因为其余task没有数据,上文说过。

     这个28其实也是14的2倍。也就是说 是把每个切片的一条数据拿出来了

    切片1  1001 ,1002,1003,1004,1005

    切片2  1001,1002,1003,1004,1005

    .。。。。

    这个28条 就是把切片1-14的 1001都拿出来了 并且搞了两份,然后自己和自己关联。

    ——————————————————————————————————————————

    再看看我们采用了group的 join

    • Input Size / Records: 1654.8 MB / 21773127
    • Shuffle Write: 937.2 KB / 6300

    input size和之前一样。 就是小数点不一样了。。

    shuffle write 937kb 6300条

    思考下 读取的时候还是按照14个片分片。但是读取后要group by 也就是5*100=500呀?为啥

    这个就是我最开始提出的疑问 有的key是合并了。。。怀疑是本来是500个 然后根据hash分区就分了480个。

    例如 本来是

    1001-1 1001-2.。。。。1001-100

    1002-1 1002-1.。。。。1002-100.... 500个

    但是!!1001-1和1002-1的hash指都一样就分到一个分区了。

    但实际看其实都不高于500和我们想法一样。

     但是注意这个stage2的花费时间很长。。花了1.5min stage0只要了18s

     这里可以优化的。下面两个task可以和上面同一时间执行。

  • 相关阅读:
    Jenkins自动化部署前后端分离项目 (svn + Springboot + Vue + maven)有图详解
    第五十九章 学习常用技能 - 将数据从一个数据库移动到另一个数据库
    LeetCode | 20. 有效的括号
    论文详读《基于改进 LeNet-5 模型的手写体中文识别》,未完待补充
    中科柏诚获“专精特新”称号,把一件事做到极致就是价值
    市场研究:3D打印材料行业规模政策及发展趋势调研分析
    小米笔试真题一
    ubuntu18.04运行ORB-SLAM2并用自己的数据集测试
    Java下Properties类的使用(写出和读入)
    基础算法篇——快速排序
  • 原文地址:https://blog.csdn.net/cclovezbf/article/details/122696620