• Spark【Spark Streaming】


    1、基本数据源

    1.1、文件流

    spark Shell 下运行:

    1. [lyh@hadoop102 spark-yarn-3.2.4]$ spark-shell
    2. Setting default log level to "WARN".
    3. To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    4. 2022-09-08 08:56:21,875 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    5. Spark context Web UI available at http://hadoop102:4040
    6. Spark context available as 'sc' (master = local[*], app id = local-1662598583370).
    7. Spark session available as 'spark'.
    8. Welcome to
    9. ____ __
    10. / __/__ ___ _____/ /__
    11. _\ \/ _ \/ _ `/ __/ '_/
    12. /___/ .__/\_,_/_/ /_/\_\ version 3.2.4
    13. /_/
    14. Using Scala version 2.12.15 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_241)
    15. Type in expressions to have them evaluated.
    16. Type :help for more information.
    17. scala> import org.apache.spark.streaming._
    18. import org.apache.spark.streaming._
    19. scala> val ssc = new StreamingContext(sc,Seconds(20))
    20. ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@379899f4
    21. scala> val lines = ssc.textFileStream("file:///home/lyh/streaming/logfile")
    22. lines: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.MappedDStream@531245fe
    23. scala> val kv = lines.map((_,1)).reduceByKey(_+_)
    24. kv: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.ShuffledDStream@c207c10
    25. scala> kv.print()
    26. scala> ssc.start()
    27. ------------------------------------------
    28. Time: 1662598860000 ms
    29. -------------------------------------------
    30. -------------------------------------------
    31. Time: 1662598880000 ms
    32. -------------------------------------------
    33. -------------------------------------------
    34. Time: 1662598900000 ms
    35. -------------------------------------------
    36. (c#,1)
    37. (hh,1)
    38. (h,1)
    39. (javafx,1)
    40. (spark,1)
    41. (hadoop,1)
    42. (js,1)
    43. (java,1)
    44. (s,1)
    45. (c,1)

    执行后立即新建终端在  /home/lyh/streaming/logfile 目录下创建文件并写入数据

    1.2、Socket 套接字流

    1. // todo 创建环境对象
    2. val conf = new SparkConf()
    3. conf.setAppName("word count").setMaster("local[*]")
    4. val ssc = new StreamingContext(conf,Seconds(3))
    5. // todo 逻辑处理
    6. // 获取端口数据(Socket)
    7. val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
    8. val words: DStream[String] = lines.flatMap(_.split(" "))
    9. val word: DStream[(String,Int)] = words.map((_, 1))
    10. val wordCount: DStream[(String,Int)] = word.reduceByKey(_ + _)
    11. wordCount.print()
    12. // todo 关闭环境
    13. // 由于SparkStreaming的采集器是长期运行的,所以不能直接关闭
    14. // 而且main方法的关闭也会使SparkStreaming的采集器关闭
    15. ssc.start()
    16. // 等待采集器关闭
    17. ssc.awaitTermination()

    启动 NetCat

    1. > nc -lp 9999
    2. > hello world
    3. > hello spark
    4. > ...

    运行结果: 

    1.3、自定义 Socket 数据源

    通过自定义 Socket 实现数据源不断产生数据

    1. import java.io.PrintWriter
    2. import java.net.{ServerSocket, Socket}
    3. import scala.io.Source
    4. /**
    5. * 通过自定义的Socket来不断给客户端发送数据
    6. */
    7. object MySocketReceiver {
    8. def index(length: Int): Int = {
    9. val rdm = new java.util.Random()
    10. rdm.nextInt(length)
    11. }
    12. def main(args: Array[String]): Unit = {
    13. val fileName = "input/1.txt"
    14. val lines: List[String] = Source.fromFile(fileName).getLines().toList
    15. val listener: ServerSocket = new ServerSocket(9999)
    16. while(true){
    17. val socket: Socket = listener.accept()
    18. new Thread(){
    19. override def run(){
    20. val out: PrintWriter = new PrintWriter(socket.getOutputStream,true)
    21. while (true){
    22. Thread.sleep(1000)
    23. val content = lines(index(lines.length)) // 源源不断,每次打印list的第(1~length)随机行
    24. println(content)
    25. out.write(content + '\n')
    26. out.flush()
    27. }
    28. socket.close()
    29. }
    30. }.start()
    31. }
    32. }
    33. }

    定义一个处理器接收自定义数据源端口发送过来的数据。

    1. def main(args: Array[String]): Unit = {
    2. // todo 创建环境对象
    3. val conf = new SparkConf()
    4. conf.setAppName("word count").setMaster("local[*]")
    5. val ssc = new StreamingContext(conf,Seconds(3))
    6. // todo 逻辑处理
    7. // 获取端口数据(Socket)
    8. val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
    9. val words: DStream[String] = lines.flatMap(_.split(" "))
    10. val word: DStream[(String,Int)] = words.map((_, 1))
    11. val wordCount: DStream[(String,Int)] = word.reduceByKey(_ + _)
    12. wordCount.print()
    13. // todo 关闭环境
    14. // 由于SparkStreaming的采集器是长期运行的,所以不能直接关闭
    15. ssc.start()
    16. // 等待采集器关闭
    17. ssc.awaitTermination()
    18. }

    先运行我们的数据源,再运行处理器:

    处理器:

    1.4、RDD 队列流

    1. import org.apache.spark.SparkConf
    2. import org.apache.spark.rdd.RDD
    3. import org.apache.spark.streaming.dstream.{DStream, InputDStream}
    4. import org.apache.spark.streaming.{Seconds, StreamingContext}
    5. import scala.collection.mutable
    6. object SparkStreaming02_RDDStream {
    7. def main(args: Array[String]): Unit = {
    8. // 1. 初始化配置信息
    9. val conf = new SparkConf()
    10. conf.setAppName("rdd Stream").setMaster("local[*]")
    11. // 2.初始化SparkStreamingContext
    12. val ssc = new StreamingContext(conf,Seconds(4))
    13. // 3.创建RDD队列
    14. val rddQueue: mutable.Queue[RDD[Int]] = new mutable.Queue[RDD[Int]]()
    15. // 4.创建QueueInputStream
    16. // oneAtATime = true 默认,一次读取队列里面的一个数据
    17. // oneAtATime = false, 按照设定的时间,读取队列里面数据
    18. val inputStream: InputDStream[Int] = ssc.queueStream(rddQueue,oneAtATime = false)
    19. // 5. 处理队列中的RDD数据
    20. val sumStream: DStream[Int] = inputStream.reduce(_ + _)
    21. // 6. 打印结果
    22. sumStream.print()
    23. // 7.启动任务
    24. ssc.start()
    25. // 8.向队列中放入RDD
    26. for(i <- 1 to 5){
    27. rddQueue += ssc.sparkContext.makeRDD(1 to 5)
    28. Thread.sleep(2000)
    29. }
    30. // 9. 等待数据源进程停止后关闭
    31. ssc.awaitTermination()
    32. }
    33. }

    2、高级数据源

    2.1、Kafka 数据源

    2.1.1、消费者程序处理流数据

    1. import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
    2. import org.apache.kafka.common.serialization.StringDeserializer
    3. import org.apache.spark.SparkConf
    4. import org.apache.spark.streaming.dstream.{DStream, InputDStream}
    5. import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
    6. import org.apache.spark.streaming.{Seconds, StreamingContext}
    7. object SparkStreaming03_Kafka {
    8. def main(args: Array[String]): Unit = {
    9. val conf = new SparkConf().setMaster("local[*]").setAppName("kafka source")
    10. val ssc = new StreamingContext(conf,Seconds(3))
    11. // 定义Kafka参数: kafka集群地址、消费者组名称、key序列化、value序列化
    12. val kafkaPara: Map[String,Object] = Map[String,Object](
    13. ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
    14. ConsumerConfig.GROUP_ID_CONFIG ->"lyh",
    15. ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
    16. ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer]
    17. )
    18. // 读取Kafka数据创建DStream
    19. val kafkaDStream: InputDStream[ConsumerRecord[String,String]] = KafkaUtils.createDirectStream[String,String](
    20. ssc,
    21. LocationStrategies.PreferConsistent, //优先位置
    22. ConsumerStrategies.Subscribe[String,String](Set("testTopic"),kafkaPara) // 消费策略:(订阅多个主题,配置参数)
    23. )
    24. // 将每条消息的KV取出
    25. val valueDStream: DStream[String] = kafkaDStream.map(_.value())
    26. // 计算WordCount
    27. valueDStream.flatMap(_.split(" "))
    28. .map((_,1))
    29. .reduceByKey(_+_)
    30. .print()
    31. // 开启任务
    32. ssc.start()
    33. ssc.awaitTermination()
    34. }
    35. }

    2.1.2、生产者生产数据

    (1)kafka 端生产数据

    启动 Kafka 集群

    创建 Topic(指定一个分区三个副本): 

    kafka-topics.sh --bootstrap-server hadoop102:9092 --topic  --create --partitions 1 --replication-factor 3 

     查看是否生成 Topic:

    kafka-topics.sh --bootstrap-server hadoop102:9092 --list

    生产者生产数据:

    1. > kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic
    2. > hello world
    3. > hello spark
    4. > ...
    (2)编写生产者程序
    1. package com.lyh
    2. import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
    3. import org.apache.kafka.common.serialization.StringDeserializer
    4. import org.apache.spark.SparkConf
    5. import org.apache.spark.streaming.dstream.{DStream, InputDStream}
    6. import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
    7. import org.apache.spark.streaming.{Seconds, StreamingContext}
    8. object SparkStreaming03_Kafka {
    9. def main(args: Array[String]): Unit = {
    10. val conf = new SparkConf().setMaster("local[*]").setAppName("kafka source")
    11. val ssc = new StreamingContext(conf,Seconds(3))
    12. // 定义Kafka参数: kafka集群地址、消费者组名称、key序列化、value序列化
    13. val kafkaPara: Map[String,Object] = Map[String,Object](
    14. ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
    15. ConsumerConfig.GROUP_ID_CONFIG ->"lyh",
    16. ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
    17. ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer]
    18. )
    19. // 读取Kafka数据创建DStream
    20. val kafkaDStream: InputDStream[ConsumerRecord[String,String]] = KafkaUtils.createDirectStream[String,String](
    21. ssc,
    22. LocationStrategies.PreferConsistent, //优先位置
    23. ConsumerStrategies.Subscribe[String,String](Set("testTopic"),kafkaPara) // 消费策略:(订阅多个主题,配置参数)
    24. )
    25. // 将每条消息的KV取出
    26. val valueDStream: DStream[String] = kafkaDStream.map(_.value())
    27. // 计算WordCount
    28. valueDStream.flatMap(_.split(" "))
    29. .map((_,1))
    30. .reduceByKey(_+_)
    31. .print()
    32. // 开启任务
    33. ssc.start()
    34. ssc.awaitTermination()
    35. }
    36. }

    3、转换操作

    3.1、无状态转换操作

    3.2、有状态转换操作

    3.1.1、滑动窗口转换操作

    1. import org.apache.spark.SparkConf
    2. import org.apache.spark.streaming.dstream.DStream
    3. import org.apache.spark.streaming.{Seconds, StreamingContext}
    4. object SparkStreaming05_Window {
    5. def main(args: Array[String]): Unit = {
    6. val conf = new SparkConf().setMaster("local[*]").setAppName("sparkStreaming window")
    7. val ssc = new StreamingContext(conf,Seconds(3))
    8. val lines:DStream[String] = ssc.socketTextStream("localhost", 9999)
    9. val word_kv = lines.map((_, 1))
    10. /**
    11. * 收集器收集RDD合成DStream: 3s 窗口范围: 12s 窗口滑动间隔: 6s/次
    12. * 1. windowLength:表示滑动窗口的长度,即窗口内包含的数据的时间跨度。它是一个Duration对象,用于指定窗口的时间长度。
    13. * 2. slideInterval:表示滑动窗口的滑动间隔,即每隔多长时间将窗口向右滑动一次。同样是一个Duration对象。
    14. * 返回一个新的 DStream
    15. **/
    16. val wordToOneByWindow:DStream[(String,Int)] = word_kv.window(Seconds(12), Seconds(6))
    17. // 窗口每滑动一次(6s),对窗口内的数据进行一次聚合操作.
    18. val res: DStream[(String,Int)] = wordToOneByWindow.reduceByKey(_ + _)
    19. res.print()
    20. ssc.start()
    21. ssc.awaitTermination()
    22. }
    23. }

    3.1.2、updateStateByKey

    1. import org.apache.spark.SparkConf
    2. import org.apache.spark.streaming.dstream.DStream
    3. import org.apache.spark.streaming.{Seconds, StreamingContext}
    4. /**
    5. * DStream 有状态转换操作之 updateStateByKey(func) 转换操作
    6. */
    7. object SparkStreaming04_State {
    8. def main(args: Array[String]): Unit = {
    9. val conf = new SparkConf().setMaster("local[*]").setAppName("kafka state")
    10. val ssc = new StreamingContext(conf,Seconds(3))
    11. /**
    12. * 设置检查点目录的作用是为了确保Spark Streaming应用程序的容错性和可恢复性。
    13. * 在Spark Streaming应用程序运行过程中,它会将接收到的数据分成一批批进行处理。
    14. * 通过设置检查点目录,Spark Streaming会定期将当前的处理状态、接收到的数据偏移量等信息保存到可靠的存储系统中,
    15. * 比如分布式文件系统(如HDFS)或云存储服务(如Amazon S3)。
    16. * 一旦应用程序出现故障或崩溃,它可以从最近的检查点中恢复状态,并从上次处理的位置继续处理数据,从而确保数据的完整性和一致性。
    17. */
    18. //检查点的路径如果是本地路径要+ file:// 否则认为是 hdfs路径 / 开头
    19. ssc.checkpoint("file:///D://IdeaProject/SparkStudy/data/") //设置检查点,检查点具有容错机制
    20. val lines: DStream[String] = ssc.socketTextStream("localhost",9999)
    21. val word_kv = lines.map((_, 1))
    22. val stateDStream: DStream[(String, Int)] = word_kv.updateStateByKey(
    23. /** 参数是一个函数
    24. 1. Seq[Int]: 当前key对应的所有value值的集合,因为我们的value是Int,所以这里也是Int
    25. 2. Option[Int]: 当前key的历史状态,对于wordCount,历史值就是上一个DStream中这个key的value计算结果(求和结果)
    26. Option 是 Scala 中用来表示可能存在或可能不存在的值的容器,是一种避免空引用(null reference)问题的模式。
    27. Option[Int] 有两个可能的实例:
    28. (1) Some(value: Int):表示一个包含 Int 类型值的 Option。
    29. (2) None:表示一个空的 Option,不包含任何值。
    30. **/
    31. (values: Seq[Int], state: Option[Int]) => {
    32. val currentCount = values.foldLeft(0)(_ + _)
    33. val previousCount = state.getOrElse(0)
    34. Option(currentCount + previousCount)
    35. }
    36. )
    37. stateDStream.print()
    38. stateDStream.saveAsTextFiles("./out") //输出结果保存到 文本文件中
    39. ssc.start()
    40. ssc.awaitTermination()
    41. }
    42. }

    4、输出操作

    4.1、输出到文本文件

    上面 3.1.2 中就保存DStream输出到了本地:

    stateDStream.saveAstextFiles("./out")

    4.2、输出到MySQL数据库

    1. import org.apache.spark.SparkConf
    2. import org.apache.spark.rdd.RDD
    3. import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
    4. import org.apache.spark.streaming.{Seconds, StreamingContext}
    5. import java.sql.{Connection, PreparedStatement}
    6. object NetWorkWordCountStateMySQL {
    7. def main(args: Array[String]): Unit = {
    8. val updateFunc = (values: Seq[Int],state: Option[Int]) => {
    9. val currentCount = values.foldLeft(0)(_+_)
    10. val previousCount = state.getOrElse(0)
    11. Some(currentCount + previousCount)
    12. }
    13. val conf = new SparkConf().setMaster("local[*]").setAppName("state mysql")
    14. val ssc = new StreamingContext(conf,Seconds(5))
    15. // file:\\ 代表本地文件系统 如果用的是 /user/... 这种形式是 HDFS 文件系统 需要启动Hadoop集群
    16. ssc.checkpoint("file:\\D:\\IdeaProjects\\SparkStudy\\data\\state")
    17. val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
    18. val word_kv: DStream[(String, Int)] = lines.flatMap(_.split(" ").map((_, 1))).reduceByKey(_ + _)
    19. val stateDStream: DStream[(String, Int)] = word_kv.updateStateByKey[Int](updateFunc)
    20. stateDStream.print()
    21. stateDStream.foreachRDD( rdd=> {
    22. def func(records: Iterator[(String,Int)]): Unit ={
    23. var conn: Connection = null
    24. var stmt: PreparedStatement = null
    25. try{
    26. conn = DBUtils.getConnection("jdbc:mysql://127.0.0.1:3306/spark","root","Yan1029.")
    27. records.foreach(p=>{
    28. val sql = "insert into wordcount values (?,?)"
    29. stmt = conn.prepareStatement(sql)
    30. stmt.setString(1,p._1.trim)
    31. stmt.setInt(2,p._2)
    32. stmt.executeUpdate() //不executeUpdate就不会写入数据库
    33. })
    34. }catch {
    35. case e: Exception => e.printStackTrace()
    36. }finally {
    37. // if (stmt!=null) stmt.close()
    38. // DBUtils.close()
    39. }
    40. }
    41. val repartitionedRDD: RDD[(String,Int)] = rdd.repartition(3) //扩大分区用 repartition
    42. repartitionedRDD.foreachPartition(func)
    43. })
    44. ssc.start()
    45. ssc.awaitTermination()
    46. }
    47. }

    运行结果:

    5、优雅的关闭和恢复数据

    5.1、关闭SparkStreaming

            流式任务通常都需要7*24小时执行,但是有时涉及到升级代码需要主动停止程序,但是分布式程序,没办法做到一个个进程去杀死,所以配置优雅的关闭就显得至关重要了。
           关闭方式:我们通常使用外部文件系统来控制内部程序关闭。

    1. package com.lyh
    2. import org.apache.hadoop.conf.Configuration
    3. import org.apache.hadoop.fs.{FileSystem, Path}
    4. import org.apache.spark.SparkConf
    5. import org.apache.spark.streaming.dstream.DStream
    6. import org.apache.spark.streaming.{Seconds, StreamingContext, StreamingContextState}
    7. import java.net.URI
    8. object SparkStreaming06_Close {
    9. def main(args: Array[String]): Unit = {
    10. val conf = new SparkConf().setMaster("local[*]").setAppName("sparkStreaming close")
    11. val ssc = new StreamingContext(conf,Seconds(3))
    12. val lines:DStream[String] = ssc.socketTextStream("localhost", 9999)
    13. val word_kv = lines.map((_, 1))
    14. word_kv.print()
    15. ssc.start()
    16. // 再创建一个线程去关闭
    17. new Thread(new MonitorStop(ssc)).start()
    18. ssc.awaitTermination() //阻塞当前main线程
    19. }
    20. }
    21. class MonitorStop(ssc: StreamingContext) extends Runnable{
    22. override def run(): Unit = {
    23. while (true){ // 一直轮询判断
    24. Thread.sleep(5000) //每5s检查一遍
    25. val fs: FileSystem = FileSystem.get(new URI("hdfs://hadoop102:9000"),new Configuration(),"lyh")
    26. val exists: Boolean = fs.exists(new Path("hdfs://hadoop102:9000/stopSpark"))
    27. if (exists) { //如果比如(MySQL出现了一行数据、Zookeeper的某个节点出现变化、hdfs是否存在某个目录...)就关闭
    28. val state: StreamingContextState = ssc.getState()
    29. if (state == StreamingContextState.ACTIVE){
    30. // 优雅地关闭-处理完当前的数据再关闭
    31. // 计算节点不再接受新的数据,而是把现有的数据处理完毕,然后关闭
    32. ssc.stop(true,true)
    33. System.exit(0)
    34. }
    35. }
    36. }
    37. }
    38. }

    5.2、恢复检查点的数据

    使用 getActiveOrCreate 的方法来对上一个失败的 Spark 任务进行数据恢复(通过检查点来进行恢复)

    方法说明:

            若Application为首次重启,将创建一个新的StreamingContext实例;如果Application从失败中重启,从checkpoint目录导入checkpoint数据来重新创建StreamingContext实例。

    1. import org.apache.spark.SparkConf
    2. import org.apache.spark.streaming.dstream.DStream
    3. import org.apache.spark.streaming.{Seconds, StreamingContext, StreamingContextState}
    4. import java.net.URI
    5. object SparkStreaming07_Resume {
    6. def main(args: Array[String]): Unit = {
    7. //好处:若Application为首次重启,将创建一个新的StreamingContext实例;如果Application从失败中重启,从checkpoint目录导入checkpoint数据来重新创建StreamingContext实例。
    8. val ssc: StreamingContext = StreamingContext.getActiveOrCreate("file:\\D:\\IdeaProjects\\SparkStudy\\data\\state", () => {
    9. val conf = new SparkConf().setMaster("local[*]").setAppName("sparkStreaming resume")
    10. val ssc = new StreamingContext(conf, Seconds(3))
    11. val lines: DStream[String] = ssc.socketTextStream("localhost", 9999)
    12. val word_kv = lines.map((_, 1))
    13. word_kv.print()
    14. ssc
    15. })
    16. // 依然设置检查点 防止application失败后丢失数据
    17. ssc.checkpoint("file:\\D:\\IdeaProjects\\SparkStudy\\data\\state")
    18. ssc.start()
    19. ssc.awaitTermination() //阻塞当前main线程
    20. }
    21. }

  • 相关阅读:
    你写过的最蠢的代码是?——前端篇
    牛客网:迷宫问题
    Vue3引入腾讯地图,点击坐标后实时获取经纬度
    Angular 学习 之 Hello World !
    Docker实战技巧(一):Kubernetes基础操作实战
    自然语言处理 | 语言模型(LM) 浅析
    CSS 笔记(十三):常用单位 & 适配方案(移动端)
    DOM对非表单元素与表单元素的属性操作
    React@16.x(25)useReducer
    推荐系统-指标:ctr、cvr
  • 原文地址:https://blog.csdn.net/m0_64261982/article/details/133964098