文章最前: 我是Octopus,这个名字来源于我的中文名--章鱼;我热爱编程、热爱算法、热爱开源。所有源码在我的个人github ;这博客是记录我学习的点点滴滴,如果您对 Python、Java、AI、算法有兴趣,可以关注我的动态,一起学习,共同进步。
相关文章:
- import warnings
- warnings.filterwarnings('ignore')
- #import pandas as pd
- #import numpy as np
- from datetime import timedelta, date, datetime
- import time
- import gc
- import os
- import argparse
- import sys
-
- from pyspark.sql import SparkSession, functions as fn
- from pyspark.ml.feature import StringIndexer
- from pyspark.ml.recommendation import ALS
- from pyspark.sql.types import *
- from pyspark import StorageLevel
- spark = SparkSession \
- .builder \
- .appName("stockout_test") \
- .config("hive.exec.dynamic.partition.mode", "nonstrict") \
- .config("spark.sql.sources.partitionOverwriteMode", "dynamic")\
- .config("spark.driver.memory", '20g')\
- .config("spark.executor.memory", '40g')\
- .config("spark.yarn.executor.memoryOverhead", '1g')\
- .config("spark.executor.instances", 8)\
- .config("spark.executor.cores", 8)\
- .config("spark.kryoserializer.buffer.max", '128m')\
- .config("spark.yarn.queue", 'root.algo')\
- .config("spark.executorEnv.OMP_NUM_THREADS", 12)\
- .config("spark.executorEnv.ARROW_PRE_0_15_IPC_FORMAT", 1) \
- .config("spark.default.parallelism", 800)\
- .enableHiveSupport() \
- .getOrCreate()
- spark.sql("set hive.exec.dynamic.partition.mode = nonstrict")
- spark.sql("set hive.exec.dynamic.partition=true")
- spark.sql("set spark.sql.autoBroadcastJoinThreshold=-1")
- employee_salary = [
- ("zhangsan", "IT", 8000),
- ("lisi", "IT", 7000),
- ("wangwu", "IT", 7500),
- ("zhaoliu", "ALGO", 10000),
- ("qisan", "IT", 8000),
- ("bajiu", "ALGO", 12000),
- ("james", "ALGO", 11000),
- ("wangzai", "INCREASE", 7000),
- ("carter", "INCREASE", 8000),
- ("kobe", "IT", 9000)]
-
- columns= ["name", "department", "salary"]
- df = spark.createDataFrame(data = employee_salary, schema = columns)
- df.show()
+--------+----------+------+ | name|department|salary| +--------+----------+------+ |zhangsan| IT| 8000| | lisi| IT| 7000| | wangwu| IT| 7500| | zhaoliu| ALGO| 10000| | qisan| IT| 8000| | bajiu| ALGO| 12000| | james| ALGO| 11000| | wangzai| INCREASE| 7000| | carter| INCREASE| 8000| | kobe| IT| 9000| +--------+----------+------+
- from pyspark.sql.window import Window
- import pyspark.sql.functions as F
-
- windowSpec = Window.partitionBy("department").orderBy(F.desc("salary"))
- df.withColumn("row_number", F.row_number().over(windowSpec)).show(truncate=False)
+--------+----------+------+----------+ |name |department|salary|row_number| +--------+----------+------+----------+ |carter |INCREASE |8000 |1 | |wangzai |INCREASE |7000 |2 | |kobe |IT |9000 |1 | |zhangsan|IT |8000 |2 | |qisan |IT |8000 |3 | |wangwu |IT |7500 |4 | |lisi |IT |7000 |5 | |bajiu |ALGO |12000 |1 | |james |ALGO |11000 |2 | |zhaoliu |ALGO |10000 |3 | +--------+----------+------+----------+
- from pyspark.sql.window import Window
- import pyspark.sql.functions as F
-
- windowSpec = Window.partitionBy("department").orderBy(F.desc("salary"))
- df.withColumn("rank",F.rank().over(windowSpec)).show(truncate=False)
+--------+----------+------+----+ |name |department|salary|rank| +--------+----------+------+----+ |carter |INCREASE |8000 |1 | |wangzai |INCREASE |7000 |2 | |kobe |IT |9000 |1 | |qisan |IT |8000 |2 | |zhangsan|IT |8000 |2 | |wangwu |IT |7500 |4 | |lisi |IT |7000 |5 | |bajiu |ALGO |12000 |1 | |james |ALGO |11000 |2 | |zhaoliu |ALGO |10000 |3 | +--------+----------+------+----+
- from pyspark.sql.window import Window
- import pyspark.sql.functions as F
-
- windowSpec = Window.partitionBy("department").orderBy(F.desc("salary"))
- df.withColumn("dense_rank",F.dense_rank().over(windowSpec)).show()
+--------+----------+------+----------+ | name|department|salary|dense_rank| +--------+----------+------+----------+ | carter| INCREASE| 8000| 1| | wangzai| INCREASE| 7000| 2| | kobe| IT| 9000| 1| | qisan| IT| 8000| 2| |zhangsan| IT| 8000| 2| | wangwu| IT| 7500| 3| | lisi| IT| 7000| 4| | bajiu| ALGO| 12000| 1| | james| ALGO| 11000| 2| | zhaoliu| ALGO| 10000| 3| +--------+----------+------+----------+
- from pyspark.sql.window import Window
- import pyspark.sql.functions as F
-
- windowSpec = Window.partitionBy("department").orderBy(F.desc("salary"))
- df.withColumn("lag",F.lag("salary",1).over(windowSpec)).show()
+--------+----------+------+-----+ | name|department|salary| lag| +--------+----------+------+-----+ | carter| INCREASE| 8000| null| | wangzai| INCREASE| 7000| 8000| | kobe| IT| 9000| null| |zhangsan| IT| 8000| 9000| | qisan| IT| 8000| 8000| | wangwu| IT| 7500| 8000| | lisi| IT| 7000| 7500| | bajiu| ALGO| 12000| null| | james| ALGO| 11000|12000| | zhaoliu| ALGO| 10000|11000| +--------+----------+------+-----+
- from pyspark.sql.window import Window
- import pyspark.sql.functions as F
-
- windowSpec = Window.partitionBy("department").orderBy(F.desc("salary"))
- df.withColumn("lead",F.lead("salary", 1).over(windowSpec)).show()
+--------+----------+------+-----+ | name|department|salary| lead| +--------+----------+------+-----+ | carter| INCREASE| 8000| 7000| | wangzai| INCREASE| 7000| null| | kobe| IT| 9000| 8000| |zhangsan| IT| 8000| 8000| | qisan| IT| 8000| 7500| | wangwu| IT| 7500| 7000| | lisi| IT| 7000| null| | bajiu| ALGO| 12000|11000| | james| ALGO| 11000|10000| | zhaoliu| ALGO| 10000| null| +--------+----------+------+-----+
- from pyspark.sql.window import Window
- import pyspark.sql.functions as F
-
- windowSpec = Window.partitionBy("department").orderBy(F.desc("salary"))
- windowSpecAgg = Window.partitionBy("department")
-
- df.withColumn("row", F.row_number().over(windowSpec)) \
- .withColumn("avg", F.avg("salary").over(windowSpecAgg)) \
- .withColumn("sum", F.sum("salary").over(windowSpecAgg)) \
- .withColumn("min", F.min("salary").over(windowSpecAgg)) \
- .withColumn("max", F.max("salary").over(windowSpecAgg)) \
- .withColumn("count", F.count("salary").over(windowSpecAgg)) \
- .withColumn("distinct_count", F.approx_count_distinct("salary").over(windowSpecAgg)) \
- .show()
+--------+----------+------+---+-------+-----+-----+-----+-----+--------------+ | name|department|salary|row| avg| sum| min| max|count|distinct_count| +--------+----------+------+---+-------+-----+-----+-----+-----+--------------+ | carter| INCREASE| 8000| 1| 7500.0|15000| 7000| 8000| 2| 2| | wangzai| INCREASE| 7000| 2| 7500.0|15000| 7000| 8000| 2| 2| | kobe| IT| 9000| 1| 7900.0|39500| 7000| 9000| 5| 4| |zhangsan| IT| 8000| 2| 7900.0|39500| 7000| 9000| 5| 4| | qisan| IT| 8000| 3| 7900.0|39500| 7000| 9000| 5| 4| | wangwu| IT| 7500| 4| 7900.0|39500| 7000| 9000| 5| 4| | lisi| IT| 7000| 5| 7900.0|39500| 7000| 9000| 5| 4| | bajiu| ALGO| 12000| 1|11000.0|33000|10000|12000| 3| 3| | james| ALGO| 11000| 2|11000.0|33000|10000|12000| 3| 3| | zhaoliu| ALGO| 10000| 3|11000.0|33000|10000|12000| 3| 3| +--------+----------+------+---+-------+-----+-----+-----+-----+--------------+
- from pyspark.sql.window import Window
- import pyspark.sql.functions as F
- # 需要注意的是 approx_count_distinct() 函数适用于窗函数的统计,
- # 而在groupby中通常用countDistinct()来代替该函数,用来求组内不重复的数值的条数。
- # approx_count_distinct()取的是近似的数值,不太准确,使用需注意
-
- windowSpec = Window.partitionBy("department").orderBy(F.desc("salary"))
- windowSpecAgg = Window.partitionBy("department")
-
- df.withColumn("row", F.row_number().over(windowSpec)) \
- .withColumn("avg", F.avg("salary").over(windowSpecAgg)) \
- .withColumn("sum", F.sum("salary").over(windowSpecAgg)) \
- .withColumn("min", F.min("salary").over(windowSpecAgg)) \
- .withColumn("max", F.max("salary").over(windowSpecAgg)) \
- .withColumn("count", F.count("salary").over(windowSpecAgg)) \
- .withColumn("distinct_count", F.approx_count_distinct("salary").over(windowSpecAgg)) \
- .where(F.col("row")==1).select("department","avg","sum","min","max","count","distinct_count") \
- .show()
+----------+-------+-----+-----+-----+-----+--------------+ |department| avg| sum| min| max|count|distinct_count| +----------+-------+-----+-----+-----+-----+--------------+ | INCREASE| 7500.0|15000| 7000| 8000| 2| 2| | IT| 7900.0|39500| 7000| 9000| 5| 4| | ALGO|11000.0|33000|10000|12000| 3| 3| +----------+-------+-----+-----+-----+-----+--------------+