大数据处理快速上手spark要点。
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName(APP_NAME) \
.enableHiveSupport() \
.config("spark.executor.instances", "50") \
.config("spark.executor.memory", "4g") \
.config("spark.driver.maxResultSize","50g")
.config("spark.executor.cores", "2") \
.config("spark.driver.memory", "15g") \
.config("spark.rpc.message.maxSize",512) \
.config("spark.sql.shuffle.partitions", "1000") \
.getOrCreate()
参考链接:http://www.manongjc.com/detail/31-joiyyxzllmgmbeb.html
# 参数说明:1. 指定采样是否应该替换 2. 返回数据的分数 3. 伪随机数产生器的种子
data_sample = data.sample(False, 0.2, 500)
def func(row):
print("row[1]", row[1])
return row[1]
data=[['Alice',26]]
df=spark.createDataFrame(data,['Name','age'])
df.collect()
df.rdd.map(lambda x : (x,1)).collect()
df.rdd.map(lambda x : (x,1)).groupByKey().flatMap(lambda row : func(row)).collect()

使用KeyValue的形式
from pyspark.sql.types import StructType, LongType, StringType, StructField, BooleanType, IntegerType
#列名
# 预估到手价计算的schema
MODEL_SCHEMA = StructType([
StructField('name', StringType()),
StructField('age', StringType()),
StructField('age', FloatType()),
StructField('id', LongType()),
])
def func(row,df_columns):
#print("row[1]", row[1])
val = row[1]
df_val = pd.DataFrame(list(val), columns=df_columns)
name = df_val['Name'].tolist()[0]
age = df_val['age'].tolist()[0]
if age>25:
age = 27
return [[name,age]]
data=[['Alice',26]]
df=spark.createDataFrame(data,['Name','age'])
df_columns = df.columns
#df.rdd.map(lambda x : (x,1)).collect()
df.rdd.map(lambda row : (row["Name"],row)).groupByKey().flatMap(lambda row : func(row, df_columns)).collect()
groupBy()方法是根据用户自定义的情况进行分组,而groupByKey()则是根据key值进行分组的.
from pyspark.sql.functions import monotonically_increasing_id
df = df.withColumn("id",monotonically_increasing_id())
https://blog.csdn.net/wj1298250240/article/details/103944979
spark.sql(f"ALTER TABLE {table_name} SET TBLPROPERTIES ('comment'='表注释信息')")
for i in range(len(spark_df.dtypes)):
col_name = spark_df.dtypes[i][0]
type_name = spark_df.dtypes[i][1]
spark.sql(f"ALTER TABLE {table_name} CHANGE COLUMN {col_name} {col_name} {type_name} COMMENT '编号2'")
spark_df.write.format("hive") \
.option("tableName", "test_name") \
.mode("overwrite") \
.saveAsTable(f"xxx.xxx")
写入表
spark_df_res = data.select(
spark.sql(
'''
select *
from {table_name}
limit 1
'''.format(table_name=XXXX)
).columns
)
spark_df_res.repartition('dt').write.insertInto(tableName=XXXX, overwrite=True)
data=[['Alice',26],['Jessica',23],['Shirely',33]]
df=spark.createDataFrame(data,['Name','age'])
df = spark.createDataFrame([(1, None), (None, 2)], ("a", "b"))
df.filter(df.a.isNotNull()).show()
将查询到的数据转化为pandas
data = data.toPandas()
func.col('id').cast('string')
对name列去重
data.select(name").dropDuplicates()
data.dropDuplicates(["col_name_a","col_name_b"])
对整个数据去重
data = data.distinct()
data.withColumn() #测试一下不知道对否
DataFrame.fillna(value, subset=None)
将A列的名字命名为B列名:
spark_data.withColumnRenamed('A列名', 'B列名')
data.filter(func.col('id').isin(KU_LIST)).count()
dataa.merge(datab, how="inner")
dataa.join(datab,['列名1','列名2'], 'inner')
dataA.join(dataB, "id",how='left')
df.groupBy('Job','Country').agg(fn.max('salary'), fn.max('seniority')).show()
https://blog.csdn.net/qq_42363032/article/details/115614675
统计数量
data.groupBy(["name"]).agg(func.count("id").alias("id_count"))
alias:返回一个设置别名的新DataFrame
使用subset得到对应的列名称。
添加或更新列,第一个参数表示的是列名
选择某几个字段:
spark_data.select(['列名1','列名2','列名3'])
data.filter(func.col("列名")>0)
from pyspark.sql.functions import concat
# 将 df1 和 df2 的所有行拼接在一起
df3 = df1.union(df2)
table_name.createOrReplaceTempView('table_name')
tlike_df = spark.sql(
'''
select product_name
from {table_name}
where product_name like '%名字%'
'''.format(table_name="table_name")
)
data_spark.cache()
参考链接
https://blog.csdn.net/u010003835/article/details/106726826
F.collect_set()
import pyspark.sql.functions as F
F.collect_set() #通过groupby后去重
使用UDF直接对某一列进行操作。
data=[['Alice',26],['Jessica',23],['Shirely',33]]
df=spark.createDataFrame(data,['Name','age'])
df.cache()
df.count()
def upperCase(str):
return str.upper()
upperCaseUDF = func.udf(lambda z:upperCase(z))
df.show()
df.withColumn("Cureated Name", upperCaseUDF(func.col("Name"))).show()
传出多个参数:
https://blog.csdn.net/sunflower_sara/article/details/104044412
这样可以计算查询速度
spark_data.cache()
spark_data.count()
截取特定字符串之后的全部内容:
https://blog.51cto.com/dream666uping/5445603
https://cloud.tencent.com/developer/article/1847068 (比较详细关于pyspark的操作)
https://www.cnblogs.com/yyy-blog/p/10249646.html(spark的submit)