• 对比Python,PySpark 大数据处理其实更香


    对于数据分析师、数据科学家和任何使用数据的人来说,能够熟练而有效地处理大数据是一项非常有优势的技能。

    如果你已经熟悉运用 Python 和 pandas 做常规数据处理,并且想学习处理大数据,那么熟悉 PySpark,并将用其做数据处理,将会是一个不错的开始。

    PySpark是一种适用于 Apache Spark 的 Python API,一种流行的大数据开源数据处理引擎。

    本文的前提是,假设读者在 Python 中熟练使用 pandas 操作数据。

    技术提升

    项目代码、数据、技术交流提升,均可加交流群获取,群友已超过2000人,添加时最好的备注方式为:来源+兴趣方向,方便找到志同道合的朋友

    方式①、添加微信号:dkl88191,备注:来自CSDN
    方式②、微信搜索公众号:Python学习与数据挖掘,后台回复:加群

    数据集

    从导包开始。在 PySpark 中,需要创建一个 Spark 会话 SparkSession。创建 Spark 会话后,可以从以下位置访问 Spark Web 用户界面 (Web UI):http://localhost:4040/。下面定义的应用程序名称appName为“PyDataStudio”,将显示为 Web UI 右上角的应用程序名称。本文将不会使用 Web UI,但是,如果您有兴趣了解更多信息,请查看官方文档[1]。

    import pandas as pd
    from pyspark.sql import SparkSession
    spark = SparkSession.builder.appName('PyDataStudio').getOrCreate()
    
    • 1
    • 2
    • 3

    图片

    我们将在这篇文章中使用企鹅数据集[2]。使用下面的脚本,我们将penguins.csv数据的修改版本保存在工作目录中。

    from seaborn import load_dataset
    (load_dataset('penguins')
        .drop(columns=['bill_length_mm', 'bill_depth_mm'])
        .rename(columns={'flipper_length_mm': 'flipper',
                         'body_mass_g': 'mass'})
        .to_csv('penguins.csv', index=False))
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    图片

    看一下两个库之间的语法比较。为了简洁,我们仅保留显示 PySpark 输出。

    基本使用

    两个库的数据对象都称为 DataFrame:pandas DataFrame vs PySpark DataFrame

    导入数据并检查其形状

    # pandas 
    df = pd.read_csv('penguins.csv') 
    df.shape
    # PySpark 
    df = spark.read.csv('penguins.csv', header=True, inferSchema=True) 
    df.count(), len(df.columns)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    (344, 5)
    
    • 1

    使用 PySpark 导入数据时,指定header=True数据类型用第一行作标题,并设置inferSchema=True。可以尝试不使用这些选项导入并检查 DataFrame 及其数据类型(类似于 pandas 使用df.dtype 检查 PySpark DataFrames 的数据类型)。

    与 pandas DataFrame 不同,PySpark DataFrame 没有像.shape可以直接查看数据的形状。所以要得到数据形状,我们分别求行数和列数。

    检查有关数据的高级信息

    # pandas
    df.info()
    # PySpark 
    df.printSchema()
    
    • 1
    • 2
    • 3
    • 4
    root
     |-- species: string (nullable = true)
     |-- island: string (nullable = true)
     |-- flipper: double (nullable = true)
     |-- mass: double (nullable = true)
     |-- sex: string (nullable = true)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    虽然此方法不会提供与df.info()相同的输出,但它是最接近的内置方法之一。

    查看数据的前几行

    # pandas
    df.head()
    # PySpark 
    df.show(5)
    
    • 1
    • 2
    • 3
    • 4
    +-------+---------+-------+------+------+
    |species|   island|flipper|  mass|   sex|
    +-------+---------+-------+------+------+
    | Adelie|Torgersen|  181.0|3750.0|  Male|
    | Adelie|Torgersen|  186.0|3800.0|Female|
    | Adelie|Torgersen|  195.0|3250.0|Female|
    | Adelie|Torgersen|   null|  null|  null|
    | Adelie|Torgersen|  193.0|3450.0|Female|
    +-------+---------+-------+------+------+
    only showing top 5 rows
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    默认情况下,df.show()默认显示前 20 行。PySpark DataFrame 实际上有一个名为.head()的方法,可以查看前几行的数据,并以row对象形式打印出。运行df.head(5)提供如下输出:

    df.head(5)
    
    • 1

    图片

    .show()方法的输出更简洁,因此在查看数据集的top行时用.show()

    选择列

    # pandas 
    df[['island', 'mass']].head(3)
    # PySpark 
    df[['island', 'mass']].show(3)
    
    • 1
    • 2
    • 3
    • 4
    +---------+------+
    |   island|  mass|
    +---------+------+
    |Torgersen|3750.0|
    |Torgersen|3800.0|
    |Torgersen|3250.0|
    +---------+------+
    only showing top 3 rows
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    虽然可以在这里使用的是类似于 pandas 的语法,而在 PySpark 中默认使用如下代码片段所示的方法选择列:

    df.select('island', 'mass').show(3) 
    df.select(['island', 'mass']).show(3)
    
    • 1
    • 2

    过滤

    根据条件过滤数据

    # pandas 
    df[df['species']=='Gentoo'].head()
    # PySpark 
    df[df['species']=='Gentoo'].show(5)
    
    • 1
    • 2
    • 3
    • 4
    +-------+------+-------+------+------+
    |species|island|flipper|  mass|   sex|
    +-------+------+-------+------+------+
    | Gentoo|Biscoe|  211.0|4500.0|Female|
    | Gentoo|Biscoe|  230.0|5700.0|  Male|
    | Gentoo|Biscoe|  210.0|4450.0|Female|
    | Gentoo|Biscoe|  218.0|5700.0|  Male|
    | Gentoo|Biscoe|  215.0|5400.0|  Male|
    +-------+------+-------+------+------+
    only showing top 5 rows
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    两个库之间的语法几乎相同。要获得相同的输出,还可以使用:

    df.filter(df['species']=='Gentoo').show(5) df.filter("species=='Gentoo'").show(5) 
    
    • 1

    下面显示了一些常见的过滤器比较:

    # pandas 
    df[df['species'].isin(['Chinstrap', 'Gentoo'])].head() 
    df[df['species'].str.match('G.')] .head() 
    df[df['flipper'].between(225,229)].head() 
    df[df['mass'].isnull()].head()1b df.loc[df['species']!='Gentoo'].head() 
    df[~df['species'].isin(['Chinstrap', 'Gentoo'])].head() 
    df[-df['species'].str.match('G.')].head() 
    df[~df['flipper'].between(225,229)].head() 
    df[df['mass'].notnull()].head()6 df[(df['mass']<3400) & (df['sex']=='Male')].head() 
    df[(df['mass']<3400) | (df['sex']=='Male')].head()
    
    # PySpark 
    df[df['species'].isin(['Chinstrap', 'Gentoo'])].show(5) 
    df[df['species'].rlike('G.')].show(5) 
    df[df['flipper'].between(225,229)].show(5) 
    df[df['mass'].isNull()].show(5)1b df[df['species']!='Gentoo'].show(5) 
    df[~df['species'].isin(['Chinstrap', 'Gentoo'])].show(5) 
    df[~df['species'].rlike('G.')].show(5) 
    df[~df['flipper'].between(225,229)].show(5) 
    df[df['mass'].isNotNull()].show(5)
    df[(df['mass']<3400) & (df['sex']=='Male')].show(5) 
    df[(df['mass']<3400) |(df[ 'sex']=='Male')].show(5)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    虽然~-在 pandas 中都可以作为否定,但在 PySpark 中仅有~能作为有效的否定。

    排序

    对数据进行排序并检查mass最小的 5 行:

    # pandas 
    df.nsmallest(5, 'mass')
    # PySpark 
    df[df['mass'].isNotNull()].orderBy('mass').show(5)
    
    • 1
    • 2
    • 3
    • 4
    +---------+------+-------+------+------+
    |  species|island|flipper|  mass|   sex|
    +---------+------+-------+------+------+
    |Chinstrap| Dream|  192.0|2700.0|Female|
    |   Adelie|Biscoe|  184.0|2850.0|Female|
    |   Adelie|Biscoe|  181.0|2850.0|Female|
    |   Adelie|Biscoe|  187.0|2900.0|Female|
    |   Adelie| Dream|  178.0|2900.0|Female|
    +---------+------+-------+------+------+
    only showing top 5 rows
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    Pandas的.nsmallest().nlargest()方法会自动排除缺失值。而 PySpark 没有等效的方法。为了获得相同的输出,首先过滤掉缺失mass的行,然后对数据进行排序并查看前 5 行。如果没有删除数据,可以简写为:

    df.orderBy(‘mass’).show(5).sort()
    
    • 1

    代替的另一种排序方式.orderBy()

    # pandas 
    df.nlargest(5, 'mass')
    # PySpark 
    df.sort('mass', ascending=False).show(5)
    
    • 1
    • 2
    • 3
    • 4
    +-------+------+-------+------+----+
    |species|island|flipper|  mass| sex|
    +-------+------+-------+------+----+
    | Gentoo|Biscoe|  221.0|6300.0|Male|
    | Gentoo|Biscoe|  230.0|6050.0|Male|
    | Gentoo|Biscoe|  220.0|6000.0|Male|
    | Gentoo|Biscoe|  222.0|6000.0|Male|
    | Gentoo|Biscoe|  229.0|5950.0|Male|
    +-------+------+-------+------+----+
    only showing top 5 rows
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    这些语法的变体也是等效的:

    df.sort(df['mass'].desc()).show(5) 
    df.orderBy('mass', ascending=False).show(5) 
    df.orderBy(df['mass'].desc( )).show(5)
    
    
    • 1
    • 2
    • 3
    • 4

    按多列排序,如下所示:

    # pandas 
    df.sort_values(['mass', 'flipper'], ascending=False).head()
    # PySpark 
    df.orderBy(['mass', 'flipper'], ascending=False).show(5)
    
    • 1
    • 2
    • 3
    • 4
    +-------+------+-------+------+----+
    |species|island|flipper|  mass| sex|
    +-------+------+-------+------+----+
    | Gentoo|Biscoe|  221.0|6300.0|Male|
    | Gentoo|Biscoe|  230.0|6050.0|Male|
    | Gentoo|Biscoe|  222.0|6000.0|Male|
    | Gentoo|Biscoe|  220.0|6000.0|Male|
    | Gentoo|Biscoe|  229.0|5950.0|Male|
    +-------+------+-------+------+----+
    only showing top 5 rows
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    在 PySpark 中,可以在将所有列分别传参数,而不需要写成列表的形式

    df.orderBy('mass', 'flipper', ascending=False).show(5)
    
    • 1

    要按多列但按不同方向排序:

    # pandas 
    df.sort_values(['mass', 'flipper'], ascending=[True, False]).head()
    # PySpark 
    df[df['mass'].isNotNull()]\ 
      .sort('mass', 'flipper', ascending=[True, False]).show(5)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    +---------+---------+-------+------+------+
    |  species|   island|flipper|  mass|   sex|
    +---------+---------+-------+------+------+
    |Chinstrap|    Dream|  192.0|2700.0|Female|
    |   Adelie|   Biscoe|  184.0|2850.0|Female|
    |   Adelie|   Biscoe|  181.0|2850.0|Female|
    |   Adelie|Torgersen|  188.0|2900.0|Female|
    |   Adelie|   Biscoe|  187.0|2900.0|Female|
    +---------+---------+-------+------+------+
    only showing top 5 rows
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    pyspark的另一种写法

    df[df['mass'].isNotNull()]\
      .orderBy(df['mass'].asc(), df['flipper'].desc()).show(5)
    
    • 1
    • 2

    聚合

    现在,看几个聚合数据的示例。

    简单的聚合

    二者方法类似:

    # pandas 
    df.agg({'flipper': 'mean'})
    # PySpark 
    df.agg({'flipper': 'mean'}).show()
    
    • 1
    • 2
    • 3
    • 4
    +------------------+
    |      avg(flipper)|
    +------------------+
    |200.91520467836258|
    +------------------+
    
    • 1
    • 2
    • 3
    • 4
    • 5

    多个聚合

    需要采用不同的方法:

    # pandas 
    df.agg({'flipper': ['min', 'max']})
    # PySpark 
    from pyspark.sql import functions as F 
    df.agg(F.min('flipper'), F.max('flipper')).show()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    +------------+------------+
    |min(flipper)|max(flipper)|
    +------------+------------+
    |       172.0|       231.0|
    +------------+------------+
    
    • 1
    • 2
    • 3
    • 4
    • 5

    获取唯一值

    # pandas 
    df['species'].unique()
    # PySpark 
    df.select('species').distinct().show()
    
    • 1
    • 2
    • 3
    • 4
    +---------+
    |  species|
    +---------+
    |   Gentoo|
    |   Adelie|
    |Chinstrap|
    +---------+
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    要在列中获取多个不同的值:

    # pandas 
    df['species'].nunique()
    # PySpark 
    df.select('species').distinct().count()
    
    • 1
    • 2
    • 3
    • 4
    3
    
    • 1

    按组聚合

    到目前为止,PySpark 使用 camelCase 驼峰命名法来表示方法和函数。.groupBy()这也是如此。这是一个简单的按聚合分组的示例:

    # pandas 
    df.groupby('species')['mass'].mean()
    # PySpark 
    df.groupBy('species').agg({'mass': 'mean'}).show()
    
    • 1
    • 2
    • 3
    • 4
    +---------+------------------+
    |  species|         avg(mass)|
    +---------+------------------+
    |   Gentoo| 5076.016260162602|
    |   Adelie| 3700.662251655629|
    |Chinstrap|3733.0882352941176|
    +---------+------------------+
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    这是一个聚合多个选定列的示例:

    # pandas 
    df.groupby('species').agg({'flipper': 'sum',
                               'mass': 'mean'})
    # PySpark 
    df.groupBy('species').agg({'flipper': 'sum',
                               'mass': 'mean'}).show()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    +---------+------------+--------------+
    |  species|sum(flipper)|     avg(mass)|
    +---------+------------+--------------+
    |   Gentoo|     26714.0| 5076.01626016|
    |   Adelie|     28683.0| 3700.66225165|
    |Chinstrap|     13316.0|3733.088235294|
    +---------+------------+--------------+
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    如果我们不指定列,它将显示所有数字列的统计信息:

    # pandas 
    df.groupby('species').mean()
    # PySpark 
    df.groupBy('species').mean().show()
    
    • 1
    • 2
    • 3
    • 4
    +---------+--------------+--------------+
    |  species|  avg(flipper)|     avg(mass)|
    +---------+--------------+--------------+
    |   Gentoo| 217.186991869| 5076.01626016|
    |   Adelie|189.9536423841| 3700.66225165|
    |Chinstrap| 195.823529411|3733.088235294|
    +---------+--------------+--------------+
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    也可以将.mean()替换为.avg(),即可以使用df.groupBy(‘species’).avg().show()

    以上就是本文的所有内容,希望能够帮到你对 PySpark 语法有所了解。我们注意到,在基本任务方面,这两个库之间有很多相似之处。这使得在熟悉 pandas 工作知识的人更容易开始使用 PySpark,在处理小数据分析与挖掘后,遇到大数据分析与挖掘时,也能够轻松面对。

    参考资料

    [1]

    官方文档: https://spark.apache.org/docs/latest/web-ui.html

    [2]

    企鹅数据集: https://github.com/mwaskom/seaborn-data/blob/master/penguins.csv

  • 相关阅读:
    【Kaggle比赛常用trick】K折交叉验证、TTA
    gin自定义验证器+中文翻译
    FPGA领域入门佳作,简明深入,夯实基础《FPGA原理和结构》(可下载)
    DP28 跳跃游戏(三)
    硅芯思见:“软约束”到底有多“软”
    [英雄星球七月集训LeetCode解题日报] 第20日 BST
    dimp V8:[WARNING]login fail, check your username and password, and check the server status
    Kotlin前置检测判断check,require,requireNotNull
    曾仕强讲道德经系列视频合集百度网盘资源
    【日拱一卒行而不辍20220920】自制操作系统
  • 原文地址:https://blog.csdn.net/qq_34160248/article/details/127949989