1.1 什么是Spark回顾:Hadoop主要解决,海量数据的存储和海量数据的分析计算。 Spark是一种基于内存的快速、通用、可扩展的大数据分析计算引擎。 | ||||||||||||||||
1.2 Hadoop与Spark历史
| ||||||||||||||||
1.3 Hadoop与Spark框架对比
| ||||||||||||||||
1.4 Spark内置模块
Spark Core:实现了Spark的基本功能,包含任务调度、内存管理、错误恢复、与存储系统交互等模块。Spark Core中还包含了对弹性分布式数据集(Resilient Distributed DataSet,简称RDD)的API定义。 Spark SQL:是Spark用来操作结构化数据的程序包。通过Spark SQL,我们可以使用 SQL或者Apache Hive版本的HQL来查询数据。Spark SQL支持多种数据源,比如Hive表、Parquet以及JSON等。 Spark Streaming:是Spark提供的对实时数据进行流式计算的组件。提供了用来操作数据流的API,并且与Spark Core中的 RDD API高度对应。 Spark MLlib:提供常见的机器学习功能的程序库。包括分类、回归、聚类、协同过滤等,还提供了模型评估、数据 导入等额外的支持功能。 Spark GraphX:主要用于图形并行计算和图挖掘系统的组件。 集群管理器:Spark设计为可以高效地在一个计算节点到数千个计算节点之间伸缩计算。为了实现这样的要求,同时获得最大灵活性,Spark支持在各种集群管理器(Cluster Manager)上运行,包括Hadoop YARN、Apache Mesos,以及Spark自带的一个简易调度器,叫作独立调度器。 Spark得到了众多大数据公司的支持,这些公司包括Hortonworks、IBM、Intel、Cloudera、MapR、Pivotal、百度、阿里、腾讯、京东、携程、优酷土豆。当前百度的Spark已应用于大搜索、直达号、百度大数据等业务;阿里利用GraphX构建了大规模的图计算和图挖掘系统,实现了很多生产系统的推荐算法;腾讯Spark集群达到8000台的规模,是当前已知的世界上最大的Spark集群。 | ||||||||||||||||
1.5 Spark特点
| ||||||||||||||||
1.6 Master和Worker集群资源管理
Master和Worker是Spark的守护进程、集群资源管理者,即Spark在特定模式(Standalone)下正常运行必须要有的后台常驻进程。 | ||||||||||||||||
1.7 Driver和Executor任务的管理者
Driver和Executor是临时程序,当有具体任务提交到Spark集群才会开启的程序。 | ||||||||||||||||
1.8 几种模式对比
1.9 端口号总结1)Spark查看当前Spark-shell运行任务情况端口号:4040 2)Spark Master内部通信服务端口号:7077 (类比于yarn的8032(RM和NM的内部通信)端口) 3)Spark Standalone模式Master Web端口号:8080(类比于Hadoop YARN任务运行情况查看端口号:8088) 4)Spark历史服务器端口号:18080 (类比于Hadoop历史服务器端口号:19888) |
| 广播变量:分布式共享只读变量。 广播变量用来高效分发较大的对象。向所有工作节点发送一个较大的只读值,以供一个或多个Spark Task操作使用。比如,如果你的应用需要向所有节点发送一个较大的只读查询表,广播变量用起来会很顺手。在多个Task并行操作中使用同一个变量,但是Spark会为每个Task任务分别发送。 1)使用广播变量步骤: (1)调用SparkContext.broadcast(广播变量)创建出一个广播对象,任何可序列化的类型都可以这么实现。 (2)通过广播变量.value,访问该对象的值。 (3)广播变量只会被发到各个节点一次,作为只读值处理(修改这个值不会影响到别的节点)。
|
| 累加器:分布式共享只写变量。(Executor和Executor之间不能读数据) 累加器用来把Executor端变量信息聚合到Driver端。在Driver中定义的一个变量,在Executor端的每个task都会得到这个变量的一份新的副本,每个task更新这些副本的值后,传回Driver端进行合并计算。 |
| 性能优化参考Spark性能优化指南——基础篇 - 美团技术团队 1. 数据压缩:使用压缩格式(如Snappy或Gzip)来减少数据在磁盘和网络传输中的大小,从而减少IO开销。 2. 数据分区:合理地对数据进行分区,以便在执行操作时能够充分利用并行性。可以使用repartition或coalesce方法来重新分区数据。 3. 广播变量:对于较小的数据集,可以将其广播到所有的工作节点上,以减少数据传输开销。 4. 内存管理:通过调整Spark的内存分配参数,如spark.executor.memory和spark.driver.memory,来优化内存使用。 5. 数据持久化:对于需要多次使用的数据集,可以使用persist或cache方法将其缓存到内存中,以避免重复计算。 6. 使用合适的数据结构和算法:根据具体的业务需求,选择合适的数据结构和算法,以提高计算效率。 7. 并行度设置:根据集群的规模和资源情况,适当调整并行度参数,如spark.default.parallelism和spark.sql.shuffle.partitions。 8. 数据倾斜处理:当某些键的数据量远远大于其他键时,可能会导致任务不均衡。可以使用一些技术,如repartition、join优化或使用自定义分区器来处理数据倾斜问题。 9. 使用DataFrame和Dataset:相比于RDD,DataFrame和Dataset提供了更高层次的抽象和优化,可以更好地利用Spark的优化功能。 10. 使用专门的优化工具:Spark提供了一些专门的优化工具,如Spark UI和Spark调优指南,可以帮助识别和解决性能瓶颈。 |
1.1 什么是Spark SQL
|
1.2 为什么要有Spark SQL
|
1.3 Spark SQL原理
1.3.1 什么是DataFrame1)DataFrame是一种类似RDD的分布式数据集,类似于传统数据库中的二维表格。 2)DataFrame与RDD的主要区别在于,DataFrame带有schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型。
左侧的RDD[Person]虽然以Person为类型参数,但Spark框架本身不了解Person类的内部结构。而右侧的DataFrame却提供了详细的结构信息,使得Spark SQL可以清楚地知道该数据集中包含哪些列,每列的名称和类型各是什么。 3)Spark SQL性能上比RDD要高。因为Spark SQL了解数据内部结构,从而对藏于DataFrame背后的数据源以及作用于DataFrame之上的变换进行了针对性的优化,最终达到大幅提升运行时效率的目标。反观RDD,由于无从得知所存数据元素的具体内部结构,Spark Core只能在Stage层面进行简单、通用的流水线优化。 1.3.2 什么是DataSetDataSet是分布式数据集合。
1.3.3 RDD、DataFrame和DataSet之间关系1)发展历史 RDD(Spark1.0)=》Dataframe(Spark1.3)=》Dataset(Spark1.6) 如果同样的数据都给到这三个数据结构,他们分别计算之后,都会给出相同的结果。不同的是他们的执行效率和执行方式。在后期的Spark版本中,DataSet有可能会逐步取代RDD和DataFrame成为唯一的API接口。 2)三者的共性 (1)RDD、DataFrame、DataSet全都是Spark平台下的分布式弹性数据集,为处理超大型数据提供便利 (2)三者都有惰性机制,在进行创建、转换,如map方法时,不会立即执行,只有在遇到Action行动算子如foreach时,三者才会开始遍历运算 (3)三者有许多共同的函数,如filter,排序等 (4)三者都会根据Spark的内存情况自动缓存运算 (5)三者都有分区的概念 |
1.4 Spark SQL的特点1)易整合 无缝的整合了SQL查询和Spark编程。
2)统一的数据访问方式 使用相同的方式连接不同的数据源。
3)兼容Hive 在已有的仓库上直接运行SQL或者HQL。
4)标准的数据连接 通过JDBC或者ODBC来连接
|
10.Spark数据分析及处理
| 用例1:数据清洗
日志拆分字段: event_time url method status sip user_uip action_prepend action_client (1)创建DataFrame的两种方式
(2)数据清洗
(3)将数据写入MySQL中 1)创建JDBC工具类,配置连接及定义写入数据方式(Append或OverWrite)
2)写入代码
完整代码 (1)JdbcUtils (2)ETLDemo |
| Json使用参考https://www.cnblogs.com/tomato0906/articles/7291178.html |
| 练习1—解析Json字符串及转DataFrame转JSON
|
| 练习2---- 解析如下json {"name":"njzb","roomInfo":{"area":2000,"employee":98,"roomNum":30},"students":[{"classes":"kb01","stuId":"1","stuName":"zhangsan","teacher":"xingxing"},{"classes":"kb02","stuId":"2","stuName":"lisi","teacher":"gree"},{"classes":"kb03","stuId":"3","stuName":"wangwu","teacher":"gree"},{"classes":"kb05","stuId":"4","stuName":"zhaoliu","teacher":"xingxing"}],"teachers":[{"name":"gree","skill":"bigdata&java","yearNum":7},{"name":"xingxing","skill":"bigdata&java","yearNum":3}]}
|
第1章 SparkStreaming概述1.1 Spark Streaming是什么
1.2 Spark Streaming架构原理1.2.1 什么是DStream
| ||||||||||||||||||||||||||||
第2章 DStream入门2.1 WordCount案例实操需求:使用netcat工具向9999端口不断的发送数据,通过SparkStreaming读取端口数据并统计不同单词出现的次数
监听端口信息及wordcount
Wordcount
2.2 WordCount解析DStream是Spark Streaming的基础抽象,代表持续性的数据流和经过各种Spark算子操作后的结果数据流。 在内部实现上,每一批次的数据封装成一个RDD,一系列连续的RDD组成了DStream。对这些RDD的转换是由Spark引擎来计算。 说明:DStream中批次与批次之间计算相互独立。如果批次设置时间小于计算时间会出现计算任务叠加情况,需要多分配资源。通常情况,批次设置时间要大于计算时间。
| ||||||||||||||||||||||||||||
第3章 DStream创建3.1 RDD队列3.1.1 用法及说明测试方法: (1)使用ssc.queueStream(queueOfRDDs)来创建DStream (2)将每一个推送到这个队列中的RDD,都会作为DStream的一个批次处理。 3.3 Kafka数据源(面试、开发重点)
| ||||||||||||||||||||||||||||
第4章 DStream转换DStream上的操作与RDD的类似,分为转换和输出两种,此外转换操作中还有一些比较特殊的原语,如:updateStateByKey()、transform()以及各种Window相关的原语。 4.1 无状态转化操作无状态转化操作:就是把RDD转化操作应用到DStream每个批次上,每个批次相互独立,自己算自己的。 4.1.1 常规无状态转化操作DStream的部分无状态转化操作列在了下表中,都是DStream自己的API。 注意,针对键值对的DStream转化操作,要添加import StreamingContext._才能在Scala中使用,比如reduceByKey()。
需要记住的是,尽管这些函数看起来像作用在整个流上一样,但事实上每个DStream在内部是由许多RDD批次组成,且无状态转化操作是分别应用到每个RDD(一个批次的数据)上的。 4.1.2 Transform需求:通过Transform可以将DStream每一批次的数据直接转换为RDD的算子操作。
4.2 有状态转化操作 有状态转化操作:计算当前批次RDD时,需要用到历史RDD的数据。 4.2.1 UpdateStateByKeyupdateStateByKey()用于键值对形式的DStream,可以记录历史批次状态。例如可以实现累加WordCount。 updateStateByKey()参数中需要传递一个函数,在函数内部可以根据需求对新数据和历史状态进行整合处理,返回一个新的DStream。 注意:使用updateStateByKey需要对检查点目录进行配置,会使用检查点来保存状态。
4)原理说明
4.2.2 WindowOperationsWindow Operations可以设置窗口的大小和滑动窗口的间隔来动态的获取当前Streaming的允许状态。所有基于窗口的操作都需要两个参数,分别为窗口时长以及滑动步长。
注意:这两者都必须为采集批次大小的整数倍。 如下图所示WordCount案例:窗口大小为批次的2倍,滑动步等于批次大小。
4.2.3 Window1)基本语法:window(windowLength, slideInterval): 基于对源DStream窗口的批次进行计算返回一个新的DStream。 2)需求:统计WordCount:3秒一个批次,窗口12秒,滑步6秒。 4.2.4 reduceByKeyAndWindow1)基本语法: reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]):当在一个(K,V)对的DStream上调用此函数,会返回一个新(K,V)对的DStream,此处通过对滑动窗口中批次数据使用reduce函数来整合每个key的value值。 4.2.5 reduceByKeyAndWindow(反向Reduce)1)基本语法: reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]): 这个函数是上述函数的变化版本,每个窗口的reduce值都是通过用前一个窗的reduce值来递增计算。通过reduce进入到滑动窗口数据并“反向reduce”离开窗口的旧数据来实现这个操作。一个例子是随着窗口滑动对keys的“加”“减”计数。通过前边介绍可以想到,这个函数只适用于“可逆的reduce函数”,也就是这些reduce函数有相应的“反reduce”函数(以参数invFunc形式传入)。如前述函数,reduce任务的数量通过可选参数来配置。 4.2.6 Window的其他操作(1)countByWindow(windowLength, slideInterval): 返回一个滑动窗口计数流中的元素个数; (2)reduceByWindow(func, windowLength, slideInterval): 通过使用自定义函数整合滑动区间流元素来创建一个新的离散化数据流; | ||||||||||||||||||||||||||||
第5章 DStream输出DStream通常将数据输出到,外部数据库或屏幕上。 DStream与RDD中的惰性求值类似,如果一个DStream及其派生出的DStream都没有被执行输出操作,那么这些DStream就都不会被求值。如果StreamingContext中没有设定输出操作,整个Context就都不会启动。 1)输出操作API如下:
注意:以上操作都是每一批次写出一次,会产生大量小文件,在生产环境,很少使用。
在企业开发中通常采用foreachRDD(),它用来对DStream中的RDD进行任意计算。这和transform()有些类似,都可以让我们访问任意RDD。在foreachRDD()中,可以重用我们在Spark中实现的所有行动操作(action算子)。比如,常见的用例之一是把数据写到如MySQL的外部数据库中。 3)注意 (1)连接不能写在Driver层面(序列化) (2)如果写在foreach则每个RDD中的每一条数据都创建,得不偿失; (3)增加foreachPartition,在分区创建(获取)。 | ||||||||||||||||||||||||||||
第6章 优雅关闭流式任务需要7*24小时执行,但是有时涉及到升级代码需要主动停止程序,但是分布式程序,没办法做到一个个进程去杀死,所以配置优雅的关闭就显得至关重要了。 关闭方式:使用外部文件系统来控制内部程序关闭。 |