• 数据湖(十二):Spark3.1.2与Iceberg0.12.1整合


    文章目录

    Spark3.1.2与Iceberg0.12.1整合

    一、​​​​​​​向pom文件导入依赖

    二、SparkSQL设置catalog配置

    三、使用Hive Catalog管理Iceberg表

    1、创建表

    2、插入数据

    3、查询数据

    4、删除表

    四、用Hadoop Catalog管理Iceberg表

    1、创建表

    2、插入数据

    3、查询数据

    4、创建对应的Hive表映射数据

    5、删除表


    Spark3.1.2与Iceberg0.12.1整合

    Spark可以操作Iceberg数据湖,这里使用的Iceberg的版本为0.12.1,此版本与Spark2.4版本之上兼容。由于在Spark2.4版本中在操作Iceberg时不支持DDL、增加分区及增加分区转换、Iceberg元数据查询、insert into/overwrite等操作,建议使用Spark3.x版本来整合Iceberg0.12.1版本,这里我们使用的Spark版本是3.1.2版本。

    一、​​​​​​​​​​​​​​向pom文件导入依赖

    在Idea中创建Maven项目,在pom文件中导入以下关键依赖:

    1. <!-- 配置以下可以解决 在jdk1.8环境下打包时报错 “-source 1.5 中不支持 lambda 表达式” -->
    2. <properties>
    3. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    4. <maven.compiler.source>1.8</maven.compiler.source>
    5. <maven.compiler.target>1.8</maven.compiler.target>
    6. </properties>
    7. <dependencies>
    8. <!-- Spark-core -->
    9. <dependency>
    10. <groupId>org.apache.spark</groupId>
    11. <artifactId>spark-core_2.12</artifactId>
    12. <version>3.1.2</version>
    13. </dependency>
    14. <!-- Spark与Iceberg整合的依赖包-->
    15. <dependency>
    16. <groupId>org.apache.iceberg</groupId>
    17. <artifactId>iceberg-spark3</artifactId>
    18. <version>0.12.1</version>
    19. </dependency>
    20. <dependency>
    21. <groupId>org.apache.iceberg</groupId>
    22. <artifactId>iceberg-spark3-runtime</artifactId>
    23. <version>0.12.1</version>
    24. </dependency>
    25. <!-- avro格式 依赖包 -->
    26. <dependency>
    27. <groupId>org.apache.avro</groupId>
    28. <artifactId>avro</artifactId>
    29. <version>1.10.2</version>
    30. </dependency>
    31. <!-- parquet格式 依赖包 -->
    32. <dependency>
    33. <groupId>org.apache.parquet</groupId>
    34. <artifactId>parquet-hadoop</artifactId>
    35. <version>1.12.0</version>
    36. </dependency>
    37. <!-- SparkSQL -->
    38. <dependency>
    39. <groupId>org.apache.spark</groupId>
    40. <artifactId>spark-sql_2.12</artifactId>
    41. <version>3.1.2</version>
    42. </dependency>
    43. <!-- SparkSQL ON Hive-->
    44. <dependency>
    45. <groupId>org.apache.spark</groupId>
    46. <artifactId>spark-hive_2.12</artifactId>
    47. <version>3.1.2</version>
    48. </dependency>
    49. <!--&lt;!&ndash;mysql依赖的jar包&ndash;&gt;-->
    50. <!--<dependency>-->
    51. <!--<groupId>mysql</groupId>-->
    52. <!--<artifactId>mysql-connector-java</artifactId>-->
    53. <!--<version>5.1.47</version>-->
    54. <!--</dependency>-->
    55. <!--SparkStreaming-->
    56. <dependency>
    57. <groupId>org.apache.spark</groupId>
    58. <artifactId>spark-streaming_2.12</artifactId>
    59. <version>3.1.2</version>
    60. </dependency>
    61. <!-- SparkStreaming + Kafka -->
    62. <dependency>
    63. <groupId>org.apache.spark</groupId>
    64. <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
    65. <version>3.1.2</version>
    66. </dependency>
    67. <!--&lt;!&ndash; 向kafka 生产数据需要包 &ndash;&gt;-->
    68. <!--<dependency>-->
    69. <!--<groupId>org.apache.kafka</groupId>-->
    70. <!--<artifactId>kafka-clients</artifactId>-->
    71. <!--<version>0.10.0.0</version>-->
    72. <!--&lt;!&ndash; 编译和测试使用jar包,没有传递性 &ndash;&gt;-->
    73. <!--&lt;!&ndash;<scope>provided</scope>&ndash;&gt;-->
    74. <!--</dependency>-->
    75. <!-- StructStreaming + Kafka -->
    76. <dependency>
    77. <groupId>org.apache.spark</groupId>
    78. <artifactId>spark-sql-kafka-0-10_2.12</artifactId>
    79. <version>3.1.2</version>
    80. </dependency>
    81. <!-- Scala 包-->
    82. <dependency>
    83. <groupId>org.scala-lang</groupId>
    84. <artifactId>scala-library</artifactId>
    85. <version>2.12.14</version>
    86. </dependency>
    87. <dependency>
    88. <groupId>org.scala-lang</groupId>
    89. <artifactId>scala-compiler</artifactId>
    90. <version>2.12.14</version>
    91. </dependency>
    92. <dependency>
    93. <groupId>org.scala-lang</groupId>
    94. <artifactId>scala-reflect</artifactId>
    95. <version>2.12.14</version>
    96. </dependency>
    97. <dependency>
    98. <groupId>log4j</groupId>
    99. <artifactId>log4j</artifactId>
    100. <version>1.2.12</version>
    101. </dependency>
    102. <dependency>
    103. <groupId>com.google.collections</groupId>
    104. <artifactId>google-collections</artifactId>
    105. <version>1.0</version>
    106. </dependency>
    107. </dependencies>

    二、SparkSQL设置catalog配置

    以下操作主要是SparkSQL操作Iceberg,同样Spark中支持两种Catalog的设置:hive和hadoop,Hive Catalog就是iceberg表存储使用Hive默认的数据路径,Hadoop Catalog需要指定Iceberg格式表存储路径。

    在SparkSQL代码中通过以下方式来指定使用的Catalog:

    1. val spark: SparkSession = SparkSession.builder().master("local").appName("SparkOperateIceberg")
    2. //指定hive catalog, catalog名称为hive_prod
    3. .config("spark.sql.catalog.hive_prod", "org.apache.iceberg.spark.SparkCatalog")
    4. .config("spark.sql.catalog.hive_prod.type", "hive")
    5. .config("spark.sql.catalog.hive_prod.uri", "thrift://node1:9083")
    6. .config("iceberg.engine.hive.enabled", "true")
    7. //指定hadoop catalog,catalog名称为hadoop_prod
    8. .config("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkCatalog")
    9. .config("spark.sql.catalog.hadoop_prod.type", "hadoop")
    10. .config("spark.sql.catalog.hadoop_prod.warehouse", "hdfs://mycluster/sparkoperateiceberg")
    11. .getOrCreate()

    三、使用Hive Catalog管理Iceberg表

    使用Hive Catalog管理Iceberg表默认数据存储在Hive对应的Warehouse目录下,在Hive中会自动创建对应的Iceberg表,SparkSQL 相当于是Hive客户端,需要额外设置“iceberg.engine.hive.enabled”属性为true,否则在Hive对应的Iceberg格式表中查询不到数据。

     

    1、创建表

    1. //创建表 ,hive_pord:指定catalog名称。default:指定Hive中存在的库。test:创建的iceberg表名。
    2. spark.sql(
    3. """
    4. | create table if not exists hive_prod.default.test(id int,name string,age int) using iceberg
    5. """.stripMargin)

    注意:

    1)创建表时,表名称为:${catalog名称}.${Hive中库名}.${创建的Iceberg格式表名}

    2)表创建之后,可以在Hive中查询到对应的test表,创建的是Hive外表,在对应的Hive warehouse 目录下可以看到对应的数据目录。

     

    2、插入数据

    1. //插入数据
    2. spark.sql(
    3. """
    4. |insert into hive_prod.default.test values (1,"zs",18),(2,"ls",19),(3,"ww",20)
    5. """.stripMargin)

    3、查询数据

    1. //查询数据
    2. spark.sql(
    3. """
    4. |select * from hive_prod.default.test
    5. """.stripMargin).show()

    结果如下:

    在Hive对应的test表中也能查询到数据:

     

    4、删除表

    1. //删除表,删除表对应的数据不会被删除
    2. spark.sql(
    3. """
    4. |drop table hive_prod.default.test
    5. """.stripMargin)

    注意:删除表后,数据会被删除,但是表目录还是存在,如果彻底删除数据,需要把对应的表目录删除。

    四、用Hadoop Catalog管理Iceberg表

    使用Hadoop Catalog管理表,需要指定对应Iceberg存储数据的目录。

    1、创建表

    1. //创建表 ,hadoop_prod:指定Hadoop catalog名称。default:指定库名称。test:创建的iceberg表名。
    2. spark.sql(
    3. """
    4. | create table if not exists hadoop_prod.default.test(id int,name string,age int) using iceberg
    5. """.stripMargin)

    注意:

    1)创建表名称为:${Hadoop Catalog名称}.${随意定义的库名}.${Iceberg格式表名}

    2)创建表后,会在hadoop_prod名称对应的目录下创建该表

     

    2、插入数据

    1. //插入数据
    2. spark.sql(
    3. """
    4. |insert into hadoop_prod.default.test values (1,"zs",18),(2,"ls",19),(3,"ww",20)
    5. """.stripMargin)

    3、查询数据

    1. spark.sql(
    2. """
    3. |select * from hadoop_prod.default.test
    4. """.stripMargin).show()

     

     

    4、创建对应的Hive表映射数据

    在Hive表中执行如下建表语句:

    1. CREATE TABLE hdfs_iceberg (
    2. id int,
    3. name string,
    4. age int
    5. )
    6. STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
    7. LOCATION 'hdfs://mycluster/sparkoperateiceberg/default/test'
    8. TBLPROPERTIES ('iceberg.catalog'='location_based_table');

    在Hive中查询“hdfs_iceberg”表数据如下:

    5、删除表

    1. spark.sql(
    2. """
    3. |drop table hadoop_prod.default.test
    4. """.stripMargin)

    注意:删除iceberg表后,数据被删除,对应的库目录存在。


    • 📢博客主页:https://lansonli.blog.csdn.net
    • 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
    • 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
    • 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨
  • 相关阅读:
    记一次 Java Testcontainers CPU 100% 问题排查过程
    Azure DevOps(一)基于 Net6.0 的 WPF 程序如何进行持续集成、持续编译
    RK3568平台开发系列讲解(视频篇)视频编码的工作原理
    lv5 嵌入式开发-10 信号机制(下)
    设计模式:单例、原型和生成器
    Linux基础命令4——Linux快捷键与帮助命令
    设计模式之状态模式
    Ubunu安装一个更新版本的gda(2.3.2到 3.0.4)
    (matplotlib)如何不显示x轴或y轴刻度(ticks)
    芯片洁净间的等级是如何划分的
  • 原文地址:https://blog.csdn.net/xiaoweite1/article/details/125571912