• Spark_SQL-DataFrame数据写出以及读写数据库(以MySQl为例)


                      一、数据写出

            (1)SparkSQL统一API写出DataFrame数据

    二、写出MySQL数据库


    一、数据写出

            (1)SparkSQL统一API写出DataFrame数据

            统一API写法:

           常见源写出:

    1. # cording:utf8
    2. from pyspark.sql import SparkSession
    3. from pyspark.sql.types import StructType, IntegerType, StringType
    4. import pyspark.sql.functions as F
    5. if __name__ == '__main__':
    6. spark = SparkSession.builder.\
    7. appName('write').\
    8. master('local[*]').\
    9. getOrCreate()
    10. sc = spark.sparkContext
    11. # 1.读取文件
    12. schema = StructType().add('user_id', StringType(), nullable=True).\
    13. add('movie_id', IntegerType(), nullable=True).\
    14. add('rank', IntegerType(), nullable=True).\
    15. add('ts', StringType(), nullable=True)
    16. df = spark.read.format('csv').\
    17. option('sep', '\t').\
    18. option('header', False).\
    19. option('encoding', 'utf-8').\
    20. schema(schema=schema).\
    21. load('../input/u.data')
    22. # write text 写出,只能写出一个列的数据,需要将df转换为单列df
    23. df.select(F.concat_ws('---', 'user_id', 'movie_id', 'rank', 'ts')).\
    24. write.\
    25. mode('overwrite').\
    26. format('text').\
    27. save('../output/sql/text')
    28. # write csv
    29. df.write.mode('overwrite').\
    30. format('csv').\
    31. option('sep',';').\
    32. option('header', True).\
    33. save('../output/sql/csv')
    34. # write json
    35. df.write.mode('overwrite').\
    36. format('json').\
    37. save('../output/sql/json')
    38. # write parquet
    39. df.write.mode('overwrite').\
    40. format('parquet').\
    41. save('../output/sql/parquet')

    二、写出MySQL数据库

            API写法:

            注意:

            ①jdbc连接字符串中,建议使用useSSL=false 确保连接可以正常连接( 不使用SSL安全协议进行连接)

            ②jdbc连接字符串中,建议使用useUnicode=true 来确保传输中不出现乱码

            ③save()不要填参数,没有路径,是写出数据库

            ④dbtable属性:指定写出的表名

    1. # cording:utf8
    2. from pyspark.sql import SparkSession
    3. from pyspark.sql.types import StructType, IntegerType, StringType
    4. import pyspark.sql.functions as F
    5. if __name__ == '__main__':
    6. spark = SparkSession.builder.\
    7. appName('write').\
    8. master('local[*]').\
    9. getOrCreate()
    10. sc = spark.sparkContext
    11. # 1.读取文件
    12. schema = StructType().add('user_id', StringType(), nullable=True).\
    13. add('movie_id', IntegerType(), nullable=True).\
    14. add('rank', IntegerType(), nullable=True).\
    15. add('ts', StringType(), nullable=True)
    16. df = spark.read.format('csv').\
    17. option('sep', '\t').\
    18. option('header', False).\
    19. option('encoding', 'utf-8').\
    20. schema(schema=schema).\
    21. load('../input/u.data')
    22. # 2.写出df到MySQL数据库
    23. df.write.mode('overwrite').\
    24. format('jdbc').\
    25. option('url', 'jdbc:mysql://pyspark01:3306/bigdata?useSSL=false&useUnicode=true&serverTimezone=GMT%2B8').\
    26. option('dbtable', 'movie_data').\
    27. option('user', 'root').\
    28. option('password', '123456').\
    29. save()
    30. # 读取
    31. df2 = spark.read.format('jdbc'). \
    32. option('url', 'jdbc:mysql://pyspark01:3306/bigdata?useSSL=false&useUnicode=true&serverTimezone=GMT%2B8'). \
    33. option('dbtable', 'movie_data'). \
    34. option('user', 'root'). \
    35. option('password', '123456'). \
    36. load()
    37. # 查看读取结果
    38. df2.printSchema()
    39. df2.show()
    40. '''
    41. JDBC写出,会自动创建表的
    42. 因为DataFrame中的有表结构信息,StructType记录的 各个字段的名称 类型 和是否运行为空
    43. '''

            保存结果:

            读取结果:

  • 相关阅读:
    Java IO中其它字节流简介说明
    NoSuchModuleError: Can‘t load plugin: sqlalchemy.dialects:clickhouse解决方案
    MacOS Sonoma 14更新:优化小组件、升级视频会议、沉浸式游戏体验等
    【51单片机】DS18B20(江科大)
    【ROS2要素】xml、GDF、URDF的关系
    基于C++的配置文件解析器/编码器——toml库
    如何在 Buildroot 中配置 Samba
    34【源码】数据可视化:基于 Echarts + Python 动态实时大屏 - 视频平台
    性能压测工具 —— wrk
    Redis基础知识(四):使用redis-cli命令测试状态
  • 原文地址:https://blog.csdn.net/2202_75347029/article/details/134013915