一、Flink 专栏
Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。
1、Flink 部署系列
本部分介绍Flink的部署、配置相关基础内容。
2、Flink基础系列
本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。
3、Flik Table API和SQL基础系列
本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。
4、Flik Table API和SQL提高与应用系列
本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。
5、Flink 监控系列
本部分和实际的运维、监控工作相关。
二、Flink 示例专栏
Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。
两专栏的所有文章入口点击:Flink 系列文章汇总索引
本文简单介绍了通过java api或者SQL操作分区、函数以及表,特别是创建hive的表,通过6个示例进行说明 。
本文依赖flink和hive、hadoop集群能正常使用。
本文示例java api的实现是通过Flink 1.13.5版本做的示例,hive的版本是3.1.2,hadoop的版本是3.1.4。
// create view
catalog.createPartition(
new ObjectPath("mydb", "mytable"),
new CatalogPartitionSpec(...),
new CatalogPartitionImpl(...),
false);
// drop partition
catalog.dropPartition(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...), false);
// alter partition
catalog.alterPartition(
new ObjectPath("mydb", "mytable"),
new CatalogPartitionSpec(...),
new CatalogPartitionImpl(...),
false);
// get partition
catalog.getPartition(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));
// check if a partition exist or not
catalog.partitionExists(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));
// list partitions of a table
catalog.listPartitions(new ObjectPath("mydb", "mytable"));
// list partitions of a table under a give partition spec
catalog.listPartitions(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));
// list partitions of a table by expression filter
catalog.listPartitions(new ObjectPath("mydb", "mytable"), Arrays.asList(epr1, ...));
本示例旨在演示如何使用flink api创建hive的分区表,至于hive的分区表如何使用,请参考hive的相关专题。同时,修改分区、删除分区都比较简单不再赘述。
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Hive示例(6)
3、hive的使用示例详解-建表、数据类型详解、内部外部表、分区表、分桶表
此处使用的依赖与上示例一致,mainclass变成本示例的类,不再赘述。
具体打包的时候运行主类则需要视自己的运行情况决定是否修改。
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.CatalogPartition;
import org.apache.flink.table.catalog.CatalogPartitionImpl;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.module.hive.HiveModule;
/**
* @author alanchan
*
*/
public class TestHivePartitionByAPI {
static final String TEST_COMMENT = "test table comment";
static String databaseName = "viewtest_db";
static String tableName1 = "t1";
static String tableName2 = "t2";
static boolean isGeneric = false;
public static void main(String[] args) throws Exception {
// 0、运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
// 1、创建数据库
// catalog.createDatabase(db1, createDb(), false);
HiveCatalog hiveCatalog = init(tenv);
// 2、创建分区表
// catalog.createTable(path1, createPartitionedTable(), false);
// 2.1 创建分区表 t1
ObjectPath path1 = new ObjectPath(databaseName, tableName1);
hiveCatalog.createTable(path1, createPartitionedTable(), false);
// 2.21 创建分区表 t2,只有表名称不一致,体现不使用方法化的创建方式
ObjectPath path2 = new ObjectPath(databaseName, tableName2);
ResolvedSchema resolvedSchema = new ResolvedSchema(
Arrays.asList(Column.physical("id", DataTypes.INT()), Column.physical("name", DataTypes.STRING()), Column.physical("age", DataTypes.INT())),
Collections.emptyList(), null);
// Schema schema,
// @Nullable String comment,
// List partitionKeys,
// Map options
CatalogTable origin = CatalogTable.of(Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(), TEST_COMMENT, Arrays.asList("name", "age"),
new HashMap<String, String>() {
{
put("streaming", "false");
putAll(getGenericFlag(isGeneric));
}
});
CatalogTable catalogTable = new ResolvedCatalogTable(origin, resolvedSchema);
hiveCatalog.createTable(path2, catalogTable, false);
// 3、断言
// assertThat(catalog.listPartitions(path1)).isEmpty();
// 3、创建分区
// 3.1 创建分区方式1
// catalog.createPartition(path1, createPartitionSpec(), createPartition(), false);
// ObjectPath tablePath,
// CatalogPartitionSpec partitionSpec,
// CatalogPartition partition,
// boolean ignoreIfExists
hiveCatalog.createPartition(path1, createPartitionSpec(), createPartition(), false);
// 3.21 创建分区方式2
hiveCatalog.createPartition(path2, new CatalogPartitionSpec(new HashMap<String, String>() {
{
put("name", "alan");
put("age", "20");
}
}), new CatalogPartitionImpl(new HashMap<String, String>() {
{
put("streaming", "false");
putAll(getGenericFlag(isGeneric));
}
}, TEST_COMMENT), false);
System.out.println("path1 listPartitions:"+hiveCatalog.listPartitions(path1));
System.out.println("path2 listPartitions:"+hiveCatalog.listPartitions(path2));
System.out.println("path1 listPartitions:"+hiveCatalog.listPartitions(path1, createPartitionSpecSubset()));
System.out.println("path2 listPartitions:"+hiveCatalog.listPartitions(path2, createPartitionSpecSubset()));
// assertThat(hiveCatalog.listPartitions(path1)).containsExactly(createPartitionSpec());
// assertThat(catalog.listPartitions(path1, createPartitionSpecSubset())).containsExactly(createPartitionSpec());
// 4、检查分区
// CatalogTestUtil.checkEquals(createPartition(), catalog.getPartition(path1, createPartitionSpec()));
//5、删除测试数据库
// tenv.executeSql("drop database " + databaseName + " cascade");
}
private static HiveCatalog init(StreamTableEnvironment tenv) throws Exception {
String moduleName = "myhive";
String hiveVersion = "3.1.2";
tenv.loadModule(moduleName, new HiveModule(hiveVersion));
String name = "alan_hive";
String defaultDatabase = "default";
String hiveConfDir = "/usr/local/bigdata/apache-hive-3.1.2-bin/conf";
HiveCatalog hiveCatalog = new HiveCatalog(name, defaultDatabase, hiveConfDir);
tenv.registerCatalog(name, hiveCatalog);
tenv.useCatalog(name);
tenv.listDatabases();
hiveCatalog.createDatabase(databaseName, new CatalogDatabaseImpl(new HashMap(), hiveConfDir) {
}, true);
// tenv.executeSql("create database "+databaseName);
tenv.useDatabase(databaseName);
return hiveCatalog;
}
CatalogDatabase createDb() {
return new CatalogDatabaseImpl(new HashMap<String, String>() {
{
put("k1", "v1");
}
}, TEST_COMMENT);
}
static CatalogTable createPartitionedTable() {
final ResolvedSchema resolvedSchema = createSchema();
final CatalogTable origin = CatalogTable.of(Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(), TEST_COMMENT, createPartitionKeys(), getBatchTableProperties());
return new ResolvedCatalogTable(origin, resolvedSchema);
}
static ResolvedSchema createSchema() {
return new ResolvedSchema(
Arrays.asList(Column.physical("id", DataTypes.INT()), Column.physical("name", DataTypes.STRING()), Column.physical("age", DataTypes.INT())),
Collections.emptyList(), null);
}
static List<String> createPartitionKeys() {
return Arrays.asList("name", "age");
}
static Map<String, String> getBatchTableProperties() {
return new HashMap<String, String>() {
{
put("streaming", "false");
putAll(getGenericFlag(isGeneric));
}
};
}
static Map<String, String> getGenericFlag(boolean isGeneric) {
return new HashMap<String, String>() {
{
String connector = isGeneric ? "COLLECTION" : "hive";
put(FactoryUtil.CONNECTOR.key(), connector);
}
};
}
static CatalogPartitionSpec createPartitionSpec() {
return new CatalogPartitionSpec(new HashMap<String, String>() {
{
put("name", "alan");
put("age", "20");
}
});
}
static CatalogPartitionSpec createPartitionSpecSubset() {
return new CatalogPartitionSpec(new HashMap<String, String>() {
{
put("name", "alan");
}
});
}
static CatalogPartition createPartition() {
return new CatalogPartitionImpl(getBatchTableProperties(), TEST_COMMENT);
}
}
[alanchan@server2 bin]$ flink run /usr/local/bigdata/flink-1.13.5/examples/table/table_sql-0.0.4-SNAPSHOT.jar
path1 listPartitions:[CatalogPartitionSpec{{name=alan, age=20}}]
path2 listPartitions:[CatalogPartitionSpec{{name=alan, age=20}}]
path1 listPartitions:[CatalogPartitionSpec{{name=alan, age=20}}]
path2 listPartitions:[CatalogPartitionSpec{{name=alan, age=20}}]
0: jdbc:hive2://server4:10000> desc formatted t1;
+-------------------------------+----------------------------------------------------+-----------------------+
| col_name | data_type | comment |
+-------------------------------+----------------------------------------------------+-----------------------+
| # col_name | data_type | comment |
| id | int | |
| | NULL | NULL |
| # Partition Information | NULL | NULL |
| # col_name | data_type | comment |
| name | string | |
| age | int | |
| | NULL | NULL |
| # Detailed Table Information | NULL | NULL |
| Database: | viewtest_db | NULL |
| OwnerType: | USER | NULL |
| Owner: | null | NULL |
| CreateTime: | Tue Oct 17 10:43:55 CST 2023 | NULL |
| LastAccessTime: | UNKNOWN | NULL |
| Retention: | 0 | NULL |
| Location: | hdfs://HadoopHAcluster/user/hive/warehouse/viewtest_db.db/t1 | NULL |
| Table Type: | MANAGED_TABLE | NULL |
| Table Parameters: | NULL | NULL |
| | bucketing_version | 2 |
| | comment | test table comment |
| | numFiles | 0 |
| | numPartitions | 1 |
| | numRows | 0 |
| | rawDataSize | 0 |
| | streaming | false |
| | totalSize | 0 |
| | transient_lastDdlTime | 1697510635 |
| | NULL | NULL |
| # Storage Information | NULL | NULL |
| SerDe Library: | org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe | NULL |
| InputFormat: | org.apache.hadoop.mapred.TextInputFormat | NULL |
| OutputFormat: | org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat | NULL |
| Compressed: | No | NULL |
| Num Buckets: | -1 | NULL |
| Bucket Columns: | [] | NULL |
| Sort Columns: | [] | NULL |
| Storage Desc Params: | NULL | NULL |
| | serialization.format | 1 |
+-------------------------------+----------------------------------------------------+-----------------------+
// create function
catalog.createFunction(new ObjectPath("mydb", "myfunc"), new CatalogFunctionImpl(...), false);
// drop function
catalog.dropFunction(new ObjectPath("mydb", "myfunc"), false);
// alter function
catalog.alterFunction(new ObjectPath("mydb", "myfunc"), new CatalogFunctionImpl(...), false);
// get function
catalog.getFunction("myfunc");
// check if a function exist or not
catalog.functionExists("myfunc");
// list functions in a database
catalog.listFunctions("mydb");
通过api来操作函数,比如创建、修改删除以及查询等。
此处使用的依赖与上示例一致,mainclass变成本示例的类,不再赘述。
具体打包的时候运行主类则需要视自己的运行情况决定是否修改。
import java.util.HashMap;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.CatalogFunctionImpl;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.module.hive.HiveModule;
import org.apache.hadoop.hive.ql.udf.UDFRand;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFAbs;
/**
* @author alanchan
*
*/
public class TestFunctionByAPI {
static String databaseName = "viewtest_db";
static String tableName1 = "t1";
public static void main(String[] args) throws Exception {
// 0、环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
// 1、创建数据库
// catalog.createDatabase(db1, createDb(), false);
HiveCatalog hiveCatalog = init(tenv);
// 2、检查function是否存在
ObjectPath path1 = new ObjectPath(databaseName, tableName1);
System.out.println("function是否存在 :" + hiveCatalog.functionExists(path1));
// 3、创建function
hiveCatalog.createFunction(path1, new CatalogFunctionImpl(GenericUDFAbs.class.getName()), false);
System.out.println("function是否存在 :" + hiveCatalog.functionExists(path1));
// 4、修改function
hiveCatalog.alterFunction(path1, new CatalogFunctionImpl(UDFRand.class.getName()), false);
System.out.println("修改后的function是否存在 :" + hiveCatalog.functionExists(path1));
System.out.println("查询function :" + hiveCatalog.getFunction(path1));
System.out.println("function 列表 :" + hiveCatalog.listFunctions(databaseName));
// 5、删除function
hiveCatalog.dropFunction(path1, false);
System.out.println("function是否存在 :" + hiveCatalog.functionExists(path1));
// 6、删除测试数据库
// tenv.executeSql("drop database " + databaseName + " cascade");
}
private static HiveCatalog init(StreamTableEnvironment tenv) throws Exception {
String moduleName = "myhive";
String hiveVersion = "3.1.2";
tenv.loadModule(moduleName, new HiveModule(hiveVersion));
String name = "alan_hive";
String defaultDatabase = "default";
String hiveConfDir = "/usr/local/bigdata/apache-hive-3.1.2-bin/conf";
HiveCatalog hiveCatalog = new HiveCatalog(name, defaultDatabase, hiveConfDir);
tenv.registerCatalog(name, hiveCatalog);
tenv.useCatalog(name);
tenv.listDatabases();
hiveCatalog.createDatabase(databaseName, new CatalogDatabaseImpl(new HashMap(), hiveConfDir) {
}, true);
tenv.useDatabase(databaseName);
return hiveCatalog;
}
}
[alanchan@server2 bin]$ flink run /usr/local/bigdata/flink-1.13.5/examples/table/table_sql-0.0.5-SNAPSHOT.jar
function是否存在 :false
function是否存在 :true
修改后的function是否存在 :true
查询function :CatalogFunctionImpl{className='org.apache.hadoop.hive.ql.udf.UDFRand', functionLanguage='JAVA', isGeneric='false'}
function 列表 :[t1]
function是否存在 :false
// create table
catalog.createTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false);
// drop table
catalog.dropTable(new ObjectPath("mydb", "mytable"), false);
// alter table
catalog.alterTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false);
// rename table
catalog.renameTable(new ObjectPath("mydb", "mytable"), "my_new_table");
// get table
catalog.getTable("mytable");
// check if a table exist or not
catalog.tableExists("mytable");
// list tables in a database
catalog.listTables("mydb");
此处使用的依赖与上示例一致,mainclass变成本示例的类,不再赘述。
具体打包的时候运行主类则需要视自己的运行情况决定是否修改。
import java.util.HashMap;
import java.util.List;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.module.hive.HiveModule;
import org.apache.flink.types.Row;
import org.apache.flink.util.CollectionUtil;
/**
* @author alanchan
*
*/
public class TestCreateHiveTableBySQLDemo {
static String databaseName = "viewtest_db";
public static final String tableName = "alan_hivecatalog_hivedb_testTable";
public static final String hive_create_table_sql = "CREATE TABLE " + tableName + " (\n" +
" id INT,\n" +
" name STRING,\n" +
" age INT" + ") " +
"TBLPROPERTIES (\n" +
" 'sink.partition-commit.delay'='5 s',\n" +
" 'sink.partition-commit.trigger'='partition-time',\n" +
" 'sink.partition-commit.policy.kind'='metastore,success-file'" + ")";
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
// 0、运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
// 1、创建数据库
HiveCatalog hiveCatalog = init(tenv);
// 2、创建表
tenv.getConfig().setSqlDialect(SqlDialect.HIVE);
tenv.executeSql(hive_create_table_sql);
// 3、插入数据
String insertSQL = "insert into " + tableName + " values (1,'alan',18)";
tenv.executeSql(insertSQL);
// 4、查询数据
List<Row> results = CollectionUtil.iteratorToList(tenv.executeSql("select * from " + tableName).collect());
for (Row row : results) {
System.out.println(tableName + ": " + row.toString());
}
// 5、删除数据库
tenv.executeSql("drop database " + databaseName + " cascade");
}
private static HiveCatalog init(StreamTableEnvironment tenv) throws Exception {
String moduleName = "myhive";
String hiveVersion = "3.1.2";
tenv.loadModule(moduleName, new HiveModule(hiveVersion));
String name = "alan_hive";
String defaultDatabase = "default";
String hiveConfDir = "/usr/local/bigdata/apache-hive-3.1.2-bin/conf";
HiveCatalog hiveCatalog = new HiveCatalog(name, defaultDatabase, hiveConfDir);
tenv.registerCatalog(name, hiveCatalog);
tenv.useCatalog(name);
tenv.listDatabases();
hiveCatalog.createDatabase(databaseName, new CatalogDatabaseImpl(new HashMap(), hiveConfDir) {
}, true);
tenv.useDatabase(databaseName);
return hiveCatalog;
}
}
[alanchan@server2 bin]$ flink run /usr/local/bigdata/flink-1.13.5/examples/table/table_sql-0.0.6-SNAPSHOT.jar
Hive Session ID = eb6579cd-befc-419b-8f95-8fd1e8e287e0
Hive Session ID = be12e47f-d611-4cc4-9be5-8e7628b7c90a
Job has been submitted with JobID 442b113232b8390394587b66b47aebbc
Hive Session ID = b8d772a8-a89d-4630-bbf1-fe5a3e301344
2023-10-17 07:23:31,244 INFO org.apache.hadoop.mapred.FileInputFormat [] - Total input files to process : 0
Job has been submitted with JobID f24c2cc25fa3aba729fc8b27c3edf243
alan_hivecatalog_hivedb_testTable: +I[1, alan, 18]
Hive Session ID = 69fafc9c-f8c0-4f55-b689-5db196a94689
此处使用的依赖与上示例一致,mainclass变成本示例的类,不再赘述。
具体打包的时候运行主类则需要视自己的运行情况决定是否修改。
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.module.hive.HiveModule;
import org.apache.flink.types.Row;
import org.apache.flink.util.CollectionUtil;
/**
* @author alanchan
*
*/
public class TestCreateHiveTableByAPIDemo {
static String TEST_COMMENT = "test table comment";
static String databaseName = "hive_db_test";
static String tableName1 = "t1";
static String tableName2 = "t2";
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
// 0、运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
// 1、创建数据库
HiveCatalog hiveCatalog = init(tenv);
// 2、创建表
ObjectPath path1 = new ObjectPath(databaseName, tableName1);
ResolvedSchema resolvedSchema = new ResolvedSchema(
Arrays.asList(
Column.physical("id", DataTypes.INT()),
Column.physical("name", DataTypes.STRING()),
Column.physical("age", DataTypes.INT())),
Collections.emptyList(), null);
CatalogTable origin = CatalogTable.of(
Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(),
TEST_COMMENT,
Collections.emptyList(),
new HashMap<String, String>() {
{
put("is_streaming", "false");
putAll(new HashMap<String, String>() {
{
put(FactoryUtil.CONNECTOR.key(), "hive");
}
});
}
});
CatalogTable catalogTable = new ResolvedCatalogTable(origin, resolvedSchema);
// 普通表
hiveCatalog.createTable(path1, catalogTable, false);
CatalogBaseTable tableCreated = hiveCatalog.getTable(path1);
List<String> tables = hiveCatalog.listTables(databaseName);
for (String table : tables) {
System.out.println(" tableNameList : " + table);
}
// 3、插入数据
String insertSQL = "insert into " + tableName1 + " values (1,'alan',18)";
tenv.executeSql(insertSQL);
// 4、查询数据
List<Row> results = CollectionUtil.iteratorToList(tenv.executeSql("select * from " + tableName1).collect());
for (Row row : results) {
System.out.println(tableName1 + ": " + row.toString());
}
hiveCatalog.dropTable(path1, false);
boolean tableExists = hiveCatalog.tableExists(path1);
System.out.println("表是否drop成功:" + tableExists);
// 5、删除数据库
tenv.executeSql("drop database " + databaseName + " cascade");
}
private static HiveCatalog init(StreamTableEnvironment tenv) throws Exception {
String moduleName = "myhive";
String hiveVersion = "3.1.2";
tenv.loadModule(moduleName, new HiveModule(hiveVersion));
String name = "alan_hive";
String defaultDatabase = "default";
String hiveConfDir = "/usr/local/bigdata/apache-hive-3.1.2-bin/conf";
HiveCatalog hiveCatalog = new HiveCatalog(name, defaultDatabase, hiveConfDir);
tenv.registerCatalog(name, hiveCatalog);
tenv.useCatalog(name);
tenv.listDatabases();
hiveCatalog.createDatabase(databaseName, new CatalogDatabaseImpl(new HashMap(), hiveConfDir) {
}, true);
// tenv.executeSql("create database "+databaseName);
tenv.useDatabase(databaseName);
return hiveCatalog;
}
}
[alanchan@server2 bin]$ flink run /usr/local/bigdata/flink-1.13.5/examples/table/table_sql-0.0.7-SNAPSHOT.jar
tableNameList : t1
Job has been submitted with JobID b70b8c76fd3f05b9f949a47583596288
2023-10-17 09:01:19,320 INFO org.apache.hadoop.mapred.FileInputFormat [] - Total input files to process : 0
Job has been submitted with JobID 34650c04d0a6fb32f7336f7ccc8b9090
t1: +I[1, alan, 18]
表是否drop成功:false
0: jdbc:hive2://server4:10000> desc formatted t1;
+-------------------------------+----------------------------------------------------+-----------------------+
| col_name | data_type | comment |
+-------------------------------+----------------------------------------------------+-----------------------+
| # col_name | data_type | comment |
| id | int | |
| name | string | |
| age | int | |
| | NULL | NULL |
| # Detailed Table Information | NULL | NULL |
| Database: | hive_db_test | NULL |
| OwnerType: | USER | NULL |
| Owner: | null | NULL |
| CreateTime: | Tue Oct 17 16:55:02 CST 2023 | NULL |
| LastAccessTime: | UNKNOWN | NULL |
| Retention: | 0 | NULL |
| Location: | hdfs://HadoopHAcluster/user/hive/warehouse/hive_db_test.db/t1 | NULL |
| Table Type: | MANAGED_TABLE | NULL |
| Table Parameters: | NULL | NULL |
| | bucketing_version | 2 |
| | comment | test table comment |
| | streaming | false |
| | transient_lastDdlTime | 1697532902 |
| | NULL | NULL |
| # Storage Information | NULL | NULL |
| SerDe Library: | org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe | NULL |
| InputFormat: | org.apache.hadoop.mapred.TextInputFormat | NULL |
| OutputFormat: | org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat | NULL |
| Compressed: | No | NULL |
| Num Buckets: | -1 | NULL |
| Bucket Columns: | [] | NULL |
| Sort Columns: | [] | NULL |
| Storage Desc Params: | NULL | NULL |
| | serialization.format | 1 |
+-------------------------------+----------------------------------------------------+-----------------------+
此处使用的依赖与上示例一致,mainclass变成本示例的类,不再赘述。
具体打包的时候运行主类则需要视自己的运行情况决定是否修改。
该示例与上述使用API创建hive表功能一样,仅仅表示了方法化和流式表的创建方式,运行结果也一样,不再赘述。
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.module.hive.HiveModule;
import org.apache.flink.types.Row;
import org.apache.flink.util.CollectionUtil;
/**
* @author alanchan
*
*/
public class TestCreateHiveTableByAPIDemo {
static String TEST_COMMENT = "test table comment";
static String databaseName = "hive_db_test";
static String tableName1 = "t1";
static String tableName2 = "t2";
static ObjectPath path1 = new ObjectPath(databaseName, tableName1);
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
// 0、运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
// 1、创建数据库
HiveCatalog hiveCatalog = init(tenv);
// 2、创建表
// 2.1、创建批处理表
// testCreateTable_Batch(hiveCatalog);
// 2.2、创建流式表
testCreateTable_Streaming(hiveCatalog);
// 3、插入数据
String insertSQL = "insert into " + tableName1 + " values (1,'alan',18)";
tenv.executeSql(insertSQL);
// 4、查询数据
List<Row> results = CollectionUtil.iteratorToList(tenv.executeSql("select * from " + tableName1).collect());
for (Row row : results) {
System.out.println(tableName1 + ": " + row.toString());
}
hiveCatalog.dropTable(path1, false);
boolean tableExists = hiveCatalog.tableExists(path1);
System.out.println("表是否drop成功:" + tableExists);
// 5、删除数据库
tenv.executeSql("drop database " + databaseName + " cascade");
}
/**
* 初始化hivecatalog
*
* @param tenv
* @return
* @throws Exception
*/
private static HiveCatalog init(StreamTableEnvironment tenv) throws Exception {
String moduleName = "myhive";
String hiveVersion = "3.1.2";
tenv.loadModule(moduleName, new HiveModule(hiveVersion));
String name = "alan_hive";
String defaultDatabase = "default";
String hiveConfDir = "/usr/local/bigdata/apache-hive-3.1.2-bin/conf";
HiveCatalog hiveCatalog = new HiveCatalog(name, defaultDatabase, hiveConfDir);
tenv.registerCatalog(name, hiveCatalog);
tenv.useCatalog(name);
tenv.listDatabases();
// tenv.executeSql("create database "+databaseName);
hiveCatalog.createDatabase(databaseName, new CatalogDatabaseImpl(new HashMap(), hiveConfDir) {
}, true);
tenv.useDatabase(databaseName);
return hiveCatalog;
}
/**
* 创建流式表
*
* @param catalog
* @throws Exception
*/
static void testCreateTable_Streaming(HiveCatalog catalog) throws Exception {
CatalogTable table = createStreamingTable();
catalog.createTable(path1, table, false);
// CatalogTestUtil.checkEquals(table, (CatalogTable) catalog.getTable(path1));
}
/**
* 创建批处理表
*
* @param catalog
* @throws Exception
*/
static void testCreateTable_Batch(HiveCatalog catalog) throws Exception {
// Non-partitioned table
CatalogTable table = createBatchTable();
catalog.createTable(path1, table, false);
// CatalogBaseTable tableCreated = catalog.getTable(path1);
// CatalogTestUtil.checkEquals(table, (CatalogTable) tableCreated);
// assertThat(tableCreated.getDescription().isPresent()).isTrue();
// assertThat(tableCreated.getDescription().get()).isEqualTo(TEST_COMMENT);
// List tables = catalog.listTables(databaseName);
// assertThat(tables).hasSize(1);
// assertThat(tables.get(0)).isEqualTo(path1.getObjectName());
// catalog.dropTable(path1, false);
}
/**
* 创建流式表
*
* @return
*/
static CatalogTable createStreamingTable() {
final ResolvedSchema resolvedSchema = createSchema();
final CatalogTable origin = CatalogTable.of(
Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(),
TEST_COMMENT,
Collections.emptyList(),
getStreamingTableProperties());
return new ResolvedCatalogTable(origin, resolvedSchema);
}
/**
* 创建批处理表
*
* @return
*/
static CatalogTable createBatchTable() {
final ResolvedSchema resolvedSchema = createSchema();
final CatalogTable origin = CatalogTable.of(
Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(),
TEST_COMMENT,
Collections.emptyList(),
getBatchTableProperties());
return new ResolvedCatalogTable(origin, resolvedSchema);
}
/**
* 设置批处理表的属性
*
* @return
*/
static Map<String, String> getBatchTableProperties() {
return new HashMap<String, String>() {
{
put("is_streaming", "false");
putAll(new HashMap<String, String>() {
{
put(FactoryUtil.CONNECTOR.key(), "hive");
}
});
}
};
}
/**
* 创建流式表的属性
*
* @return
*/
static Map<String, String> getStreamingTableProperties() {
return new HashMap<String, String>() {
{
put("is_streaming", "true");
putAll(new HashMap<String, String>() {
{
put(FactoryUtil.CONNECTOR.key(), "hive");
}
});
}
};
}
static ResolvedSchema createSchema() {
return new ResolvedSchema(
Arrays.asList(
Column.physical("id", DataTypes.INT()),
Column.physical("name", DataTypes.STRING()),
Column.physical("age", DataTypes.INT())),
Collections.emptyList(), null);
}
}
运行结果参考上述示例,运行结果一致。
此处使用的依赖与上示例一致,mainclass变成本示例的类,不再赘述。
具体打包的时候运行主类则需要视自己的运行情况决定是否修改。
本示例没有加载数据,仅示例创建的分区表,并且是2重分区表。关于hive分区表的操作,请参考链接:
3、hive的使用示例详解-建表、数据类型详解、内部外部表、分区表、分桶表
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.CatalogPartition;
import org.apache.flink.table.catalog.CatalogPartitionImpl;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.module.hive.HiveModule;
import org.apache.flink.types.Row;
import org.apache.flink.util.CollectionUtil;
/**
* @author alanchan
*
*/
public class TestCreateHiveTableByAPIDemo {
static String TEST_COMMENT = "test table comment";
static String databaseName = "hive_db_test";
static String tableName1 = "t1";
static String tableName2 = "t2";
static ObjectPath path1 = new ObjectPath(databaseName, tableName1);
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
// 0、运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
// 1、创建数据库
HiveCatalog hiveCatalog = init(tenv);
// 2、创建表
// 2.1、创建批处理表
// testCreateTable_Batch(hiveCatalog);
// 2.2、创建流式表
// testCreateTable_Streaming(hiveCatalog);
// 2.3、创建分区批处理表
testCreatePartitionTable_Batch(hiveCatalog);
// 2.4、创建带有hive属性的批处理表
// 3、插入数据
// 分区表不能如此操作,具体参考相关内容
// String insertSQL = "insert into " + tableName1 + " values (1,'alan',18)";
// tenv.executeSql(insertSQL);
// 4、查询数据
List<Row> results = CollectionUtil.iteratorToList(tenv.executeSql("select * from " + tableName1).collect());
for (Row row : results) {
System.out.println(tableName1 + ": " + row.toString());
}
hiveCatalog.dropTable(path1, false);
boolean tableExists = hiveCatalog.tableExists(path1);
System.out.println("表是否drop成功:" + tableExists);
// 5、删除数据库
tenv.executeSql("drop database " + databaseName + " cascade");
}
/**
* 初始化hivecatalog
*
* @param tenv
* @return
* @throws Exception
*/
private static HiveCatalog init(StreamTableEnvironment tenv) throws Exception {
String moduleName = "myhive";
String hiveVersion = "3.1.2";
tenv.loadModule(moduleName, new HiveModule(hiveVersion));
String name = "alan_hive";
String defaultDatabase = "default";
String hiveConfDir = "/usr/local/bigdata/apache-hive-3.1.2-bin/conf";
HiveCatalog hiveCatalog = new HiveCatalog(name, defaultDatabase, hiveConfDir);
tenv.registerCatalog(name, hiveCatalog);
tenv.useCatalog(name);
tenv.listDatabases();
// tenv.executeSql("create database "+databaseName);
hiveCatalog.createDatabase(databaseName, new CatalogDatabaseImpl(new HashMap(), hiveConfDir) {
}, true);
tenv.useDatabase(databaseName);
return hiveCatalog;
}
/**
* 创建流式表
*
* @param catalog
* @throws Exception
*/
static void testCreateTable_Streaming(HiveCatalog catalog) throws Exception {
CatalogTable table = createStreamingTable();
catalog.createTable(path1, table, false);
// CatalogTestUtil.checkEquals(table, (CatalogTable) catalog.getTable(path1));
}
/**
* 创建批处理表
*
* @param catalog
* @throws Exception
*/
static void testCreateTable_Batch(HiveCatalog catalog) throws Exception {
// Non-partitioned table
CatalogTable table = createBatchTable();
catalog.createTable(path1, table, false);
// CatalogBaseTable tableCreated = catalog.getTable(path1);
// CatalogTestUtil.checkEquals(table, (CatalogTable) tableCreated);
// assertThat(tableCreated.getDescription().isPresent()).isTrue();
// assertThat(tableCreated.getDescription().get()).isEqualTo(TEST_COMMENT);
// List tables = catalog.listTables(databaseName);
// assertThat(tables).hasSize(1);
// assertThat(tables.get(0)).isEqualTo(path1.getObjectName());
// catalog.dropTable(path1, false);
}
/**
*
* @param catalog
* @throws DatabaseNotExistException
* @throws TableAlreadyExistException
* @throws CatalogException
*/
static void testCreatePartitionTable_Batch(HiveCatalog catalog) throws Exception {
CatalogTable table = createPartitionedTable();
catalog.createTable(path1, table, false);
// 创建分区
catalog.createPartition(path1, createPartitionSpec(), createPartition(), false);
}
/**
* 创建分区表
*
* @return
*/
static CatalogTable createPartitionedTable() {
final ResolvedSchema resolvedSchema = createSchema();
final CatalogTable origin = CatalogTable.of(Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(), TEST_COMMENT, createPartitionKeys(), getBatchTableProperties());
return new ResolvedCatalogTable(origin, resolvedSchema);
}
/**
* 创建分区键
*
* @return
*/
static List<String> createPartitionKeys() {
return Arrays.asList("name", "age");
}
/**
* 创建CatalogPartitionSpec。 Represents a partition spec object in catalog.
* Partition columns and values are NOT of strict order, and they need to be
* re-arranged to the correct order by comparing with a list of strictly ordered
* partition keys.
*
* @return
*/
static CatalogPartitionSpec createPartitionSpec() {
return new CatalogPartitionSpec(new HashMap<String, String>() {
{
put("name", "alan");
put("age", "20");
}
});
}
static CatalogPartition createPartition() {
return new CatalogPartitionImpl(getBatchTableProperties(), TEST_COMMENT);
}
/**
* 创建流式表
*
* @return
*/
static CatalogTable createStreamingTable() {
final ResolvedSchema resolvedSchema = createSchema();
final CatalogTable origin = CatalogTable.of(Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(), TEST_COMMENT, Collections.emptyList(),
getStreamingTableProperties());
return new ResolvedCatalogTable(origin, resolvedSchema);
}
/**
* 创建批处理表
*
* @return
*/
static CatalogTable createBatchTable() {
final ResolvedSchema resolvedSchema = createSchema();
final CatalogTable origin = CatalogTable.of(Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(), TEST_COMMENT, Collections.emptyList(),
getBatchTableProperties());
return new ResolvedCatalogTable(origin, resolvedSchema);
}
/**
* 设置批处理表的属性
*
* @return
*/
static Map<String, String> getBatchTableProperties() {
return new HashMap<String, String>() {
{
put("is_streaming", "false");
putAll(new HashMap<String, String>() {
{
put(FactoryUtil.CONNECTOR.key(), "hive");
}
});
}
};
}
/**
* 创建流式表的属性
*
* @return
*/
static Map<String, String> getStreamingTableProperties() {
return new HashMap<String, String>() {
{
put("is_streaming", "true");
putAll(new HashMap<String, String>() {
{
put(FactoryUtil.CONNECTOR.key(), "hive");
}
});
}
};
}
static ResolvedSchema createSchema() {
return new ResolvedSchema(Arrays.asList(Column.physical("id", DataTypes.INT()), Column.physical("name", DataTypes.STRING()), Column.physical("age", DataTypes.INT())),
Collections.emptyList(), null);
}
}
hdfs上创建的t1表结构如下:



