• Spark基础【RDD行动算子、序列化】


    一 RDD行动算子

    Spark的RDD方法分为2大类,一个是转换算子,一个是行动算子

    行动算子在被调用时,会触发Spark作业的执行

    之前一直在使用的collect算子就是行动算子,行动算子执行时,会构建新的作业

    main代表一个应用,也称为Diver程序,一个应用内可以有多个作业

    collect算子源码:

    def collect(): Array[T] = withScope {
      val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
      Array.concat(results: _*)
    }
    
    • 1
    • 2
    • 3
    • 4

    1 reduce

    函数签名
    def reduce(f: (T, T) => T): T
    
    • 1
    • 2

    聚集RDD中的所有元素,先聚合分区内数据,再聚合分区间数据

    val rdd = sc.makeRDD(List(1,2,3,4),2)
    
    val i: Int = rdd.reduce(_ + _)
    
    • 1
    • 2
    • 3

    2 collect

    函数签名
    def collect(): Array[T]
    
    • 1
    • 2

    在驱动程序(Driver)中,以数组Array的形式返回数据集的所有元素

    将数据从Executor端采集到Driver端

    collect会将数据全部拉取到Driver端的内存中,形成数据集合,可能会导致内存溢出,所以如果在具体场景中可以判断出内存可能不够的情况下,不能使用这个方法,可以将结果保存到文件当中,一般会涉及到写磁盘,比如保存为分区文件

    val ints: Array[Int] = rdd.collect()
    println(ints.mkString(","))
    
    • 1
    • 2

    3 count

    函数签名
    def count(): Long
    
    • 1
    • 2

    返回RDD中元素的个数

    val l: Long = rdd.count()
    println(l)
    
    • 1
    • 2

    4 first

    函数签名
    def first(): T
    
    • 1
    • 2

    返回RDD中的第一个元素

    val i1: Int = rdd.first()
    println(i1)
    
    • 1
    • 2

    5 take

    函数签名
    def take(num: Int): Array[T]
    
    • 1
    • 2

    返回一个由RDD的前n个元素组成的数组

    val ints1: Array[Int] = rdd.take(3)
    println(ints1)
    
    • 1
    • 2

    6 takeOrdered

    函数签名
    def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
    
    • 1
    • 2

    返回该RDD排序后的前n个元素组成的数组

    val rdd1: RDD[Int] = sc.makeRDD(List(1,4,3,2))
    val ints2: Array[Int] = rdd1.takeOrdered(3)
    println(ints2.mkString(","))
    
    • 1
    • 2
    • 3

    7 aggregate

    函数签名
    def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
    
    • 1
    • 2

    分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合

    val i2: Int = rdd.aggregate(0)(_ + _ , _ + _)
    println(i2)
    
    • 1
    • 2

    aggregate和aggregateByKey的区别

    • 数据格式:aggregateByKey对相同类型的K对V做aggregate,aggregate不考虑K,直接对V进行操作
    • aggregateByKey是一个转换算子,执行后会产生新的RDD,aggregate是一个行动算子,执行后会得到结果
    • aggregateByKey在执行计算时,初始值只会参与分区内的计算,aggregate还会参与分区间的计算
    val rdd = sc.makeRDD(List(1,2,3,4),2)
    val i2: Int = rdd.aggregate(5)(_ + _ , _ + _)
    println(i2)
    
    • 1
    • 2
    • 3

    执行过程

    【1 2】【3 4】
    【5 1 2】【5 3 4】
    【8】【12】
    【5 8 12】
    【25】
    
    • 1
    • 2
    • 3
    • 4
    • 5

    8 fold

    函数签名
    def fold(zeroValue: T)(op: (T, T) => T): T
    
    • 1
    • 2

    折叠操作,aggregate的简化版操作,分区内和分区间计算规则一致

    val i3: Int = rdd.fold(5)(_ + _)
    println(i3)
    
    • 1
    • 2

    9 countByKey

    函数签名
    def countByKey(): Map[K, Long]
    
    • 1
    • 2

    统计相同key出现的次数

    val map: collection.Map[String, Long] = rdd.map(("a",_)).countByKey()
    println(map)	//Map(a -> 4)
    
    • 1
    • 2

    可以实现WordCount(7 / 10)

    将数据转换格式:

    ("a",1)("a",2)("a",3)("a",4)
    其中
    ("a",1) => ("a",1)
    ("a",4) => ("a",1)("a",1)("a",1)("a",1)
    再执行WordCount
    
    • 1
    • 2
    • 3
    • 4
    • 5

    10 countByValue

    这里的value不是KV键值对的V,指的是集合中的相同值出现了多少次

    val map1: collection.Map[(String, Int), Long] = rdd.map(("a",_)).countByValue()
    println(map1)	//Map((a,3) -> 1, (a,2) -> 1, (a,1) -> 1, (a,4) -> 1)
    
    • 1
    • 2
    val rdd2: RDD[Int] = sc.makeRDD(List(1,2,2,3,1,3,3))
    val map2: collection.Map[Int, Long] = rdd2.countByValue()
    println(map2)	//Map(1 -> 2, 3 -> 3, 2 -> 2)
    
    • 1
    • 2
    • 3

    可以实现WordCount(8 / 10)

    将数据转换格式

    ("a",1) => "a"
    ("a",4) => "a","a","a","a"
    
    • 1
    • 2

    11 sava相关算子

    函数签名
    def saveAsTextFile(path: String): Unit
    def saveAsObjectFile(path: String): Unit
    def saveAsSequenceFile(
      path: String,
      codec: Option[Class[_ <: CompressionCodec]] = None): Unit
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    将数据保存到不同格式的文件中

    // 保存成Text文件
    rdd.saveAsTextFile("output")
    
    // 序列化成对象保存到文件
    rdd.saveAsObjectFile("output1")
    
    // 保存成Sequencefile文件
    rdd.map((_,1)).saveAsSequenceFile("output2")
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    12 foreach

    函数签名
    def foreach(f: T => Unit): Unit = withScope {
        val cleanF = sc.clean(f)
        sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    分布式遍历RDD中的每一个元素,调用指定函数

    rdd.collect().foreach(println)
    
    • 1

    在内存中单点打印,rdd.collect()返回一个数组,是数组的foreach方法

    collect按照分区号码进行采集,如先采集一分区,再采集二分区…

    val rdd = sc.makeRDD(List(1,2,3,4),2)
    rdd.foreach(println)
    
    • 1
    • 2

    在Executor中打印,分区内有序,分区间无序

    二 RDD序列化

    1 闭包检测

    从计算的角度, 算子以外的代码都是在Driver端执行, 算子里面的代码都是在Executor端执行。那么在scala的函数式编程中,就会导致算子内经常会用到算子外的数据,这样就形成了闭包的效果,如果使用的算子外的数据无法序列化,就意味着无法传值给Executor端执行,就会发生错误,所以需要在执行任务计算前,检测闭包内的对象是否可以进行序列化,这个操作称之为闭包检测。Scala2.12版本后闭包编译方式发生了改变

    想要将foreach和其他类一起使用

    def main(args: Array[String]): Unit = {
      val conf = new SparkConf().setMaster("local").setAppName("RDD")
      val sc = new SparkContext(conf)
      val rdd = sc.makeRDD(List(1,2,3,4),2)
      val user = new User()
      rdd.foreach(
        num => {
          println(num + user.age)
        }
      )
      sc.stop()
    }
    class User extends Serializable {
      var age = 30
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    以上代码中User为什么需要序列化:User是在Driver端创建,而foreach是一个算子,在Executor端执行,println在foreach内部,想要用到User的age属性,需要通过网络从Driver端将此属性传送到Executor端,而网络中又不能传送对象,所以需要序列化

    // Task not serializable
    def main(args: Array[String]): Unit = {
      val conf = new SparkConf().setMaster("local").setAppName("RDD")
      val sc = new SparkContext(conf)
      val rdd = sc.makeRDD(List[Int](),2)
      val user = new User()
      rdd.foreach(
        num => {
          println(num + user.age)
        }
      )
      sc.stop()
    }
    class User {
      var age = 30
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    Spark在执行算子时,如果算子的内部使用了外部的变量(对象),一定会出现闭包

    闭包出现的时机:

    • 匿名函数
    • 内部函数在外部使用
    • 将一个函数作为对象使用

    在这种场景中,需要将Driver端的变量通过网络传递给Executor端执行,根据原理可以判断出来,可以在真正执行之前,对数据进行序列化校验

    spark在执行作业前,需要先进行闭包检测功能

    闭包检测源码:

    rdd.foreach(
      num => {
        println(num + user.age)
      }
    )
    
    def foreach(f: T => Unit): Unit = withScope {
      val cleanF = sc.clean(f)
      sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
    }
    
    //ClosureCleaner闭包清除器,checkSerializable检查序列化
    private[spark] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F = {
      ClosureCleaner.clean(f, checkSerializable)
      f
    }
    
    private def clean(
        // indylambda check. Most likely to be the case with 2.12, 2.13
        // so we check first
        // non LMF-closures should be less frequent from now on
        val maybeIndylambdaProxy = IndylambdaScalaClosures.getSerializationProxy(func)
    
        if (!isClosure(func.getClass) && maybeIndylambdaProxy.isEmpty) {
          logDebug(s"Expected a closure; got ${func.getClass.getName}")
          return
        }
        // 检查能够序列化
        if (checkSerializable) {
          ensureSerializable(func)
        }
    )
    
    // 确保是否能序列化,不能直接抛出异常
    private def ensureSerializable(func: AnyRef): Unit = {
        try {
          if (SparkEnv.get != null) {
            SparkEnv.get.closureSerializer.newInstance().serialize(func)
          }
        } catch {
          case ex: Exception => throw new SparkException("Task not serializable", ex)
        }
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    2 序列化方法和属性

    (1)scala语法

    如果构造方法中的参数没有在其他地方使用,不会将其变为类的属性,但是如果在其他地方使用,必须将其变为类的属性

    def main(args: Array[String]): Unit = {
        new Test("zhangsan").test()
      }
    
      class Test( name : String){
        def test() : Unit = {
          println(name)
        }
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    其中class Test( name : String){}是一个构造方法,def test() : Unit = {}是另外的一个方法,不可以在另外的一个方法中使用构造方法中的局部变量,但以上代码可以执行,是怎么做到的呢,反编译代码如下

    public static class Test {
       private final String name;
    
       public void test() {
          .MODULE$.println(this.name);
       }
    
       public Test(final String name) {
          this.name = name;
       }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    为了能够将构造方法的临时变量在另外一个方法中能够使用,编译器将其变成了Test的一个属性,但修改代码

    def main(args: Array[String]): Unit = {
      new Test("zhangsan")
    }
    
    class Test( name : String){}
    
    • 1
    • 2
    • 3
    • 4
    • 5

    又是如何编译的呢

    public static class Test {
       public Test(final String name) {
       }
    }
    
    • 1
    • 2
    • 3
    • 4

    发现没有这个属性

    (2)需求

    实现Search类,再类中定义filterByQuery,实现查询以某个字母开头单词的功能,在Driver中调用这个方法

    def main(args: Array[String]): Unit = {
      val conf = new SparkConf().setMaster("local").setAppName("RDD")
      val sc = new SparkContext(conf)
    
      val rdd = sc.makeRDD(List("hello","hive","scala","spark"))
    
      val search = new Search("h")
      search.filterByQuery(rdd).foreach(println)
    
      sc.stop()
    }
    
    class Search(s : String) extends Serializable {
      def filterByQuery(rdd : RDD[String]): RDD[String] ={
        // _.startsWith(s) 在算子内,称之为Executor
        // 算子外,称之为Driver
        rdd.filter(_.startsWith(this.s))
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    根据(1)中分析,以上代码中Search类在其他方法中使用到了构造方法的属性,所以会将s变为类的一个属性

    s是一个属性,与对象相关,而对象是一个类,所以这个类需要序列化

    或者在类的前面添加case关键字也可以成功执行

    case class Search(s : String){}
    
    • 1

    样例类专用于模式匹配声明的类,但是其不仅可以在模式匹配中使用,也可以当做普通类来使用,而且更加强大,因为在生成类的同时,自动添加了很多功能,其中一项就是默认实现了可序列化接口,反编译后代码

    public static class Search implements Product, Serializable {
       private final String s;
    }
    
    • 1
    • 2
    • 3

    或者只要保证在算子内没有使用到外部的属性,也可以成功执行

    新定义一个属性,接收构造方法的属性,ss没有使用到s(this),只是一个普通的字符串

    class Search(s : String){
      def filterByQuery(rdd : RDD[String]): RDD[String] ={
        val ss : String = this.s
        // _.startsWith(s) 在算子内,称之为Executor
        // 算子外,称之为Driver
        rdd.filter(_.startsWith(ss))
      }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    从计算的角度, 算子以外的代码都是在Driver端执行, 算子里面的代码都是在Executor端执行

    3 Kryo序列化框架

    Java的序列化能够序列化任何的类。但是比较重(字节多),序列化后,对象的提交也比较大。Spark出于性能的考虑,Spark2.0开始支持另外一种Kryo序列化机制。Kryo速度是Serializable的10倍。当RDD在Shuffle数据的时候,简单数据类型、数组和字符串类型已经在Spark内部使用Kryo来序列化

    注意:即使使用Kryo序列化,也要继承Serializable接口

    导入依赖

    
        com.esotericsoftware
    	kryo
    	5.0.3
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    框架介绍

    public class KryoTest {
        public static void main(String[] args) {
            User user = new User();
            user.setUserage(20);
            user.setUsername("zhangsan"); //
            //javaSerial(user, "e:/user.dat");
            //kryoSerial(user, "e:/user1.dat");
            User user1 = kryoDeSerial(User.class, "e:/user1.dat");
            System.out.println(user1.getUsername());
            System.out.println(user1.getUserage());
        }
    
        public static void javaSerial(Serializable s, String filepath) {
    
            try {
                ObjectOutputStream out = new ObjectOutputStream(new BufferedOutputStream(new FileOutputStream(filepath)));
                out.writeObject(s);
                out.flush(); 
                out.close();  
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        public static <T> T kryoDeSerial(Class<T> c, String filepath) {
            try {
                Kryo kryo=new Kryo();  
                kryo.register(c,new BeanSerializer(kryo, c));  
                Input input = new Input(new BufferedInputStream(new FileInputStream(filepath)));        
                T t = kryo.readObject(input, c);
                input.close();  
                return t;
            } catch (Exception e) {
                e.printStackTrace();
            }
            return null;
        }
        public static void kryoSerial(Serializable s, String filepath) {
    
            try {
                Kryo kryo=new Kryo();  
                kryo.register(s.getClass(),new BeanSerializer(kryo, s.getClass()));  
                Output output=new Output(new BufferedOutputStream(new FileOutputStream(filepath)));        
                kryo.writeObject(output, s);  
                output.flush(); 
                output.close();  
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    class User implements Serializable {  
     
    	private transient String username;
    	private int userage;
    	public String getUsername() {
    		return username;
    	}
    	public void setUsername(String username) {
    		this.username = username;
    	}
    	public int getUserage() {
    		return userage;
    	}
    	public void setUserage(int userage) {
    		this.userage = userage;
    	}
    	
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    def main(args: Array[String]): Unit = {
    
        val conf: SparkConf = new SparkConf()
                .setAppName("SerDemo")
                .setMaster("local[*]")
                // 替换默认的序列化机制
                .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
                // 注册需要使用 kryo 序列化的自定义类
                .registerKryoClasses(Array(classOf[Searcher]))
    
        val sc = new SparkContext(conf)
    
        val rdd: RDD[String] = sc.makeRDD(Array("hello world", "hello spark", "spark", "word"), 2)
    
        val searcher = new Searcher("hello")
        val result: RDD[String] = searcher.getMatchedRDD1(rdd)
    
        result.collect.foreach(println)
    }
    case class Searcher(val query: String) {
    
        def isMatch(s: String) = {
            s.contains(query)
        }
    
        def getMatchedRDD1(rdd: RDD[String]) = {
            rdd.filter(isMatch) 
        }
    
        def getMatchedRDD2(rdd: RDD[String]) = {
            val q = query
            rdd.filter(_.contains(q))
        }
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
  • 相关阅读:
    Three.js着色器基础【含源码】
    C++--Linux基础使用
    Elastic Search 浅浅认识 快速使用 keyword 和 text 的区别之处 spring boot 集成案例 es 增删改查
    Php“梦寻”淘宝天猫商品详情数据接口,淘宝商品详情数据API接口,淘宝API接口申请指南(含代码示例)
    Java反射知识总结、Java内存分析、通过反射动态创建对象、操作泛型、注解
    matlab 求数字积分
    SpringBoot整合dataworks
    《零基础作曲编曲完全教程》(合辑)崛起的卧室音乐人 编曲教程 作曲教程精进篇
    操作系统 - 进程
    Shell脚本2
  • 原文地址:https://blog.csdn.net/weixin_43923463/article/details/126309767