• Spark RDD 行动算子


    1.reduce

    函数签名 def reduce(f: (T, T) => T): T

    代码:

    1. /**
    2. * reduce()聚合
    3. */
    4. object ActionDemo {
    5. def main(args: Array[String]): Unit = {
    6. val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
    7. val sc = new SparkContext(sparkConf)
    8. val rdd = sc.makeRDD(Array(12,13,15))
    9. println(rdd.reduce(_+_))
    10. }
    11. }

     2.collect

    函数签名 def collect(): Array[T]

    代码: 

    1. /**
    2. *count()返回RDD中元素个数
    3. */
    4. object ActionDemo2{
    5. def main(args: Array[String]): Unit = {
    6. val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
    7. val sc = new SparkContext(sparkConf)
    8. val rdd = sc.makeRDD(Array(12,13,15))
    9. rdd.collect().foreach(println)
    10. }
    11. }

     3.count

    函数签名 def count(): Long

    代码: 

    1. /**
    2. *count()返回RDD中元素个数
    3. */
    4. object ActionDemo2{
    5. def main(args: Array[String]): Unit = {
    6. val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
    7. val sc = new SparkContext(sparkConf)
    8. val rdd = sc.makeRDD(Array(12,13,15))
    9. val rdd2 =rdd.count()
    10. println(rdd2)
    11. }
    12. }

     4.first

    函数签名 def first(): T

    返回 RDD 中的第一个元素

    代码: 

    1. /**
    2. *first()返回RDD中的第一个元素
    3. */
    4. object ActionDemo3{
    5. def main(args: Array[String]): Unit = {
    6. val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
    7. val sc = new SparkContext(sparkConf)
    8. val rdd = sc.makeRDD(Array(12,13,15))
    9. val rdd2 =rdd.first()
    10. println(rdd2)
    11. }
    12. }

    5.take

    函数签名 def take(num: Int): Array[T]

    代码:

    1. /**
    2. *take()返回由RDD前n个元素组成的数组
    3. */
    4. object ActionDemo4{
    5. def main(args: Array[String]): Unit = {
    6. val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
    7. val sc = new SparkContext(sparkConf)
    8. val rdd = sc.makeRDD(Array(12,13,15))
    9. val rdd2 =rdd.take(2)
    10. rdd2.foreach(println)
    11. }
    12. }

     6.takeOrdered

    函数签名 def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]

    代码:

    1. /**
    2. *takeOrdered()返回该RDD排序后前n个元素组成的数组
    3. */
    4. object ActionDemo5{
    5. def main(args: Array[String]): Unit = {
    6. val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
    7. val sc = new SparkContext(sparkConf)
    8. val rdd = sc.makeRDD(Array(12,13,15,-1,9,100))
    9. val rdd2 =rdd.takeOrdered(2)
    10. rdd2.foreach(println)
    11. }
    12. }

     7.aggregate

    函数签名

    def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U

    代码:

    1. /**
    2. * aggregate()案例
    3. */
    4. object ActionDemo6{
    5. def main(args: Array[String]): Unit = {
    6. val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
    7. val sc = new SparkContext(sparkConf)
    8. val rdd = sc.makeRDD(Array(12,13,15,-1,9,100))
    9. //22 23 25 9 19 110
    10. //空分区也有默认值
    11. val rdd2 =rdd.aggregate(10)(_+_,_+_)
    12. println(rdd2)//318
    13. }
    14. }

     8.fold

    函数签名 def fold(zeroValue: T)(op: (T, T) => T): T

    代码:

    1. /**
    2. * fold()案例
    3. */
    4. object ActionDemo7{
    5. def main(args: Array[String]): Unit = {
    6. val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
    7. val sc = new SparkContext(sparkConf)
    8. val rdd = sc.makeRDD(Array(12,13,15,-1,9,100))
    9. //22 23 25 9 19 110
    10. //空分区也有默认值
    11. val rdd2 =rdd.fold(10)(_+_)
    12. println(rdd2)//318
    13. }
    14. }

     9.countByKey

    函数签名 def countByKey(): Map[K, Long]

    代码: 

    1. /**
    2. * countByKey()统计每种key的个数
    3. */
    4. object ActionDemo8 {
    5. def main(args: Array[String]): Unit = {
    6. val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
    7. val sc = new SparkContext(sparkConf)
    8. val rdd = sc.makeRDD(Array((1, "a"), (1, "a"), (1, "a"), (2, "b"), (3, "c"), (3, "c")))
    9. val rdd2 = rdd.countByKey()
    10. println(rdd2)
    11. }
    12. }

     10.save 相关算子

    函数签名

    def saveAsTextFile(path: String): Unit

    def saveAsObjectFile(path: String): Unit

    def saveAsSequenceFile( path: String,

    codec: Option[Class[_ <: CompressionCodec]] = None): Unit

    将数据保存到不同格式的文件中

    1. // 保存成 Text 文件
    2. rdd.saveAsTextFile("output")
    3. // 序列化成对象保存到文件
    4. rdd.saveAsObjectFile("output1")
    5. // 保存成 Sequencefile 文件
    6. rdd.map((_,1)).saveAsSequenceFile("output2")

     11.foreach

    函数签名

    def foreach(f: T => Unit): Unit = withScope {

    val cleanF = sc.clean(f)

    sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF)) }

    分布式遍历 RDD 中的每一个元素,调用指定函数

  • 相关阅读:
    C++:使用cin.getline()输入超过规定字符数
    QT事件说明
    MYSQL的系统数据表空间,用户数据表空间,系统临时表空间,用户临时表空间详解
    ArcGIS api for JavaScript 制作统计专题图
    MyBatisPlus(十二)排序查询:orderBy
    二进制安装minio 并实现主从同步
    RabbitMQ的学习之路(二-4)模式详解之路由routing路由模式
    神经网络模型训练简记(一)
    java .jks证书在php中的使用
    [附源码]JAVA毕业设计基于Ssm学生信息管理系统(系统+LW)
  • 原文地址:https://blog.csdn.net/m0_55834564/article/details/125458993