大数据实验教学系统
DataFrame的操作-使用SQL
在SparkSQL对SQL语句的处理和关系型数据库采用了类似的方法,SparkSQL会先将SQL语句进行解析Parse形成一个Tree,然后使用Rule对Tree进行绑定、优化等处理过程,通过模式匹配对不同类型的节点采用不同的操作。而SparkSQL的查询优化器是Catalyst,它负责处理查询语句的解析、绑定、优化和生成物理计划等过程,Catalyst是SparkSQL最核心的部分,其性能优劣将决定整体的性能。
掌握Spark SQL临时视图的创建。
掌握Spark SQL查询。
使用标准SQL对DataFrame进行操作。具体包含如下内容:
1、临时表创建
- createGlobalTempView(name)
- createOrReplaceGlobalTempView(name)
- createOrReplaceTempView(name)
- createTempView(name)
- registerTempTable(name)
- dropTempView(name)
2、Spark SQL查询
- 加载数据集及处理
- 创建临时表并查询
- 创建永久表并查询
硬件:x86_64 ubuntu 16.04服务器
软件:JDK 1.8,Spark-2.3.2,Hadoop-2.7.3,zeppelin-0.8.1
1、在终端窗口下,输入以下命令,分别启动HDFS集群、Spark集群和Zeppelin服务器:
1. $ start-dfs.sh
2. $ cd /opt/spark
3. $ ./sbin/start-all.sh
4. $ zeppelin-daemon.sh start
2. 将本实验用到的数据集上传到HDFS上。在终端窗口中,执行以下命令:
1. $ hdfs dfs -mkdir -p /data/dataset/batch
2. $ hdfs dfs -put /data/dataset/batch/customers.csv /data/dataset/batch/
3. $ hdfs dfs -put /data/dataset/batch/wc.txt /data/dataset/batch/
3、启动浏览器,打开zeppelin notebook首页,点击【Create new note】链接,创建一个新的笔记本,名字为【rdd_demo】,解释器默认使用【spark】,如下图所示:

