• RDD行动算子和血缘关系


    wordCount分布式运行

    1. 将wordCount进行打包上传,path使用args参数传参
    2. 启动hdfs和yarn
    3. 提交任务
    [atguigu@hadoop102 spark-yarn]$ bin/spark-submit \
    --class com.atguigu.spark.WordCount \
    --master yarn \
    ./WordCount.jar \
    /input \
    集群模式:client/cluster
    /output
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    注意: 如果集群模式选择为cluster,代码中的local[*]必须改为yarn,否则会报错。

    foreach和collect

    foreach底层是多线程打印的,出现的结果是分区间有序,但是整体无序,如果需要有序,只能将分区数设置为1,可以节省一些流量。
    collect是将各个Executor计算的结果聚合在一起,输出结果是整体有序的。

    行动算子

    算子名称作用
    collect按顺序从各个executor中收取数据
    foreach直接在各个executor中操作数据
    count统计元素个数
    first返回0号分区的第一个值
    take(int n)根据参数个数提取数据
    countByKeyrdd需要是kv对集合,根据key统计个数,返回map结果
    save以特定格式,一般为textFile, 保存数据

    Spark之Kryo序列化

    适合简单数据的序列化,比hadoop的序列化更加轻量。使用方法如下:

    1. 替换默认的序列化机.set(“spark.serializer”, “org.apache.spark.serializer.KryoSerializer”)
    2. 注册序列化对象,参数为数组类型new 类名[]{类1,类2}
    3. 后面使用conf对象创建sc即可
    // 1.创建配置对象
            SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore")
                    // 替换默认的序列化机制
                    .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
                    // 注册需要使用kryo序列化的自定义类
                    .registerKryoClasses(new Class[]{Class.forName("com.atguigu.bean.User")});
    
            // 2. 创建sparkContext
            JavaSparkContext sc = new JavaSparkContext(conf);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    血缘关系

    RDD.toDebugString().sout可以查看所有RDD的血缘关系。
    打印的关系如下所示,其中(2)表示分区数量,±表示进行了shuffle,分区数量有所改变。如果进入了shuffle,计算时就会划分为多个阶段,即阶段数 = shuflle数量 +1。

    常见的会走shffle的算子有reduceByKey和sortBy.

    (2) ShuffledRDD[4] at reduceByKey at WordCount2.java:56 []
     +-(2) MapPartitionsRDD[3] at mapToPair at WordCount2.java:47 []
        |  MapPartitionsRDD[2] at flatMap at WordCount2.java:36 []
        |  input/1.txt MapPartitionsRDD[1] at textFile at WordCount2.java:33 []
        |  input/1.txt HadoopRDD[0] at textFile at WordCount2.java:33 []
    
    • 1
    • 2
    • 3
    • 4
    • 5

    宽依赖:分区数量改变了,即走了shuffle
    窄依赖:分区数量和分区规则不变,不走shuffle.

    注: join操作如果是分桶join时,不需要走shuffle,其他普通join时则需要打散分区进行shuffle操作。

    Stage任务划分

    1. 提交任务后,画出DAG有向无环图
    2. 划分阶段,阶段划分的标志是是否有Shuffle.
    3. 划分Task,任务运行的最小基本单位,按照分区进行划分, 每一份分区一个Task.
    4. 任务交给Worker即executor去执行,executor中的线程数等于CPU个数,每个CPU可以去执行一个Task。
    5. job划分,每执行一个行动算子就是一个job。如果算子计算过程中有sortBy算子,会划分为两个job。因为sortBy底层调用了collect, save行动算子,进行数据的落盘。

    注意: 多个stage之间是串行执行的,stage间的Task是并行执行的。故分区越多,并发度越高。并不是调用了带有分区器的算子就一定会走shuffle,需要分区数量和分区规则变化时才会。

  • 相关阅读:
    思腾云计算
    算法-堆/多路归并-查找和最小的 K 对数字
    劳务派遣怎么交社保
    关于vector存放对象和对象指针的探索
    快速上手几个Linux命令
    drone ci 是什么
    Spring Boot 实现万能文件在线预览-开源学习一
    java实现快速排序的方法
    stm32定时器之简单封装
    Linux:ll命令详解
  • 原文地址:https://blog.csdn.net/qq_44273739/article/details/133898707