• SparkSQL的Join的实现方式


    SparkSQL的Join的实现方式

    1. Hash Join:传统数据库的单机join算法。
      概念:
      Build Table:一般是小表
      Probe Table:一般是大表
      Hash Table:将Build Table按照Join的Key生成hash值,存到对应的bucket中,生成一张Hash Table,缓存在内存中,或者落盘。

      步骤:1. 确定Build表和Probe表。2. 生成Hash 表。3. 扫描Probe表,根据Key的hash值在Hash表中匹配

      性能:hash join基本只扫描两个表一次,O(a+b),极端情况会产生笛卡尔积O(ab);选择叫小的表构建Hash表,就可以将数据全部加载到内存中,提高效率。

    2. Broadcast Hash Join:适合一张很小的表和一张大表进行Join。将一小表通过广播的方式,由Driver端发送到各个Executor端,大表正常分成多个区,每个区的数据和本地的广播小表进行join。
      概念:
      事实表:指固定的、变动较少的表,如联系人、物品种类
      维度表:一般用来记录流水,如销售清单

      条件:

      1. 被广播的表需要小于spark.sql.autoBroadcastJoinThreshold所配置的大小,默认为10M;
      2. 基表不能被广播,比如left outer join时,只能广播右表

      缺点:该方案只能广播较小的表,否则数据的冗余传输会远大于shuffle的开销;此外,广播时需要被广播的表collect到driver端,当频繁的广播出现时,对driver端的内存也是一个考验。

      步骤:1. broadcast阶段:将小表广播到所有的executor上。2. hash join阶段:在每个executor上执行hash join,小表构建为hash表,大表匹配数据。

    3. Shuffle Hash Join:适合一张小表(比上一个大一点)和一张大表进行Join。对两张表分布进行shuffle,将相同key 的数据分到一个分区中,然后分区之间进行join。

      条件:

      1. 分区的平均大小需要小于spark.sql.autoBroadcastJoinThreshold所配置的大小,默认为10M;
      2. 基表不能被广播。
      3. 一侧的表要明显小于另外一侧的表(一般为3倍小),小的一侧将被广播。

      步骤:1. 对两张表分别按照join的key进行shuffle重分区,将相同的key分到同一个分区中。2. 对分区中的数据进行join。

    4. Sort Merge Join:适合两张大表进行Join。这种方式不用将一侧数据全部加载后再进行hash join,但是需要在join之前将数据排序。

      步骤:1. shuffle阶段:将两张大表根据join的key进行重分区。2. sort阶段:对单个分区节点的两张表的数据,分别进行排序。3. merge阶段:对排好序的两张分区表数据进行join操作。分布变量两张有序表,碰到相同的key就合并输出,否则取更小的一边遍历。

    5. Cartesian Join:如果两种表没有指定Key,则会产生笛卡尔积。

      条件:

      1. 仅支持内连接
      2. 支持等值连接和不等值连接
      3. 开启参数spark.sql.crossJoin.enabled=true
    6. Broadcast Nested Loop Join:在没有合适的Join机制可供选择时,最终会选择该策略。优先级:Broadcast Hash Join > Sort Merge Join > Shuffle Hash Join > cartesian Join > Broadcast Nested Loop Join。

    Spark选择Join的策略

    等值连接时:

    有Join提示(hints)的情况下,按照如下顺序:
    1. Broadcast hint:如果join类型支持,则选择broadcast hash join
    2. Sort merge hint:如果join key是排序的,则选择 sort merge join
    3. Shuffle hash hint:如果join类型支持, 选择 shuffle hash join
    4. Shuffle replicate NL hint:如果是内连接,选择笛卡尔积方式

    没有Join提示(hints)的情况下,按照如下顺序:
    1. Broadcast hash join:如果join类型支持,并且其中一张表可以被广播
    2. Shuffle hash join:如果参数spark.sql.join.preferSortMergeJoin设定为false,且一张表足够小
    3. Sort merge join:如果key是排序的
    4. Cartesian join:如果是内连接
    5. Broadcast nested loop join:如果可能会发生OOM(内存耗尽),或者没有可以选择的策略

    非等值连接:

    有Join提示(hints)的情况下,按照如下顺序:
    1. Broadcast hint:选择broadcast nested loop join
    2. Shuffle replicate NL hint:如果是内连接,选择cartesian product join

    没有Join提示(hints)的情况下,按照如下顺序:
    1. Broadcast nested loop join:如果一张表可以被广播
    2. Cartesian product join:如果是内连接
    3. Broadcast nested loop join:如果可能会发生OOM(内存耗尽),或者没有可以选择的策略

  • 相关阅读:
    苹果回应系统偷跑流量:建议恢复出厂设置;华为成立第三批军团;TensorFlow 2.9发布|极客头条
    Apache Airflow (十三) :Airflow分布式集群搭建及使用-原因及
    数列分块入门
    凉鞋的 Godot 笔记 102. 场景与节点的增删改查
    redis中value/SortedSet
    C语言——结构体详解
    HTML基本骨架与编辑器选择
    神经网络和深度神经网络,深度神经网络发展历程
    uniapp中unicloud接入支付宝订阅消息完整教程
    安卓手机磁盘空间不足怎样导出数据?
  • 原文地址:https://blog.csdn.net/junkmachine/article/details/126898499