• 2023_Spark_实验十九:SparkStreaming入门案例


    SparkStreaming入门案例

    一、准备工作


    • 实验环境:netcat
    • 安装nc:yum install -y nc

    二、任务分析


    将nc作为服务器端,用户产生数据;启动sparkstreaming案例中的客户端程序,监听服务器端发送过来的数据,并对其数据进行词频统计,即为流式的wordcount入门程序

    三、官网案例


    • 启动nc作为服务器端,执行:nc -l 1234,并输入测试数据,如图所示:
    • 启动客户端,执行:
      bin/run-example streaming.NetworkWordCount localhost 1234
    注意)如果要执行本例,必须确保机器 cpu 核数大于 2

    四、开发NetWordCount

    1. 创建maven工程
    2. 添加maven依赖,即在pom.xml中添加streamming的依赖,如下(如果之前实验已经添加,就不用再添加,如果之前未添加,则需要添加该依赖)
    1. <dependency>
    2. <groupId>org.apache.sparkgroupId>
    3. <artifactId>spark-streaming_2.12artifactId>
    4. <version>2.1.1version>
    5. dependency>

    3.开发NetWordCount程序
    1. import org.apache.spark.SparkConf
    2. import org.apache.spark.storage.StorageLevel
    3. import org.apache.spark.streaming.dstream.{DStreamReceiverInputDStream}
    4. import org.apache.spark.streaming.{SecondsStreamingContext}
    5. object StreamingTest {
    6. def main(args: Array[String]): Unit = {
    7. val sparkConf = newSparkConf().setMaster("local[2]").setAppName("StreamingTest")
    8. val streamingContext = new StreamingContext(sparkConf, Seconds(5))
    9. // 创建DStream对象,并链接到nc服务器端
    10. val ris: ReceiverInputDStream[String] = streamingContext.socketTextStream("192.168.245.110"1234,StorageLevel.MEMORY_AND_DISK)
    11. // 采集数据,并处理数据
    12. val ds: DStream[String] = ris.flatMap(_.split(" "))
    13. println(ris)
    14. // 统计单词
    15. val resultDS: DStream[(String, Int)] = ds.map(x => (x, 1)).reduceByKey(_ + _)
    16. // 打印结果
    17. resultDS.print()
    18. // 启动实时计算
    19. streamingContext.start()
    20. // 等待计算结束
    21. streamingContext.awaitTermination()
    22. }
    23. }

    4.先在虚拟机上启动nc服务器:nc -l 1234,并输入测试数据,如图
    5.然后运行程序
    6.运行结果如下
    参考:

  • 相关阅读:
    1.6HTML的表格和列表
    Python之哈希表-哈希表原理
    Java教程:如何利用UDP实现群聊聊天室?
    Java内部类 (详细讲述java内部类)
    计算机毕业设计node+vue基于微信小程序的乐团团购系统的设计与实现
    nginx 配置静态网页
    从租完ecs云服务器 使用docker建立用户 全过程
    android开发工作笔记
    【Linux网络编程】基础API
    Redis高可用部署架构
  • 原文地址:https://blog.csdn.net/pblh123/article/details/133856058