• DataFrame的操作-使用SQL



    申明: 未经许可,禁止以任何形式转载,若要引用,请标注链接地址
    全文共计5703字,阅读大概需要3分钟

    1. 实验室名称:

    大数据实验教学系统

    2. 实验项目名称:

    DataFrame的操作-使用SQL

    3. 实验学时:

    4. 实验原理:

    在SparkSQL对SQL语句的处理和关系型数据库采用了类似的方法,SparkSQL会先将SQL语句进行解析Parse形成一个Tree,然后使用Rule对Tree进行绑定、优化等处理过程,通过模式匹配对不同类型的节点采用不同的操作。而SparkSQL的查询优化器是Catalyst,它负责处理查询语句的解析、绑定、优化和生成物理计划等过程,Catalyst是SparkSQL最核心的部分,其性能优劣将决定整体的性能。

    5. 实验目的:

    掌握Spark SQL临时视图的创建。
      掌握Spark SQL查询。

    6. 实验内容:

    使用标准SQL对DataFrame进行操作。具体包含如下内容:
      1、临时表创建
        - createGlobalTempView(name)
        - createOrReplaceGlobalTempView(name)
        - createOrReplaceTempView(name)
        - createTempView(name)
        - registerTempTable(name)
        - dropTempView(name)
      2、Spark SQL查询
        - 加载数据集及处理
        - 创建临时表并查询
        - 创建永久表并查询

    7. 实验器材(设备、虚拟机名称):

    硬件:x86_64 ubuntu 16.04服务器
      软件:JDK 1.8,Spark-2.3.2,Hadoop-2.7.3,zeppelin-0.8.1

    8. 实验步骤:

    8.1 环境准备

    1、在终端窗口下,输入以下命令,分别启动HDFS集群、Spark集群和Zeppelin服务器:

    1.	$ start-dfs.sh
    2.	$ cd /opt/spark
    3.	$ ./sbin/start-all.sh
    4.	$ zeppelin-daemon.sh start
    
    • 1
    • 2
    • 3
    • 4

    2. 将本实验用到的数据集上传到HDFS上。在终端窗口中,执行以下命令:

    1.	$ hdfs dfs -mkdir -p /data/dataset/batch
    2.	$ hdfs dfs -put /data/dataset/batch/customers.csv /data/dataset/batch/
    3.	$ hdfs dfs -put /data/dataset/batch/wc.txt /data/dataset/batch/
    
    • 1
    • 2
    • 3

    3、启动浏览器,打开zeppelin notebook首页,点击【Create new note】链接,创建一个新的笔记本,名字为【rdd_demo】,解释器默认使用【spark】,如下图所示:
    在这里插入图片描述

    8.2 临时视图的创建

    1、使用createGlobalTempView(name)方法为DataFrame创建一个全局的临时表,其生命周期和启动的app的周期一致,既启动的spark应用存在则这个临时的表就一直能访问,直道sparkcontext的stop方法的调用退出应用为止。创建的临时表保存在global_temp这个库中。
      在zeppelin中执行如下代码:

    1.	var df = spark.read.option("header","true").csv("/data/dataset/batch/customers.csv")
    2.	df.createGlobalTempView("xx")
    3.	     
    4.	// 查询
    5.	spark.sql("select * from global_temp.xx").show(5)
    
    • 1
    • 2
    • 3
    • 4
    • 5

    【shift+enter】对程序进行输出。输出内容如下所示:

    1.	+---+------+---+-------------+
    2.	| ID| Gener|Age|Annual Income|
    3.	+---+------+---+-------------+
    4.	|  1|  Male| 34|         2000|
    5.	|  2|Female| 23|         3500|
    6.	|  3|Female| 26|         2500|
    7.	|  4|Female| 27|         4500|
    8.	|  5|  Male| 24|         5500|
    9.	+---+------+---+-------------+
    10.	only showing top 5 rows
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    2、使用createOrReplaceGlobalTempView(name)方法创建或替换视图。上面的方法当遇到已经创建了的临时表名的话会报错。而这个方法遇到已经存在的临时表会进行替换,没有则创建。在zeppelin中执行如下代码:

    1.	// 创建临时表,存在则覆盖
    2.	df.createOrReplaceGlobalTempView("XX")
    3.	     
    4.	// 查询
    5.	spark.sql("select * from global_temp.XX").show(5)
    
    • 1
    • 2
    • 3
    • 4
    • 5

    【shift+enter】对程序进行输出。输出内容如下所示:

    1.	+---+------+---+-------------+
    2.	| ID| Gener|Age|Annual Income|
    3.	+---+------+---+-------------+
    4.	|  1|  Male| 34|         2000|
    5.	|  2|Female| 23|         3500|
    6.	|  3|Female| 26|         2500|
    7.	|  4|Female| 27|         4500|
    8.	|  5|  Male| 24|         5500|
    9.	+---+------+---+-------------+
    10.	only showing top 5 rows
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    3、使用createOrReplaceTempView(name)方法为DataFrame创建本地的临时视图,其生命周期只限于当前的SparkSession,当调用了SparkSession的stop方法停止SparkSession后,其生命周期就到此为止了。
      在zeppelin中执行如下代码:

    1.	
    2.	// 创建本地的临时视图
    3.	df.createOrReplaceTempView("TT")
    4.	     
    5.	// 查询
    6.	spark.sql("select * from TT").show(5)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    【shift+enter】对程序进行输出。输出内容如下所示:

    1.	+---+------+---+-------------+
    2.	| ID| Gener|Age|Annual Income|
    3.	+---+------+---+-------------+
    4.	|  1|  Male| 34|         2000|
    5.	|  2|Female| 23|         3500|
    6.	|  3|Female| 26|         2500|
    7.	|  4|Female| 27|         4500|
    8.	|  5|  Male| 24|         5500|
    9.	+---+------+---+-------------+
    10.	only showing top 5 rows
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    4、使用createTempView(name)方法创建临时视图。这个方法在创建临时视图时若遇到已经创建过的视图的名字,会报错。因此需要指定另外的名字。在zeppelin中执行如下代码:

    1.	
    2.	// 创建视图
    3.	df.createTempView("T2")
    4.	     
    5.	// 查询
    6.	spark.sql("select * from T2").show(5)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    【shift+enter】对程序进行输出。输出内容如下所示:

    1.	+---+------+---+-------------+
    2.	| ID| Gener|Age|Annual Income|
    3.	+---+------+---+-------------+
    4.	|  1|  Male| 34|         2000|
    5.	|  2|Female| 23|         3500|
    6.	|  3|Female| 26|         2500|
    7.	|  4|Female| 27|         4500|
    8.	|  5|  Male| 24|         5500|
    9.	+---+------+---+-------------+
    10.	only showing top 5 rows
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    8.3 执行Spark SQL查询

    1、创建数据,从HDFS上进行数据读取,数据转换为DataFrame类型。在zeppelin中执行如下代码:

    1.	import org.apache.spark.sql.types._
    2.	import org.apache.spark.sql._
    3.	     
    4.	// 数据路径
    5.	var filePath = "/data/dataset/batch/wc.txt"
    6.	     
    7.	// 创建RDD
    8.	var rdd1 = sc.textFile(filePath)
    9.	     
    10.	// RDD进行flatMap操作后进行map操作
    11.	var rdd2 = rdd1.flatMap( x=>x.split(" ")).map( word=>Row(word,1))
    12.	     
    13.	// 指定schema
    14.	var schema = StructType(List(StructField("word",StringType,true),StructField("count",IntegerType,true)))
    15.	     
    16.	// 创建DataFrame
    17.	var wordDF = spark.createDataFrame(rdd2,schema)
    18.	// 查看df的数据结构信息
    19.	wordDF.printSchema
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    【shift+enter】对程序进行输出。输出内容如下所示:

    1.	root
    2.	 |-- word: string (nullable = true)
    3.	 |-- count: integer (nullable = true)
    
    • 1
    • 2
    • 3

    2、创建临时表,数据查询。在zeppelin中执行如下代码:

    1.	// 注册临时表
    2.	wordDF.createOrReplaceTempView("wc")
    3.	     
    4.	// 从临时表wc中执行sql查询
    5.	var resultDF = spark.sql("select * from wc")
    6.	// 展示DF
    7.	resultDF.show
    8.	     
    9.	// 从临时表中执行sql查询
    10.	var resultDF2 = spark.sql("select word,count(*) as total from wc group by word")
    11.	// 展示DF
    12.	resultDF2.show
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    【shift+enter】对程序进行输出。输出内容如下所示:

    1.	+-----+-----+
    2.	| word|count|
    3.	+-----+-----+
    4.	| good|    1|
    5.	| good|    1|
    6.	|study|    1|
    7.	|  day|    1|
    8.	|  day|    1|
    9.	|   up|    1|
    10.	+-----+-----+
    11.	     
    12.	+-----+-----+
    13.	| word|total|
    14.	+-----+-----+
    15.	|  day|    2|
    16.	|study|    1|
    17.	|   up|    1|
    18.	| good|    2|
    19.	+-----+-----+
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    3、创建永久表,数据查询。在zeppelin中执行如下代码:

    1.	// 注册为永久表
    2.	wordDF.write.saveAsTable("wccc")
    3.	     
    4.	// 从永久表中执行查询
    5.	var resultDF3 = spark.sql("select * from wccc")
    6.	// 数据展示
    7.	resultDF3.show()
    8.	     
    9.	// 从永久表中执行wccc查询
    10.	var resultDF4 = spark.sql("select word,count(*) as total from wccc group by word")
    11.	// 数据展示
    12.	resultDF4.show()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    【shift+enter】对程序进行输出。输出内容如下所示:

    1.	+-----+-----+
    2.	| word|count|
    3.	+-----+-----+
    4.	| good|    1|
    5.	| good|    1|
    6.	|study|    1|
    7.	|  day|    1|
    8.	|  day|    1|
    9.	|   up|    1|
    10.	+-----+-----+
    11.	     
    12.	+-----+-----+
    13.	| word|total|
    14.	+-----+-----+
    15.	|  day|    2|
    16.	|study|    1|
    17.	|   up|    1|
    18.	| good|    2|
    19.	+-----+-----+
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    9. 实验结果及分析:

    实验结果运行准确,无误

    10. 实验结论:

    经过本节实验的学习,通过DataFrame的操作-使用SQL,进一步巩固了我们的Spark基础。

    11. 总结及心得体会:

    SparkSQL对SQL语句的处理和关系型数据库采用了类似的方法,SparkSQL会先将SQL语句进行解析Parse形成一个Tree,然后使用Rule对Tree进行绑定、优化等处理过程,通过模式匹配对不同类型的节点采用不同的操作。而SparkSQL的查询优化器是Catalyst,它负责处理查询语句的解析、绑定、优化和生成物理计划等过程,Catalyst是SparkSQL最核心的部分,其性能优劣将决定整体的性能。

    12、 实验知识测试

    1、创建本地临时视图表成功的方法是( C ){单选}
      A、createGlobalTempView(name)
      B、createOrReplaceGlobalTempView(name)
      C、createOrReplaceTempView(name)
      D、createTempView(name)

    13、实验拓展

    给定本地一个文本数据,并上传至HDFS,读取后创建为DataFrame,并创建临时表对数据进行查询。

  • 相关阅读:
    进程间通信(IPC):共享内存
    【英语语法】 either or / neither nor
    Jackson 解析 JSON 详细教程
    Linux超详细笔记
    数据可视化:数据可视化的意义
    【Flutter】包管理(5)Flutter 中 Hive 的详细使用说明
    RabbitMQ 简介
    uview使用u-action-sheet添加滚动条
    基于滑模预测控制的海底采矿车轨迹跟踪算法
    【Docker】基于Dockerfile构建镜像介绍
  • 原文地址:https://blog.csdn.net/qq_44807756/article/details/125570979