• ubuntu20安装Spark和pyspark的简单使用


    简单介绍

    1,介绍

    Hadoop存在如下一些缺点:表达能力有限、磁盘IO开销大、延迟高、任务之间的衔接涉及IO开销、在前一个任务执行完成之前,其他任务就无法开始,难以胜任复杂、多阶段的计算任务

    Spark在借鉴Hadoop MapReduce优点的同时,很好地解决了MapReduce所面临的问题。
    相比于Hadoop MapReduce,Spark主要具有如下优点:
    Spark的计算模式也属于MapReduce,但不局限于Map和Reduce操作,还提供了多种数据集操作类型,编程模型比Hadoop MapReduce更灵活
    Spark提供了内存计算,可将中间结果放到内存中,对于迭代运算效率更高
    Spark基于DAG的任务调度执行机制,要优于Hadoop MapReduce的迭代执行机制

    核心模块

    Spark Core:Spark Core中提供了Spark最基础与最核心的功能。Spark其他的功能如:Spark SQL,Spark Streaming,GraphX, MLlib都是在Spark Core的基础上进行扩展的
    Spark SQL:Spark SQL是Spark用来操作结构化数据的组件。通过Spark SQL,用户可以使用SQL或者Apache Hive版本的SQL方言(HQL)来查询数据。
    Spark Streaming:Spark Streaming是Spark平台上针对实时数据进行流式计算的组件,提供了丰富的处理数据流的API。
    Spark Mllib:MLlib是Spark提供的一个机器学习算法库。MLlib不仅提供了模型评估、数据导入等额外的功能,还提供了一些更底层的机器学习原语。
    Spark GraphX:GraphX是Spark面向图计算提供的框架与算法库。


    Spark Core - 基本概念

    RDD:是Resillient Distributed Dataset(弹性分布式数据集)的简称,是分布式内存的一个抽象概念,提供了一种高度受限的共享内存模型
    DAG:是Directed Acyclic Graph(有向无环图)的简称,反映RDD之间的依赖关系
    Executor:是运行在工作节点(WorkerNode)的一个进程,负责运行Task
    应用(Application):用户编写的Spark应用程序
    任务( Task ):运行在Executor上的工作单元
    作业( Job ):一个作业包含多个RDD及作用于相应RDD上的各种操作
    阶段( Stage ):是作业的基本调度单位,一个作业会分为多组任务,每组任务被称为阶段,或者也被称为任务集合,代表了一组关联的、相互之间没有Shuffle依赖关系的任务组成的任务集

    Spark计算框架为了能够进行高并发和高吞吐的数据处理,封装了三大数据结构,用于处理不同的应用场景。三大数据结构分别是:

    • RDD : 弹性分布式数据集
    • 累加器:分布式共享只写变量
    • 广播变量:分布式共享只读变量

    Spark Core - 运行流程

    (1)首先为应用构建起基本的运行环境,即由Driver创建一个SparkContext,进行资源的申请、任务的分配和监控
    (2)资源管理器为Executor分配资源,并启动Executor进程
    (3)SparkContext根据RDD的依赖关系构建DAG图,DAG图提交给DAGScheduler解析成Stage,然后把一个个TaskSet提交给底层调度器TaskScheduler处理;Executor向SparkContext申请Task,Task Scheduler将Task发放给Executor运行,并提供应用程序代码
    (4)Task在Executor上运行,把执行结果反馈给TaskScheduler,然后反馈给DAGScheduler,运行完毕后写入数据并释放所有资源
    在这里插入图片描述


    Spark Core - RDD

    RDD 设计背景

    许多迭代式算法(比如机器学习、图算法等)和交互式数据挖掘工具,共同之处是,不同计算阶段之间会重用中间结果
    目前的MapReduce框架都是把中间结果写入到稳定存储(比如磁盘)中,带来了大量的数据复制、磁盘IO和序列化开销
    RDD就是为了满足这种需求而出现的,它提供了一个抽象的数据架构,我们不必担心底层数据的分布式特性,只需将具体的应用逻辑表达为一系列转换处理,不同RDD之间的转换操作形成依赖关系,可以实现管道化,避免中间数据存储

    • 一个RDD就是一个分布式对象集合,本质上是一个只读的分区记录集合,每个RDD可分成多个分区,每个分区就是一个数据集片段,并且一个RDD的不同分区可以被保存到集群中不同的节点上,从而可以在集群中的不同节点上进行并行计算

    • RDD提供了一种高度受限的共享内存模型,即RDD是只读的记录分区的集合,不能直接修改,只能基于稳定的物理存储中的数据集创建RDD,或者通过在其他RDD上执行确定的转换操作(如map、join和group by)而创建得到新的RDD

    • RDD提供了一组丰富的操作以支持常见的数据运算,分为“动作”(Action)和“转换”(Transformation)两种类型

    • RDD提供的转换接口都非常简单,都是类似map、filter、groupBy、join等粗粒度的数据转换操作,而不是针对某个数据项的细粒度修改(不适合网页爬虫)

    • 表面上RDD的功能很受限、不够强大,实际上RDD已经被实践证明可以高效地表达许多框架的编程模型(比如MapReduce、SQL、Pregel)

    • Spark提供了RDD的API,程序员可以通过调用API实现对RDD的各种操作

      1、转换操作

    操作含义
    filter(func)筛选出满足函数func的元素,并返回一个新的数据集
    map(func)将每个元素传递到函数func中,并将结果返回为一个新的数据集
    flatMap(func)与map()相似,但每个输入元素都可以映射到0或多个输出结果
    groupByKey()应用于(K,V)键值对的数据集时,返回一个新的(K, Iterable)形式的数据集
    reduceByKey(func)应用于(K,V)键值对的数据集时,返回一个新的(K, V)形式的数据集,其中每个值是将每个key传递到函数func中进行聚合后的结果

    2、行动操作

    行动操作是真正触发计算的地方。Spark程序执行到行动操作时,才会执行真正的计算,从文件中加载数据,完成一次又一次转换操作,最终,完成行动操作得到结果。

    操作含义
    count()返回数据集中的元素个数
    collect()以数组的形式返回数据集中的所有元素
    first()返回数据集中的第一个元素
    take(n)以数组的形式返回数据集中的前n个元素
    reduce(func)通过函数func(输入两个参数并返回一个值)聚合数据集中的元素
    foreach(func)将数据集中的每个元素传递到函数func中运行

    3、惰性机制

    所谓的“惰性机制”是指,整个转换过程只是记录了转换的轨迹,并不会发生真正的计算,只有遇到行动操作时,才会触发“从头到尾”的真正的计算。这里给出一段简单的语句来解释Spark的惰性机制:

    lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")
    lineLengths = lines.map(lambda s:len(s))
    totalLength = lineLengths.reduce(lambda a,b:a+b)
    print(totalLength)
    
    • 1
    • 2
    • 3
    • 4

    4、持久化

    在Spark中,RDD采用惰性求值的机制,每次遇到行动操作,都会从头开始执行计算。每次调用行动操作,都会触发一次从头开始的计算。这对于迭代计算而言,代价是很大的,迭代计算经常需要多次重复使用同一组数据

    >>> list = ["Hadoop","Spark","Hive"]
    >>> rdd = sc.parallelize(list)
    >>> print(rdd.count()) //行动操作,触发一次真正从头到尾的计算
    3
    >>> print(','.join(rdd.collect())) //行动操作,触发一次真正从头到尾的计算
    Hadoop,Spark,Hive
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    可以通过持久化(缓存)机制避免这种重复计算的开销
    可以使用persist()方法对一个RDD标记为持久化
    之所以说“标记为持久化”,是因为出现persist()语句的地方,并不会马上计算生成RDD并把它持久化,而是要等到遇到第一个行动操作触发真正计算以后,才会把计算结果进行持久化
    持久化后的RDD将会被保留在计算节点的内存中被后面的行动操作重复使用

    Spark Core - RDD 分区

    RDD分区的一个原则是使得分区的个数尽量等于集群中的CPU核心(core)数目。

    对于不同的Spark部署模式而言(本地模式、Standalone模式、YARN模式、Mesos模式),都可以通过设置spark.default.parallelism这个参数的值,来配置默认的分区数目,一般而言:
    本地模式:默认为本地机器的CPU数目,若设置了local[N],则默认为N
    Apache Mesos:默认的分区数为8
    Standalone或YARN:在“集群中所有CPU核心数目总和”和“2”二者中取较大值作为默认值

    创建RDD时手动指定分区个数
    使用reparititon方法重新设置分区个数

    从文件中加载

    >>> lines = sc.textFile("file:///usr/local/spark/mycode/pairrdd/word.txt")
    >>> pairRDD = lines.flatMap(lambda line:line.split(" ")).map(lambda word:(word,1))
    >>> pairRDD.foreach(print)
    ('I', 1)
    ('love', 1)
    ('Hadoop', 1)
    ……
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    通过并行集合(列表)创建

    >>> list = ["Hadoop","Spark","Hive","Spark"]
    >>> rdd = sc.parallelize(list)
    >>> pairRDD = rdd.map(lambda word:(word,1))
    >>> pairRDD.foreach(print)
    (Hadoop,1)
    (Spark,1)
    (Hive,1)
    (Spark,1)
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    spark-sql

    Spark SQL在Hive兼容层面仅依赖HiveQL解析、Hive元数据,也就是说,从HQL被解析成抽象语法树(AST)起,就全部由Spark SQL接管了。Spark SQL执行计划生成和优化都由Catalyst(函数式关系查询优化框架)负责

    Spark SQL - DataFrame

    可以通过如下语句创建一个SparkSession对象:

    from pyspark.sql import SparkSession
    spark = SparkSession.builder.appName("PySpark").getOrCreate()
    
    • 1
    • 2

    实际上,在启动进入pyspark以后,pyspark就默认提供了一个SparkContext对象(名称为sc)和一个SparkSession对象(名称为spark)

    在创建DataFrame时,可以使用spark.read操作,从不同类型的文件中加载数据创建DataFrame,例如:

    spark.read.text("people.txt"):读取文本文件people.txt创建DataFrame
    spark.read.json("people.json"):读取people.json文件创建DataFrame;在读取本地文件或HDFS文件时,要注意给出正确的文件路径
    spark.read.parquet(“people.parquet”):读取people.parquet文件创建DataFrame
    
    • 1
    • 2
    • 3

    或者也可以使用如下格式的语句:

    spark.read.format("text").load("people.txt"):读取文本文件people.json创建DataFrame;
    spark.read.format("json").load("people.json"):读取JSON文件people.json创建DataFrame;
    spark.read.format("parquet").load("people.parquet"):读取Parquet文件people.parquet创建DataFrame。
    
    • 1
    • 2
    • 3

    Spark SQL - DataFrame的保存

    可以使用spark.write操作,把一个DataFrame保存成不同格式的文件,例如,把一个名称为df的DataFrame保存到不同格式文件中,方法如下:

    df.write.text("people.txt")
    df.write.json("people.json“)
    df.write.parquet("people.parquet“)
    
    • 1
    • 2
    • 3

    或者也可以使用如下格式的语句:

    df.write.format("text").save("people.txt")
    df.write.format("json").save("people.json")
    df.write.format ("parquet").save("people.parquet")
    
    • 1
    • 2
    • 3

    Spark ML - 来源

    Spark提供了一个基于海量数据的机器学习库,它提供了常用机器学习算法的分布式实现。
    开发者只需要有 Spark 基础并且了解机器学习算法的原理,以及方法相关参数的含义,就可以轻松的通过调用相应的 API 来实现基于海量数据的机器学习过程

    需要注意的是,Mllib中只包含能够在集群上运行良好的并行算法,这一点很重要。有些经典的机器学习算法没有包含在其中,就是因为它们不能并行执行。相反地,一些较新的研究得出的算法因为适用于集群,也被包含在Mllib中,例如分布式随机森林算法、交替最小二乘算法。这样的选择使得Mllib中的每一个算法都适用于大规模数据集。
    如果是小规模数据集上训练各机器学习模型,最好还是在各个节点上使用单节点的机器学习算法库(比如Weka)。类似地,我们在机器学习流水线中,也常常用同一算法的不同参数对小规模数据集分别训练,来选出最好的一组参数。在Spark中,你可以通过参数列表传给parallelize()来在不同的节点上分别运行不同的参数,而在每个节点上则使用单节点的机器学习库来实现。只有当你需要在一个大规模分布式数据集上训练模型时,Mllib的优势才能突显出来。

    Spark 机器学习库从1.2 版本以后被分为两个包:

    • spark.mllib:包含基于RDD的原始算法API。Spark MLlib 历史比较长,在1.0 以前的版本即已经包含了,提供的算法实现都是基于原始的 RDD,当前处于维护状态。
    • spark.ml:提供了基于DataFrames 高层次的API,可以用来构建机器学习工作流(PipeLine)。ML Pipeline 弥补了原始 MLlib 库的不足,向用户提供了一个基于 DataFrame 的机器学习工作流式 API 套件

    使用 ML Pipeline API可以很方便的把数据处理,特征转换,正则化,以及多个机器学习算法联合起来,构建一个单一完整的机器学习流水线。

    park.ml 目前支持4种常见的机器学习问题: 分类、回归、聚类和协同过滤
    在这里插入图片描述

    Spark ML - 机器学习流水线 - 概念

    • PipeLine

      翻译为流水线或者管道。流水线将多个工作流阶段(转换器和估计器)连接在一起,形成机器学习的工作流,并获得结果输出

    • Transformer

      翻译成转换器,是一种可以将一个DataFrame转换为另一个DataFrame的算法。比如一个模型就是一个 Transformer。它可以把一个不包含预测标签的测试数据集 DataFrame 打上标签,转化成另一个包含预测标签的 DataFrame。技术上,Transformer实现了一个方法transform(),它通过附加一个或多个列将一个DataFrame转换为另一个DataFrame

    • Estimator

      翻译成估计器或评估器,它是学习算法或在训练数据上的训练方法的概念抽象。在 Pipeline 里通常是被用来操作 DataFrame 数据并生成一个 Transformer。从技术上讲,Estimator实现了一个方法fit(),它接受一个DataFrame并产生一个转换器。比如,一个随机森林算法就是一个 Estimator,它可以调用fit(),通过训练特征数据而得到一个随机森林模型。

    • Parameter

      Parameter 被用来设置 Transformer 或者 Estimator 的参数。现在,所有转换器和估计器可共享用于指定参数的公共API。ParamMap是一组(参数,值)对。

    一、安装spark

    1,下载

    官网:https://spark.apache.org/downloads.html

    清华镜像:https://mirrors.tuna.tsinghua.edu.cn/apache/spark/spark-3.2.0/

    最好选择spark-3.2.0-bin-without-hadoop.tgz版本

    进入下载目录

    sudo tar -zxf spark-3.3.0-bin-without-hadoop.tgz -C /usr/local
    
    • 1

    2,安装spark

    重命名以及增加权限

    cd /usr/local/
    sudo mv spark-3.3.0-bin-without-hadoop/ spark
    sudo chown -R hadoop spark
    
    • 1
    • 2
    • 3

    配置环境变量

    vim ~/.bashrc
    
    • 1

    添加以下几行

    export SPARK_HOME=/usr/local/spark
    export PATH=$PATH:$SPARK_HOME/bin
    export HADOOP_HOME=/usr/local/hadoop
    
    • 1
    • 2
    • 3

    配置生效

    source ~/.bashrc
    
    • 1

    3,修改配置文件

    创建 spark-env.sh 文件:

    cd /usr/local/spark/conf/
    cp spark-env.sh.template spark-env.sh
    vim spark-env.sh
    
    • 1
    • 2
    • 3

    写入以下内容

    export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath)
    
    • 1

    4,运行spark实例

    cd /usr/local/spark
    bin/run-example SparkPi 2>&1 | grep "Pi is"
    
    • 1
    • 2

    应该有以下内容

    hadoop@qing-Inspiron-3437:/usr/local/spark$ bin/run-example SparkPi 2>&1 | grep "Pi is"
    Pi is roughly 3.1429357146785732
    
    • 1
    • 2

    启动Spark shell

    spark-shell
    
    • 1

    退出

    :quit
    
    • 1

    二、spark的使用

    1、简单的使用

    import findspark
    from pyspark.sql import SparkSession
    from pyspark.sql.types import *
    import numpy as np
    import pandas as pd
    
    findspark.init()
    spark = SparkSession.builder.appName('quick_start_pyspark').getOrCreate()
    
    # 创建DataFrame
    df = spark.read.json('/usr/local/spark/examples/src/main/resources/people.json')
    df.show()
    
    # Dataframe操作
    df.printSchema()
    df.select('name').show()  # 显示某一列
    df.select(df['name'], df['age']+1).show()  # 对某列进行操作
    df.select('name', df['age']+1).show()
    df.filter(df['age'] > 20).show()  # 过滤,有点像where子句
    df.groupBy('age').count().show()  # 分组,分组后一般会带一个统计函数
    
    # 用SQL语句来操作DataFrame
    df.createOrReplaceTempView('people')  # 创建一个临时视图,内存里面的临时表
    sql_df = spark.sql('select * from people').show()
    df.createGlobalTempView('people_1')  # 创建一个全局的临时视图
    spark.sql('SELECT * FROM global_temp.people_1').show()  # 访问全局临时变量,需要带有global_temp前缀
    spark.newSession().sql('SELECT * FROM global_temp.people_1').show()  # spark.newSession()相当于另外一个用户
    # spark.newSession().sql('SELECT * FROM people').show() 访问不到。出错
    
    # 修订DataFrame数据结构
    sc = spark.sparkContext  # 属性,spark的上下文
    lines = sc.textFile('/usr/local/spark/examples/src/main/resources/people.txt')  # 生成RDD
    parts = lines.map(lambda row: row.split(','))  # 用逗号分隔,还原他的结构
    print(parts)
    people = parts.map(lambda p: (p[0], p[1].strip()))  # 第0列强制转换成字符串
    
    schemaString = 'name age'
    fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]  # StructField定义列
    schema = StructType(fields)  # 表结构:字段名、类型
    
    schemaPeople = spark.createDataFrame(people, schema)
    schemaPeople.printSchema()
    
    # 载入和保存功能
    # 支持的格式包括parquet、csv、text等。
    df = spark.read.load('/usr/local/spark/examples/src/main/resources/users.parquet')
    df.select('name', 'favorite_color').write.save('name_fav_colors.parquet')  # 写入指定的parquet
    
    df = spark.read.load('/usr/local/spark/examples/src/main/resources/people.json', format='json')
    df.printSchema()
    df.select('name', 'age').write.save('name_fav_colors.csv', format='csv')  # 按指定格式保存
    df.select('name').write.save('name_fav_colors.txt', format='text')  # 按txt保存,df中只能有一列
    
    df = spark.read.load('/usr/local/spark/examples/src/main/resources/people.csv', format='csv', sep=';', header='true')
    df.select('name', 'age', 'job').show()  # 通过df查询特定字段
    
    # 创建临时表,使用SQL查询
    df.createTempView('_USER')
    spark.sql('select * from _USER').show()
    df.select("*").show()  # 通过df查询全部字段
    
    # 直接在文件上运行SQL
    df = spark.sql("select * from parquet.`name_fav_colors.parquet`").show()
    
    # pandas和spark互转
    spark.conf.set('spark.sql.execution.arrow.enabled', 'true')
    pd_f = pd.DataFrame(np.random.rand(100, 3))
    df = spark.createDataFrame(pd_f)  # pandas转换成spark
    df.select('*').show()
    _pdf = df.select('*').toPandas()  # spark转换成pandas
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70

    2、逻辑回归

    from pyspark.sql import SparkSession  # 程序入口点
    from pyspark.sql.functions import *
    
    from pyspark.ml.feature import StringIndexer  # 字符串=》整数
    from pyspark.ml.feature import OneHotEncoder
    from pyspark.ml.feature import VectorAssembler  # 将多个列合并成向量列
    from pyspark.ml.classification import LogisticRegression  # 逻辑回归
    
    # 入口点
    spark = SparkSession.builder.appName('spark_lr').getOrCreate()
    
    # 获取数据
    df = spark.read.csv('/home/qing/下载/数据集 (1)/╩²╛▌╝»/Log_Reg_dataset.csv', inferSchema=True, header=True)
    print(df.count())
    print(df.columns)
    df.printSchema()
    df.describe().show()
    df.groupBy('Country').count().show()
    df.select('*').show()
    
    # 预处理
    search_engine_indexer = StringIndexer(inputCol='Platform', outputCol='platform_num').fit(df)
    df = search_engine_indexer.transform(df)  # 输入的DataFrame进行转换,返回经过转化的DataFrame
    df.show(10)
    
    # 转换器:字符串、数值=>独热编码
    search_engine_encode = OneHotEncoder(inputCol='platform_num', outputCol='platform_vector').fit(df)
    df = search_engine_encode.transform(df)
    df.show(10)
    
    counter_indexer = StringIndexer(inputCol='Country', outputCol='country_num').fit(df)
    df = counter_indexer.transform(df)
    counter_encoder = OneHotEncoder(inputCol='country_num', outputCol='country_vector').fit(df)
    df = counter_encoder.transform(df)
    df.select('Country', 'country_num', 'country_vector').show()
    df.groupBy('country_vector').count().orderBy('count').show()
    
    # 模型训练
    # 整合特征列
    _inputCols = ['platform_vector', 'country_vector', 'Age', 'Repeat_Visitor', 'Web_pages_viewed']
    df_assembler = VectorAssembler(inputCols=_inputCols, outputCol='features')
    df = df_assembler.transform(df)
    df.printSchema()
    df.select('features', 'Status').show()
    # 生成真正的样本数据
    dataDf = df.select(['features', 'Status'])
    # 拆分训练集和测试集
    trainDf, testDf = dataDf.randomSplit([0.70, 0.30])
    model = LogisticRegression(labelCol='Status').fit(trainDf)
    
    # 评估和测试
    train_results = model.evaluate(testDf).predictions
    train_results.show()
    acc = model.evaluate(testDf).accuracy
    print(acc)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55

    3、朴素贝叶斯

    from pyspark.sql import SparkSession
    from pyspark.ml.feature import VectorAssembler
    from pyspark.ml.classification import NaiveBayes
    from pyspark.ml.evaluation import BinaryClassificationEvaluator  # 二分类评估
    from pyspark.ml.evaluation import MulticlassClassificationEvaluator  # 多分类
    
    spark = SparkSession.builder.appName('spark_nb').getOrCreate()
    df = spark.read.csv('/home/qing/下载/数据集 (1)/╩²╛▌╝»/cars.csv', inferSchema=True, header=True)
    df.show()
    df.describe().show()
    
    # 生成模型所需要的格式
    _inputCols = ['地区', '驾驶员年龄', '驾龄', '每年保养次数', '汽车类型']
    df_assembler = VectorAssembler(inputCols=_inputCols, outputCol='features')
    df = df_assembler.transform(df)
    df.show(10)
    
    model_df = df.select(['features', '故障'])  # 训练模型的数据格式
    train_df, test_df = model_df.randomSplit([0.7, 0.3])  # 拆分训练集和测试集
    
    # 选择算法(评估器),并训练、拟合、学习
    nb = NaiveBayes(labelCol='故障', smoothing=1.0)
    model = nb.fit(train_df)
    
    # 预测,使用
    predict = model.transform(test_df)
    predict.show()
    
    eva_b = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction', labelCol='故障')
    res = eva_b.evaluate(predict)  # 二分类评估器返回的AUC,ROC曲线下的面积
    print(res)
    
    eva_m = MulticlassClassificationEvaluator(labelCol='故障')
    acc = eva_m.evaluate(predict)  # 多分类评估器返回的是准确率
    print(acc)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
  • 相关阅读:
    【GCN-RS】Learning Explicit User Interest Boundary for Recommendation (WWW‘22)
    1.3.6 交换机划分 Vlan 配置
    Java多线程(6):锁与AQS(下)
    linux下安装qt、qt触摸屏校准tslib
    基于java+SpringBoot+HTML+Mysql酒店预订网站
    Linux内存管理(二十二):页面回收简介和 kswapd(1)
    Concat、Push、Spread syntax性能差异对比
    Java 中那些绕不开的内置接口 -- Iterator 和 Iterable
    StatefulSets In K8s
    如何使用 Junit + Mockito 实践单元测试
  • 原文地址:https://blog.csdn.net/qq_22808061/article/details/126392570