• DataFrame持久存储



    申明: 未经许可,禁止以任何形式转载,若要引用,请标注链接地址
    全文共计7165字,阅读大概需要3分钟

    1. 实验室名称:

    大数据实验教学系统

    2. 实验项目名称:

    DataFrame持久存储

    3. 实验学时:

    4. 实验原理:

    DataFrame数据经过计算以后,可以持久到外部存储中,如关系型数据库和HDFS中。Spark对此提供了支持。

    5. 实验目的:

    掌握DataFrame存储操作。

    6. 实验内容:

    将DataFrame持久存储。具体包含如下内容:
      - 写入MySQL
      - 写入HDFS

    7. 实验器材(设备、虚拟机名称):

    硬件:x86_64 ubuntu 16.04服务器
      软件:JDK 1.8,Spark-2.3.2,Hadoop-2.7.3,zeppelin-0.8.1

    8. 实验步骤:

    8.1 环境准备

    1、右击Ubuntu操作系统桌面,从弹出菜单中选择【Open in Terminal】命令打开终端。
      在终端窗口下,输入以下命令,分别启动HDFS集群、Spark集群和Zeppelin服务器:

    1.	$ start-dfs.sh
    2.	$ cd /opt/spark
    3.	$ ./sbin/start-all.sh
    4.	$ zeppelin-daemon.sh start
    
    • 1
    • 2
    • 3
    • 4

    2、将本地数据上传至HDFS上。在终端窗口中,分别执行以下命令上传数据:

    1.	$ hdfs dfs -mkdir -p /data/dataset/batch
    2.	$ hdfs dfs -put /data/dataset/batch/Online-Retail.txt /data/dataset/batch/
    
    • 1
    • 2

    3、因为后面的实验中需要访问MySQL数据库,所以先要将MySQL的jdbc驱动程序拷贝到Spark的jars目录下。在终端窗口,执行如下的命令:

    1.	$ cp /data/software/mysql-connector-java-5.1.45-bin.jar /opt/spark/jars/
    
    • 1

    4、启动浏览器,打开zeppelin notebook首页,点击【Create new note】链接,创建一个新的笔记本,名字为【rdd_demo】,解释器默认使用【spark】,如下图所示:
    在这里插入图片描述

    8.2 数据存储

    1、读取数据,生成RDD,创建DataFrame。在zeppelin中执行如下代码:

    1.	import org.apache.spark.sql.types._
    2.	import org.apache.spark.sql._
    3.	import org.apache.spark.sql.functions._
    4.	     
    5.	// 数据路径
    6.	var filePath = "/data/dataset/batch/Online-Retail.txt"
    7.	     
    8.	// 加载RDD
    9.	var inFileRDD= sc.textFile(filePath)
    10.	// 以制表符进行分割
    11.	var allRowsRDD=inFileRDD.map(x=> x.split("\t"))
    12.	     
    13.	// 获取RDD的第一条数据头标签
    14.	var header = allRowsRDD.first()
    15.	     
    16.	// 去除标题行
    17.	var data = allRowsRDD.filter(x => x(0) != header(0))
    18.	     
    19.	// 创建Schema
    20.	var fields= List(StructField("invoiceNo", StringType, true),
    21.	                 StructField("stockCode", StringType, true),
    22.	                 StructField("description", StringType, true),
    23.	                 StructField("quantity", IntegerType, true),
    24.	                 StructField("invoiceDate", StringType, true),
    25.	                 StructField("unitPrice", DoubleType, true),
    26.	                 StructField("customerID", StringType, true),
    27.	                 StructField("country", StringType, true)
    28.	      )
    29.	val schema = StructType(fields)
    30.	     
    31.	
    32.	// 将RDD中的每行数据转换为Row对象
    33.	var rowRDD = data.map( x => Row(x(0),x(1),x(2),x(3).toInt,x(4),x(5).toDouble,x(6),x(7)))
    34.	     
    35.	// 创建DataFrame
    36.	var r1DF = spark.createDataFrame(rowRDD,schema)
    37.	// 显示DataFrame数据
    38.	 r1DF.show(5)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    同时按下【shift+enter】对程序进行输出。输出内容如下所示:

    1.	+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
    2.	|invoiceNo|stockCode|         description|quantity|   invoiceDate|unitPrice|customerID|       country|
    3.	+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
    4.	|   536365|   85123A|WHITE HANGING HEA...|       6|2010/12/1 8:26|     2.55|     17850|United Kingdom|
    5.	|   536365|    71053| WHITE METAL LANTERN|       6|2010/12/1 8:26|     3.39|     17850|United Kingdom|
    6.	|   536365|   84406B|CREAM CUPID HEART...|       8|2010/12/1 8:26|     2.75|     17850|United Kingdom|
    7.	|   536365|   84029G|KNITTED UNION FLA...|       6|2010/12/1 8:26|     3.39|     17850|United Kingdom|
    8.	|   536365|   84029E|RED WOOLLY HOTTIE...|       6|2010/12/1 8:26|     3.39|     17850|United Kingdom|
    9.	+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
    10.	only showing top 5 rows
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    2、数据类型转换,创建本地视图,调用sql语句进行查询。在zeppelin中执行如下代码:

    1.	// 将invoiceDate列强制转换为时间类型
    2.	var ts = unix_timestamp($"invoiceDate","yyyy/MM/dd HH:mm").cast("timestamp")
    3.	     
    4.	// 为DataFrame添加一列
    5.	var r2DF = r1DF.withColumn("ts",ts)
    6.	// 显示添加后的数据
    7.	r2DF.show(5)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    同时按下【shift+enter】对程序进行输出。输出结果如下所示:

    1.	+---------+---------+--------------------+--------+--------------+---------+----------+--------------+-------------------+
    2.	|invoiceNo|stockCode|         description|quantity|   invoiceDate|unitPrice|customerID|       country|                 ts|
    3.	+---------+---------+--------------------+--------+--------------+---------+----------+--------------+-------------------+
    4.	|   536365|   85123A|WHITE HANGING HEA...|       6|2010/12/1 8:26|     2.55|     17850|United Kingdom|2010-12-01 08:26:00|
    5.	|   536365|    71053| WHITE METAL LANTERN|       6|2010/12/1 8:26|     3.39|     17850|United Kingdom|2010-12-01 08:26:00|
    6.	|   536365|   84406B|CREAM CUPID HEART...|       8|2010/12/1 8:26|     2.75|     17850|United Kingdom|2010-12-01 08:26:00|
    7.	|   536365|   84029G|KNITTED UNION FLA...|       6|2010/12/1 8:26|     3.39|     17850|United Kingdom|2010-12-01 08:26:00|
    8.	|   536365|   84029E|RED WOOLLY HOTTIE...|       6|2010/12/1 8:26|     3.39|     17850|United Kingdom|2010-12-01 08:26:00|
    9.	+---------+---------+--------------------+--------+--------------+---------+----------+--------------+-------------------+
    10.	only showing top 5 rows
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    3、对数据进行查询,选择满足条件的数据。在zeppelin中执行如下代码:

    1.	import java.util.Properties
    2.	     
    3.	// 创建本地临时视图
    4.	r2DF.createOrReplaceTempView("retailTable")
    5.	// 查找时间小于2011-12-01的数据
    6.	var r3DF = spark.sql("select * from retailTable where ts<\"2011-12-01\"")
    7.	     
    8.	// 查找时间大于等于2011-12-01的数据
    9.	var r4DF = spark.sql("select * from retailTable where ts>=\"2011-12-01\"")
    10.	     
    11.	// 选取数据
    12.	var selectData =  r4DF.select("invoiceNo","stockCode","description","quantity","unitPrice","customerID","country","ts")
    13.	     
    14.	// 修改列的名字
    15.	var writeMySQL = selectData.withColumnRenamed("ts","invoiceDate")
    16.	// 显示修改后的DataFrame
    17.	writeMySQL.show(5)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    同时按下【shift+enter】对程序进行输出。输出结果如下所示:

    1.	+---------+---------+--------------------+--------+---------+----------+--------------+-------------------+
    2.	|invoiceNo|stockCode|         description|quantity|unitPrice|customerID|       country|        invoiceDate|
    3.	+---------+---------+--------------------+--------+---------+----------+--------------+-------------------+
    4.	|  C579889|    23245|SET OF 3 REGENCY ...|      -8|     4.15|     13853|United Kingdom|2011-12-01 08:12:00|
    5.	|  C579890|    84947|ANTIQUE SILVER TE...|      -1|     1.25|     15197|United Kingdom|2011-12-01 08:14:00|
    6.	|  C579890|    23374|RED SPOT PAPER GI...|      -1|     0.82|     15197|United Kingdom|2011-12-01 08:14:00|
    7.	|  C579890|    84945|MULTI COLOUR SILV...|      -2|     0.85|     15197|United Kingdom|2011-12-01 08:14:00|
    8.	|  C579891|    23485|BOTANICAL GARDENS...|      -1|     25.0|     13644|United Kingdom|2011-12-01 08:18:00|
    9.	+---------+---------+--------------------+--------+---------+----------+--------------+-------------------+
    10.	only showing top 5 rows
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    4、将查询到的数据存储到MySQL中。
      (1)首先启动MySQL服务器。在终端窗口中,执行以下命令:

    1.	$ service mysql start
    
    • 1

    (2)登录MySQL服务器。在终端窗口中,执行以下命令:

    1.	$ mysql -u root -p
    
    • 1

    然后根据提示,输入登录密码:root。
      (3)执行以下SQL语句,创建测试表:

    1.	mysql> create database retailDB;
    2.	mysql> exit;
    
    • 1
    • 2

    (4)在zeppelin中执行如下代码:

    1.	// 将DataFrame数据存储到数据库中
    2.	val prop = new Properties()
    3.	prop.setProperty("user", "root")
    4.	prop.setProperty("password", "root")
    5.	     
    6.	writeMySQL.write.mode("append").jdbc("jdbc:mysql://localhost:3306/retailDB?characterEncoding=UTF-8", "transactions", prop)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    验证保存成功。进入MySQL,通过【select count(*) from transactions;】来查看写入MySQL的数据条数。

    5、将DataFrame存储到HDFS中。选择满足条件的数据,将数据写入到HDFS中。在zeppelin中执行如下代码:

    1.	var selectData = r3DF.select("invoiceNo","stockCode","description","quantity","unitPrice","customerID","country","ts")
    2.	var writeHDFS = selectData.withColumnRenamed("ts","invoiceDate")
    3.	writeHDFS.select("*").write.format("json").save("/Users/r3DF")
    
    • 1
    • 2
    • 3

    同时按下【shift+enter】对程序进行输出。
      验证保存到HDFS中成功。在终端窗口下,执行以下命令,查看写入HDFS的json数据:

    1.	# hdfs dfs -ls /Users/r3DF
    
    • 1

    可以看到已经写入成功,如下图所示:

    在这里插入图片描述

    9. 实验结果及分析:

    实验结果运行准确,无误

    10. 实验结论:

    经过本节实验的学习,通过学习DataFrame持久存储,进一步巩固了我们的Spark基础。

    11. 总结及心得体会:

    SparkSQL对SQL语句的处理和关系型数据库采用了类似的方法,SparkSQL会先将SQL语句进行解析Parse形成一个Tree,然后使用Rule对Tree进行绑定、优化等处理过程,通过模式匹配对不同类型的节点采用不同的操作。而SparkSQL的查询优化器是Catalyst,它负责处理查询语句的解析、绑定、优化和生成物理计划等过程,Catalyst是SparkSQL最核心的部分,其性能优劣将决定整体的性能。

    12、 实验测试

    1、数据写入MySQL中mode=’append’的意思是什么( A ){单选}
      A、追加
      B、覆盖
      C、修改
      D、删除

    13、实验拓展

    1、给定给一个文本数据,将数据转换为DataFrame类型,并将数据写入到MySQL 中。

    在这里插入图片描述

  • 相关阅读:
    【Linux】第十二站:进程
    Yii2安装遇到Loading composer repositories with package information
    Openssl生成证书-nginx使用ssl
    Spring内置事件监听器
    linux安装firefox
    Python学习之——测试环境路径的工程示例
    Java实现添加文字水印、图片水印功能实战
    怎么学习前端开发?求推荐学习路线?
    dataframe保存excel格式比csv格式小很多很多
    解决小程序-wx.canvasGetImageData()-RGB取色盘苹果手机获取颜色慢问题
  • 原文地址:https://blog.csdn.net/qq_44807756/article/details/125571064