• Spark基础【RDD单Value类型转换算子】


    RDD转换算子

    RDD的方法有很多,但一般分为两大类,第一类是逻辑的封装,将旧的逻辑转换为新的逻辑,称之为转换算子;第二类是执行逻辑,将封装好的逻辑进行执行,让整个作业运行起来,称之为行动算子

    算子

    问题(初始)–> operator(算子,操作,方法) –> 问题(解决,完成)

    RDD根据数据处理方式的不同将算子整体上分为单Value类型、双Value类型和Key-Value类型

    将RDD的方法称为算子的原因是与Scala集合的方法进行区分

    如以下代码中的两个foreach方法,第一个为scala集合(单点)中的方法,第二个为RDD(分布式)的方法

    val wordCount: RDD[(String, Int)] = sc.textFile("data/word.txt").map((_,1)).reduceByKey(_ + _)
    val array: Array[(String, Int)] = wordCount.collect()
    array.foreach(println)
    wordCount.foreach(println)
    
    • 1
    • 2
    • 3
    • 4

    单Value类型

    1 map

    将处理的数据逐条进行映射转换,这里的转换可以是类型的转换,也可以是值的转换

    def map[U: ClassTag](f: T => U): RDD[U]
    
    • 1

    map算子表示将数据源中的每一条数据进行处理

    map算子的参数是函数类型:Int => U,输入Int类型的数据,输出类型不确定

    “转换”概念的体现过程:从rdd算子转换成了newRdd算子

    def main(args: Array[String]): Unit = {
      val conf = new SparkConf().setMaster("local").setAppName("WordCount")
      val sc = new SparkContext(conf)
    
      val rdd = sc.makeRDD(List(1,2,3,4))
      
      def mapFunction ( num : Int) : Int = {
        num * 2
      }
      val newRdd: RDD[Int] = rdd.map(mapFunction)
      newRdd.collect().foreach(println)
    
      sc.stop()
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    每次写mapFunction有些麻烦,可以使用函数至简原则

    val newRdd: RDD[Int] = rdd.map(_ * 2)
    
    • 1

    转换之后如何分区,数据执行的顺序如何

    查看newRdd分区数量(2个)

    newRdd.saveAsTextFile("output")
    
    • 1

    在RDD进行转换时,新的RDD和旧的RDD的分区数量保持一致,源码如下,返回所有依赖的RDD中的第一个RDD的分区数量

    /** Returns the first parent RDD */
    protected[spark] def firstParent[U: ClassTag]: RDD[U] = {
      dependencies.head.rdd.asInstanceOf[RDD[U]]
    }
    
    • 1
    • 2
    • 3
    • 4

    数据在处理过程中,默认情况下,分区不变,原来数据在哪个分区,转换完成之后还是在哪里

    数据在处理过程中,要遵循执行顺序:分区内有序,分区间无序

    使用如下代码查看是否满足分区内有序,分区间无序的规则

    val conf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
    
    val newRdd: RDD[Int] = rdd.map(
          num => {
            println("num = " + num)
            num * 2
          }
        )
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    RDD其实就是封装的逻辑,如果有多个RDD,那么第一条数据应该所有的逻辑执行完毕,再执行下一条数据,RDD没有等待的功能

    val newRdd1: RDD[Int] = rdd.map(
      num => {
        println("############### num = " + num)
        num * 2
      }
    )
    
    val newRdd2: RDD[Int] = newRdd1.map(
      num => {
        println("*************** num = " + num)
        num * 2
      }
    )
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    (1)案例:从服务器日志数据agent.log中获取第四列数据

    部分数据如下

    1516609143867 6 7 64 16
    1516609143869 9 4 75 18
    1516609143869 1 7 87 12
    1516609143869 2 8 92 9
    1516609143869 6 7 84 24
    
    • 1
    • 2
    • 3
    • 4
    • 5
    def main(args: Array[String]): Unit = {
      val conf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
      val sc = new SparkContext(conf)
    
      val lineRDD: RDD[String] = sc.textFile("data/agent.log")
      val forthRDD: RDD[String] = lineRDD.map(
        line => {
          val datas = line.split(" ")
          datas(3)
        }
      )
      forthRDD.collect().foreach(println)
    
      sc.stop()
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    2 mapPartitions

    将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据,Iterator[T] => Iterator[U],数据量可以增加

    def mapPartitions[U: ClassTag](
     f: Iterator[T] => Iterator[U],
     preservesPartitioning: Boolean = false): RDD[U]
    • 1
    • 2
    • 3
    • 4
    val rdd = sc.makeRDD(List(1,2,3,4),2)
    
    val rdd1 = rdd.mapPartitions(
      list => {
        println("*************")
        list.map(_ * 2)
      }
    )
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    将分区内的数据先放在Executor中(缓存),增加效率,以上程序中mapPartitions执行两次,而map执行四次,虽然提升了效率,但也存在缺点

    • 占用内存多
    • 处理完的数据不会释放

    (1)map和mapPartitions的区别

    • 数据处理角度

      Map算子是分区内一个数据一个数据的执行,类似于串行操作。而mapPartitions算子是以分区为单位进行批处理操作。

    • 功能的角度

      Map算子主要目的将数据源中的数据进行转换和改变。但是不会减少或增多数据。MapPartitions算子需要传递一个迭代器,返回一个迭代器,没有要求的元素的个数保持不变,所以可以增加或减少数据

    • 性能的角度

      Map算子因为类似于串行操作,所以性能比较低,而是mapPartitions算子类似于批处理,所以性能较高。但是mapPartitions算子会长时间占用内存,那么这样会导致内存可能不够用,出现内存溢出的错误。所以在内存有限的情况下,不推荐使用,一般使用map操作

    综上,有时完成比完美更重要

    (2)java克隆浅复制

    何时考虑不使用接口而是使用实现类:当需要使用实现类中的特有方法时,会定义为实现类类型,第一种声明方式,ArrayList中的特有方式无法使用,比如clone方法

    public class Test {
        public static void main(String[] args) {
            User user = new User();
            user.name = "zhangsan";
    
            List<User> userList = new ArrayList<User>();
            //userList.clone();
            ArrayList<User> userList1 = new ArrayList<User>();
            userList1.clone();
        }
    }
    class User{
        public String name;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    克隆之后,两块内存空间不同

    userList1.add(user);
    ArrayList<User> userList2 = (ArrayList<User>)userList1.clone();
    System.out.println(userList1 == userList2);	//false
    
    • 1
    • 2
    • 3

    以下代码,两块内存地址引用了相同的对象

    final User user1 = userList2.get(0);
    user1.name = "lisi";
    System.out.println(userList1 == userList2);	//false
    System.out.println(userList1);	//[com.hike.bigdata.spark.test.User@1698c449]
    System.out.println(userList2);	//[com.hike.bigdata.spark.test.User@1698c449]
    
    • 1
    • 2
    • 3
    • 4
    • 5

    这种现象,称为java克隆浅复制,只复制集合最外层集合的内存,但是如果集合引用了其他内存,不会复制

    java克隆浅复制存在引用问题,mapPartitions处理完的数据不会释放,那么引用也不会被释放,当全部处理完成时,才会释放,意味着数据越多,持续时间越长,占用内存空间越大

    (3)案例:获取每个数据分区的最大值

    def main(args: Array[String]): Unit = {
      val conf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
      val sc = new SparkContext(conf)
    
      val rdd = sc.makeRDD(List(1,2,3,4),2)
    
      val rdd1: RDD[Int] = rdd.mapPartitions(
        list => {
          val max = list.max
          List(max).iterator
        }
      )
      rdd1.collect().foreach(println)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    3 mapPartitionsWithIndex

    现有三个分区,只获取第二个分区的数据

    分区间无序,所以以下代码是错误的

    var count = 0
    val rdd1 = rdd.mapPartitions(
      list => {
        if(count == 1){
          count = count + 1
          list
        }else{
          count = count + 1
          Nil.iterator
        }
      }
    )
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    使用mapPartitionsWithIndex方法

    将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据,在处理时同时可以获取当前分区索引

    def mapPartitionsWithIndex[U: ClassTag](
    f: (Int, Iterator[T]) => Iterator[U],
    preservesPartitioning: Boolean = false): RDD[U]
    
    • 1
    • 2
    • 3
    def main(args: Array[String]): Unit = {
      val conf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
      val sc = new SparkContext(conf)
    
      val rdd = sc.makeRDD(List(1,2,3,4,5,6),3)
    
      val rdd1 = rdd.mapPartitionsWithIndex(
        (index,list) => {
          if(index == 1){
            list
          }else{
            Nil.iterator
          }
        }
      )
    
      rdd1.collect().foreach(println)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    4 flatMap

    将处理的数据进行扁平化后再进行映射处理,所以算子也称之为扁平映射

    def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
    
    • 1
    def main(args: Array[String]): Unit = {
      val conf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
      val sc = new SparkContext(conf)
    
      val rdd: RDD[String] = sc.makeRDD(
        List("hello scala", "hello spark")
      )
      val rdd1: RDD[String] = rdd.flatMap(_.split(" "))
      rdd1.collect().foreach(println)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    flatMap输入是数据集的整体(一个),返回的是拆分后的个体(多个),使用容器将个体包装起来

    val rdd1 = rdd.flatMap(
      str => {
        str.split(" ")
      }
    )
    
    • 1
    • 2
    • 3
    • 4
    • 5
    def main(args: Array[String]): Unit = {
      val conf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
      val sc = new SparkContext(conf)
    
      val rdd: RDD[List[Int]] = sc.makeRDD(
        List(
          List(1, 2), List(3, 4)
        )
      )
    
      val rdd1 = rdd.flatMap(
        List =>List
      )
    
      rdd1.collect().foreach(println)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    第一个List是整体,第二个List代表的是容器

    (1)案例:将List(List(1,2),3,List(4,5))进行扁平化操作

    模式匹配

    val rdd1 = rdd.flatMap {
      case list : List[_] => list
      case other => List(other)
    }
    
    • 1
    • 2
    • 3
    • 4

    5 glom

    将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变

    def glom(): RDD[Array[T]]
    
    • 1

    将个体变成整体

    val rdd: RDD[Int] = sc.makeRDD(
      List(1, 2, 3, 4, 5, 6), 2
    )
    val rdd1: RDD[Array[Int]] = rdd.glom()
    rdd1.collect().foreach(a => println(a.mkString(",")))
    // 1,2,3
    // 4,5,6
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    (1)案例:计算所有分区最大值求和(分区内取最大值,分区间最大值求和)

    val rdd: RDD[Int] = sc.makeRDD(
      List(1, 2, 3, 4, 5, 6), 2
    )
    val rdd1: RDD[Array[Int]] = rdd.glom()
    
    val rdd2: RDD[Int] = rdd1.map(_.max)
    
    println(rdd2.collect().sum)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    6 groupBy

    将数据根据指定的规则进行分组, 分区默认不变,但是数据会被打乱重新组合,将这样的操作称之为shuffle。极限情况下,数据可能被分在同一个分区中

    一个组的数据在一个分区中,但是并不是说一个分区中只有一个组

    def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]
    
    • 1

    此算子根据函数计算结果进行分组,执行结果为KV键值对数据类型,K是分组标识,V为同一个组中的数据集合

    val rdd: RDD[Int] = sc.makeRDD(
      List(1, 2, 3, 4, 5, 6), 2
    )
    
    val rdd1: RDD[(Int, Iterable[Int])] = rdd.groupBy(_ % 2)
    rdd1.collect().foreach(println)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    默认情况下,数据处理后所在的分区不会发生改变

    Spark要求,一个组的数据必须在一个分区中

    一个分区的数据被打乱,和其他分区的数据组合在一起,这个操作称为shuffle,现在想进行两分区数据相加,但第一个RDD中有很多分区,分区内有很多数据,shuffle如何做计算呢,在内存中是否等待所有数据到来之后再进行运算,即使等待,内存不够怎么办?

    所以,shuffle操作不允许在内存中等待,必须落盘,因此shuffle的速度慢

    shuffle会将完整的计算过程一分为二,形成两个阶段,一个阶段用于写数据,一个阶段用于读数据

    写数据的阶段如果没有完成,读数据的阶段不能执行

    conf.set("spark.local.dir","e:/")
    
    • 1

    通过windows下的spark环境,执行groupBy相关代码,可以在监控页面查看到“Shuffle Read”和“Shuffle Write”两个阶段

    shuffle的操作可以更改分区

    val rdd1: RDD[(Int, Iterable[Int])] = rdd.groupBy(_ % 2,2)
    
    • 1

    (1)案例:将List(“Hello”,“hive”, “hbase”, “Hadoop”)根据单词首写字母进行分组。

    val rdd: RDD[String] = sc.makeRDD(
      List("Hello","hive", "hbase", "Hadoop")
    )
    
    val rdd1: RDD[(String, Iterable[String])] = rdd.groupBy(_.substring(0,1))
    rdd1.collect().foreach(println)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    不区分首字母大小写

    val rdd1: RDD[(String, Iterable[String])] = rdd.groupBy(_.substring(0,1).toUpperCase())
    
    • 1

    (2)案例:按照agent.log中的第二列数据分组,相同求和

    def main(args: Array[String]): Unit = {
      val conf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
      val sc = new SparkContext(conf)
    
      val lines: RDD[String] = sc.textFile("data/agent.log")
      val groupRDD: RDD[(String, Iterable[(String, Int)])] = lines.map(
        lines => {
          val datas: Array[String] = lines.split(" ")
          (datas(1), 1)
        }
      ).groupBy(_._1)
      val value: RDD[(String, Int)] = groupRDD.mapValues(_.size)
      value.collect().foreach(println)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    groupBy算子可以实现WordCount(共有十种算子可以实现,1/10)

  • 相关阅读:
    基于SSM校园一卡通管理系统
    GNSS、RTK、基站、移动站
    python 多线程编程(线程同步和守护线程)
    .Net 7 Native AOT 单文件 无依赖 跨平台
    Java学习笔记
    Android WMS——服务端输入事件处理(十八)
    CSS渐变色理论与分类、文字渐变色方案、炸裂渐变色方案以及主流专业渐变色工具网站推荐
    网络安全系列-四十二: Suricata之rulesets的激活、更新及动态加载
    项目管理最重要的工具之一——甘特图
    IDEA中.gitignore配置不生效的解决方案
  • 原文地址:https://blog.csdn.net/weixin_43923463/article/details/126253887