本示例是通过SQL创建的分区ORC存储的表,然后通过源数据插入至目标分区表中。
关于hive的分区表使用,请参考:3、hive的使用示例详解-建表、数据类型详解、内部外部表、分区表、分桶表
此处使用的依赖与上示例一致,mainclass变成本示例的类,不再赘述。
具体打包的时候运行主类则需要视自己的运行情况决定是否修改。
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.module.hive.HiveModule;
import org.apache.flink.types.Row;
import org.apache.flink.util.CollectionUtil;
/**
* @author alanchan
*
*/
public class TestCreateHiveTableBySQLDemo2 {
static String databaseName = "viewtest_db";
public static final String sourceTableName = "sourceTable";
public static final String targetPartitionTableName = "targetPartitionTable";
public static final String hive_create_source_table_sql =
"create table "+sourceTableName +"(id int ,name string, age int,province string) \r\n" +
"row format delimited fields terminated by ','\r\n" +
"STORED AS ORC ";
public static final String hive_create_target_partition_table_sql =
"create table "+targetPartitionTableName+" (id int ,name string, age int) \r\n" +
"partitioned by (province string)\r\n" +
"row format delimited fields terminated by ','\r\n" +
"STORED AS ORC "+
"TBLPROPERTIES (\n" +
" 'sink.partition-commit.delay'='5 s',\n" +
" 'sink.partition-commit.trigger'='partition-time',\n" +
" 'sink.partition-commit.policy.kind'='metastore,success-file'" + ")";
public static void main(String[] args) throws Exception {
// 0、运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
// 1、创建数据库
HiveCatalog hiveCatalog = init(tenv);
// 2、创建表
tenv.getConfig().setSqlDialect(SqlDialect.HIVE);
tenv.executeSql(hive_create_source_table_sql);
tenv.executeSql(hive_create_target_partition_table_sql);
// 3、插入sourceTableName数据
List<String> insertSQL = Arrays.asList(
"insert into "+sourceTableName+" values(1,'alan',18,'SH')",
"insert into "+sourceTableName+" values(2,'alanchan',18,'SH')",
"insert into "+sourceTableName+" values(3,'alanchanchn',18,'SH')",
"insert into "+sourceTableName+" values(4,'alan_chan',18,'BJ')",
"insert into "+sourceTableName+" values(5,'alan_chan_chn',18,'BJ')",
"insert into "+sourceTableName+" values(6,'alan',18,'TJ')",
"insert into "+sourceTableName+" values(7,'alan',18,'NJ')",
"insert into "+sourceTableName+" values(8,'alan',18,'HZ')"
);
for(String sql :insertSQL) {
tenv.executeSql(sql);
}
// 4、查询sourceTableName数据
List<Row> results = CollectionUtil.iteratorToList(tenv.executeSql("select * from " + sourceTableName).collect());
for (Row row : results) {
System.out.println(sourceTableName + ": " + row.toString());
}
// 5、执行动态插入数据命令
System.out.println("dynamic.partition:["+ hiveCatalog.getHiveConf().get("hive.exec.dynamic.partition")+"]");
System.out.println("dynamic.partition.mode:["+hiveCatalog.getHiveConf().get("hive.exec.dynamic.partition.mode")+"]");
hiveCatalog.getHiveConf().setBoolean("hive.exec.dynamic.partition", true);
hiveCatalog.getHiveConf().set("hive.exec.dynamic.partition.mode", "nonstrict");
System.out.println("dynamic.partition:["+ hiveCatalog.getHiveConf().get("hive.exec.dynamic.partition")+"]");
System.out.println("dynamic.partition.mode:["+hiveCatalog.getHiveConf().get("hive.exec.dynamic.partition.mode")+"]");
//6、插入分区表数据
String insertpartitionsql =
"insert into table "+targetPartitionTableName+" partition(province)\r\n" +
"select id,name,age,province from "+ sourceTableName;
tenv.executeSql(insertpartitionsql);
//7、查询分区表数据
List<Row> partitionResults = CollectionUtil.iteratorToList(tenv.executeSql(
"select * from " + targetPartitionTableName).collect());
for (Row row : partitionResults) {
System.out.println(targetPartitionTableName + " : " + row.toString());
}
List<Row> partitionResults_SH = CollectionUtil.iteratorToList(tenv.executeSql(
"select * from " + targetPartitionTableName+" where province = 'SH'").collect());
for (Row row : partitionResults_SH) {
System.out.println(targetPartitionTableName + " SH: " + row.toString());
}
// 8、删除数据库
// tenv.executeSql("drop database " + databaseName + " cascade");
}
private static HiveCatalog init(StreamTableEnvironment tenv) throws Exception {
String moduleName = "myhive";
String hiveVersion = "3.1.2";
tenv.loadModule(moduleName, new HiveModule(hiveVersion));
String name = "alan_hive";
String defaultDatabase = "default";
String hiveConfDir = "/usr/local/bigdata/apache-hive-3.1.2-bin/conf";
HiveCatalog hiveCatalog = new HiveCatalog(name, defaultDatabase, hiveConfDir);
tenv.registerCatalog(name, hiveCatalog);
tenv.useCatalog(name);
tenv.listDatabases();
hiveCatalog.createDatabase(databaseName, new CatalogDatabaseImpl(new HashMap(), hiveConfDir) {
}, true);
tenv.useDatabase(databaseName);
return hiveCatalog;
}
}
[alanchan@server2 bin]$ flink run /usr/local/bigdata/flink-1.13.5/examples/table/table_sql-0.0.10-SNAPSHOT.jar
Hive Session ID = ba971dc3-7fa5-4f2c-a872-9200a0396337
Hive Session ID = a3c01c23-9828-4473-96ad-c9dc40b417c0
Hive Session ID = 547668a9-d603-4c1d-ae29-29c4cccd54f0
Job has been submitted with JobID 881de04ddea94f2c7a9f5fb051e1d4af
Hive Session ID = 676c6dfe-11ae-411e-9be7-ddef386fb2ac
Job has been submitted with JobID 0d76f2446d8cdcfd296d82965f9f759b
Hive Session ID = b18c5e00-7da9-4a43-bf50-d6bcb57d45a3
Job has been submitted with JobID 644f094a3c9fadeb0d81b9bcf339a1e7
Hive Session ID = 76f06744-ec5b-444c-a2d3-e22dfb17d83c
Job has been submitted with JobID 1e8d36f0b0961f81a63de4e9f2ce21af
Hive Session ID = 97f14128-1032-437e-b59f-f89a1e331e34
Job has been submitted with JobID 3bbd81cf693279fd8ebe8a889bdb08e3
Hive Session ID = 1456c502-8c30-44c5-94d1-6b2e4bf71bc3
Job has been submitted with JobID 377101faffcc12d3d4638826e004ddc5
Hive Session ID = ef4f659d-735b-44ca-90c0-4e19ba000e37
Job has been submitted with JobID 33d50d9501a83f28068e52f77d0b0f6d
Hive Session ID = fccefaea-5340-422d-b9ed-dd904857346e
Job has been submitted with JobID 4a53753c008f16573ab7c84e8964bc48
Hive Session ID = 5c066f43-57e8-4aba-9c7b-b75caf4f9fe7
2023-10-19 05:49:12,774 INFO org.apache.hadoop.conf.Configuration.deprecation [] - mapred.input.dir is deprecated. Instead, use mapreduce.input.fileinputformat.inputdir
Job has been submitted with JobID b44dd095b7460470c23f8e28243fc895
sourceTable: +I[1, alan, 18, SH]
sourceTable: +I[6, alan, 18, TJ]
sourceTable: +I[4, alan_chan, 18, BJ]
sourceTable: +I[2, alanchan, 18, SH]
sourceTable: +I[3, alanchanchn, 18, SH]
sourceTable: +I[5, alan_chan_chn, 18, BJ]
sourceTable: +I[7, alan, 18, NJ]
sourceTable: +I[8, alan, 18, HZ]
dynamic.partition:[true]
dynamic.partition.mode:[nonstrict]
dynamic.partition:[true]
dynamic.partition.mode:[nonstrict]
Hive Session ID = e63fd003-5d5f-458c-a9bf-e7cbfe51fbf8
Job has been submitted with JobID 59e2558aaf8daced29b7943e12a41164
Hive Session ID = 3111db81-a822-4731-a342-ab32cdc48d86
Job has been submitted with JobID 949435047e324bce96a5aa9e5b6f448d
targetPartitionTable : +I[2, alanchan, 18, SH]
targetPartitionTable : +I[7, alan, 18, NJ]
targetPartitionTable : +I[1, alan, 18, SH]
targetPartitionTable : +I[3, alanchanchn, 18, SH]
targetPartitionTable : +I[5, alan_chan_chn, 18, BJ]
targetPartitionTable : +I[4, alan_chan, 18, BJ]
targetPartitionTable : +I[8, alan, 18, HZ]
targetPartitionTable : +I[6, alan, 18, TJ]
Hive Session ID = 0bfbd60b-da1d-4a44-be23-0bde71e1ad59
Job has been submitted with JobID 49b728c8dc7fdc8037ab72bd6f3c5339
targetPartitionTable SH: +I[1, alan, 18, SH]
targetPartitionTable SH: +I[3, alanchanchn, 18, SH]
targetPartitionTable SH: +I[2, alanchan, 18, SH]
Hive Session ID = 68716de6-fceb-486e-91a8-8e4cf734ecfa


以上,介绍了java api/sql操作分区、函数和表,特别是针对表操作使用了6个示例进行说明。