- package com.atguigu.flink.test_hk;
-
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.table.api.EnvironmentSettings;
- import org.apache.flink.table.api.SqlDialect;
- import org.apache.flink.table.api.TableEnvironment;
- import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
- import org.apache.flink.table.catalog.hive.HiveCatalog;
-
- public class KafkaToHive3 {
-
- public static void main(String[] args) {
-
- System.setProperty("HADOOP_USER_NAME", "atguigu");
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
- env.setParallelism(1);
-
- env.enableCheckpointing(6000);
-
- // to use hive dialect
- tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
-
- // 注册Hive Catalog
- HiveCatalog hiveCatalog = new HiveCatalog("hive", "default", "conf");
- tableEnv.registerCatalog("hive", hiveCatalog);
-
- // 使用默认的Flink catalog创建Kafka表
- tableEnv.useCatalog("hive");
-
- // to use default dialect
- // tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
-
- String hivesql = "CREATE TABLE IF NOT EXISTS hive_table7 (\n" +
- " id int\n" +
- ") ";
-
- tableEnv.executeSql(hivesql);
-
-
-
- // tableEnv.sqlQuery("select * from hive_table7 ").execute().print();
-
- // 使用默认的Flink catalog创建Kafka表
- tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
- tableEnv.useCatalog("default_catalog");
-
- String readSql = " create table t2 (" +
- " id int " +
- ") WITH (" +
- " 'connector' = 'kafka', " +
- " 'topic' = 'topicA', " +
- " 'properties.bootstrap.servers' = 'hadoop102:9092', " +
- " 'properties.group.id' = 'flin_"+System.currentTimeMillis()+"', " +
- " 'scan.startup.mode' = 'latest-offset', " +
- " 'value.format' = 'csv' " +
- ")" ;
- tableEnv.executeSql(readSql);
-
- // tableEnv.sqlQuery("select * from t2 ").execute().print();
-
- tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
- tableEnv.useCatalog("hive");
-
- tableEnv.executeSql("INSERT INTO hive_table7 select id from default_catalog.default_database.t2");
-
-
- }
-
- }
- "1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <parent>
- <artifactId>BigData230201artifactId>
- <groupId>com.atguigugroupId>
- <version>1.0-SNAPSHOTversion>
- parent>
- <modelVersion>4.0.0modelVersion>
-
- <artifactId>FlinkartifactId>
-
- <properties>
- <maven.compiler.source>8maven.compiler.source>
- <maven.compiler.target>8maven.compiler.target>
- <project.build.sourceEncoding>UTF-8project.build.sourceEncoding>
- <flink.version>1.17.0flink.version>
- <scala.version>2.12scala.version>
- properties>
-
-
-
-
- <dependencies>
- <dependency>
- <groupId>org.apache.flinkgroupId>
- <artifactId>flink-streaming-javaartifactId>
- <version>${flink.version}version>
- <scope>providedscope>
- dependency>
-
- <dependency>
- <groupId>org.apache.flinkgroupId>
- <artifactId>flink-clientsartifactId>
- <version>${flink.version}version>
- <scope>providedscope>
- dependency>
- <dependency>
- <groupId>org.slf4jgroupId>
- <artifactId>slf4j-log4j12artifactId>
- <version>1.7.25version>
- <scope>providedscope>
- dependency>
-
-
- <dependency>
- <groupId>org.projectlombokgroupId>
- <artifactId>lombokartifactId>
- <version>1.18.26version>
- dependency>
-
-
- <dependency>
- <groupId>org.apache.flinkgroupId>
- <artifactId>flink-runtime-webartifactId>
- <version>${flink.version}version>
- <scope>providedscope>
- dependency>
-
-
- <dependency>
- <groupId>org.apache.flinkgroupId>
- <artifactId>flink-connector-datagenartifactId>
- <version>${flink.version}version>
- <scope>providedscope>
- dependency>
-
-
-
- <dependency>
- <groupId>org.apache.flinkgroupId>
- <artifactId>flink-connector-filesartifactId>
- <version>${flink.version}version>
- <scope>providedscope>
- dependency>
-
-
- <dependency>
- <groupId>org.apache.hadoopgroupId>
- <artifactId>hadoop-clientartifactId>
- <version>3.3.4version>
- <scope>providedscope>
- dependency>
-
-
- <dependency>
- <groupId>org.apache.flinkgroupId>
- <artifactId>flink-connector-kafkaartifactId>
- <version>${flink.version}version>
- <scope>providedscope>
- dependency>
-
-
- <dependency>
- <groupId>com.alibabagroupId>
- <artifactId>fastjsonartifactId>
- <version>1.2.83version>
- <scope>providedscope>
- dependency>
-
-
- <dependency>
- <groupId>org.apache.flinkgroupId>
- <artifactId>flink-connector-jdbcartifactId>
- <version>3.1.0-1.17version>
- <scope>providedscope>
- dependency>
-
-
- <dependency>
- <groupId>com.mysqlgroupId>
- <artifactId>mysql-connector-jartifactId>
- <version>8.0.32version>
- <scope>providedscope>
- dependency>
-
-
- <dependency>
- <groupId>org.apache.flinkgroupId>
- <artifactId>flink-statebackend-rocksdbartifactId>
- <version>${flink.version}version>
- <scope>providedscope>
- dependency>
-
-
- <dependency>
- <groupId>org.apache.flinkgroupId>
- <artifactId>flink-statebackend-changelogartifactId>
- <version>${flink.version}version>
- <scope>providedscope>
- dependency>
-
-
- <dependency>
- <groupId>org.apache.flinkgroupId>
- <artifactId>flink-table-api-java-uberartifactId>
- <version>${flink.version}version>
- dependency>
-
- <dependency>
- <groupId>org.apache.flinkgroupId>
- <artifactId>flink-table-runtimeartifactId>
- <version>${flink.version}version>
- dependency>
-
-
-
-
- <dependency>
- <groupId>org.apache.flinkgroupId>
- <artifactId>flink-table-planner_${scala.version}artifactId>
- <version>${flink.version}version>
- dependency>
-
-
- <dependency>
- <groupId>org.apache.flinkgroupId>
- <artifactId>flink-csvartifactId>
- <version>${flink.version}version>
- dependency>
-
-
- <dependency>
- <groupId>org.apache.flinkgroupId>
- <artifactId>flink-jsonartifactId>
- <version>${flink.version}version>
- dependency>
-
-
- <dependency>
- <groupId>org.apache.flinkgroupId>
- <artifactId>flink-connector-hive_2.12artifactId>
- <version>${flink.version}version>
- <scope>providedscope>
- dependency>
-
- <dependency>
- <groupId>org.apache.hivegroupId>
- <artifactId>hive-execartifactId>
- <version>3.1.3version>
- <scope>providedscope>
- dependency>
-
-
-
-
-
-
- dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.pluginsgroupId>
- <artifactId>maven-shade-pluginartifactId>
- <version>3.2.4version>
- <executions>
- <execution>
- <phase>packagephase>
- <goals>
- <goal>shadegoal>
- goals>
- <configuration>
- <artifactSet>
- <excludes>
- <exclude>com.google.code.findbugs:jsr305exclude>
- <exclude>org.slf4j:*exclude>
- <exclude>log4j:*exclude>
- excludes>
- artifactSet>
- <filters>
- <filter>
-
- <artifact>*:*artifact>
- <excludes>
- <exclude>META-INF/*.SFexclude>
- <exclude>META-INF/*.DSAexclude>
- <exclude>META-INF/*.RSAexclude>
- excludes>
- filter>
- filters>
- <transformers combine.children="append">
- <transformer
- implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer">
- transformer>
- transformers>
- configuration>
- execution>
- executions>
- plugin>
- plugins>
- build>
-
-
-
-
- project>
- "1.0"?>
- "text/xsl" href="configuration.xsl"?>
- <configuration>
-
- <property>
- <name>javax.jdo.option.ConnectionURLname>
- <value>jdbc:mysql://hadoop102:3306/metastore?useSSL=false&useUnicode=true&characterEncoding=UTF-8&allowPublicKeyRetrieval=truevalue>
- property>
-
-
- <property>
- <name>javax.jdo.option.ConnectionDriverNamename>
- <value>com.mysql.cj.jdbc.Drivervalue>
- property>
-
-
- <property>
- <name>javax.jdo.option.ConnectionUserNamename>
- <value>rootvalue>
- property>
-
-
- <property>
- <name>javax.jdo.option.ConnectionPasswordname>
- <value>000000value>
- property>
-
- <property>
- <name>hive.metastore.warehouse.dirname>
- <value>/user/hive/warehousevalue>
- property>
-
- <property>
- <name>hive.metastore.schema.verificationname>
- <value>falsevalue>
- property>
-
- <property>
- <name>hive.server2.thrift.portname>
- <value>10000value>
- property>
-
- <property>
- <name>hive.server2.thrift.bind.hostname>
- <value>hadoop102value>
- property>
-
- <property>
- <name>hive.metastore.event.db.notification.api.authname>
- <value>falsevalue>
- property>
-
- <property>
- <name>hive.cli.print.headername>
- <value>truevalue>
- property>
-
- <property>
- <name>hive.cli.print.current.dbname>
- <value>truevalue>
- property>
- <property>
- <name>spark.yarn.jarsname>
- <value>hdfs://hadoop102:8020/spark-jars/*value>
- property>
-
- <property>
- <name>hive.execution.enginename>
- <value>sparkvalue>
- property>
-
- <property>
- <name>metastore.storage.schema.reader.implname>
- <value>org.apache.hadoop.hive.metastore.SerDeStorageSchemaReadervalue>
- property>
-
- <property>
- <name>hive.metastore.urisname>
- <value>thrift://hadoop102:9083value>
- property>
-
-
-
- configuration>
-