1、使用createGlobalTempView(name)方法为DataFrame创建一个全局的临时表,其生命周期和启动的app的周期一致,既启动的spark应用存在则这个临时的表就一直能访问,直道sparkcontext的stop方法的调用退出应用为止。创建的临时表保存在global_temp这个库中。
在zeppelin中执行如下代码:
1. var df = spark.read.option("header","true").csv("/data/dataset/batch/customers.csv")
2. df.createGlobalTempView("xx")
3.
4. // 查询
5. spark.sql("select * from global_temp.xx").show(5)
【shift+enter】对程序进行输出。输出内容如下所示:
1. +---+------+---+-------------+
2. | ID| Gener|Age|Annual Income|
3. +---+------+---+-------------+
4. | 1| Male| 34| 2000|
5. | 2|Female| 23| 3500|
6. | 3|Female| 26| 2500|
7. | 4|Female| 27| 4500|
8. | 5| Male| 24| 5500|
9. +---+------+---+-------------+
10. only showing top 5 rows
2、使用createOrReplaceGlobalTempView(name)方法创建或替换视图。上面的方法当遇到已经创建了的临时表名的话会报错。而这个方法遇到已经存在的临时表会进行替换,没有则创建。在zeppelin中执行如下代码:
1. // 创建临时表,存在则覆盖
2. df.createOrReplaceGlobalTempView("XX")
3.
4. // 查询
5. spark.sql("select * from global_temp.XX").show(5)
【shift+enter】对程序进行输出。输出内容如下所示:
1. +---+------+---+-------------+
2. | ID| Gener|Age|Annual Income|
3. +---+------+---+-------------+
4. | 1| Male| 34| 2000|
5. | 2|Female| 23| 3500|
6. | 3|Female| 26| 2500|
7. | 4|Female| 27| 4500|
8. | 5| Male| 24| 5500|
9. +---+------+---+-------------+
10. only showing top 5 rows
3、使用createOrReplaceTempView(name)方法为DataFrame创建本地的临时视图,其生命周期只限于当前的SparkSession,当调用了SparkSession的stop方法停止SparkSession后,其生命周期就到此为止了。
在zeppelin中执行如下代码:
1.
2. // 创建本地的临时视图
3. df.createOrReplaceTempView("TT")
4.
5. // 查询
6. spark.sql("select * from TT").show(5)
【shift+enter】对程序进行输出。输出内容如下所示:
1. +---+------+---+-------------+
2. | ID| Gener|Age|Annual Income|
3. +---+------+---+-------------+
4. | 1| Male| 34| 2000|
5. | 2|Female| 23| 3500|
6. | 3|Female| 26| 2500|
7. | 4|Female| 27| 4500|
8. | 5| Male| 24| 5500|
9. +---+------+---+-------------+
10. only showing top 5 rows
4、使用createTempView(name)方法创建临时视图。这个方法在创建临时视图时若遇到已经创建过的视图的名字,会报错。因此需要指定另外的名字。在zeppelin中执行如下代码:
1.
2. // 创建视图
3. df.createTempView("T2")
4.
5. // 查询
6. spark.sql("select * from T2").show(5)
【shift+enter】对程序进行输出。输出内容如下所示:
1. +---+------+---+-------------+
2. | ID| Gener|Age|Annual Income|
3. +---+------+---+-------------+
4. | 1| Male| 34| 2000|
5. | 2|Female| 23| 3500|
6. | 3|Female| 26| 2500|
7. | 4|Female| 27| 4500|
8. | 5| Male| 24| 5500|
9. +---+------+---+-------------+
10. only showing top 5 rows
1、创建数据,从HDFS上进行数据读取,数据转换为DataFrame类型。在zeppelin中执行如下代码:
1. import org.apache.spark.sql.types._
2. import org.apache.spark.sql._
3.
4. // 数据路径
5. var filePath = "/data/dataset/batch/wc.txt"
6.
7. // 创建RDD
8. var rdd1 = sc.textFile(filePath)
9.
10. // RDD进行flatMap操作后进行map操作
11. var rdd2 = rdd1.flatMap( x=>x.split(" ")).map( word=>Row(word,1))
12.
13. // 指定schema
14. var schema = StructType(List(StructField("word",StringType,true),StructField("count",IntegerType,true)))
15.
16. // 创建DataFrame
17. var wordDF = spark.createDataFrame(rdd2,schema)
18. // 查看df的数据结构信息
19. wordDF.printSchema
【shift+enter】对程序进行输出。输出内容如下所示:
1. root
2. |-- word: string (nullable = true)
3. |-- count: integer (nullable = true)
2、创建临时表,数据查询。在zeppelin中执行如下代码:
1. // 注册临时表
2. wordDF.createOrReplaceTempView("wc")
3.
4. // 从临时表wc中执行sql查询
5. var resultDF = spark.sql("select * from wc")
6. // 展示DF
7. resultDF.show
8.
9. // 从临时表中执行sql查询
10. var resultDF2 = spark.sql("select word,count(*) as total from wc group by word")
11. // 展示DF
12. resultDF2.show
【shift+enter】对程序进行输出。输出内容如下所示:
1. +-----+-----+
2. | word|count|
3. +-----+-----+
4. | good| 1|
5. | good| 1|
6. |study| 1|
7. | day| 1|
8. | day| 1|
9. | up| 1|
10. +-----+-----+
11.
12. +-----+-----+
13. | word|total|
14. +-----+-----+
15. | day| 2|
16. |study| 1|
17. | up| 1|
18. | good| 2|
19. +-----+-----+
3、创建永久表,数据查询。在zeppelin中执行如下代码:
1. // 注册为永久表
2. wordDF.write.saveAsTable("wccc")
3.
4. // 从永久表中执行查询
5. var resultDF3 = spark.sql("select * from wccc")
6. // 数据展示
7. resultDF3.show()
8.
9. // 从永久表中执行wccc查询
10. var resultDF4 = spark.sql("select word,count(*) as total from wccc group by word")
11. // 数据展示
12. resultDF4.show()
【shift+enter】对程序进行输出。输出内容如下所示:
1. +-----+-----+
2. | word|count|
3. +-----+-----+
4. | good| 1|
5. | good| 1|
6. |study| 1|
7. | day| 1|
8. | day| 1|
9. | up| 1|
10. +-----+-----+
11.
12. +-----+-----+
13. | word|total|
14. +-----+-----+
15. | day| 2|
16. |study| 1|
17. | up| 1|
18. | good| 2|
19. +-----+-----+
实验结果运行准确,无误
经过本节实验的学习,通过DataFrame的操作-使用SQL,进一步巩固了我们的Spark基础。
SparkSQL对SQL语句的处理和关系型数据库采用了类似的方法,SparkSQL会先将SQL语句进行解析Parse形成一个Tree,然后使用Rule对Tree进行绑定、优化等处理过程,通过模式匹配对不同类型的节点采用不同的操作。而SparkSQL的查询优化器是Catalyst,它负责处理查询语句的解析、绑定、优化和生成物理计划等过程,Catalyst是SparkSQL最核心的部分,其性能优劣将决定整体的性能。
1、创建本地临时视图表成功的方法是( C ){单选}
A、createGlobalTempView(name)
B、createOrReplaceGlobalTempView(name)
C、createOrReplaceTempView(name)
D、createTempView(name)
给定本地一个文本数据,并上传至HDFS,读取后创建为DataFrame,并创建临时表对数据进行查询。