• spark广播导致的问题


    1. 背景介绍

    2022Q2上线的功能,用spark-sql跑一个大任务,Q3从业务和技术上做了一些功能扩展

    • 业务在Q3增加了功能,sql变复杂了
    • 技术也Q3做了些优化,sql更复杂了

    2. 分析步骤

    2.1 任务yarn log

    Application application_1664181807056_6528008 failed 1 times (global limit =8; local limit is =1) due to AM Container for appattempt_1664181807056_6528008_000001 exited with exitCode: 134

    根据日志判断是OOM了。

    2.2 任务执行计划

    再看看stage执行计划,扫描table_xx3后进行广播,就挂了

     

    2.3 spark SQL

    SELECT t3.*
    FROM (
        SELECT *
        FROM (
           SELECT *
           FROM (
               SELECT *, ROW_NUMBER() OVER (PARTITION BY region, version_id, shop_id, item_id, model_id ORDER BY enqueue_time DESC) AS rk
               FROM table_xx1
               WHERE version_id IN (1489, 1485, 1486)
                  AND region = 'ID'
                  AND rule = 0
                  AND enqueue_time >= 1666832400
                  AND enqueue_time < 1666834200
                  AND dt IN (current_date, date_sub(current_date, 1))
           ) event
           WHERE event.rk = 1
        ) t1
           INNER JOIN (
               SELECT *
               FROM table_xx2
               WHERE time_range = '01-07'
                  AND dt = '2022-10-26'
                  AND region IN ('ID')
                  AND view_count >= 5
           ) t2
           ON t1.shop_id = t2.shop_id
               AND t1.item_id = t2.item_id
               AND t1.region = t2.region
    ) t3
    LEFT JOIN (
        SELECT *
        FROM table_xx3
        WHERE expiration_time > unix_timestamp(current_timestamp)
           AND expiration_date >= current_date
           AND region IN ('ID')
    ) t4 ON t3.user_id = t4.user_id
        AND t3.region = t4.region
        AND t3.journey_id = t4.journey_id
    WHERE t4.user_id IS NULL

    2.4 spark-sql校验

    登陆spark driver机器,启动spark-sql,重新运行异常SQL,报143错误

    22/10/27 10:23:52 ERROR YarnScheduler: Lost executor 182 on ip-10-130-81-171.idata-server.shopee.io: Container from a bad node: container_e257_1664260775537_6335017_01_000294 on host: ip-10-130-81-171.idata-server.shopee.io. Exit status: 143. Diagnostics: [2022-10-27 10:23:48.385]Container killed on request. Exit code is 143
    [2022-10-27 10:23:48.463]Container exited with a non-zero exit code 143.
    [2022-10-27 10:23:48.491]Killed by external signal
    .
    Interrupting... Be patient, this might take some time.
    Press Ctrl+C again to kill JVM

    同样可以判断是OOM异常了

    3. 原因分析

    3.1 table_xx1数据量太大

    table_xx1动态数据关联上table_xx2静态数据(每天生成t-1数据,1-30天的数据大概80亿),由于table_xx2固定且非常大,table_xx1数据波动会影响join复杂度。

    统计table_xx1最近半小时数据量,近400w,太大了!

    SELECT *
        FROM (
            SELECT *
            FROM (
                SELECT*, ROW_NUMBER() OVER (PARTITION BY region, version_id, shop_id, item_id, model_id ORDER BY enqueue_time DESC) AS rk
                FROM table_xx1
                WHERE version_id IN (1489, 1485, 1486)
                    AND region = 'ID'
                    AND rule = 0
                    AND enqueue_time >= 1666832400
                    AND enqueue_time < 1666834200
                    AND dt IN (current_date, date_sub(current_date, 1))
            ) event
            WHERE event.rk = 1
        ) t1

    3.2 开启自动广播

    看执行计划和日志,最先处理最后一部分数据,并进行广播

     

    22/10/27 10:19:15 WARN InsertAdaptiveSparkPlan: spark.sql.adaptive.enabled is enabled but is not supported for query: Project [user_id#20L, view_most#21, view_count#22, region#25, item_id#6L, model_id#8L, shop_id#7L, event_type#11, related_database_type#12, event_time#13L, version_id#9L, cur_element_id#1L, journey_task_id#2L, journey_id#3L, flash_sale_id#4L]
    +- Filter isnull(user_id#26L)
       +- BroadcastHashJoin [user_id#20L, region#25, journey_id#3L], [user_id#26L, region#34, journey_id#30L], LeftOuter, BuildRight, false
          :- Project [user_id#20L, view_most#21, view_count#22, region#25, item_id#6L, model_id#8L, shop_id#7L, event_type#11, related_database_type#12, event_time#13L, version_id#9L, cast(get_json_object(get_json_object(extra#15, $.get_matched_version_biz_info), $.cur_element_id) as bigint) AS cur_element_id#1L, cast(get_json_object(get_json_object(extra#15, $.get_matched_version_biz_info), $.journey_task_id) as bigint) AS journey_task_id#2L, cast(get_json_object(get_json_object(extra#15, $.get_matched_version_biz_info), $.journey_id) as bigint) AS journey_id#3L, cast(get_json_object(extra#15, $.flash_sale_id) as bigint) AS flash_sale_id#4L]
          :  +- SortMergeJoin [shop_id#7L, item_id#6L, region#10], [shop_id#19L, item_id#18L, region#25], Inner
          :     :- Project [item_id#6L, model_id#8L, shop_id#7L, version_id#9L, region#10, event_type#11, related_database_type#12, event_time#13L, extra#15]
          :     :  +- Filter (isnotnull(rk#0) AND (rk#0 = 1))
          :     :     +- Window [row_number() windowspecdefinition(region#10, version_id#9L, shop_id#7L, item_id#6L, model_id#8L, enqueue_time#14L DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rk#0], [region#10, version_id#9L, shop_id#7L, item_id#6L, model_id#8L], [enqueue_time#14L DESC NULLS LAST]
          :     :        +- Project [item_id#6L, model_id#8L, shop_id#7L, version_id#9L, region#10, event_type#11, related_database_type#12, event_time#13L, extra#15, enqueue_time#14L]
          :     :           +- Filter (((((((isnotnull(region#10) AND isnotnull(enqueue_time#14L)) AND version_id#9L IN (1489,1485,1486)) AND (region#10 = ID)) AND (enqueue_time#14L >= 1666832400)) AND (enqueue_time#14L < 1666834200)) AND isnotnull(shop_id#7L)) AND isnotnull(item_id#6L))
          :     :              +- FileScan parquet opa_crm_datasend.dwd_event_trigger_rt_live[item_id#6L,shop_id#7L,model_id#8L,version_id#9L,region#10,event_type#11,related_database_type#12,event_time#13L,enqueue_time#14L,extra#15,dt#16,rule#17] Batched: true, DataFilters: [isnotnull(region#10), isnotnull(enqueue_time#14L), version_id#9L IN (1489,1485,1486), (region#10..., Format: Parquet, Location: InMemoryFileIndex[hdfs://R2/projects/opa_crm_datasend/hive/opa_crm_datasend/dwd_event_trigger_rt_..., PartitionFilters: [isnotnull(rule#17), (rule#17 = 0), dt#16 IN (2022-10-27,2022-10-26)], PushedFilters: [IsNotNull(region), IsNotNull(enqueue_time), In(version_id, [1489,1485,1486]), EqualTo(region,ID)..., ReadSchema: struct       :     +- Project [item_id#18L, shop_id#19L, user_id#20L, view_most#21, view_count#22, region#25]
          :        +- Filter (((isnotnull(view_count#22) AND (view_count#22 >= 5)) AND isnotnull(shop_id#19L)) AND isnotnull(item_id#18L))
          :           +- FileScan parquet opa_crm_datasend.ads_view_item_agg_live[item_id#18L,shop_id#19L,user_id#20L,view_most#21,view_count#22,dt#23,time_range#24,region#25] Batched: true, DataFilters: [isnotnull(view_count#22), (view_count#22 >= 5), isnotnull(shop_id#19L), isnotnull(item_id#18L)], Format: Parquet, Location: InMemoryFileIndex[hdfs://R2/projects/opa_crm_datasend/hive/opa_crm_datasend/ads_view_item_agg_liv..., PartitionFilters: [isnotnull(time_range#24), isnotnull(dt#23), isnotnull(region#25), (time_range#24 = 01-07), (dt#2..., PushedFilters: [IsNotNull(view_count), GreaterThanOrEqual(view_count,5), IsNotNull(shop_id), IsNotNull(item_id)], ReadSchema: struct
          :                 +- Project [item_id#6L, model_id#8L, shop_id#7L, version_id#9L, region#10, event_type#11, related_database_type#12, event_time#13L, extra#15]
          :                    +- Filter (isnotnull(rk#0) AND (rk#0 = 1))
          :                       +- Window [row_number() windowspecdefinition(region#10, version_id#9L, shop_id#7L, item_id#6L, model_id#8L, enqueue_time#14L DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rk#0], [region#10, version_id#9L, shop_id#7L, item_id#6L, model_id#8L], [enqueue_time#14L DESC NULLS LAST]
          :                          +- Project [item_id#6L, model_id#8L, shop_id#7L, version_id#9L, region#10, event_type#11, related_database_type#12, event_time#13L, extra#15, enqueue_time#14L]
          :                             +- Filter ((((((((((isnotnull(region#10) AND isnotnull(rule#17)) AND isnotnull(enqueue_time#14L)) AND version_id#9L IN (1489,1485,1486)) AND (region#10 = ID)) AND (rule#17 = 0)) AND (enqueue_time#14L >= 1666832400)) AND (enqueue_time#14L < 1666834200)) AND dt#16 IN (2022-10-27,2022-10-26)) AND isnotnull(shop_id#7L)) AND isnotnull(item_id#6L))
          :                                +- Relation[item_id#6L,shop_id#7L,model_id#8L,version_id#9L,region#10,event_type#11,related_database_type#12,event_time#13L,enqueue_time#14L,extra#15,dt#16,rule#17] parquet
          +- Project [user_id#26L, region#34, journey_id#30L]
             +- Filter (((isnotnull(expiration_time#27L) AND (expiration_time#27L > 1666837153)) AND isnotnull(user_id#26L)) AND isnotnull(journey_id#30L))
                +- Scan hive opa_crm_datasend.dwd_crm_user_reentry_rt_live [expiration_time#27L, journey_id#30L, region#34, user_id#26L], HiveTableRelation [`opa_crm_datasend`.`dwd_crm_user_reentry_rt_live`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [user_id#26L, expiration_time#27L, re_entry_type#28, re_entry_key#29L, journey_id#30L, version_id..., Partition Cols: [expiration_date#33, region#34], Pruned Partitions: [(expiration_date=2022-10-27, region=ID), (expiration_date=2022-10-28, region=ID), (expiration_da...], [isnotnull(expiration_date#33), isnotnull(region#34), (cast(expiration_date#33 as date) >= 19292), (region#34 = ID)]

    4. 解决方案

    4.1 控制table_xx1数据

    4.2 禁止广播功能

    方法一:设置参数

    set spark.sql.autoBroadcastJoinThreshold = -1

    再执行SQL,BroadcastHashJoin已经改成SortMergeJoin:

    22/10/27 10:30:04 WARN InsertAdaptiveSparkPlan: spark.sql.adaptive.enabled is enabled but is not supported for query: Project [user_id#106L, view_most#107, view_count#108, region#111, item_id#92L, model_id#94L, shop_id#93L, event_type#97, related_database_type#98, event_time#99L, version_id#95L, cur_element_id#87L, journey_task_id#88L, journey_id#89L, flash_sale_id#90L]
    +- Filter isnull(user_id#112L)
       +- SortMergeJoin [user_id#106L, region#111, journey_id#89L], [user_id#112L, region#120, journey_id#116L], LeftOuter
          :- Project [user_id#106L, view_most#107, view_count#108, region#111, item_id#92L, model_id#94L, shop_id#93L, event_type#97, related_database_type#98, event_time#99L, version_id#95L, cast(get_json_object(get_json_object(extra#101, $.get_matched_version_biz_info), $.cur_element_id) as bigint) AS cur_element_id#87L, cast(get_json_object(get_json_object(extra#101, $.get_matched_version_biz_info), $.journey_task_id) as bigint) AS journey_task_id#88L, cast(get_json_object(get_json_object(extra#101, $.get_matched_version_biz_info), $.journey_id) as bigint) AS journey_id#89L, cast(get_json_object(extra#101, $.flash_sale_id) as bigint) AS flash_sale_id#90L]
          :  +- SortMergeJoin [shop_id#93L, item_id#92L, region#96], [shop_id#105L, item_id#104L, region#111], Inner
          :     :- Project [item_id#92L, model_id#94L, shop_id#93L, version_id#95L, region#96, event_type#97, related_database_type#98, event_time#99L, extra#101]
          :     :  +- Filter (isnotnull(rk#86) AND (rk#86 = 1))
          :     :     +- Window [row_number() windowspecdefinition(region#96, version_id#95L, shop_id#93L, item_id#92L, model_id#94L, enqueue_time#100L DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rk#86], [region#96, version_id#95L, shop_id#93L, item_id#92L, model_id#94L], [enqueue_time#100L DESC NULLS LAST]
          :     :        +- Project [item_id#92L, model_id#94L, shop_id#93L, version_id#95L, region#96, event_type#97, related_database_type#98, event_time#99L, extra#101, enqueue_time#100L]
          :     :           +- Filter (((((((isnotnull(region#96) AND isnotnull(enqueue_time#100L)) AND version_id#95L IN (1489,1485,1486)) AND (region#96 = ID)) AND (enqueue_time#100L >= 1666832400)) AND (enqueue_time#100L < 1666834200)) AND isnotnull(shop_id#93L)) AND isnotnull(item_id#92L))
          :     :              +- FileScan parquet opa_crm_datasend.dwd_event_trigger_rt_live[item_id#92L,shop_id#93L,model_id#94L,version_id#95L,region#96,event_type#97,related_database_type#98,event_time#99L,enqueue_time#100L,extra#101,dt#102,rule#103] Batched: true, DataFilters: [isnotnull(region#96), isnotnull(enqueue_time#100L), version_id#95L IN (1489,1485,1486), (region#..., Format: Parquet, Location: InMemoryFileIndex[hdfs://R2/projects/opa_crm_datasend/hive/opa_crm_datasend/dwd_event_trigger_rt_..., PartitionFilters: [isnotnull(rule#103), (rule#103 = 0), dt#102 IN (2022-10-27,2022-10-26)], PushedFilters: [IsNotNull(region), IsNotNull(enqueue_time), In(version_id, [1489,1485,1486]), EqualTo(region,ID)..., ReadSchema: struct       :     +- Project [item_id#104L, shop_id#105L, user_id#106L, view_most#107, view_count#108, region#111]
          :        +- Filter (((isnotnull(view_count#108) AND (view_count#108 >= 5)) AND isnotnull(shop_id#105L)) AND isnotnull(item_id#104L))
          :           +- FileScan parquet opa_crm_datasend.ads_view_item_agg_live[item_id#104L,shop_id#105L,user_id#106L,view_most#107,view_count#108,dt#109,time_range#110,region#111] Batched: true, DataFilters: [isnotnull(view_count#108), (view_count#108 >= 5), isnotnull(shop_id#105L), isnotnull(item_id#104L)], Format: Parquet, Location: InMemoryFileIndex[hdfs://R2/projects/opa_crm_datasend/hive/opa_crm_datasend/ads_view_item_agg_liv..., PartitionFilters: [isnotnull(time_range#110), isnotnull(dt#109), isnotnull(region#111), (time_range#110 = 01-07), (..., PushedFilters: [IsNotNull(view_count), GreaterThanOrEqual(view_count,5), IsNotNull(shop_id), IsNotNull(item_id)], ReadSchema: struct
          :                 +- Project [item_id#92L, model_id#94L, shop_id#93L, version_id#95L, region#96, event_type#97, related_database_type#98, event_time#99L, extra#101]
          :                    +- Filter (isnotnull(rk#86) AND (rk#86 = 1))
          :                       +- Window [row_number() windowspecdefinition(region#96, version_id#95L, shop_id#93L, item_id#92L, model_id#94L, enqueue_time#100L DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rk#86], [region#96, version_id#95L, shop_id#93L, item_id#92L, model_id#94L], [enqueue_time#100L DESC NULLS LAST]
          :                          +- Project [item_id#92L, model_id#94L, shop_id#93L, version_id#95L, region#96, event_type#97, related_database_type#98, event_time#99L, extra#101, enqueue_time#100L]
          :                             +- Filter ((((((((((isnotnull(region#96) AND isnotnull(rule#103)) AND isnotnull(enqueue_time#100L)) AND version_id#95L IN (1489,1485,1486)) AND (region#96 = ID)) AND (rule#103 = 0)) AND (enqueue_time#100L >= 1666832400)) AND (enqueue_time#100L < 1666834200)) AND dt#102 IN (2022-10-27,2022-10-26)) AND isnotnull(shop_id#93L)) AND isnotnull(item_id#92L))
          :                                +- Relation[item_id#92L,shop_id#93L,model_id#94L,version_id#95L,region#96,event_type#97,related_database_type#98,event_time#99L,enqueue_time#100L,extra#101,dt#102,rule#103] parquet
          +- Project [user_id#112L, region#120, journey_id#116L]
             +- Filter (((isnotnull(expiration_time#113L) AND (expiration_time#113L > 1666837803)) AND isnotnull(user_id#112L)) AND isnotnull(journey_id#116L))
                +- Scan hive opa_crm_datasend.dwd_crm_user_reentry_rt_live [expiration_time#113L, journey_id#116L, region#120, user_id#112L], HiveTableRelation [`opa_crm_datasend`.`dwd_crm_user_reentry_rt_live`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [user_id#112L, expiration_time#113L, re_entry_type#114, re_entry_key#115L, journey_id#116L, versi..., Partition Cols: [expiration_date#119, region#120], Pruned Partitions: [(expiration_date=2022-10-27, region=ID), (expiration_date=2022-10-28, region=ID), (expiration_da...], [isnotnull(expiration_date#119), isnotnull(region#120), (cast(expiration_date#119 as date) >= 19292), (region#120 = ID)]

    目前线上的解决方案是采用这个方法。

    方法二:优化流程

    生成两个中间表,再进行join

    create table view_item_test
    SELECT *
    FROM (
        SELECT *
        FROM (
            SELECT item_id, model_id, shop_id, version_id, region
                , event_type, related_database_type, event_time, extra, ROW_NUMBER() OVER (PARTITION BY region, version_id, shop_id, item_id, model_id ORDER BY enqueue_time DESC) AS rk
            FROM table_xx1
            WHERE version_id IN (1489, 1485, 1486)
                AND region = 'ID'
                AND rule = 0
                AND enqueue_time >= 1666832400
                AND enqueue_time < 1666834200
                AND dt IN (current_date, date_sub(current_date, 1))
        ) event
        WHERE event.rk = 1
    ) t1
    INNER JOIN (
        SELECT item_id, shop_id, user_id, view_most, view_count, region
        FROM table_xx2
        WHERE time_range = '01-07'
            AND dt = '2022-10-26'
            AND region IN ('ID')
            AND view_count >= 5
    ) t2
    ON t1.shop_id = t2.shop_id
        AND t1.item_id = t2.item_id
        AND t1.region = t2.region
    ;

    create table user_reentry_test
    SELECT user_id, region, journey_id
    FROM table_xx3
    WHERE expiration_time > unix_timestamp(current_timestamp)
        AND expiration_date >= current_date
        AND region IN ('ID')
    ;

    create table view_item_result
    select aa.* from (
        select * from view_item_test
    ) as aa
    left join (select * from user_reentry_test) as bb 
        on aa.user_id = bb.user_id and aa.region = bb.region and aa.journey_id = bb.journey_id
    where bb.user_id is null;

  • 相关阅读:
    《语义增强可编程知识图谱SPG》白皮书
    第三方App与Termux命令建立IO通道
    公司的注册资金必须实际缴纳吗
    Leecode56:合并区间(贪心算法)
    NOIP2010 提高组 复赛 flow 引水入城
    十五、【VUE-CLI】插槽 slot
    视频集中存储/直播点播平台EasyDSS点播文件分类功能新升级
    DBMS 中的 2 层架构与 3 层架构
    智能指针 之 unique_ptr shared_ptr weak_ptr
    学生党用什么蓝牙耳机好?学生党性价比高的蓝牙耳机推荐
  • 原文地址:https://blog.csdn.net/L13763338360/article/details/127654678