• 【Spark的五种Join策略解析】


    join基本流程

    Spark将参与Join的两张表抽象为流式遍历表(streamIter)和查找表(buildIter),通常streamIter为大表,buildIter为小表,我们不用担心哪个表为streamIter,哪个表为buildIter,这个spark会根据join语句自动帮我们完成。

    img

    对于每条来自streamIter的记录,都要去buildIter中查找匹配的记录,所以buildIter一定要是查找性能较优的数据结构。spark提供了三种join实现:sort merge join、broadcast join以及hash join。

    五种join策略

    • Shuffle Hash Join
    • Broadcast Hash Join
    • Sort Merge Join
    • Cartesian Join
    • Broadcast Nested Loop Join

    大表join小表

    Shuffle Hash Join

    image-20220929103146591

    Join 步骤:把大表和小表按照相同的分区算法和分区数进行分区(Join 的 keys 进行分区),保证了 hash 值一样(相同key)的数据都分发到同一个分区中(分区内不排序),然后在同一个 Executor 中两张表 hash 值一样的分区就可以在本地进行 hash Join 。在进行 Join 之前,还会对小表的分区构建 Hash 桶(这就要求每个分区都不能太大),便于查找。

    注意,和broadcast hash join的区别,这里并没有广播小表,在双方shuffle后的分区内,小表转成Hash桶与大表进行hash join。

    苛刻的条件:

    • buildIter总体估计大小超过spark.sql.autoBroadcastJoinThreshold设定的值,即不满足broadcast join条件
    • 开启尝试使用hash join的开关,spark.sql.join.preferSortMergeJoin=false
    • 每个分区的平均大小不超过spark.sql.autoBroadcastJoinThreshold设定的值,即shuffle read阶段每个分区来自buildIter的记录要能放到内存中
    • streamIter的大小是buildIter三倍以上

    特点:

    • 仅支持等值连接,join key不需要排序
    • 支持除了全外连接(full outer joins)之外的所有join类型
    • 需要对小表构建Hash map,属于内存密集型的操作,如果构建Hash表的一侧数据比较大,可能会造成OOM,不适合严重倾斜的join
    • 对于FullOuter Join,需要建立双向hash表,代价太大。因此FullOuterJoin默认都是基于SortJoin来实现

    Broadcast Hash Join

    image-20220929103203808

    将小表的数据广播到 Spark 所有的 Executor 端,只能用于等值连接。避免了 Shuffle 操作。一般而言,Broadcast Hash Join 会比其他 Join 策略执行的要快。因为他直接在一个map中完成了,也称之为map join

    Join 步骤:

    • 利用 collect 算子将小表的数据从 Executor 端拉到 Driver 端
    • 在 Driver 端调用 sparkContext.broadcast 广播到所有 Executor 端
    • 在 Executor 端使用广播的数据与大表进行 Join 操作

    使用条件:

    • 必须为等值连接,不要求 Join 的 keys 可排序
    • 小表大小小于 spark.sql.autoBroadcastJoinThreshold(default 10M)设定的值

    Broadcast Nested Loop Join

    该方式是在没有合适的JOIN机制可供选择时,最终会选择该种join策略。优先级为:Broadcast Hash Join > Sort Merge Join > Shuffle Hash Join > cartesian Join > Broadcast Nested Loop Join.

    最小的数据集被广播到另一个数据集的每个分区上,执行一个嵌套循环来执行join, 也就是说数据集1的每条记录都尝试join数据集2的每条记录(最笨的方法),效率比较低。既可以做等值join也可以做非等值join,而且是非等值join的默认策略。

    没有排序,就是广播小表到每个分区上,尝试join每条记录,效率低!

    大表之间join

    Sort Merge Join

    image-20220929103221072

    先hash到同一个分区且排好序,然后再在分区内顺序查找比对

    对表的大小没有条件,不管分区多大,SortMergeJoin 都不用把一侧的数据全部加载到内存中,而是即用即丢;两个序列都有序。从头遍历,碰到 key 相同的就输出,如果不同,左边小就继续取左边,反之取右边,由于排序的特性,每次处理完一条记录后只需要从上一次结束的位置开始查找,SortMergeJoinExec执行时就能够避免大量无用的操作,提高了大数据量下sql join 的稳定性。

    Join 步骤:

    • shuffle: 将两张表按照 join key 进行shuffle,保证join key值相同的记录会被分在相应的分区
    • sort: 对每个分区内的数据进行排序
    • merge: 排序后再对相应的分区内的记录进行连接

    使用条件:

    • 等值连接
    • 参与 join 的 key 可排序

    Cartesian Join

    笛卡尔积

    如果左表有n个分区,右表有m个分区,那么笛卡尔积后的分区数是K=n * m个;并且这K个分区中,第K(i)个分区获取的左表分区为 kn=i / m,获取的右表分区为 km=i % m,然后kn和km这两个分区做笛卡尔积;由于是以分区为单位,所以不会触发shuffle;

    join策略选择

    等值连接的情况

    有join提示(hints)的情况,按照下面的顺序
    • Broadcast Hint:如果join类型支持,则选择broadcast hash join
    • Sort merge hint:如果join key是排序的,则选择 sort-merge join
    • shuffle hash hint:如果join类型支持, 选择 shuffle hash join
    • shuffle replicate NL hint: 如果是内连接,选择笛卡尔积方式
    没有join提示(hints)的情况,则逐个对照下面的规则
    image-20220929194248135
    • 如果join类型支持,并且其中一张表能够被广播(spark.sql.autoBroadcastJoinThreshold值,默认是10MB),则选择 broadcast hash join
    • 如果参数spark.sql.join.preferSortMergeJoin设定为false,且一张表足够小(可以构建一个hash map) ,则选择shuffle hash join
    • 如果join keys 是排序的,则选择sort-merge join
    • 如果是内连接,选择 cartesian join
    • 没有可以选择的执行策略,则最终选择broadcast nested loop join,即使可能会发生OOM

    非等值连接情况

    有join提示(hints),按照下面的顺序
    • broadcast hint:选择broadcast nested loop join.
    • shuffle replicate NL hint: 如果是内连接,则选择cartesian product join
    没有join提示(hints),则逐个对照下面的规则
    • 如果一张表足够小(可以被广播),则选择 broadcast nested loop join
    • 如果是内连接,则选择cartesian product join
    • 如果可能会发生OOM或者没有可以选择的执行策略,则最终选择broadcast nested loop join

    实验

    非等值连接默认是BroadcastNestedLoopJoin

    scala> spark.conf.get("spark.sql.join.preferSortMergeJoin")
    res1: String = true
    
    scala> spark.conf.get("spark.sql.autoBroadcastJoinThreshold")
    res2: String = 10485760
    
    scala> val data1 = Seq(10, 20, 20, 30, 40, 10, 40, 20, 20, 20, 20, 50)
    data1: Seq[Int] = List(10, 20, 20, 30, 40, 10, 40, 20, 20, 20, 20, 50)
    
    scala> val df1 = data1.toDF("id1")
    df1: org.apache.spark.sql.DataFrame = [id1: int]
    
    scala> val data2 = Seq(30, 20, 40, 50)
    data2: Seq[Int] = List(30, 20, 40, 50)
    
    scala> val df2 = data2.toDF("id2")
    df2: org.apache.spark.sql.DataFrame = [id2: int]
    
    scala> val dfJoined = df1.join(df2, $"id1" >= $"id2")   //非等值连接
    dfJoined: org.apache.spark.sql.DataFrame = [id1: int, id2: int]
    // 注意查看执行计划是BroadcastNestedLoopJoin
    scala> dfJoined.queryExecution.executedPlan
    res3: org.apache.spark.sql.execution.SparkPlan =
    BroadcastNestedLoopJoin BuildRight, Inner, (id1#3 >= id2#8)
    :- LocalTableScan [id1#3]
    +- BroadcastExchange IdentityBroadcastMode
       +- LocalTableScan [id2#8]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    image-20220929190714574

    shuffle hash join

    scala> spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 2)
    
    scala> spark.conf.set("spark.sql.join.preferSortMergeJoin", "false")
    
    scala> val dfhashJoined = df1.join(df2, $"id1" === $"id2")    //等值连接
    dfhashJoined: org.apache.spark.sql.DataFrame = [id1: int, id2: int]
    
    scala> dfhashJoined.queryExecution.executedPlan
    res7: org.apache.spark.sql.execution.SparkPlan =
    ShuffledHashJoin [id1#3], [id2#8], Inner, BuildRight
    :- Exchange hashpartitioning(id1#3, 200)
    :  +- LocalTableScan [id1#3]
    +- Exchange hashpartitioning(id2#8, 200)
       +- LocalTableScan [id2#8]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    image-20220929191340105

    sort MergeJoin

    scala> spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
    
    scala> spark.conf.set("spark.sql.join.preferSortMergeJoin", "true")
    
    scala> val sortJoined = df1.join(df2, $"id1" === $"id2")
    sortJoined: org.apache.spark.sql.DataFrame = [id1: int, id2: int]
    
    scala> sortJoined.queryExecution.executedPlan
    res11: org.apache.spark.sql.execution.SparkPlan =
    *(3) SortMergeJoin [id1#3], [id2#8], Inner
    :- *(1) Sort [id1#3 ASC NULLS FIRST], false, 0
    :  +- Exchange hashpartitioning(id1#3, 200)
    :     +- LocalTableScan [id1#3]
    +- *(2) Sort [id2#8 ASC NULLS FIRST], false, 0
       +- Exchange hashpartitioning(id2#8, 200)
          +- LocalTableScan [id2#8]
    
    scala> sortJoined.show
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    image-20220929193014333

    spark3 join策略提示

    1. Broadcast HashJoin
    有三种方式
    SELECT /*+ BROADCAST(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;
    
    SELECT /*+ BROADCASTJOIN (t1) */ * FROM t1 left JOIN t2 ON t1.key = t2.key;
    
    SELECT /*+ MAPJOIN(t2) */ * FROM t1 right JOIN t2 ON t1.key = t2.key;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    1. shuffle sort merge Join
    SELECT /*+ SHUFFLE_MERGE(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;
    
    SELECT /*+ MERGEJOIN(t2) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;
    
    SELECT /*+ MERGE(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    1. shuffle Hash Join
    SELECT /*+ SHUFFLE_HASH(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;
    
    • 1
  • 相关阅读:
    C++打怪升级(一)- 命名空间、缺省形参、重载
    开放领域问答机器人1
    ES6 --》函数扩展以及箭头函数讲解
    matlab常微分方程在传染病建模中的应用
    Hugging Face 实战系列 总目录
    Javaweb之Vue的概述
    让工程师拥有一台“超级”计算机——字节跳动客户端编译加速方案
    AUTOSAR CAN主动与被动唤醒分析
    嵌入式分享合集89
    C++实现可变参数的日志打印vprintf
  • 原文地址:https://blog.csdn.net/u012432611/article/details/132824637