• SparkSQL系列-3、你知道SparkSQL的DataFrame?


    版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。

    传送门:大数据系列文章目录

    官方网址http://spark.apache.org/http://spark.apache.org/sql/
    在这里插入图片描述

    DataFrame的介绍

    DataFrame它不是Spark SQL提出来的,而是早期在R、 Pandas语言就已经有了的。就易用性
    而言,对比传统的MapReduce API,说Spark的RDD API有了数量级的飞跃并不为过。然而, 对于
    没有MapReduce和函数式编程经验的新手来说, RDD API仍然存在着一定的门槛。另一方面,数
    据科学家们所熟悉的R、 Pandas等传统数据框架虽然提供了直观的API,却局限于单机处理,无法
    胜任大数据场景。为了解决这一矛盾, Spark SQL 1.3.0在原有SchemaRDD的基础上提供了与R和
    Pandas风格类似的DataFrame API。 新的DataFrame AP不仅可以大幅度降低普通开发者的学习门
    槛,同时还支持Scala、 Java与Python三种语言。更重要的是,由于脱胎自SchemaRDD, DataFrame天然适用于分布式大数据场景。

    那到底啥是DataFrame?

    在Spark中, DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表
    格。 DataFrame与RDD的主要区别在于,前者带有schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型

    在这里插入图片描述

    使得Spark SQL得以洞察更多的结构信息,从而对藏于DataFrame背后的数据源以及作用于DataFrame之上的变换进行针对性的优化,最终达到大幅提升运行时效率。 反观RDD,由于无从得
    知所存数据元素的具体内部结构, Spark Core只能在stage层面进行简单、通用的流水线优化。
    在这里插入图片描述

    上图中左侧的RDD[Person]虽然以Person为类型参数,但Spark框架本身不了解Person类的内
    部结构。而右侧的DataFrame却提供了详细的结构信息,使得Spark SQL可以清楚地知道该数据集
    中包含哪些列,每列的名称和类型各是什么。 了解了这些信息之后, Spark SQL的查询优化器就可
    以进行针对性的优化。后者由于在编译期有详尽的类型信息,编译期就可以编译出更加有针对性、
    更加优化的可执行代码。官方定义:

    Dataset: A DataSet is a distributed collection of data. (分布式的数据集)

    DataFrame: A DataFrame is a DataSet organized into named columns.(以列(列名,列类型,列值)的形式构成的分布式的数据集,按照列赋予不同的名称)

    在这里插入图片描述

    DataFrame有如下特性

    1. 分布式的数据集,并且以列的方式组合的,相当于具有schema的RDD;
    2. 相当于关系型数据库中的表,但是底层有优化;
    3. 提供了一些抽象的操作,如select、 filter、 aggregation、 plot;
    4. 它是由于R语言或者Pandas语言处理小数据集的经验应用到处理分布式大数据集上;
    5. 在1.3版本之前,叫SchemaRDD;

    范例演示:加载json格式数据

    第一步、上传官方测试数据$SPARK_HOME/examples/src/main/resources至HDFS目录/datas

    在这里插入图片描述

    查看HDFS上数据文件,其中雇员信息数据【employees.json】

    在这里插入图片描述

    第二步、启动spark-shell命令行,采用本地模式localmode运行

    在这里插入图片描述

    第三步、读取雇员信息数据

    在这里插入图片描述

    那啥是Schema?

    查看DataFrame中Schema是什么,执行如下命令:
    在这里插入图片描述
    可以看出Schema信息封装在StructType中,包含很多StructField对象,查看源码。

    其一、 StructType 定义,是一个样例类,属性为StructField的数组
    在这里插入图片描述
    其二、 StructField 定义,同样是一个样例类,有四个属性,其中字段名称和类型为必填
    在这里插入图片描述
    自定义Schema结构,官方提供实例代码:
    在这里插入图片描述

    Row

    DataFrame中每条数据封装在Row中, Row表示每行数据,具体哪些字段位置,获取DataFrame中第一条数据。
    在这里插入图片描述
    如何构建Row对象:要么是传递value,要么传递Seq,官方实例代码:

    import org.apache.spark.sql._
    
    // Create a Row from values.
    Row(value1, value2, value3, ...)
    
    // Create a Row from a Seq of values.
    Row.fromSeq(Seq(value1, value2, ...))
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    如何获取Row中每个字段的值呢

    方式一:下标获取,从0开始,类似数组下标获取

    在这里插入图片描述

    方式二:指定下标,知道类型

    在这里插入图片描述

    方式三:通过As转换类型, 此种方式开发中使用最多

    在这里插入图片描述

    RDD转换DataFrame

    实际项目开发中,往往需要将RDD数据集转换为DataFrame,本质上就是给RDD加上Schema
    信息,官方提供两种方式: 类型推断和自定义Schema

    官方文档: http://spark.apache.org/docs/2.4.5/sql-getting-started.html#interoperating-with-rdds

    在这里插入图片描述

    范例演示说明

    使用经典数据集【电影评分数据u.data】,先读取为RDD,再转换为DataFrame。

    userId	itemId	rating	timestamp
    196	242	3	881250949
    186	302	3	891717742
    22	377	1	878887116
    244	51	2	880606923
    166	346	1	886397596
    298	474	4	884182806
    115	265	2	881171488
    253	465	5	891628467
    305	451	3	886324817
    6	86	3	883603013
    62	257	2	879372434
    286	1014	5	879781125
    200	222	5	876042340
    210	40	3	891035994
    224	29	3	888104457
    303	785	3	879485318
    122	387	5	879270459
    194	274	2	879539794
    291	1042	4	874834944
    234	1184	2	892079237
    119	392	4	886176814
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    字段信息: user id 、 item id、 rating 、 timestamp。

    反射类型推断

    当RDD中数据类型CaseClass样例类时,通过反射Reflecttion获取属性名称和类型,构建Schema,应用到RDD数据集,将其转换为DataFrame。

    第一步、定义CaseClass样例类,封装电影评分数据

    /**
    * 封装电影评分数据
    * *
    @param userId 用户ID
    * @param itemId 电影ID
    * @param rating 用户对电影评分
    * @param timestamp 评分时间戳
    */
    case class MovieRating(
    	userId: String,
    	itemId: String,
    	rating: Double,
    	timestamp: Long
    )
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    第二步、 SparkContext读取电影评分数据封装到RDD中,转换数据类型

    import model.{MovieRatingStr}
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.{DataFrame, SparkSession}
    /**
     * 采用反射的方式将RDD转换为DataFrame和Dataset
     */
    object SparkRDDInferring {
      def main(args: Array[String]): Unit = {
        // 构建SparkSession实例对象
        val spark: SparkSession = SparkSession
          .builder() // 使用建造者模式构建对象
          .appName(this.getClass.getSimpleName.stripSuffix("$"))
          .master("local[3]")
          .getOrCreate()
        import spark.implicits._
        // 读取电影评分数据u.data, 每行数据有四个字段,使用制表符分割
        // user id | item id | rating | timestamp.
        val rawRatingsRDD: RDD[String] = spark.sparkContext
          .textFile("datas/ml-100k/u.data", minPartitions = 2)
        // 转换数据
        val ratingsRDD: RDD[MovieRatingStr] = rawRatingsRDD
          .filter(line => null != line && line.trim.split("\t").length == 4)
          .mapPartitions{iter =>
            iter.map{line =>
              // 拆箱操作, Python中常用
              val Array(userId, itemId, rating, timestamp) = line.trim.split("\t")
              // 返回MovieRating实例对象
              MovieRatingStr(userId, itemId, rating.toDouble, timestamp.toLong)
            }
          }
        // 将RDD转换为DataFrame和Dataset
        val ratingsDF: DataFrame = ratingsRDD.toDF()
        /*
        root
        |-- userId: string (nullable = true)
        |-- itemId: string (nullable = true)
        |-- rating: double (nullable = false)
        |-- timestamp: long (nullable = false)
        */
        ratingsDF.printSchema()
        ratingsDF.show(10)
        // 应用结束,关闭资源
        spark.stop()
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45

    执行结果

    root
     |-- userId: string (nullable = true)
     |-- itemId: string (nullable = true)
     |-- rating: double (nullable = false)
     |-- timestamp: long (nullable = false)
     
    +------+------+------+---------+
    |userId|itemId|rating|timestamp|
    +------+------+------+---------+
    |   196|   242|   3.0|881250949|
    |   186|   302|   3.0|891717742|
    |    22|   377|   1.0|878887116|
    |   244|    51|   2.0|880606923|
    |   196|   242|   3.0|881250949|
    |   186|   302|   3.0|891717742|
    |    22|   377|   1.0|878887116|
    |   244|    51|   2.0|880606923|
    |   166|   346|   1.0|886397596|
    |   298|   474|   4.0|884182806|
    +------+------+------+---------+
    only showing top 10 rows
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    此种方式要求RDD数据类型必须为CaseClass,转换的DataFrame中字段名称就是CaseClass中属性名称

    自定义Schema

    依据RDD中数据自定义Schema,类型为StructType,每个字段的约束使用StructField定义,具
    体步骤如下

    1. 第一步、RDD中数据类型为Row: RDD[Row];
    2. 第二步、针对Row中数据定义Schema: StructType;
    3. 第三步、使用SparkSession中方法将定义的Schema应用到RDD[Row]上;

    范例演示代码:

    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
    
    /**
     * 自定义Schema方式转换RDD为DataFrame
     */
    object SparkRDDSchema {
      def main(args: Array[String]): Unit = {
        // 构建SparkSession实例对象
        val spark: SparkSession = SparkSession
          .builder() // 使用建造者模式构建对象
          .appName(this.getClass.getSimpleName.stripSuffix("$"))
          .master("local[3]")
          .getOrCreate()
        import spark.implicits._
        // 读取电影评分数据u.data, 每行数据有四个字段,使用制表符分割
        // user id | item id | rating | timestamp.
        val ratingsRDD: RDD[String] = spark
          .sparkContext.textFile("datas/ml-100k/u.data", minPartitions = 2)
        // a. RDD[Row]
        val rowsRDD: RDD[Row] = ratingsRDD.mapPartitions { iter =>
          iter.map { line =>
            // 拆箱操作, Python中常用
            val Array(userId, itemId, rating, timestamp) = line.trim.split("\t")
            // 返回Row实例对象
            Row(userId, itemId, rating.toDouble, timestamp.toLong)
          }
        }
        // b. schema
        val rowSchema: StructType = StructType(
          Array(
            StructField("userId", StringType, nullable = true),
            StructField("itemId", StringType, nullable = true),
            StructField("rating", DoubleType, nullable = true),
            StructField("timestamp", LongType, nullable = true)
          )
        )
        // c. 应用函数createDataFrame
        val ratingDF: DataFrame = spark.createDataFrame(rowsRDD, rowSchema)
        ratingDF.printSchema()
        ratingDF.show(10, truncate = false)
        // 应用结束,关闭资源
        spark.stop()
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46

    toDF函数

    除了上述两种方式将RDD转换为DataFrame以外, SparkSQL中提供一个函数: toDF,通过指
    定列名称,将数据类型为元组的RDD或Seq转换为DataFrame,实际开发中也常常使用。

    在这里插入图片描述
    范例演示: 将数据类型为元组的RDD或Seq直接转换为DataFrame。

    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.{DataFrame, SparkSession}
    /**
     * 隐式调用toDF函数,将数据类型为元组的Seq和RDD集合转换为DataFrame
     */
    object SparkSQLToDF {
      def main(args: Array[String]): Unit = {
        // 构建SparkSession实例对象,通过建造者模式创建
        val spark: SparkSession = SparkSession
          .builder()
          .appName(this.getClass.getSimpleName.stripSuffix("$"))
          .master("local[3]")
          .getOrCreate()
        import spark.implicits._
        // TODO: 1、构建RDD,数据类型为三元组形式
        val usersRDD: RDD[(Int, String, Int)] = spark.sparkContext.parallelize(
          Seq(
            (10001, "zhangsan", 23),
            (10002, "lisi", 22),
            (10003, "wangwu", 23),
            (10004, "zhaoliu", 24)
          )
        )
        // 将RDD转换为DataFrame
        val usersDF: DataFrame = usersRDD.toDF("id", "name", "age")
        usersDF.printSchema()
        usersDF.show(10, truncate = false)
        println("========================================================")
        val df: DataFrame = Seq(
          (10001, "zhangsan", 23),
          (10002, "lisi", 22),
          (10003, "wangwu", 23),
          (10004, "zhaoliu", 24)
        ).toDF("id", "name", "age")
        df.printSchema()
        df.show(10, truncate = false)
        // TODO: 应用结束,关闭资源
        spark.stop()
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    执行结果:

    root
     |-- id: integer (nullable = false)
     |-- name: string (nullable = true)
     |-- age: integer (nullable = false)
    
    +-----+--------+---+
    |id   |name    |age|
    +-----+--------+---+
    |10001|zhangsan|23 |
    |10002|lisi    |22 |
    |10003|wangwu  |23 |
    |10004|zhaoliu |24 |
    +-----+--------+---+
    
    ========================================================
    
    root
     |-- id: integer (nullable = false)
     |-- name: string (nullable = true)
     |-- age: integer (nullable = false)
    
    
    +-----+--------+---+
    |id   |name    |age|
    +-----+--------+---+
    |10001|zhangsan|23 |
    |10002|lisi    |22 |
    |10003|wangwu  |23 |
    |10004|zhaoliu |24 |
    +-----+--------+---+
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
  • 相关阅读:
    IDEA使用database
    LabVIEW基础
    Oracle IO是否为Direct IO
    基于Matlab实现多个图像压缩案例(附上源码+数据集)
    【NOI模拟赛】摆(线性代数,杜教筛)
    用 Canvas 画简易手电筒
    解密数字经济时代,元宇宙企业如何重塑商业价值?
    【LeetCode每日一题合集】2023.9.11-2023.9.17(⭐反悔贪心&拓扑排序&Floyd)
    Redis(三)
    网络时代下的声音之路:如何在中央新闻媒体发布网评稿
  • 原文地址:https://blog.csdn.net/l848168/article/details/126280370