



| Transformation | Meaning |
|---|---|
| map(func) | 对DStream中的各个元素进行func函数操作,然后返回一个新的DStream |
| flatMap(func) | 与map方法类似,只不过各个输入项可以被输出为零个或多个输出项 |
| filter(func) | 过滤出所有函数func返回值为true的DStream元素并返回一个新的DStream |
| repartition(numPartitions) | 增加或减少DStream中的分区数,从而改变DStream的并行度 |
| union(otherStream) | 将源DStream和输入参数为otherDStream的元素合并,并返回一个新的DStream. |
| count() | 通过对DStream中的各个RDD中的元素进行计数,然后返回只有一个元素的RDD构成的DStream |
| reduce(func) | 对源DStream中的各个RDD中的元素利用func进行聚合操作,然后返回只有一个元素的RDD构成的新的DStream. |
| countByValue() | 对于元素类型为KV的DStream,返回一个元素为(K,Long)键值对形式的新的DStream,Long对应的值为源DStream中各个RDD的key出现的次数 |
| reduceByKey(func, [numTasks]) | 利用func函数对源DStream中的key进行聚合操作,然后返回新的(K,V)对构成的DStream |
| join(otherStream, [numTasks]) | 输入为(K,V)、(K,W)类型的DStream,返回一个新的(K,(V,W))类型的DStream |
| cogroup(otherStream, [numTasks]) | 输入为(K,V)、(K,W)类型的DStream,返回一个新的 (K, Seq[V], Seq[W]) 元组类型的DStream |
| transform(func) | 通过RDD-to-RDD函数作用于DStream中的各个RDD,可以是任意的RDD操作,从而返回一个新的RDD |
| updateStateByKey(func) | 根据key的之前状态值和key的新值,对key进行更新,返回一个新状态的DStream |
| reduceByKeyAndWindow | 窗口函数操作,实现按照window窗口大小来进行计算 |
| Output Operation | Meaning |
|---|---|
| print() | 打印到控制台 |
| saveAsTextFiles(prefix, [suffix]) | 保存流的内容为文本文件,文件名为"prefix-TIME_IN_MS[.suffix]". |
| saveAsObjectFiles(prefix, [suffix]) | 保存流的内容为SequenceFile,文件名为 “prefix-TIME_IN_MS[.suffix]”. |
| saveAsHadoopFiles(prefix, [suffix]) | 保存流的内容为hadoop文件,文件名为 “prefix-TIME_IN_MS[.suffix]”. |
| foreachRDD(func) | 对Dstream里面的每个RDD执行func |

sudo yum -y install nc
nc -lk 9999
object Case01_SocketWordCount {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
// 1. 创建SparkConf对象,注意这里至少给两个线程,一个线程没办法执行
val sparkConf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")
// 2. 创建StreamingContext对象
val ssc = new StreamingContext(sparkConf, Seconds(1))
// 3. 接收Socket数据
val socketTextStream: ReceiverInputDStream[String] = ssc.socketTextStream("node01", 9999)
// 4. 对数据进行处理
val result: DStream[(String, Int)] = socketTextStream.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
// 5. 打印结果
result.print()
// 6. 开启流式计算
ssc.start()
ssc.awaitTermination()
}
}

