• SparkStreaming在实时处理的两个场景示例


    简介

    Spark Streaming是Apache Spark生态系统中的一个组件,用于实时流式数据处理。它提供了类似于Spark的API,使开发者可以使用相似的编程模型来处理实时数据流。

    Spark Streaming的工作原理是将连续的数据流划分成小的批次,并将每个批次作为RDD(弹性分布式数据集)来处理。这样,开发者可以使用Spark的各种高级功能,如map、reduce、join等,来进行实时数据处理。Spark Streaming还提供了内置的窗口操作、状态管理、容错处理等功能,使得开发者能够轻松处理实时数据的复杂逻辑。

    Spark Streaming支持多种数据源,包括Kafka、Flume、HDFS、S3等,因此可以轻松地集成到各种数据管道中。它还能够与Spark的批处理和SQL引擎进行无缝集成,从而实现流式处理与批处理的混合使用。
    在这里插入图片描述

    本文以 TCP、kafka场景讲解spark streaming的使用

    消息队列下的信息铺抓

    类似消息队列的有redis、kafka等核心组件。
    本文以kafka为例,向kafka中实时抓取数据,

    pom.xml中添加以下依赖

    
        
        
            org.apache.spark
            spark-core_2.12
            3.2.0
        
    
        
        
            org.apache.spark
            spark-streaming_2.12
            3.2.0
        
    
        
        
            org.apache.spark
            spark-sql_2.12
            3.2.0
        
    
        
        
            org.apache.kafka
            kafka-clients
            2.8.0
        
    
        
        
            org.apache.spark
            spark-streaming-kafka-0-10_2.12
            3.2.0
        
    
        
        
            org.postgresql
            postgresql
            42.2.24
        
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    创建项目编写以下代码实现功能

    package org.example;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.function.VoidFunction;
    import org.apache.spark.sql.*;
    import org.apache.spark.sql.types.DataTypes;
    import org.apache.spark.sql.types.StructType;
    import org.apache.spark.streaming.Duration;
    import org.apache.spark.streaming.api.java.JavaDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    import org.apache.spark.streaming.kafka010.ConsumerStrategies;
    import org.apache.spark.streaming.kafka010.KafkaUtils;
    import org.apache.spark.streaming.kafka010.LocationStrategies;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.util.*;
    
    public class SparkStreamingKafka {
        public static void main(String[] args) throws InterruptedException {
            // 创建 Spark 配置
            SparkConf sparkConf = new SparkConf()
                    .setAppName("spark_kafka")
                    .setMaster("local[*]")
                    .setExecutorEnv("setLogLevel", "ERROR");//设置日志等级为ERROR,避免日志增长导致的磁盘膨胀
    
            // 创建 Spark Streaming 上下文
            JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, new Duration(2000)); // 间隔两秒扑捉一次
    
            // 创建 Spark SQL 会话
            SparkSession sparkSession = SparkSession.builder().config(sparkConf).getOrCreate();
    
    
            // 设置 Kafka 相关参数
            Map kafkaParams = new HashMap<>();
            kafkaParams.put("bootstrap.servers", "10.0.0.105:9092,10.0.0.106:9092,10.0.0.107:9092");
            kafkaParams.put("key.deserializer", StringDeserializer.class);
            kafkaParams.put("value.deserializer", StringDeserializer.class);
            kafkaParams.put("auto.offset.reset", "earliest");
            // auto.offset.reset可指定参数有
            // latest:从分区的最新偏移量开始读取消息。
            // earliest:从分区的最早偏移量开始读取消息。
            // none:如果没有有效的偏移量,则抛出异常。
            kafkaParams.put("enable.auto.commit", true);  //采用自动提交offset 的模式
            kafkaParams.put("auto.commit.interval.ms",2000);//每隔离两秒提交一次commited-offset
            kafkaParams.put("group.id", "spark_kafka"); //消费组名称
    
    
            // 创建 Kafka stream
            Collection topics = Collections.singletonList("spark_kafka"); // Kafka 主题名称
            JavaDStream> kafkaStream = KafkaUtils.createDirectStream(
                    streamingContext,
                    LocationStrategies.PreferConsistent(),
                    ConsumerStrategies.Subscribe(topics, kafkaParams)  //订阅kafka
            );
    
            //定义数据结构
            StructType schema = new StructType()
                    .add("key", DataTypes.LongType)
                    .add("value", DataTypes.StringType);
    
            kafkaStream.foreachRDD((VoidFunction>>) rdd -> {
                // 转换为 DataFrame
                Dataset df = sparkSession.createDataFrame(rdd.map(record -> {
                    return RowFactory.create(record.offset(), record.value());  //将偏移量和value聚合
                }), schema);
    
                // 写入到 PostgreSQL
                df.write()
                        //选择写入数据库的模式
                        .mode(SaveMode.Append)//采用追加的写入模式
                        //协议
                        .format("jdbc")
                        //option 参数
                        .option("url", "jdbc:postgresql://localhost:5432/postgres") // PostgreSQL 连接 URL
                        //确定表名
                        .option("dbtable", "public.spark_kafka")//指定表名
                        .option("user", "postgres") // PostgreSQL 用户名
                        .option("password", "postgres") // PostgreSQL 密码
                        .save();
            });
            // 启动 Spark Streaming
            streamingContext.start();
            // 等待 Spark Streaming 应用程序终止
            streamingContext.awaitTermination();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89

    在执行代码前,向创建名为spark_kafka的topic

    kafka-topics.sh --create --topic spark_kafka --bootstrap-server 10.0.0.105:9092,10.0.0.106:9092,10.0.0.107:9092
    
    
    • 1
    • 2

    向spark_kafka 主题进行随机推数

    kafka-producer-perf-test.sh --topic spark_kafka --thrghput 10 --num-records 10000 --record-size 100000 --producer-props bootstrap.servers=10.0.0.105:9092,10.0.0.106:9092,10.0.0.107:9092
    
    
    • 1
    • 2

    运行过程中消费的offset会一直被提交到每一个分区
    在这里插入图片描述

    此时在数据库中查看,数据已经实时落地到库中
    在这里插入图片描述

    TCP

    TCP环境下,实时监控日志的输出,可用于监控设备状态、环境变化等。当监测到异常情况时,可以实时发出警报。

    package org.example;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.function.VoidFunction;
    import org.apache.spark.sql.*;
    import org.apache.spark.sql.types.DataTypes;
    import org.apache.spark.sql.types.StructType;
    import org.apache.spark.streaming.Duration;
    import org.apache.spark.streaming.api.java.JavaDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    import org.apache.spark.streaming.kafka010.ConsumerStrategies;
    import org.apache.spark.streaming.kafka010.KafkaUtils;
    import org.apache.spark.streaming.kafka010.LocationStrategies;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.util.*;
    
    public class SparkStreamingKafka {
        public static void main(String[] args) throws InterruptedException {
            // 创建 Spark 配置
            SparkConf sparkConf = new SparkConf()
                    .setAppName("spark_kafka") // 设置应用程序名称
                    .setMaster("local[*]") // 设置 Spark master 为本地模式,[*]表示使用所有可用核心
    
                    // 设置日志等级为ERROR,避免日志增长导致的磁盘膨胀
                    .setExecutorEnv("setLogLevel", "ERROR");
    
            // 创建 Spark Streaming 上下文
            JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, new Duration(2000)); // 间隔两秒扑捉一次
    
            // 创建 Spark SQL 会话
            SparkSession sparkSession = SparkSession.builder().config(sparkConf).getOrCreate();
    
    
            // 设置 Kafka 相关参数
            Map kafkaParams = new HashMap<>();
            kafkaParams.put("bootstrap.servers", "10.0.0.105:9092,10.0.0.106:9092,10.0.0.107:9092"); // Kafka 服务器地址
            kafkaParams.put("key.deserializer", StringDeserializer.class); // key 反序列化器类
            kafkaParams.put("value.deserializer", StringDeserializer.class); // value 反序列化器类
            kafkaParams.put("auto.offset.reset", "earliest"); // 从最早的偏移量开始消费消息
            kafkaParams.put("enable.auto.commit", true);  // 采用自动提交 offset 的模式
            kafkaParams.put("auto.commit.interval.ms", 2000); // 每隔两秒提交一次 committed-offset
            kafkaParams.put("group.id", "spark_kafka"); // 消费组名称
    
    
            // 创建 Kafka stream
            Collection topics = Collections.singletonList("spark_kafka"); // Kafka 主题名称
            JavaDStream> kafkaStream = KafkaUtils.createDirectStream(
                    streamingContext,
                    LocationStrategies.PreferConsistent(),
                    ConsumerStrategies.Subscribe(topics, kafkaParams)  // 订阅 Kafka
            );
    
            // 定义数据结构
            StructType schema = new StructType()
                    .add("key", DataTypes.LongType)
                    .add("value", DataTypes.StringType);
    
            kafkaStream.foreachRDD((VoidFunction>>) rdd -> {
                // 转换为 DataFrame
                Dataset df = sparkSession.createDataFrame(rdd.map(record -> {
                    return RowFactory.create(record.offset(), record.value());  // 将偏移量和 value 聚合
                }), schema);
    
                // 写入到 PostgreSQL
                df.write()
                        // 选择写入数据库的模式
                        .mode(SaveMode.Append) // 采用追加的写入模式
                        // 协议
                        .format("jdbc")
                        // option 参数
                        .option("url", "jdbc:postgresql://localhost:5432/postgres") // PostgreSQL 连接 URL
                        // 确定表名
                        .option("dbtable", "public.spark_kafka") // 指定表名
                        .option("user", "postgres") // PostgreSQL 用户名
                        .option("password", "postgres") // PostgreSQL 密码
                        .save();
            });
            // 启动 Spark Streaming
            streamingContext.start();
            // 等待 Spark Streaming 应用程序终止
            streamingContext.awaitTermination();
        }
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88

    在10.0.0.108 打开9999端口键入数值 ,使其被spark接收到并进行运算

    nc -lk 9999
    
    
    • 1
    • 2

    开启端口可以键入数值 此时会在IDEA的控制台显示其计算值
    在这里插入图片描述

  • 相关阅读:
    计算机网络之无线网络与移动网络
    NNI自动调参工具
    msys2 + MSVC(VS2019)编译ffmpeg6.0源码
    基于java+ssm的在线投票管理系统-计算机毕业设计
    Kotlin笔记(七):协程
    PID学习
    ​调用Lua脚本tostring(xxx)报attempt to call a nil value (global ‘tostring‘
    洛谷P2520 裴蜀定理,多限制的不定方程解的构造
    小红书信息流广告投放怎么收费?投信息流广告效果怎么样
    Java深拷贝与浅拷贝
  • 原文地址:https://blog.csdn.net/weixin_73350116/article/details/136416715