• 使用FlinkCatalog将kafka的数据写入hive


    1. package com.atguigu.flink.test_hk;
    2. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    3. import org.apache.flink.table.api.EnvironmentSettings;
    4. import org.apache.flink.table.api.SqlDialect;
    5. import org.apache.flink.table.api.TableEnvironment;
    6. import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    7. import org.apache.flink.table.catalog.hive.HiveCatalog;
    8. public class KafkaToHive3 {
    9. public static void main(String[] args) {
    10. System.setProperty("HADOOP_USER_NAME", "atguigu");
    11. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    12. StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    13. env.setParallelism(1);
    14. env.enableCheckpointing(6000);
    15. // to use hive dialect
    16. tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
    17. // 注册Hive Catalog
    18. HiveCatalog hiveCatalog = new HiveCatalog("hive", "default", "conf");
    19. tableEnv.registerCatalog("hive", hiveCatalog);
    20. // 使用默认的Flink catalog创建Kafka表
    21. tableEnv.useCatalog("hive");
    22. // to use default dialect
    23. // tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
    24. String hivesql = "CREATE TABLE IF NOT EXISTS hive_table7 (\n" +
    25. " id int\n" +
    26. ") ";
    27. tableEnv.executeSql(hivesql);
    28. // tableEnv.sqlQuery("select * from hive_table7 ").execute().print();
    29. // 使用默认的Flink catalog创建Kafka表
    30. tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
    31. tableEnv.useCatalog("default_catalog");
    32. String readSql = " create table t2 (" +
    33. " id int " +
    34. ") WITH (" +
    35. " 'connector' = 'kafka', " +
    36. " 'topic' = 'topicA', " +
    37. " 'properties.bootstrap.servers' = 'hadoop102:9092', " +
    38. " 'properties.group.id' = 'flin_"+System.currentTimeMillis()+"', " +
    39. " 'scan.startup.mode' = 'latest-offset', " +
    40. " 'value.format' = 'csv' " +
    41. ")" ;
    42. tableEnv.executeSql(readSql);
    43. // tableEnv.sqlQuery("select * from t2 ").execute().print();
    44. tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
    45. tableEnv.useCatalog("hive");
    46. tableEnv.executeSql("INSERT INTO hive_table7 select id from default_catalog.default_database.t2");
    47. }
    48. }
    1. "1.0" encoding="UTF-8"?>
    2. <project xmlns="http://maven.apache.org/POM/4.0.0"
    3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    5. <parent>
    6. <artifactId>BigData230201artifactId>
    7. <groupId>com.atguigugroupId>
    8. <version>1.0-SNAPSHOTversion>
    9. parent>
    10. <modelVersion>4.0.0modelVersion>
    11. <artifactId>FlinkartifactId>
    12. <properties>
    13. <maven.compiler.source>8maven.compiler.source>
    14. <maven.compiler.target>8maven.compiler.target>
    15. <project.build.sourceEncoding>UTF-8project.build.sourceEncoding>
    16. <flink.version>1.17.0flink.version>
    17. <scala.version>2.12scala.version>
    18. properties>
    19. <dependencies>
    20. <dependency>
    21. <groupId>org.apache.flinkgroupId>
    22. <artifactId>flink-streaming-javaartifactId>
    23. <version>${flink.version}version>
    24. <scope>providedscope>
    25. dependency>
    26. <dependency>
    27. <groupId>org.apache.flinkgroupId>
    28. <artifactId>flink-clientsartifactId>
    29. <version>${flink.version}version>
    30. <scope>providedscope>
    31. dependency>
    32. <dependency>
    33. <groupId>org.slf4jgroupId>
    34. <artifactId>slf4j-log4j12artifactId>
    35. <version>1.7.25version>
    36. <scope>providedscope>
    37. dependency>
    38. <dependency>
    39. <groupId>org.projectlombokgroupId>
    40. <artifactId>lombokartifactId>
    41. <version>1.18.26version>
    42. dependency>
    43. <dependency>
    44. <groupId>org.apache.flinkgroupId>
    45. <artifactId>flink-runtime-webartifactId>
    46. <version>${flink.version}version>
    47. <scope>providedscope>
    48. dependency>
    49. <dependency>
    50. <groupId>org.apache.flinkgroupId>
    51. <artifactId>flink-connector-datagenartifactId>
    52. <version>${flink.version}version>
    53. <scope>providedscope>
    54. dependency>
    55. <dependency>
    56. <groupId>org.apache.flinkgroupId>
    57. <artifactId>flink-connector-filesartifactId>
    58. <version>${flink.version}version>
    59. <scope>providedscope>
    60. dependency>
    61. <dependency>
    62. <groupId>org.apache.hadoopgroupId>
    63. <artifactId>hadoop-clientartifactId>
    64. <version>3.3.4version>
    65. <scope>providedscope>
    66. dependency>
    67. <dependency>
    68. <groupId>org.apache.flinkgroupId>
    69. <artifactId>flink-connector-kafkaartifactId>
    70. <version>${flink.version}version>
    71. <scope>providedscope>
    72. dependency>
    73. <dependency>
    74. <groupId>com.alibabagroupId>
    75. <artifactId>fastjsonartifactId>
    76. <version>1.2.83version>
    77. <scope>providedscope>
    78. dependency>
    79. <dependency>
    80. <groupId>org.apache.flinkgroupId>
    81. <artifactId>flink-connector-jdbcartifactId>
    82. <version>3.1.0-1.17version>
    83. <scope>providedscope>
    84. dependency>
    85. <dependency>
    86. <groupId>com.mysqlgroupId>
    87. <artifactId>mysql-connector-jartifactId>
    88. <version>8.0.32version>
    89. <scope>providedscope>
    90. dependency>
    91. <dependency>
    92. <groupId>org.apache.flinkgroupId>
    93. <artifactId>flink-statebackend-rocksdbartifactId>
    94. <version>${flink.version}version>
    95. <scope>providedscope>
    96. dependency>
    97. <dependency>
    98. <groupId>org.apache.flinkgroupId>
    99. <artifactId>flink-statebackend-changelogartifactId>
    100. <version>${flink.version}version>
    101. <scope>providedscope>
    102. dependency>
    103. <dependency>
    104. <groupId>org.apache.flinkgroupId>
    105. <artifactId>flink-table-api-java-uberartifactId>
    106. <version>${flink.version}version>
    107. dependency>
    108. <dependency>
    109. <groupId>org.apache.flinkgroupId>
    110. <artifactId>flink-table-runtimeartifactId>
    111. <version>${flink.version}version>
    112. dependency>
    113. <dependency>
    114. <groupId>org.apache.flinkgroupId>
    115. <artifactId>flink-table-planner_${scala.version}artifactId>
    116. <version>${flink.version}version>
    117. dependency>
    118. <dependency>
    119. <groupId>org.apache.flinkgroupId>
    120. <artifactId>flink-csvartifactId>
    121. <version>${flink.version}version>
    122. dependency>
    123. <dependency>
    124. <groupId>org.apache.flinkgroupId>
    125. <artifactId>flink-jsonartifactId>
    126. <version>${flink.version}version>
    127. dependency>
    128. <dependency>
    129. <groupId>org.apache.flinkgroupId>
    130. <artifactId>flink-connector-hive_2.12artifactId>
    131. <version>${flink.version}version>
    132. <scope>providedscope>
    133. dependency>
    134. <dependency>
    135. <groupId>org.apache.hivegroupId>
    136. <artifactId>hive-execartifactId>
    137. <version>3.1.3version>
    138. <scope>providedscope>
    139. dependency>
    140. dependencies>
    141. <build>
    142. <plugins>
    143. <plugin>
    144. <groupId>org.apache.maven.pluginsgroupId>
    145. <artifactId>maven-shade-pluginartifactId>
    146. <version>3.2.4version>
    147. <executions>
    148. <execution>
    149. <phase>packagephase>
    150. <goals>
    151. <goal>shadegoal>
    152. goals>
    153. <configuration>
    154. <artifactSet>
    155. <excludes>
    156. <exclude>com.google.code.findbugs:jsr305exclude>
    157. <exclude>org.slf4j:*exclude>
    158. <exclude>log4j:*exclude>
    159. excludes>
    160. artifactSet>
    161. <filters>
    162. <filter>
    163. <artifact>*:*artifact>
    164. <excludes>
    165. <exclude>META-INF/*.SFexclude>
    166. <exclude>META-INF/*.DSAexclude>
    167. <exclude>META-INF/*.RSAexclude>
    168. excludes>
    169. filter>
    170. filters>
    171. <transformers combine.children="append">
    172. <transformer
    173. implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer">
    174. transformer>
    175. transformers>
    176. configuration>
    177. execution>
    178. executions>
    179. plugin>
    180. plugins>
    181. build>
    182. project>

    hive-site.xml

    1. "1.0"?>
    2. "text/xsl" href="configuration.xsl"?>
    3. <configuration>
    4. <property>
    5. <name>javax.jdo.option.ConnectionURLname>
    6. <value>jdbc:mysql://hadoop102:3306/metastore?useSSL=false&useUnicode=true&characterEncoding=UTF-8&allowPublicKeyRetrieval=truevalue>
    7. property>
    8. <property>
    9. <name>javax.jdo.option.ConnectionDriverNamename>
    10. <value>com.mysql.cj.jdbc.Drivervalue>
    11. property>
    12. <property>
    13. <name>javax.jdo.option.ConnectionUserNamename>
    14. <value>rootvalue>
    15. property>
    16. <property>
    17. <name>javax.jdo.option.ConnectionPasswordname>
    18. <value>000000value>
    19. property>
    20. <property>
    21. <name>hive.metastore.warehouse.dirname>
    22. <value>/user/hive/warehousevalue>
    23. property>
    24. <property>
    25. <name>hive.metastore.schema.verificationname>
    26. <value>falsevalue>
    27. property>
    28. <property>
    29. <name>hive.server2.thrift.portname>
    30. <value>10000value>
    31. property>
    32. <property>
    33. <name>hive.server2.thrift.bind.hostname>
    34. <value>hadoop102value>
    35. property>
    36. <property>
    37. <name>hive.metastore.event.db.notification.api.authname>
    38. <value>falsevalue>
    39. property>
    40. <property>
    41. <name>hive.cli.print.headername>
    42. <value>truevalue>
    43. property>
    44. <property>
    45. <name>hive.cli.print.current.dbname>
    46. <value>truevalue>
    47. property>
    48. <property>
    49. <name>spark.yarn.jarsname>
    50. <value>hdfs://hadoop102:8020/spark-jars/*value>
    51. property>
    52. <property>
    53. <name>hive.execution.enginename>
    54. <value>sparkvalue>
    55. property>
    56. <property>
    57. <name>metastore.storage.schema.reader.implname>
    58. <value>org.apache.hadoop.hive.metastore.SerDeStorageSchemaReadervalue>
    59. property>
    60. <property>
    61. <name>hive.metastore.urisname>
    62. <value>thrift://hadoop102:9083value>
    63. property>
    64. configuration>

  • 相关阅读:
    商城小程序?
    【软件测试】我们测试人搭上元宇宙的列车,测试一直在进军......
    GaussDB CN服务异常实例分析
    储能领域 / 通讯协议 / 技术栈 等专有名字集锦——主要收集一些储能领域的专有名词,以及相关的名词
    企望制造ERP存在远程命令执行漏洞 附POC
    17-Linux进程管理
    系列二、类装载器ClassLoader
    不看后悔!第一本全面详解Transformer的综合性书籍!284页pdf下载
    uniapp使用sqlite
    记一次在amd架构打包arm64架构的镜像的试错经历
  • 原文地址:https://blog.csdn.net/qq_40382400/article/details/133861670