object Case02_HdfsWordCount {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
// 1. 创建SparkConf对象
val sparkConf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")
// 2. 创建StreamingContext对象
val ssc = new StreamingContext(sparkConf, Seconds(2))
// 3. 监控hdfs目录的数据
val textFileStream: DStream[String] = ssc.textFileStream("hdfs://node01:8020/data")
// 4. 对数据进行处理
val result: DStream[(String, Int)] = textFileStream.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
// 5. 打印结果
result.print()
// 6. 开启流式计算
ssc.start()
ssc.awaitTermination()
}
}
object Case03_CustomReceiver {
def main(args: Array[String]): Unit = {
// 1. 创建SparkConf对象,注意这里至少给两个线程,一个线程没办法执行
val sparkConf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")
// 2. 创建StreamingContext对象
val ssc = new StreamingContext(sparkConf, Seconds(1))
// 3. 调用ReceiverStream API,将自定义的Receiver传进去
val receiverStream = ssc.receiverStream(new CustomReceiver("node01", 9999))
// 4. 对数据进行处理
val result: DStream[(String, Int)] = receiverStream.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
// 5. 打印结果
result.print()
// 6. 开启流式计算
ssc.start()
ssc.awaitTermination()
}
}
/**
* 自定义Source数据源
*/
class CustomReceiver(host: String, port: Int) extends Receiver[String] (StorageLevel.MEMORY_AND_DISK_SER) with Logging {
override def onStart(): Unit = {
// 启动一个线程,开始接收数据
new Thread("custom-receiver") {
override def run(): Unit = {
receive()
}
}.start()
}
private def receive(): Unit = {
var socket: Socket = null
try {
logInfo("Connecting to " + host + ":" + port)
socket = new Socket(host, port)
logInfo("Connected to " + host + ":" + port)
val reader = new BufferedReader(new InputStreamReader(socket.getInputStream, StandardCharsets.UTF_8))
var line: String = null
while ((line = reader.readLine()) != null && !isStopped) {
store(line)
}
reader.close()
socket.close()
logInfo("Stopped receiving")
restart("Trying to connect again")
} catch {
case e: java.net.ConnectException =>
restart("Error connecting to " + host + ":" + port, e)
case t: Throwable =>
restart("Error receiving data", t)
}
}
override def onStop(): Unit = {
}
}
vim spark_flume_poll.confa1.sources = r1
a1.sinks = k1
a1.channels = c1
# source
a1.sources.r1.channels = c1
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /bigdata/install/flumedatas/spark_flume
a1.sources.r1.fileHeader = true
# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 20000
a1.channels.c1.transactionCapacity = 5000
# sink
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
a1.sinks.k1.hostname = node02
a1.sinks.k1.port = 8888
a1.sinks.k1.batchSize = 2000
cd /bigdata/install/apache-flume-1.9.0-bin/
bin/flume-ng agent -c conf -f conf/spark_flume_poll.conf -n a1 -Dflume.rootLogger=DEBUG,CONSOLE
cd /bigdata/install/flumedatas/spark_flume && vim wordcount.txthadoop spark hive spark
hadoop sqoop spark storm
<dependency>
<groupId>org.apache.sparkgroupId>
<artifactId>spark-streaming-flume_2.11artifactId>
<version>2.3.4version>
dependency>
object SparkStreamingPollFlume {
def main(args: Array[String]): Unit = {
// 1. 创建SparkConf对象,注意这里至少给两个线程,一个线程没办法执行
val sparkConf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")
// 2. 创建SparContext对象
val sc = new SparkContext(sparkConf)
sc.setLogLevel("WARN")
// 3. 创建StreamingContext对象
val ssc = new StreamingContext(sc, Seconds(5))
ssc.checkpoint("./flume")
// 4. 通过FlumeUtils调用createPollingStream方法获取flume中的数据
val pollingStream: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createPollingStream(ssc, "node02", 8888)
// 5. 获取flume中event的body
val data: DStream[String] = pollingStream.map(x => new String(x.event.getBody.array()))
// 6. 切分每一行,每个单词记为1
val wordAndOne: DStream[(String, Int)] = data.flatMap(x => x.split(" ")).map((_, 1))
// 7. 相同单词出现的次数累加
val result: DStream[(String, Int)] = wordAndOne.updateStateByKey(updateFunc)
// 8. 打印结果
result.print()
// 9. 开启流式计算
ssc.start()
ssc.awaitTermination()
}
/**
* @param currentValues 表示在当前批次每个单词出现的所有的1 (hadoop,1)、(hadoop,1)、...、(hadoop,1)
* @param historyValues 表示在之前所有批次中每个单词出现的总次数 (hadoop,100)
*/
def updateFunc(currentValues: Seq[Int], historyValues: Option[Int]): Option[Int] = {
val newValue: Int = currentValues.sum + historyValues.getOrElse(0)
Some(newValue)
}
}
vim spark_flume_push.confa1.sources = r1
a1.sinks = k1
a1.channels = c1
# source
a1.sources.r1.channels = c1
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /bigdata/install/flumedatas/spark_flume
a1.sources.r1.fileHeader = true
# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 20000
a1.channels.c1.transactionCapacity=5000
# sink
a1.sinks.k1.channel = c1
a1.sinks.k1.type = avro
# 注意这里的ip需要指定的是我们spark程序所运行的服务器的ip
a1.sinks.k1.hostname = 192.168.0.100
a1.sinks.k1.port = 8888
a1.sinks.k1.batchSize = 2000
bin/flume-ng agent -c conf -f conf/spark_flume_push.conf -n a1 -Dflume.rootLogger=DEBUG,CONSOLE
object SparkStreamingPushFlume {
def main(args: Array[String]): Unit = {
// 1. 创建SparkConf对象,注意这里至少给两个线程,一个线程没办法执行
val sparkConf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")
// 2. 创建SparContext对象
val sc = new SparkContext(sparkConf)
sc.setLogLevel("WARN")
// 3. 创建StreamingContext对象
val ssc = new StreamingContext(sc, Seconds(5))
ssc.checkpoint("./flume")
// 4. 当前应用程序部署的服务器ip地址,跟flume配置文件保持一致
val flumeStream: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createStream(ssc, "192.168.0.100", 8888, StorageLevel.MEMORY_AND_DISK)
// 5. 获取flume中event的body
val lineStream: DStream[String] = flumeStream.map(x => new String(x.event.getBody.array()))
// 6. 实现单词汇总
val result: DStream[(String, Int)] = lineStream.flatMap(x => x.split(" ")).map((_, 1))
.updateStateByKey(updateFunc)
// 7. 打印结果
result.print()
// 8. 开启流式计算
ssc.start()
ssc.awaitTermination()
}
/**
* @param currentValues 表示在当前批次每个单词出现的所有的1 (hadoop,1)、(hadoop,1)、...、(hadoop,1)
* @param historyValues 表示在之前所有批次中每个单词出现的总次数 (hadoop,100)
*/
def updateFunc(currentValues: Seq[Int], historyValues: Option[Int]): Option[Int] = {
val newValue: Int = currentValues.sum + historyValues.getOrElse(0)
Some(newValue)
}
}
spark-submit \
--master spark://node01:7077 \
--deploy-mode cluster \
--supervise \
--class com.yw.spark.example.streaming.cases.Case01_SocketWordCount \
--executor-memory 1g \
--total-executor-cores 2 \
original-spark-demo-1.0.jar
object Case04_UpdateStateByKeyWordCount {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
// 1. 创建SparkConf对象
val sparkConf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")
// 2. 创建StreamingContext对象
val ssc = new StreamingContext(sparkConf, Seconds(2))
// 3. 设置checkpoint目录
ssc.checkpoint("hdfs://node01:8020/checkpoint")
// 4. 接收Socket数据
val socketTextStream: ReceiverInputDStream[String] = ssc.socketTextStream("node01", 9999)
// 5. 对数据进行处理
val result: DStream[(String, Int)] = socketTextStream.flatMap(_.split(" ")).map((_, 1))
.updateStateByKey(updateFunc)
// 6. 打印结果
result.print()
// 7. 开启流式计算
ssc.start()
ssc.awaitTermination()
}
/**
* @param currentValues 表示在当前批次每个单词出现的所有的1 (hadoop,1)、(hadoop,1)、...、(hadoop,1)
* @param historyValues 表示在之前所有批次中每个单词出现的总次数 (hadoop,100)
*/
def updateFunc(currentValues: Seq[Int], historyValues: Option[Int]): Option[Int] = {
val newValue: Int = currentValues.sum + historyValues.getOrElse(0)
Some(newValue)
}
}
object Case05_MapWithStateWordCount {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
// 1. 创建SparkConf对象
val sparkConf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")
// 2. 创建StreamingContext对象
val ssc = new StreamingContext(sparkConf, Seconds(2))
// 3. 设置checkpoint目录
ssc.checkpoint("hdfs://node01:8020/checkpoint")
// 4. 接收Socket数据
val socketTextStream: ReceiverInputDStream[String] = ssc.socketTextStream("node01", 9999)
val initRDD: RDD[(String, Int)] = ssc.sparkContext.parallelize(List(("hadoop", 10), ("spark", 20)))
val stateSpec = StateSpec.function((time: Time, key: String, currentValue: Option[Int], historyState: State[Int]) => {
val sum: Int = currentValue.getOrElse(0) + historyState.getOption().getOrElse(0)
val output = (key, sum)
if (!historyState.isTimingOut()) {
historyState.update(sum)
}
Some(output)
}).initialState(initRDD).timeout(Durations.seconds(5))
// 5. 对数据进行处理
val result: MapWithStateDStream[String, Int, Int, (String, Int)] = socketTextStream.flatMap(_.split(" ")).map((_, 1))
.mapWithState(stateSpec)
// 6. 打印结果
result.stateSnapshots().print()
// 7. 开启流式计算
ssc.start()
ssc.awaitTermination()
}
}
object Case06_TransformWordCount {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
// 1. 创建SparkConf对象
val sparkConf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")
// 2. 创建StreamingContext对象
val ssc = new StreamingContext(sparkConf, Seconds(2))
// 3. 接收Socket数据
val socketTextStream: ReceiverInputDStream[String] = ssc.socketTextStream("node01", 9999)
// 4. 对数据进行处理
val result: DStream[(String, Int)] = socketTextStream.flatMap(_.split(" ")).map((_, 1))
.reduceByKey(_ + _)
// 5. 对DStream进行transform操作
val sortedDstream: DStream[(String, Int)] = result.transform(rdd => {
val sortedRDD: RDD[(String, Int)] = rdd.sortBy(_._2, false)
val top3: Array[(String, Int)] = sortedRDD.take(3)
top3.foreach(println)
sortedRDD
})
// 6. 打印结果
sortedDstream.print()
// 7. 开启流式计算
ssc.start()
ssc.awaitTermination()
}
}

