• DataFrame窗口函数操作


    文章最前: 我是Octopus,这个名字来源于我的中文名--章鱼;我热爱编程、热爱算法、热爱开源。所有源码在我的个人github ;这博客是记录我学习的点点滴滴,如果您对 Python、Java、AI、算法有兴趣,可以关注我的动态,一起学习,共同进步。

       相关文章:

    1. PySpark 概述
    2. Spark连接快速入门
    3. Spark上使用pandas API快速入门

    创建pyspark对象

    1. import warnings
    2. warnings.filterwarnings('ignore')
    3. #import pandas as pd
    4. #import numpy as np
    5. from datetime import timedelta, date, datetime
    6. import time
    7. import gc
    8. import os
    9. import argparse
    10. import sys
    11. from pyspark.sql import SparkSession, functions as fn
    12. from pyspark.ml.feature import StringIndexer
    13. from pyspark.ml.recommendation import ALS
    14. from pyspark.sql.types import *
    15. from pyspark import StorageLevel
    16. spark = SparkSession \
    17. .builder \
    18. .appName("stockout_test") \
    19. .config("hive.exec.dynamic.partition.mode", "nonstrict") \
    20. .config("spark.sql.sources.partitionOverwriteMode", "dynamic")\
    21. .config("spark.driver.memory", '20g')\
    22. .config("spark.executor.memory", '40g')\
    23. .config("spark.yarn.executor.memoryOverhead", '1g')\
    24. .config("spark.executor.instances", 8)\
    25. .config("spark.executor.cores", 8)\
    26. .config("spark.kryoserializer.buffer.max", '128m')\
    27. .config("spark.yarn.queue", 'root.algo')\
    28. .config("spark.executorEnv.OMP_NUM_THREADS", 12)\
    29. .config("spark.executorEnv.ARROW_PRE_0_15_IPC_FORMAT", 1) \
    30. .config("spark.default.parallelism", 800)\
    31. .enableHiveSupport() \
    32. .getOrCreate()
    33. spark.sql("set hive.exec.dynamic.partition.mode = nonstrict")
    34. spark.sql("set hive.exec.dynamic.partition=true")
    35. spark.sql("set spark.sql.autoBroadcastJoinThreshold=-1")

    创建DataFrame

    1. employee_salary = [
    2. ("zhangsan", "IT", 8000),
    3. ("lisi", "IT", 7000),
    4. ("wangwu", "IT", 7500),
    5. ("zhaoliu", "ALGO", 10000),
    6. ("qisan", "IT", 8000),
    7. ("bajiu", "ALGO", 12000),
    8. ("james", "ALGO", 11000),
    9. ("wangzai", "INCREASE", 7000),
    10. ("carter", "INCREASE", 8000),
    11. ("kobe", "IT", 9000)]
    12. columns= ["name", "department", "salary"]
    13. df = spark.createDataFrame(data = employee_salary, schema = columns)
    14. df.show()
    +--------+----------+------+
    |    name|department|salary|
    +--------+----------+------+
    |zhangsan|        IT|  8000|
    |    lisi|        IT|  7000|
    |  wangwu|        IT|  7500|
    | zhaoliu|      ALGO| 10000|
    |   qisan|        IT|  8000|
    |   bajiu|      ALGO| 12000|
    |   james|      ALGO| 11000|
    | wangzai|  INCREASE|  7000|
    |  carter|  INCREASE|  8000|
    |    kobe|        IT|  9000|
    +--------+----------+------+

    row_number()

    1. from pyspark.sql.window import Window
    2. import pyspark.sql.functions as F
    3. windowSpec = Window.partitionBy("department").orderBy(F.desc("salary"))
    4. df.withColumn("row_number", F.row_number().over(windowSpec)).show(truncate=False)
    +--------+----------+------+----------+
    |name    |department|salary|row_number|
    +--------+----------+------+----------+
    |carter  |INCREASE  |8000  |1         |
    |wangzai |INCREASE  |7000  |2         |
    |kobe    |IT        |9000  |1         |
    |zhangsan|IT        |8000  |2         |
    |qisan   |IT        |8000  |3         |
    |wangwu  |IT        |7500  |4         |
    |lisi    |IT        |7000  |5         |
    |bajiu   |ALGO      |12000 |1         |
    |james   |ALGO      |11000 |2         |
    |zhaoliu |ALGO      |10000 |3         |
    +--------+----------+------+----------+

    Rank()

    1. from pyspark.sql.window import Window
    2. import pyspark.sql.functions as F
    3. windowSpec = Window.partitionBy("department").orderBy(F.desc("salary"))
    4. df.withColumn("rank",F.rank().over(windowSpec)).show(truncate=False)
    +--------+----------+------+----+
    |name    |department|salary|rank|
    +--------+----------+------+----+
    |carter  |INCREASE  |8000  |1   |
    |wangzai |INCREASE  |7000  |2   |
    |kobe    |IT        |9000  |1   |
    |qisan   |IT        |8000  |2   |
    |zhangsan|IT        |8000  |2   |
    |wangwu  |IT        |7500  |4   |
    |lisi    |IT        |7000  |5   |
    |bajiu   |ALGO      |12000 |1   |
    |james   |ALGO      |11000 |2   |
    |zhaoliu |ALGO      |10000 |3   |
    +--------+----------+------+----+

    dense_rank()

    1. from pyspark.sql.window import Window
    2. import pyspark.sql.functions as F
    3. windowSpec = Window.partitionBy("department").orderBy(F.desc("salary"))
    4. df.withColumn("dense_rank",F.dense_rank().over(windowSpec)).show()
    +--------+----------+------+----------+
    |    name|department|salary|dense_rank|
    +--------+----------+------+----------+
    |  carter|  INCREASE|  8000|         1|
    | wangzai|  INCREASE|  7000|         2|
    |    kobe|        IT|  9000|         1|
    |   qisan|        IT|  8000|         2|
    |zhangsan|        IT|  8000|         2|
    |  wangwu|        IT|  7500|         3|
    |    lisi|        IT|  7000|         4|
    |   bajiu|      ALGO| 12000|         1|
    |   james|      ALGO| 11000|         2|
    | zhaoliu|      ALGO| 10000|         3|
    +--------+----------+------+----------+

    lag()

    1. from pyspark.sql.window import Window
    2. import pyspark.sql.functions as F
    3. windowSpec = Window.partitionBy("department").orderBy(F.desc("salary"))
    4. df.withColumn("lag",F.lag("salary",1).over(windowSpec)).show()
    +--------+----------+------+-----+
    |    name|department|salary|  lag|
    +--------+----------+------+-----+
    |  carter|  INCREASE|  8000| null|
    | wangzai|  INCREASE|  7000| 8000|
    |    kobe|        IT|  9000| null|
    |zhangsan|        IT|  8000| 9000|
    |   qisan|        IT|  8000| 8000|
    |  wangwu|        IT|  7500| 8000|
    |    lisi|        IT|  7000| 7500|
    |   bajiu|      ALGO| 12000| null|
    |   james|      ALGO| 11000|12000|
    | zhaoliu|      ALGO| 10000|11000|
    +--------+----------+------+-----+

    lead()

    1. from pyspark.sql.window import Window
    2. import pyspark.sql.functions as F
    3. windowSpec = Window.partitionBy("department").orderBy(F.desc("salary"))
    4. df.withColumn("lead",F.lead("salary", 1).over(windowSpec)).show()
    +--------+----------+------+-----+
    |    name|department|salary| lead|
    +--------+----------+------+-----+
    |  carter|  INCREASE|  8000| 7000|
    | wangzai|  INCREASE|  7000| null|
    |    kobe|        IT|  9000| 8000|
    |zhangsan|        IT|  8000| 8000|
    |   qisan|        IT|  8000| 7500|
    |  wangwu|        IT|  7500| 7000|
    |    lisi|        IT|  7000| null|
    |   bajiu|      ALGO| 12000|11000|
    |   james|      ALGO| 11000|10000|
    | zhaoliu|      ALGO| 10000| null|
    +--------+----------+------+-----+

    Aggregate Functions

    1. from pyspark.sql.window import Window
    2. import pyspark.sql.functions as F
    3. windowSpec = Window.partitionBy("department").orderBy(F.desc("salary"))
    4. windowSpecAgg = Window.partitionBy("department")
    5. df.withColumn("row", F.row_number().over(windowSpec)) \
    6. .withColumn("avg", F.avg("salary").over(windowSpecAgg)) \
    7. .withColumn("sum", F.sum("salary").over(windowSpecAgg)) \
    8. .withColumn("min", F.min("salary").over(windowSpecAgg)) \
    9. .withColumn("max", F.max("salary").over(windowSpecAgg)) \
    10. .withColumn("count", F.count("salary").over(windowSpecAgg)) \
    11. .withColumn("distinct_count", F.approx_count_distinct("salary").over(windowSpecAgg)) \
    12. .show()
    +--------+----------+------+---+-------+-----+-----+-----+-----+--------------+
    |    name|department|salary|row|    avg|  sum|  min|  max|count|distinct_count|
    +--------+----------+------+---+-------+-----+-----+-----+-----+--------------+
    |  carter|  INCREASE|  8000|  1| 7500.0|15000| 7000| 8000|    2|             2|
    | wangzai|  INCREASE|  7000|  2| 7500.0|15000| 7000| 8000|    2|             2|
    |    kobe|        IT|  9000|  1| 7900.0|39500| 7000| 9000|    5|             4|
    |zhangsan|        IT|  8000|  2| 7900.0|39500| 7000| 9000|    5|             4|
    |   qisan|        IT|  8000|  3| 7900.0|39500| 7000| 9000|    5|             4|
    |  wangwu|        IT|  7500|  4| 7900.0|39500| 7000| 9000|    5|             4|
    |    lisi|        IT|  7000|  5| 7900.0|39500| 7000| 9000|    5|             4|
    |   bajiu|      ALGO| 12000|  1|11000.0|33000|10000|12000|    3|             3|
    |   james|      ALGO| 11000|  2|11000.0|33000|10000|12000|    3|             3|
    | zhaoliu|      ALGO| 10000|  3|11000.0|33000|10000|12000|    3|             3|
    +--------+----------+------+---+-------+-----+-----+-----+-----+--------------+
    
    1. from pyspark.sql.window import Window
    2. import pyspark.sql.functions as F
    3. # 需要注意的是 approx_count_distinct() 函数适用于窗函数的统计,
    4. # 而在groupby中通常用countDistinct()来代替该函数,用来求组内不重复的数值的条数。
    5. # approx_count_distinct()取的是近似的数值,不太准确,使用需注意
    6. windowSpec = Window.partitionBy("department").orderBy(F.desc("salary"))
    7. windowSpecAgg = Window.partitionBy("department")
    8. df.withColumn("row", F.row_number().over(windowSpec)) \
    9. .withColumn("avg", F.avg("salary").over(windowSpecAgg)) \
    10. .withColumn("sum", F.sum("salary").over(windowSpecAgg)) \
    11. .withColumn("min", F.min("salary").over(windowSpecAgg)) \
    12. .withColumn("max", F.max("salary").over(windowSpecAgg)) \
    13. .withColumn("count", F.count("salary").over(windowSpecAgg)) \
    14. .withColumn("distinct_count", F.approx_count_distinct("salary").over(windowSpecAgg)) \
    15. .where(F.col("row")==1).select("department","avg","sum","min","max","count","distinct_count") \
    16. .show()

     +----------+-------+-----+-----+-----+-----+--------------+ |department| avg| sum| min| max|count|distinct_count| +----------+-------+-----+-----+-----+-----+--------------+ | INCREASE| 7500.0|15000| 7000| 8000| 2| 2| | IT| 7900.0|39500| 7000| 9000| 5| 4| | ALGO|11000.0|33000|10000|12000| 3| 3| +----------+-------+-----+-----+-----+-----+--------------+

  • 相关阅读:
    树的递归算法与非递归(迭代)的转化重点理解代码(上篇)
    数据库存储架构
    基础语法——组合与继承
    天然气网络潮流计算模型研究(Matlab代码实现)
    计算机毕业设计python基于django租房系统-房屋租赁系统
    【论文精读】Fast R-CNN
    【Rust日报】2023-10-16 为什么要异步 Rust
    【【RAM的verilog 代码 + testbench】】
    基于ADS的marx雪崩电路设计-设计实践(射频脉冲源)
    数据库设计中如何选择主键 (字符串或数值数据类型 )| Part 2
  • 原文地址:https://blog.csdn.net/zy345293721/article/details/133936349