• pyspark常用语法


    一、背景

    大数据处理快速上手spark要点。

    二、操作技术点

    2.0 参数配置
    from pyspark.sql import SparkSession
    spark = SparkSession \
            .builder \
            .appName(APP_NAME) \
            .enableHiveSupport() \
            .config("spark.executor.instances", "50") \
            .config("spark.executor.memory", "4g") \
            .config("spark.driver.maxResultSize","50g")
            .config("spark.executor.cores", "2") \
            .config("spark.driver.memory", "15g") \
            .config("spark.rpc.message.maxSize",512) \
            .config("spark.sql.shuffle.partitions", "1000") \
            .getOrCreate()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    2.1 采样

    参考链接:http://www.manongjc.com/detail/31-joiyyxzllmgmbeb.html

    # 参数说明:1. 指定采样是否应该替换 2. 返回数据的分数 3. 伪随机数产生器的种子
    data_sample = data.sample(False, 0.2, 500)
    
    • 1
    • 2
    2.2 Map与FlatMap联合使用
    def func(row):
        print("row[1]", row[1])
        return row[1]
    data=[['Alice',26]]
    df=spark.createDataFrame(data,['Name','age'])
    df.collect()
    df.rdd.map(lambda x : (x,1)).collect()
    df.rdd.map(lambda x : (x,1)).groupByKey().flatMap(lambda row : func(row)).collect()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    在这里插入图片描述
    使用KeyValue的形式

    from pyspark.sql.types import StructType, LongType, StringType, StructField, BooleanType, IntegerType
    #列名
    # 预估到手价计算的schema
    MODEL_SCHEMA = StructType([
        StructField('name', StringType()),
        StructField('age', StringType()),
        StructField('age',  FloatType()),
        StructField('id',  LongType()),
    ])
    
    def func(row,df_columns):
        #print("row[1]", row[1])
        val = row[1]
        df_val = pd.DataFrame(list(val), columns=df_columns)
        name = df_val['Name'].tolist()[0]
        age = df_val['age'].tolist()[0]
        if age>25:
            age = 27
        return [[name,age]]
    data=[['Alice',26]]
    df=spark.createDataFrame(data,['Name','age'])
    df_columns = df.columns
    #df.rdd.map(lambda x : (x,1)).collect()
    df.rdd.map(lambda row : (row["Name"],row)).groupByKey().flatMap(lambda row : func(row, df_columns)).collect()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    2.2.1 rdd.map. GroupyByKey和GroupBy的区别

    groupBy()方法是根据用户自定义的情况进行分组,而groupByKey()则是根据key值进行分组的.

    2.2.3 生成唯一自增id
    from pyspark.sql.functions import monotonically_increasing_id
    df = df.withColumn("id",monotonically_increasing_id())
    
    • 1
    • 2

    https://blog.csdn.net/wj1298250240/article/details/103944979

    2.3 添加表注释
    spark.sql(f"ALTER TABLE {table_name} SET TBLPROPERTIES ('comment'='表注释信息')")
    
    • 1
    2.4 修改列名注释
    for i in range(len(spark_df.dtypes)):
        col_name = spark_df.dtypes[i][0]
        type_name = spark_df.dtypes[i][1]
        spark.sql(f"ALTER TABLE {table_name} CHANGE COLUMN {col_name} {col_name} {type_name} COMMENT '编号2'")
    
    • 1
    • 2
    • 3
    • 4
    2.5 不建表直接插入数据
    spark_df.write.format("hive") \
    .option("tableName", "test_name") \
    .mode("overwrite") \
    .saveAsTable(f"xxx.xxx")
    
    • 1
    • 2
    • 3
    • 4

    写入表

    spark_df_res = data.select(
        spark.sql(
            '''
                select *
                from {table_name}
                limit 1
            '''.format(table_name=XXXX)
        ).columns
    )
    spark_df_res.repartition('dt').write.insertInto(tableName=XXXX, overwrite=True)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    DataFrame转为Spark
    data=[['Alice',26],['Jessica',23],['Shirely',33]]
    df=spark.createDataFrame(data,['Name','age'])
    
    • 1
    • 2
    删除为空的数据
    df = spark.createDataFrame([(1, None), (None, 2)], ("a", "b"))
    df.filter(df.a.isNotNull()).show()
    
    • 1
    • 2
    转化为pandas

    将查询到的数据转化为pandas

    data = data.toPandas()
    
    • 1
    转换某一列的数据类型
     func.col('id').cast('string')
    
    • 1
    去重

    对name列去重

    data.select(name").dropDuplicates()
    data.dropDuplicates(["col_name_a","col_name_b"])
    
    • 1
    • 2

    对整个数据去重

    data = data.distinct()
    
    • 1
    增加一列
    data.withColumn() #测试一下不知道对否
    
    • 1
    空值填充
    DataFrame.fillna(value, subset=None)
    
    • 1
    重命名列

    将A列的名字命名为B列名:

    spark_data.withColumnRenamed('A列名', 'B列名')
    
    • 1
    isin过滤
    data.filter(func.col('id').isin(KU_LIST)).count()
    
    • 1
    求交集
    dataa.merge(datab, how="inner")
    
    • 1
    dataa.join(datab,['列名1''列名2'], 'inner')
    
    • 1
    左拼接
     dataA.join(dataB, "id",how='left')
    
    • 1
    agg
    df.groupBy('Job','Country').agg(fn.max('salary'), fn.max('seniority')).show()
    
    • 1

    https://blog.csdn.net/qq_42363032/article/details/115614675

    统计数量

    data.groupBy(["name"]).agg(func.count("id").alias("id_count"))
    
    • 1
    alias

    alias:返回一个设置别名的新DataFrame

    subset

    使用subset得到对应的列名称。

    withColumns

    添加或更新列,第一个参数表示的是列名

    select

    选择某几个字段:

    spark_data.select(['列名1','列名2','列名3'])
    
    • 1
    依据某一列进行过滤
    data.filter(func.col("列名")>0)
    
    • 1
    数据拼接
    from pyspark.sql.functions import concat
    
    # 将 df1 和 df2 的所有行拼接在一起
    df3 = df1.union(df2)
    
    • 1
    • 2
    • 3
    • 4
    创建临时视图后使用SQL语句
    table_name.createOrReplaceTempView('table_name')
    tlike_df = spark.sql(
        '''
        select product_name
        from {table_name}
        where product_name like '%名字%'
        '''.format(table_name="table_name")
    )
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    cache
    data_spark.cache()
    
    • 1

    参考链接

    https://blog.csdn.net/u010003835/article/details/106726826

    函数使用

    F.collect_set()

    import pyspark.sql.functions as F
    F.collect_set() #通过groupby后去重
    
    • 1
    • 2
    UDF

    使用UDF直接对某一列进行操作。

    data=[['Alice',26],['Jessica',23],['Shirely',33]]
    df=spark.createDataFrame(data,['Name','age'])
    df.cache()
    df.count()
    def upperCase(str):
        return str.upper()
    upperCaseUDF = func.udf(lambda z:upperCase(z))
    df.show()
    df.withColumn("Cureated Name", upperCaseUDF(func.col("Name"))).show()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    传出多个参数:

    https://blog.csdn.net/sunflower_sara/article/details/104044412

    加速查询

    这样可以计算查询速度

    spark_data.cache()
    spark_data.count()
    
    • 1
    • 2

    正则

    截取特定字符串之后的全部内容:

    https://blog.51cto.com/dream666uping/5445603

    三、提交技术点

    相关资料

    https://cloud.tencent.com/developer/article/1847068 (比较详细关于pyspark的操作)
    https://www.cnblogs.com/yyy-blog/p/10249646.html(spark的submit)

  • 相关阅读:
    【HTML实战】把爱心代码放在自己的网站上是一种什么体验?
    Hystirx熔断降级机制
    QQ隐藏福利二-----------------那些免费的挂件和气泡
    LNK2001 __GSHandlerCheck【error】
    同样是Java程序员,年薪10W和35W的差别在哪?
    C++ 学习笔记(五)(泛型算法篇)
    今天又做了三个梦,其中一个梦梦里的我还有意识会思考?
    技术面试面面观
    NoSQL Redis
    浅析RocketMQ-存储文件
  • 原文地址:https://blog.csdn.net/doswynkfsw/article/details/126033125