object Case07_WindowWordCount {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
// 1. 创建SparkConf对象
val sparkConf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")
// 2. 创建StreamingContext对象
val ssc = new StreamingContext(sparkConf, Seconds(2))
// 3. 接收Socket数据
val socketTextStream: ReceiverInputDStream[String] = ssc.socketTextStream("node01", 9999)
// 4. 对数据进行处理
val result: DStream[(String, Int)] = socketTextStream.flatMap(_.split(" ")).map((_, 1))
.reduceByKeyAndWindow((a: Int, b: Int) => a + b, Seconds(12), Seconds(6))
// 5. 打印结果
result.print()
// 6. 开启流式计算
ssc.start()
ssc.awaitTermination()
}
}
关于 window 的操作还有如下方法:
object Case08_WordCountForeachRDD {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
// 1. 创建SparkConf对象,注意这里至少给两个线程,一个线程没办法执行
val sparkConf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")
// 2. 创建StreamingContext对象
val ssc = new StreamingContext(sparkConf, Seconds(1))
// 3. 接收Socket数据
val socketTextStream: ReceiverInputDStream[String] = ssc.socketTextStream("node01", 9999)
// 4. 对数据进行处理
val result: DStream[(String, Int)] = socketTextStream.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
// 5. 将结果保存到MySQL数据库中
// /*********************** 方案一 ***********************/
// result.foreachRDD(rdd => {
// // 注意这里创建的对象都是在Driver端,但真正执行是在 Executor 端,所以是有问题的
// val conn = DriverManager.getConnection("jdbc:mysql://node01:3306/test", "root", "123456")
// val statement = conn.prepareStatement(s"insert into wordcount(word, count) values(?, ?)")
//
// rdd.foreach { record =>
// statement.setString(1, record._1)
// statement.setInt(2, record._2)
// statement.execute()
// }
// statement.close()
// conn.close()
// })
// /*********************** 方案二 ***********************/
// result.foreachRDD(rdd => {
// rdd.foreach { record =>
// // 针对每一个record创建连接,效率不高
// val conn = DriverManager.getConnection("jdbc:mysql://node01:3306/test", "root", "123456")
// val statement = conn.prepareStatement(s"insert into wordcount(word, count) values(?, ?)")
//
// statement.setString(1, record._1)
// statement.setInt(2, record._2)
// statement.execute()
//
// statement.close()
// conn.close()
// }
// })
// /*********************** 方案三 ***********************/
// result.foreachRDD(rdd => {
// rdd.foreachPartition(it => {
// // 针对每一个执行器分区创建连接
// val conn = DriverManager.getConnection("jdbc:mysql://node01:3306/test", "root", "123456")
// val statement = conn.prepareStatement(s"insert into wordcount(word, count) values(?, ?)")
//
// it.foreach(record => {
// statement.setString(1, record._1)
// statement.setInt(2, record._2)
// statement.execute()
// })
//
// statement.close()
// conn.close()
// })
// })
/*********************** 方案四 ***********************/
result.foreachRDD(rdd => {
rdd.foreachPartition(it => {
// 针对每一个执行器分区创建连接,同时使用批量提交
val conn = DriverManager.getConnection("jdbc:mysql://node01:3306/test", "root", "123456")
val statement = conn.prepareStatement(s"insert into wordcount(word, count) values(?, ?)")
// 关闭自动提交
conn.setAutoCommit(false)
it.foreach(record => {
statement.setString(1, record._1)
statement.setInt(2, record._2)
// 添加到每一个批次
statement.addBatch()
})
// 批量提交该分区所有数据
statement.executeBatch()
conn.commit()
statement.close()
conn.close()
})
})
// 6. 开启流式计算
ssc.start()
ssc.awaitTermination()
}
}