• 24、Flink 的table api与sql之Catalogs(java api操作分区与函数、表)-4


    Flink 系列文章

    一、Flink 专栏

    Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。

    • 1、Flink 部署系列
      本部分介绍Flink的部署、配置相关基础内容。

    • 2、Flink基础系列
      本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。

    • 3、Flik Table API和SQL基础系列
      本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。

    • 4、Flik Table API和SQL提高与应用系列
      本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。

    • 5、Flink 监控系列
      本部分和实际的运维、监控工作相关。

    二、Flink 示例专栏

    Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。

    两专栏的所有文章入口点击:Flink 系列文章汇总索引



    本文简单介绍了通过java api或者SQL操作分区、函数以及表,特别是创建hive的表,通过6个示例进行说明 。
    本文依赖flink和hive、hadoop集群能正常使用。
    本文示例java api的实现是通过Flink 1.13.5版本做的示例,hive的版本是3.1.2,hadoop的版本是3.1.4。

    五、Catalog API

    4、分区操作

    1)、官方示例

    // create view
    catalog.createPartition(
        new ObjectPath("mydb", "mytable"),
        new CatalogPartitionSpec(...),
        new CatalogPartitionImpl(...),
        false);
    
    // drop partition
    catalog.dropPartition(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...), false);
    
    // alter partition
    catalog.alterPartition(
        new ObjectPath("mydb", "mytable"),
        new CatalogPartitionSpec(...),
        new CatalogPartitionImpl(...),
        false);
    
    // get partition
    catalog.getPartition(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));
    
    // check if a partition exist or not
    catalog.partitionExists(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));
    
    // list partitions of a table
    catalog.listPartitions(new ObjectPath("mydb", "mytable"));
    
    // list partitions of a table under a give partition spec
    catalog.listPartitions(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));
    
    // list partitions of a table by expression filter
    catalog.listPartitions(new ObjectPath("mydb", "mytable"), Arrays.asList(epr1, ...));
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    2)、API创建hive分区示例

    本示例旨在演示如何使用flink api创建hive的分区表,至于hive的分区表如何使用,请参考hive的相关专题。同时,修改分区、删除分区都比较简单不再赘述。
    16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Hive示例(6)
    3、hive的使用示例详解-建表、数据类型详解、内部外部表、分区表、分桶表

    1、maven依赖

    此处使用的依赖与上示例一致,mainclass变成本示例的类,不再赘述。
    具体打包的时候运行主类则需要视自己的运行情况决定是否修改。

    2、代码
    import java.util.Arrays;
    import java.util.Collections;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.DataTypes;
    import org.apache.flink.table.api.Schema;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    import org.apache.flink.table.catalog.CatalogDatabase;
    import org.apache.flink.table.catalog.CatalogDatabaseImpl;
    import org.apache.flink.table.catalog.CatalogPartition;
    import org.apache.flink.table.catalog.CatalogPartitionImpl;
    import org.apache.flink.table.catalog.CatalogPartitionSpec;
    import org.apache.flink.table.catalog.CatalogTable;
    import org.apache.flink.table.catalog.Column;
    import org.apache.flink.table.catalog.ObjectPath;
    import org.apache.flink.table.catalog.ResolvedCatalogTable;
    import org.apache.flink.table.catalog.ResolvedSchema;
    import org.apache.flink.table.catalog.hive.HiveCatalog;
    import org.apache.flink.table.factories.FactoryUtil;
    import org.apache.flink.table.module.hive.HiveModule;
    
    /**
     * @author alanchan
     *
     */
    public class TestHivePartitionByAPI {
    	static final String TEST_COMMENT = "test table comment";
    	static String databaseName = "viewtest_db";
    	static String tableName1 = "t1";
    	static String tableName2 = "t2";
    	static boolean isGeneric = false;
    
    	public static void main(String[] args) throws Exception {
    
    		// 0、运行环境
    		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
    
    		// 1、创建数据库
    //		catalog.createDatabase(db1, createDb(), false);
    		HiveCatalog hiveCatalog = init(tenv);
    
    		// 2、创建分区表
    //		catalog.createTable(path1, createPartitionedTable(), false);
    		// 2.1 创建分区表 t1
    		ObjectPath path1 = new ObjectPath(databaseName, tableName1);
    		hiveCatalog.createTable(path1, createPartitionedTable(), false);
    
    		// 2.21 创建分区表 t2,只有表名称不一致,体现不使用方法化的创建方式
    		ObjectPath path2 = new ObjectPath(databaseName, tableName2);
    		ResolvedSchema resolvedSchema = new ResolvedSchema(
    				Arrays.asList(Column.physical("id", DataTypes.INT()), Column.physical("name", DataTypes.STRING()), Column.physical("age", DataTypes.INT())),
    				Collections.emptyList(), null);
    
    //		   Schema schema,
    //        @Nullable String comment,
    //        List partitionKeys,
    //        Map options
    		CatalogTable origin = CatalogTable.of(Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(), TEST_COMMENT, Arrays.asList("name", "age"),
    				new HashMap<String, String>() {
    					{
    						put("streaming", "false");
    						putAll(getGenericFlag(isGeneric));
    					}
    				});
    		CatalogTable catalogTable = new ResolvedCatalogTable(origin, resolvedSchema);
    		hiveCatalog.createTable(path2, catalogTable, false);
    
    		// 3、断言
    //		assertThat(catalog.listPartitions(path1)).isEmpty();
    		// 3、创建分区
    		// 3.1 创建分区方式1
    //		catalog.createPartition(path1, createPartitionSpec(), createPartition(), false);
    //		   ObjectPath tablePath,
    //        CatalogPartitionSpec partitionSpec,
    //        CatalogPartition partition,
    //        boolean ignoreIfExists
    		hiveCatalog.createPartition(path1, createPartitionSpec(), createPartition(), false);
    
    		// 3.21 创建分区方式2
    		hiveCatalog.createPartition(path2, new CatalogPartitionSpec(new HashMap<String, String>() {
    			{
    				put("name", "alan");
    				put("age", "20");
    			}
    		}), new CatalogPartitionImpl(new HashMap<String, String>() {
    			{
    				put("streaming", "false");
    				putAll(getGenericFlag(isGeneric));
    			}
    		}, TEST_COMMENT), false);
    
    		System.out.println("path1 listPartitions:"+hiveCatalog.listPartitions(path1));
    		System.out.println("path2 listPartitions:"+hiveCatalog.listPartitions(path2));
    
    		System.out.println("path1 listPartitions:"+hiveCatalog.listPartitions(path1, createPartitionSpecSubset()));
    		System.out.println("path2 listPartitions:"+hiveCatalog.listPartitions(path2, createPartitionSpecSubset()));
    
    //		assertThat(hiveCatalog.listPartitions(path1)).containsExactly(createPartitionSpec());
    //		assertThat(catalog.listPartitions(path1, createPartitionSpecSubset())).containsExactly(createPartitionSpec());
    
    		// 4、检查分区
    //		CatalogTestUtil.checkEquals(createPartition(), catalog.getPartition(path1, createPartitionSpec()));
    		
    		//5、删除测试数据库
    //		tenv.executeSql("drop database " + databaseName + " cascade");
    	}
    
    	private static HiveCatalog init(StreamTableEnvironment tenv) throws Exception {
    
    		String moduleName = "myhive";
    		String hiveVersion = "3.1.2";
    		tenv.loadModule(moduleName, new HiveModule(hiveVersion));
    
    		String name = "alan_hive";
    		String defaultDatabase = "default";
    		String hiveConfDir = "/usr/local/bigdata/apache-hive-3.1.2-bin/conf";
    
    		HiveCatalog hiveCatalog = new HiveCatalog(name, defaultDatabase, hiveConfDir);
    		tenv.registerCatalog(name, hiveCatalog);
    		tenv.useCatalog(name);
    		tenv.listDatabases();
    		hiveCatalog.createDatabase(databaseName, new CatalogDatabaseImpl(new HashMap(), hiveConfDir) {
    		}, true);
    
    //	tenv.executeSql("create database "+databaseName);
    
    		tenv.useDatabase(databaseName);
    		return hiveCatalog;
    	}
    
    	CatalogDatabase createDb() {
    		return new CatalogDatabaseImpl(new HashMap<String, String>() {
    			{
    				put("k1", "v1");
    			}
    		}, TEST_COMMENT);
    	}
    
    	static CatalogTable createPartitionedTable() {
    		final ResolvedSchema resolvedSchema = createSchema();
    		final CatalogTable origin = CatalogTable.of(Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(), TEST_COMMENT, createPartitionKeys(), getBatchTableProperties());
    		return new ResolvedCatalogTable(origin, resolvedSchema);
    	}
    
    	static ResolvedSchema createSchema() {
    		return new ResolvedSchema(
    				Arrays.asList(Column.physical("id", DataTypes.INT()), Column.physical("name", DataTypes.STRING()), Column.physical("age", DataTypes.INT())),
    				Collections.emptyList(), null);
    	}
    
    	static List<String> createPartitionKeys() {
    		return Arrays.asList("name", "age");
    	}
    
    	static Map<String, String> getBatchTableProperties() {
    		return new HashMap<String, String>() {
    			{
    				put("streaming", "false");
    				putAll(getGenericFlag(isGeneric));
    			}
    		};
    	}
    
    	static Map<String, String> getGenericFlag(boolean isGeneric) {
    		return new HashMap<String, String>() {
    			{
    				String connector = isGeneric ? "COLLECTION" : "hive";
    				put(FactoryUtil.CONNECTOR.key(), connector);
    			}
    		};
    	}
    
    	static CatalogPartitionSpec createPartitionSpec() {
    		return new CatalogPartitionSpec(new HashMap<String, String>() {
    			{
    				put("name", "alan");
    				put("age", "20");
    			}
    		});
    	}
    
    	static CatalogPartitionSpec createPartitionSpecSubset() {
    		return new CatalogPartitionSpec(new HashMap<String, String>() {
    			{
    				put("name", "alan");
    			}
    		});
    	}
    
    	static CatalogPartition createPartition() {
    		return new CatalogPartitionImpl(getBatchTableProperties(), TEST_COMMENT);
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    3、运行结果
    • flink 运行结果
    [alanchan@server2 bin]$ flink run  /usr/local/bigdata/flink-1.13.5/examples/table/table_sql-0.0.4-SNAPSHOT.jar
    
    path1 listPartitions:[CatalogPartitionSpec{{name=alan, age=20}}]
    path2 listPartitions:[CatalogPartitionSpec{{name=alan, age=20}}]
    path1 listPartitions:[CatalogPartitionSpec{{name=alan, age=20}}]
    path2 listPartitions:[CatalogPartitionSpec{{name=alan, age=20}}]
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • hive 查看表分区情况
    0: jdbc:hive2://server4:10000> desc formatted t1;
    +-------------------------------+----------------------------------------------------+-----------------------+
    |           col_name            |                     data_type                      |        comment        |
    +-------------------------------+----------------------------------------------------+-----------------------+
    | # col_name                    | data_type                                          | comment               |
    | id                            | int                                                |                       |
    |                               | NULL                                               | NULL                  |
    | # Partition Information       | NULL                                               | NULL                  |
    | # col_name                    | data_type                                          | comment               |
    | name                          | string                                             |                       |
    | age                           | int                                                |                       |
    |                               | NULL                                               | NULL                  |
    | # Detailed Table Information  | NULL                                               | NULL                  |
    | Database:                     | viewtest_db                                        | NULL                  |
    | OwnerType:                    | USER                                               | NULL                  |
    | Owner:                        | null                                               | NULL                  |
    | CreateTime:                   | Tue Oct 17 10:43:55 CST 2023                       | NULL                  |
    | LastAccessTime:               | UNKNOWN                                            | NULL                  |
    | Retention:                    | 0                                                  | NULL                  |
    | Location:                     | hdfs://HadoopHAcluster/user/hive/warehouse/viewtest_db.db/t1 | NULL                  |
    | Table Type:                   | MANAGED_TABLE                                      | NULL                  |
    | Table Parameters:             | NULL                                               | NULL                  |
    |                               | bucketing_version                                  | 2                     |
    |                               | comment                                            | test table comment    |
    |                               | numFiles                                           | 0                     |
    |                               | numPartitions                                      | 1                     |
    |                               | numRows                                            | 0                     |
    |                               | rawDataSize                                        | 0                     |
    |                               | streaming                                          | false                 |
    |                               | totalSize                                          | 0                     |
    |                               | transient_lastDdlTime                              | 1697510635            |
    |                               | NULL                                               | NULL                  |
    | # Storage Information         | NULL                                               | NULL                  |
    | SerDe Library:                | org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe | NULL                  |
    | InputFormat:                  | org.apache.hadoop.mapred.TextInputFormat           | NULL                  |
    | OutputFormat:                 | org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat | NULL                  |
    | Compressed:                   | No                                                 | NULL                  |
    | Num Buckets:                  | -1                                                 | NULL                  |
    | Bucket Columns:               | []                                                 | NULL                  |
    | Sort Columns:                 | []                                                 | NULL                  |
    | Storage Desc Params:          | NULL                                               | NULL                  |
    |                               | serialization.format                               | 1                     |
    +-------------------------------+----------------------------------------------------+-----------------------+
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    5、函数操作

    1)、官方示例

    
    // create function
    catalog.createFunction(new ObjectPath("mydb", "myfunc"), new CatalogFunctionImpl(...), false);
    
    // drop function
    catalog.dropFunction(new ObjectPath("mydb", "myfunc"), false);
    
    // alter function
    catalog.alterFunction(new ObjectPath("mydb", "myfunc"), new CatalogFunctionImpl(...), false);
    
    // get function
    catalog.getFunction("myfunc");
    
    // check if a function exist or not
    catalog.functionExists("myfunc");
    
    // list functions in a database
    catalog.listFunctions("mydb");
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    2)、API操作Function

    通过api来操作函数,比如创建、修改删除以及查询等。

    1、maven依赖

    此处使用的依赖与上示例一致,mainclass变成本示例的类,不再赘述。
    具体打包的时候运行主类则需要视自己的运行情况决定是否修改。

    2、代码
    import java.util.HashMap;
    
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    import org.apache.flink.table.catalog.CatalogDatabaseImpl;
    import org.apache.flink.table.catalog.CatalogFunctionImpl;
    import org.apache.flink.table.catalog.ObjectPath;
    import org.apache.flink.table.catalog.hive.HiveCatalog;
    import org.apache.flink.table.module.hive.HiveModule;
    import org.apache.hadoop.hive.ql.udf.UDFRand;
    import org.apache.hadoop.hive.ql.udf.generic.GenericUDFAbs;
    
    /**
     * @author alanchan
     *
     */
    public class TestFunctionByAPI {
    	static String databaseName = "viewtest_db";
    	static String tableName1 = "t1";
    
    	public static void main(String[] args) throws Exception {
    		// 0、环境
    		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
    
    		// 1、创建数据库
    		// catalog.createDatabase(db1, createDb(), false);
    		HiveCatalog hiveCatalog = init(tenv);
    
    		// 2、检查function是否存在
    		ObjectPath path1 = new ObjectPath(databaseName, tableName1);
    		System.out.println("function是否存在 :" + hiveCatalog.functionExists(path1));
    
    		// 3、创建function
    		hiveCatalog.createFunction(path1, new CatalogFunctionImpl(GenericUDFAbs.class.getName()), false);
    		System.out.println("function是否存在 :" + hiveCatalog.functionExists(path1));
    		
    		// 4、修改function
    		hiveCatalog.alterFunction(path1, new CatalogFunctionImpl(UDFRand.class.getName()), false);
    		System.out.println("修改后的function是否存在 :" + hiveCatalog.functionExists(path1));
    		
    		System.out.println("查询function :" + hiveCatalog.getFunction(path1));
    		System.out.println("function 列表 :" + hiveCatalog.listFunctions(databaseName));
    
    		// 5、删除function
    		hiveCatalog.dropFunction(path1, false);
    		System.out.println("function是否存在 :" + hiveCatalog.functionExists(path1));
    
    		// 6、删除测试数据库
    		// tenv.executeSql("drop database " + databaseName + " cascade");
    	}
    
    	private static HiveCatalog init(StreamTableEnvironment tenv) throws Exception {
    		String moduleName = "myhive";
    		String hiveVersion = "3.1.2";
    		tenv.loadModule(moduleName, new HiveModule(hiveVersion));
    
    		String name = "alan_hive";
    		String defaultDatabase = "default";
    		String hiveConfDir = "/usr/local/bigdata/apache-hive-3.1.2-bin/conf";
    
    		HiveCatalog hiveCatalog = new HiveCatalog(name, defaultDatabase, hiveConfDir);
    		tenv.registerCatalog(name, hiveCatalog);
    		tenv.useCatalog(name);
    		tenv.listDatabases();
    		hiveCatalog.createDatabase(databaseName, new CatalogDatabaseImpl(new HashMap(), hiveConfDir) {
    		}, true);
    
    		tenv.useDatabase(databaseName);
    		return hiveCatalog;
    	}
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    3、运行结果
    [alanchan@server2 bin]$ flink run  /usr/local/bigdata/flink-1.13.5/examples/table/table_sql-0.0.5-SNAPSHOT.jar
    
    function是否存在 :false
    function是否存在 :true
    修改后的function是否存在 :true
    查询function :CatalogFunctionImpl{className='org.apache.hadoop.hive.ql.udf.UDFRand', functionLanguage='JAVA', isGeneric='false'}
    function 列表 :[t1]
    function是否存在 :false
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    6、表操作(补充)

    1)、官方示例

    // create table
    catalog.createTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false);
    
    // drop table
    catalog.dropTable(new ObjectPath("mydb", "mytable"), false);
    
    // alter table
    catalog.alterTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false);
    
    // rename table
    catalog.renameTable(new ObjectPath("mydb", "mytable"), "my_new_table");
    
    // get table
    catalog.getTable("mytable");
    
    // check if a table exist or not
    catalog.tableExists("mytable");
    
    // list tables in a database
    catalog.listTables("mydb");
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    2)、SQL创建hive表示例

    1、maven依赖

    此处使用的依赖与上示例一致,mainclass变成本示例的类,不再赘述。
    具体打包的时候运行主类则需要视自己的运行情况决定是否修改。

    2、代码
    import java.util.HashMap;
    import java.util.List;
    
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.SqlDialect;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    import org.apache.flink.table.catalog.CatalogDatabaseImpl;
    import org.apache.flink.table.catalog.hive.HiveCatalog;
    import org.apache.flink.table.module.hive.HiveModule;
    import org.apache.flink.types.Row;
    import org.apache.flink.util.CollectionUtil;
    
    /**
     * @author alanchan
     *
     */
    public class TestCreateHiveTableBySQLDemo {
    	static String databaseName = "viewtest_db";
    	public static final String tableName = "alan_hivecatalog_hivedb_testTable";
    	public static final String hive_create_table_sql = "CREATE  TABLE  " + tableName +  " (\n" + 
    			  "  id INT,\n" + 
    			  "  name STRING,\n" + 
    			  "  age INT" + ") " + 
    			  "TBLPROPERTIES (\n" + 
    			  "  'sink.partition-commit.delay'='5 s',\n" + 
    			  "  'sink.partition-commit.trigger'='partition-time',\n" + 
    			  "  'sink.partition-commit.policy.kind'='metastore,success-file'" + ")";
    
    	/**
    	 * @param args
    	 * @throws Exception
    	 */
    	public static void main(String[] args) throws Exception {
    		// 0、运行环境
    		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
    
    		// 1、创建数据库
    		HiveCatalog hiveCatalog = init(tenv);
    
    		// 2、创建表
    		tenv.getConfig().setSqlDialect(SqlDialect.HIVE);
    		tenv.executeSql(hive_create_table_sql);
    
    		// 3、插入数据
    		String insertSQL = "insert into " + tableName + " values (1,'alan',18)";
    		tenv.executeSql(insertSQL);
    
    		// 4、查询数据
    		List<Row> results = CollectionUtil.iteratorToList(tenv.executeSql("select * from " + tableName).collect());
    		for (Row row : results) {
    			System.out.println(tableName + ": " + row.toString());
    		}
    
    		// 5、删除数据库
    		tenv.executeSql("drop database " + databaseName + " cascade");
    	}
    
    	private static HiveCatalog init(StreamTableEnvironment tenv) throws Exception {
    		String moduleName = "myhive";
    		String hiveVersion = "3.1.2";
    		tenv.loadModule(moduleName, new HiveModule(hiveVersion));
    
    		String name = "alan_hive";
    		String defaultDatabase = "default";
    		String hiveConfDir = "/usr/local/bigdata/apache-hive-3.1.2-bin/conf";
    
    		HiveCatalog hiveCatalog = new HiveCatalog(name, defaultDatabase, hiveConfDir);
    		tenv.registerCatalog(name, hiveCatalog);
    		tenv.useCatalog(name);
    		tenv.listDatabases();
    		hiveCatalog.createDatabase(databaseName, new CatalogDatabaseImpl(new HashMap(), hiveConfDir) {
    		}, true);
    
    		tenv.useDatabase(databaseName);
    		return hiveCatalog;
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    3、运行结果
    [alanchan@server2 bin]$ flink run  /usr/local/bigdata/flink-1.13.5/examples/table/table_sql-0.0.6-SNAPSHOT.jar
    
    Hive Session ID = eb6579cd-befc-419b-8f95-8fd1e8e287e0
    Hive Session ID = be12e47f-d611-4cc4-9be5-8e7628b7c90a
    Job has been submitted with JobID 442b113232b8390394587b66b47aebbc
    Hive Session ID = b8d772a8-a89d-4630-bbf1-fe5a3e301344
    2023-10-17 07:23:31,244 INFO  org.apache.hadoop.mapred.FileInputFormat                     [] - Total input files to process : 0
    Job has been submitted with JobID f24c2cc25fa3aba729fc8b27c3edf243
    alan_hivecatalog_hivedb_testTable: +I[1, alan, 18]
    Hive Session ID = 69fafc9c-f8c0-4f55-b689-5db196a94689
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    3)、API创建hive表-普通表

    1、maven依赖

    此处使用的依赖与上示例一致,mainclass变成本示例的类,不再赘述。
    具体打包的时候运行主类则需要视自己的运行情况决定是否修改。

    2、代码
    import java.util.Arrays;
    import java.util.Collections;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.DataTypes;
    import org.apache.flink.table.api.Schema;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    import org.apache.flink.table.catalog.CatalogBaseTable;
    import org.apache.flink.table.catalog.CatalogDatabaseImpl;
    import org.apache.flink.table.catalog.CatalogTable;
    import org.apache.flink.table.catalog.Column;
    import org.apache.flink.table.catalog.ObjectPath;
    import org.apache.flink.table.catalog.ResolvedCatalogTable;
    import org.apache.flink.table.catalog.ResolvedSchema;
    import org.apache.flink.table.catalog.hive.HiveCatalog;
    import org.apache.flink.table.factories.FactoryUtil;
    import org.apache.flink.table.module.hive.HiveModule;
    import org.apache.flink.types.Row;
    import org.apache.flink.util.CollectionUtil;
    
    /**
     * @author alanchan
     *
     */
    public class TestCreateHiveTableByAPIDemo {
    	static String TEST_COMMENT = "test table comment";
    	static String databaseName = "hive_db_test";
    	static String tableName1 = "t1";
    	static String tableName2 = "t2";
    
    	/**
    	 * @param args
    	 * @throws Exception
    	 */
    	public static void main(String[] args) throws Exception {
    		// 0、运行环境
    		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
    
    		// 1、创建数据库
    		HiveCatalog hiveCatalog = init(tenv);
    
    		// 2、创建表
    		ObjectPath path1 = new ObjectPath(databaseName, tableName1);
    		ResolvedSchema resolvedSchema = new ResolvedSchema(
    				Arrays.asList(
    						Column.physical("id", DataTypes.INT()), 
    						Column.physical("name", DataTypes.STRING()), 
    						Column.physical("age", DataTypes.INT())),
    				Collections.emptyList(), null);
    
    		CatalogTable origin = CatalogTable.of(
    				Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(), 
    				TEST_COMMENT, 
    				Collections.emptyList(), 
    				new HashMap<String, String>() {
    					{
    						put("is_streaming", "false");
    						putAll(new HashMap<String, String>() {
    							{
    								put(FactoryUtil.CONNECTOR.key(), "hive");
    							}
    						});
    					}
    				});
    		CatalogTable catalogTable = new ResolvedCatalogTable(origin, resolvedSchema);
    
    		// 普通表
    		hiveCatalog.createTable(path1, catalogTable, false);
    		CatalogBaseTable tableCreated = hiveCatalog.getTable(path1);
    
    		List<String> tables = hiveCatalog.listTables(databaseName);
    		for (String table : tables) {
    			System.out.println(" tableNameList : " + table);
    		}
    
    		// 3、插入数据
    		String insertSQL = "insert into " + tableName1 + " values (1,'alan',18)";
    		tenv.executeSql(insertSQL);
    
    		// 4、查询数据
    		List<Row> results = CollectionUtil.iteratorToList(tenv.executeSql("select * from " + tableName1).collect());
    		for (Row row : results) {
    			System.out.println(tableName1 + ": " + row.toString());
    		}
    
    		hiveCatalog.dropTable(path1, false);
    		boolean tableExists = hiveCatalog.tableExists(path1);
    		System.out.println("表是否drop成功:" + tableExists);
    
    		// 5、删除数据库
    		tenv.executeSql("drop database " + databaseName + " cascade");
    	}
    
    	private static HiveCatalog init(StreamTableEnvironment tenv) throws Exception {
    		String moduleName = "myhive";
    		String hiveVersion = "3.1.2";
    		tenv.loadModule(moduleName, new HiveModule(hiveVersion));
    
    		String name = "alan_hive";
    		String defaultDatabase = "default";
    		String hiveConfDir = "/usr/local/bigdata/apache-hive-3.1.2-bin/conf";
    
    		HiveCatalog hiveCatalog = new HiveCatalog(name, defaultDatabase, hiveConfDir);
    		tenv.registerCatalog(name, hiveCatalog);
    		tenv.useCatalog(name);
    		tenv.listDatabases();
    		hiveCatalog.createDatabase(databaseName, new CatalogDatabaseImpl(new HashMap(), hiveConfDir) {
    		}, true);
    
    		// tenv.executeSql("create database "+databaseName);
    		tenv.useDatabase(databaseName);
    		return hiveCatalog;
    	}
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    3、运行结果
    • flink 运行结果
    [alanchan@server2 bin]$ flink run  /usr/local/bigdata/flink-1.13.5/examples/table/table_sql-0.0.7-SNAPSHOT.jar
    
     tableNameList : t1
    Job has been submitted with JobID b70b8c76fd3f05b9f949a47583596288
    2023-10-17 09:01:19,320 INFO  org.apache.hadoop.mapred.FileInputFormat                     [] - Total input files to process : 0
    Job has been submitted with JobID 34650c04d0a6fb32f7336f7ccc8b9090
    t1: +I[1, alan, 18]
    表是否drop成功:false
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • hive 表描述
      下述结果是表和数据库没有删除的时候查询结果,也就是将上述示例中关于删除表和库的语句注释掉了。
    0: jdbc:hive2://server4:10000> desc formatted t1;
    +-------------------------------+----------------------------------------------------+-----------------------+
    |           col_name            |                     data_type                      |        comment        |
    +-------------------------------+----------------------------------------------------+-----------------------+
    | # col_name                    | data_type                                          | comment               |
    | id                            | int                                                |                       |
    | name                          | string                                             |                       |
    | age                           | int                                                |                       |
    |                               | NULL                                               | NULL                  |
    | # Detailed Table Information  | NULL                                               | NULL                  |
    | Database:                     | hive_db_test                                       | NULL                  |
    | OwnerType:                    | USER                                               | NULL                  |
    | Owner:                        | null                                               | NULL                  |
    | CreateTime:                   | Tue Oct 17 16:55:02 CST 2023                       | NULL                  |
    | LastAccessTime:               | UNKNOWN                                            | NULL                  |
    | Retention:                    | 0                                                  | NULL                  |
    | Location:                     | hdfs://HadoopHAcluster/user/hive/warehouse/hive_db_test.db/t1 | NULL                  |
    | Table Type:                   | MANAGED_TABLE                                      | NULL                  |
    | Table Parameters:             | NULL                                               | NULL                  |
    |                               | bucketing_version                                  | 2                     |
    |                               | comment                                            | test table comment    |
    |                               | streaming                                          | false                 |
    |                               | transient_lastDdlTime                              | 1697532902            |
    |                               | NULL                                               | NULL                  |
    | # Storage Information         | NULL                                               | NULL                  |
    | SerDe Library:                | org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe | NULL                  |
    | InputFormat:                  | org.apache.hadoop.mapred.TextInputFormat           | NULL                  |
    | OutputFormat:                 | org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat | NULL                  |
    | Compressed:                   | No                                                 | NULL                  |
    | Num Buckets:                  | -1                                                 | NULL                  |
    | Bucket Columns:               | []                                                 | NULL                  |
    | Sort Columns:                 | []                                                 | NULL                  |
    | Storage Desc Params:          | NULL                                               | NULL                  |
    |                               | serialization.format                               | 1                     |
    +-------------------------------+----------------------------------------------------+-----------------------+
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    4)、API创建hive表-流式表

    1、maven依赖

    此处使用的依赖与上示例一致,mainclass变成本示例的类,不再赘述。
    具体打包的时候运行主类则需要视自己的运行情况决定是否修改。

    2、代码

    该示例与上述使用API创建hive表功能一样,仅仅表示了方法化和流式表的创建方式,运行结果也一样,不再赘述。

    import java.util.Arrays;
    import java.util.Collections;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.DataTypes;
    import org.apache.flink.table.api.Schema;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    import org.apache.flink.table.catalog.CatalogDatabaseImpl;
    import org.apache.flink.table.catalog.CatalogTable;
    import org.apache.flink.table.catalog.Column;
    import org.apache.flink.table.catalog.ObjectPath;
    import org.apache.flink.table.catalog.ResolvedCatalogTable;
    import org.apache.flink.table.catalog.ResolvedSchema;
    import org.apache.flink.table.catalog.hive.HiveCatalog;
    import org.apache.flink.table.factories.FactoryUtil;
    import org.apache.flink.table.module.hive.HiveModule;
    import org.apache.flink.types.Row;
    import org.apache.flink.util.CollectionUtil;
    
    /**
     * @author alanchan
     *
     */
    public class TestCreateHiveTableByAPIDemo {
    	static String TEST_COMMENT = "test table comment";
    	static String databaseName = "hive_db_test";
    	static String tableName1 = "t1";
    	static String tableName2 = "t2";
    	static ObjectPath path1 = new ObjectPath(databaseName, tableName1);
    
    	/**
    	 * @param args
    	 * @throws Exception
    	 */
    	public static void main(String[] args) throws Exception {
    		// 0、运行环境
    		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
    
    		// 1、创建数据库
    		HiveCatalog hiveCatalog = init(tenv);
    
    		// 2、创建表
    		// 2.1、创建批处理表
    //		testCreateTable_Batch(hiveCatalog);
    		// 2.2、创建流式表
    		testCreateTable_Streaming(hiveCatalog);
    		
    		
    		// 3、插入数据
    		String insertSQL = "insert into " + tableName1 + " values (1,'alan',18)";
    		tenv.executeSql(insertSQL);
    
    		// 4、查询数据
    		List<Row> results = CollectionUtil.iteratorToList(tenv.executeSql("select * from " + tableName1).collect());
    		for (Row row : results) {
    			System.out.println(tableName1 + ": " + row.toString());
    		}
    
    		hiveCatalog.dropTable(path1, false);
    		boolean tableExists = hiveCatalog.tableExists(path1);
    		System.out.println("表是否drop成功:" + tableExists);
    
    		// 5、删除数据库
    		tenv.executeSql("drop database " + databaseName + " cascade");
    	}
    
    	/**
    	 * 初始化hivecatalog
    	 * 
    	 * @param tenv
    	 * @return
    	 * @throws Exception
    	 */
    	private static HiveCatalog init(StreamTableEnvironment tenv) throws Exception {
    		String moduleName = "myhive";
    		String hiveVersion = "3.1.2";
    		tenv.loadModule(moduleName, new HiveModule(hiveVersion));
    
    		String name = "alan_hive";
    		String defaultDatabase = "default";
    		String hiveConfDir = "/usr/local/bigdata/apache-hive-3.1.2-bin/conf";
    
    		HiveCatalog hiveCatalog = new HiveCatalog(name, defaultDatabase, hiveConfDir);
    		tenv.registerCatalog(name, hiveCatalog);
    		tenv.useCatalog(name);
    		tenv.listDatabases();
    		// tenv.executeSql("create database "+databaseName);
    		hiveCatalog.createDatabase(databaseName, new CatalogDatabaseImpl(new HashMap(), hiveConfDir) {
    		}, true);
    
    		tenv.useDatabase(databaseName);
    		return hiveCatalog;
    	}
    
    	/**
    	 * 创建流式表
    	 * 
    	 * @param catalog
    	 * @throws Exception
    	 */
    	static void testCreateTable_Streaming(HiveCatalog catalog) throws Exception {
    		CatalogTable table = createStreamingTable();
    		catalog.createTable(path1, table, false);
    
    //		CatalogTestUtil.checkEquals(table, (CatalogTable) catalog.getTable(path1));
    	}
    
    	/**
    	 * 创建批处理表
    	 * 
    	 * @param catalog
    	 * @throws Exception
    	 */
    	static void testCreateTable_Batch(HiveCatalog catalog) throws Exception {
    		// Non-partitioned table
    		CatalogTable table = createBatchTable();
    		catalog.createTable(path1, table, false);
    
    //		CatalogBaseTable tableCreated = catalog.getTable(path1);
    
    //		CatalogTestUtil.checkEquals(table, (CatalogTable) tableCreated);
    //		assertThat(tableCreated.getDescription().isPresent()).isTrue();
    //		assertThat(tableCreated.getDescription().get()).isEqualTo(TEST_COMMENT);
    
    //		List tables = catalog.listTables(databaseName);
    
    //		assertThat(tables).hasSize(1);
    //		assertThat(tables.get(0)).isEqualTo(path1.getObjectName());
    
    //		catalog.dropTable(path1, false);
    	}
    
    	/**
    	 * 创建流式表
    	 * 
    	 * @return
    	 */
    	static CatalogTable createStreamingTable() {
    		final ResolvedSchema resolvedSchema = createSchema();
    		final CatalogTable origin = CatalogTable.of(
    				Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(), 
    				TEST_COMMENT, 
    				Collections.emptyList(),
    				getStreamingTableProperties());
    		return new ResolvedCatalogTable(origin, resolvedSchema);
    	}
    
    	/**
    	 * 创建批处理表
    	 * 
    	 * @return
    	 */
    	static CatalogTable createBatchTable() {
    		final ResolvedSchema resolvedSchema = createSchema();
    		final CatalogTable origin = CatalogTable.of(
    				Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(), 
    				TEST_COMMENT, 
    				Collections.emptyList(),
    				getBatchTableProperties());
    		return new ResolvedCatalogTable(origin, resolvedSchema);
    	}
    
    	/**
    	 * 设置批处理表的属性
    	 * 
    	 * @return
    	 */
    	static Map<String, String> getBatchTableProperties() {
    		return new HashMap<String, String>() {
    			{
    				put("is_streaming", "false");
    				putAll(new HashMap<String, String>() {
    					{
    						put(FactoryUtil.CONNECTOR.key(), "hive");
    					}
    				});
    			}
    		};
    	}
    
    	/**
    	 * 创建流式表的属性
    	 * 
    	 * @return
    	 */
    	static Map<String, String> getStreamingTableProperties() {
    		return new HashMap<String, String>() {
    			{
    				put("is_streaming", "true");
    				putAll(new HashMap<String, String>() {
    					{
    						put(FactoryUtil.CONNECTOR.key(), "hive");
    					}
    				});
    			}
    		};
    	}
    
    	static ResolvedSchema createSchema() {
    		return new ResolvedSchema(
    				Arrays.asList(
    						Column.physical("id", DataTypes.INT()), 
    						Column.physical("name", DataTypes.STRING()), 
    						Column.physical("age", DataTypes.INT())),
    				Collections.emptyList(), null);
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211
    3、运行结果

    运行结果参考上述示例,运行结果一致。

    5)、API创建hive表-分区表

    1、maven依赖

    此处使用的依赖与上示例一致,mainclass变成本示例的类,不再赘述。
    具体打包的时候运行主类则需要视自己的运行情况决定是否修改。

    2、代码

    本示例没有加载数据,仅示例创建的分区表,并且是2重分区表。关于hive分区表的操作,请参考链接:
    3、hive的使用示例详解-建表、数据类型详解、内部外部表、分区表、分桶表

    import java.util.Arrays;
    import java.util.Collections;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.DataTypes;
    import org.apache.flink.table.api.Schema;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    import org.apache.flink.table.catalog.CatalogDatabaseImpl;
    import org.apache.flink.table.catalog.CatalogPartition;
    import org.apache.flink.table.catalog.CatalogPartitionImpl;
    import org.apache.flink.table.catalog.CatalogPartitionSpec;
    import org.apache.flink.table.catalog.CatalogTable;
    import org.apache.flink.table.catalog.Column;
    import org.apache.flink.table.catalog.ObjectPath;
    import org.apache.flink.table.catalog.ResolvedCatalogTable;
    import org.apache.flink.table.catalog.ResolvedSchema;
    import org.apache.flink.table.catalog.exceptions.CatalogException;
    import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
    import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
    import org.apache.flink.table.catalog.hive.HiveCatalog;
    import org.apache.flink.table.factories.FactoryUtil;
    import org.apache.flink.table.module.hive.HiveModule;
    import org.apache.flink.types.Row;
    import org.apache.flink.util.CollectionUtil;
    
    /**
     * @author alanchan
     *
     */
    public class TestCreateHiveTableByAPIDemo {
    	static String TEST_COMMENT = "test table comment";
    	static String databaseName = "hive_db_test";
    	static String tableName1 = "t1";
    	static String tableName2 = "t2";
    	static ObjectPath path1 = new ObjectPath(databaseName, tableName1);
    
    	/**
    	 * @param args
    	 * @throws Exception
    	 */
    	public static void main(String[] args) throws Exception {
    		// 0、运行环境
    		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
    
    		// 1、创建数据库
    		HiveCatalog hiveCatalog = init(tenv);
    
    		// 2、创建表
    		// 2.1、创建批处理表
    //		testCreateTable_Batch(hiveCatalog);
    		// 2.2、创建流式表
    //		testCreateTable_Streaming(hiveCatalog);
    		// 2.3、创建分区批处理表
    		testCreatePartitionTable_Batch(hiveCatalog);
    
    		// 2.4、创建带有hive属性的批处理表
    
    		// 3、插入数据
    		// 分区表不能如此操作,具体参考相关内容
    //		String insertSQL = "insert into " + tableName1 + " values (1,'alan',18)";
    //		tenv.executeSql(insertSQL);
    
    		// 4、查询数据
    		List<Row> results = CollectionUtil.iteratorToList(tenv.executeSql("select * from " + tableName1).collect());
    		for (Row row : results) {
    			System.out.println(tableName1 + ": " + row.toString());
    		}
    
    		hiveCatalog.dropTable(path1, false);
    		boolean tableExists = hiveCatalog.tableExists(path1);
    		System.out.println("表是否drop成功:" + tableExists);
    
    		// 5、删除数据库
    		tenv.executeSql("drop database " + databaseName + " cascade");
    	}
    
    	/**
    	 * 初始化hivecatalog
    	 * 
    	 * @param tenv
    	 * @return
    	 * @throws Exception
    	 */
    	private static HiveCatalog init(StreamTableEnvironment tenv) throws Exception {
    		String moduleName = "myhive";
    		String hiveVersion = "3.1.2";
    		tenv.loadModule(moduleName, new HiveModule(hiveVersion));
    
    		String name = "alan_hive";
    		String defaultDatabase = "default";
    		String hiveConfDir = "/usr/local/bigdata/apache-hive-3.1.2-bin/conf";
    
    		HiveCatalog hiveCatalog = new HiveCatalog(name, defaultDatabase, hiveConfDir);
    		tenv.registerCatalog(name, hiveCatalog);
    		tenv.useCatalog(name);
    		tenv.listDatabases();
    		// tenv.executeSql("create database "+databaseName);
    		hiveCatalog.createDatabase(databaseName, new CatalogDatabaseImpl(new HashMap(), hiveConfDir) {
    		}, true);
    
    		tenv.useDatabase(databaseName);
    		return hiveCatalog;
    	}
    
    	/**
    	 * 创建流式表
    	 * 
    	 * @param catalog
    	 * @throws Exception
    	 */
    	static void testCreateTable_Streaming(HiveCatalog catalog) throws Exception {
    		CatalogTable table = createStreamingTable();
    		catalog.createTable(path1, table, false);
    
    //		CatalogTestUtil.checkEquals(table, (CatalogTable) catalog.getTable(path1));
    	}
    
    	/**
    	 * 创建批处理表
    	 * 
    	 * @param catalog
    	 * @throws Exception
    	 */
    	static void testCreateTable_Batch(HiveCatalog catalog) throws Exception {
    		// Non-partitioned table
    		CatalogTable table = createBatchTable();
    		catalog.createTable(path1, table, false);
    
    //		CatalogBaseTable tableCreated = catalog.getTable(path1);
    
    //		CatalogTestUtil.checkEquals(table, (CatalogTable) tableCreated);
    //		assertThat(tableCreated.getDescription().isPresent()).isTrue();
    //		assertThat(tableCreated.getDescription().get()).isEqualTo(TEST_COMMENT);
    
    //		List tables = catalog.listTables(databaseName);
    
    //		assertThat(tables).hasSize(1);
    //		assertThat(tables.get(0)).isEqualTo(path1.getObjectName());
    
    //		catalog.dropTable(path1, false);
    	}
    
    	/**
    	 * 
    	 * @param catalog
    	 * @throws DatabaseNotExistException
    	 * @throws TableAlreadyExistException
    	 * @throws CatalogException
    	 */
    	static void testCreatePartitionTable_Batch(HiveCatalog catalog) throws Exception {
    		CatalogTable table = createPartitionedTable();
    		catalog.createTable(path1, table, false);
    		// 创建分区
    		catalog.createPartition(path1, createPartitionSpec(), createPartition(), false);
    
    	}
    
    	/**
    	 * 创建分区表
    	 * 
    	 * @return
    	 */
    	static CatalogTable createPartitionedTable() {
    		final ResolvedSchema resolvedSchema = createSchema();
    		final CatalogTable origin = CatalogTable.of(Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(), TEST_COMMENT, createPartitionKeys(), getBatchTableProperties());
    		return new ResolvedCatalogTable(origin, resolvedSchema);
    	}
    
    	/**
    	 * 创建分区键
    	 * 
    	 * @return
    	 */
    	static List<String> createPartitionKeys() {
    		return Arrays.asList("name", "age");
    	}
    
    	/**
    	 * 创建CatalogPartitionSpec。 Represents a partition spec object in catalog.
    	 * Partition columns and values are NOT of strict order, and they need to be
    	 * re-arranged to the correct order by comparing with a list of strictly ordered
    	 * partition keys.
    	 * 
    	 * @return
    	 */
    	static CatalogPartitionSpec createPartitionSpec() {
    		return new CatalogPartitionSpec(new HashMap<String, String>() {
    			{
    				put("name", "alan");
    				put("age", "20");
    			}
    		});
    	}
    
    	static CatalogPartition createPartition() {
    		return new CatalogPartitionImpl(getBatchTableProperties(), TEST_COMMENT);
    	}
    
    	/**
    	 * 创建流式表
    	 * 
    	 * @return
    	 */
    	static CatalogTable createStreamingTable() {
    		final ResolvedSchema resolvedSchema = createSchema();
    		final CatalogTable origin = CatalogTable.of(Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(), TEST_COMMENT, Collections.emptyList(),
    				getStreamingTableProperties());
    		return new ResolvedCatalogTable(origin, resolvedSchema);
    	}
    
    	/**
    	 * 创建批处理表
    	 * 
    	 * @return
    	 */
    	static CatalogTable createBatchTable() {
    		final ResolvedSchema resolvedSchema = createSchema();
    		final CatalogTable origin = CatalogTable.of(Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(), TEST_COMMENT, Collections.emptyList(),
    				getBatchTableProperties());
    		return new ResolvedCatalogTable(origin, resolvedSchema);
    	}
    
    	/**
    	 * 设置批处理表的属性
    	 * 
    	 * @return
    	 */
    	static Map<String, String> getBatchTableProperties() {
    		return new HashMap<String, String>() {
    			{
    				put("is_streaming", "false");
    				putAll(new HashMap<String, String>() {
    					{
    						put(FactoryUtil.CONNECTOR.key(), "hive");
    					}
    				});
    			}
    		};
    	}
    
    	/**
    	 * 创建流式表的属性
    	 * 
    	 * @return
    	 */
    	static Map<String, String> getStreamingTableProperties() {
    		return new HashMap<String, String>() {
    			{
    				put("is_streaming", "true");
    				putAll(new HashMap<String, String>() {
    					{
    						put(FactoryUtil.CONNECTOR.key(), "hive");
    					}
    				});
    			}
    		};
    	}
    
    	static ResolvedSchema createSchema() {
    		return new ResolvedSchema(Arrays.asList(Column.physical("id", DataTypes.INT()), Column.physical("name", DataTypes.STRING()), Column.physical("age", DataTypes.INT())),
    				Collections.emptyList(), null);
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211
    • 212
    • 213
    • 214
    • 215
    • 216
    • 217
    • 218
    • 219
    • 220
    • 221
    • 222
    • 223
    • 224
    • 225
    • 226
    • 227
    • 228
    • 229
    • 230
    • 231
    • 232
    • 233
    • 234
    • 235
    • 236
    • 237
    • 238
    • 239
    • 240
    • 241
    • 242
    • 243
    • 244
    • 245
    • 246
    • 247
    • 248
    • 249
    • 250
    • 251
    • 252
    • 253
    • 254
    • 255
    • 256
    • 257
    • 258
    • 259
    • 260
    • 261
    • 262
    • 263
    • 264
    • 265
    • 266
    • 267
    3、运行结果

    hdfs上创建的t1表结构如下:
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

    6)、SQL创建hive表-带hive属性的表(分隔符、分区以及ORC存储)

    本示例是通过SQL创建的分区ORC存储的表,然后通过源数据插入至目标分区表中。
    关于hive的分区表使用,请参考:3、hive的使用示例详解-建表、数据类型详解、内部外部表、分区表、分桶表

    1、maven依赖

    此处使用的依赖与上示例一致,mainclass变成本示例的类,不再赘述。
    具体打包的时候运行主类则需要视自己的运行情况决定是否修改。

    2、代码
    import java.util.Arrays;
    import java.util.HashMap;
    import java.util.List;
    
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.SqlDialect;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    import org.apache.flink.table.catalog.CatalogDatabaseImpl;
    import org.apache.flink.table.catalog.hive.HiveCatalog;
    import org.apache.flink.table.module.hive.HiveModule;
    import org.apache.flink.types.Row;
    import org.apache.flink.util.CollectionUtil;
    
    /**
     * @author alanchan
     *
     */
    public class TestCreateHiveTableBySQLDemo2 {
    	static String databaseName = "viewtest_db";
    	public static final String sourceTableName = "sourceTable";
    	public static final String targetPartitionTableName = "targetPartitionTable";
    	public static final String hive_create_source_table_sql = 
    			"create table "+sourceTableName +"(id int ,name string, age int,province string) \r\n" + 
    			"row format delimited fields terminated by  ','\r\n" + 
    			"STORED AS ORC ";
    	public static final String hive_create_target_partition_table_sql = 
    			"create table "+targetPartitionTableName+" (id int ,name string, age int) \r\n" + 
    			"partitioned by (province string)\r\n" + 
    			"row format delimited fields terminated by  ','\r\n" + 
    			"STORED AS ORC "+ 
    			  "TBLPROPERTIES (\n" + 
    			  "  'sink.partition-commit.delay'='5 s',\n" + 
    			  "  'sink.partition-commit.trigger'='partition-time',\n" + 
    			  "  'sink.partition-commit.policy.kind'='metastore,success-file'" + ")";
    	
    	public static void main(String[] args) throws Exception {
    		// 0、运行环境
    		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
    
    		// 1、创建数据库
    		HiveCatalog hiveCatalog = init(tenv);
    
    		// 2、创建表
    		tenv.getConfig().setSqlDialect(SqlDialect.HIVE);
    		tenv.executeSql(hive_create_source_table_sql);
    		tenv.executeSql(hive_create_target_partition_table_sql);
    
    		// 3、插入sourceTableName数据
    		List<String> insertSQL = Arrays.asList(
    				"insert into  "+sourceTableName+"  values(1,'alan',18,'SH')", 
    				"insert into  "+sourceTableName+"  values(2,'alanchan',18,'SH')",
    				"insert into  "+sourceTableName+"  values(3,'alanchanchn',18,'SH')", 
    				"insert into  "+sourceTableName+"  values(4,'alan_chan',18,'BJ')",
    				"insert into  "+sourceTableName+"  values(5,'alan_chan_chn',18,'BJ')", 
    				"insert into  "+sourceTableName+"  values(6,'alan',18,'TJ')",
    				"insert into  "+sourceTableName+"  values(7,'alan',18,'NJ')", 
    				"insert into  "+sourceTableName+"  values(8,'alan',18,'HZ')"
    				);
    		for(String sql :insertSQL) {
    			tenv.executeSql(sql);
    		}
    		
    		// 4、查询sourceTableName数据
    		List<Row> results = CollectionUtil.iteratorToList(tenv.executeSql("select * from " + sourceTableName).collect());
    		for (Row row : results) {
    			System.out.println(sourceTableName + ": " + row.toString());
    		}
    		
    		// 5、执行动态插入数据命令
    		System.out.println("dynamic.partition:["+ hiveCatalog.getHiveConf().get("hive.exec.dynamic.partition")+"]");
    		System.out.println("dynamic.partition.mode:["+hiveCatalog.getHiveConf().get("hive.exec.dynamic.partition.mode")+"]");
    		hiveCatalog.getHiveConf().setBoolean("hive.exec.dynamic.partition", true);
    		hiveCatalog.getHiveConf().set("hive.exec.dynamic.partition.mode", "nonstrict");
    		System.out.println("dynamic.partition:["+ hiveCatalog.getHiveConf().get("hive.exec.dynamic.partition")+"]");
    		System.out.println("dynamic.partition.mode:["+hiveCatalog.getHiveConf().get("hive.exec.dynamic.partition.mode")+"]");
    		
    		//6、插入分区表数据
    		String insertpartitionsql = 
    				"insert into table "+targetPartitionTableName+" partition(province)\r\n" + 
    				"select id,name,age,province from "+ sourceTableName;
    		tenv.executeSql(insertpartitionsql);
    		
    		//7、查询分区表数据
    		List<Row> partitionResults = CollectionUtil.iteratorToList(tenv.executeSql(
    				"select * from " + targetPartitionTableName).collect());
    		for (Row row : partitionResults) {
    			System.out.println(targetPartitionTableName + " : " + row.toString());
    		}
    		
    		List<Row> partitionResults_SH = CollectionUtil.iteratorToList(tenv.executeSql(
    				"select * from " + targetPartitionTableName+" where  province = 'SH'").collect());
    		for (Row row : partitionResults_SH) {
    			System.out.println(targetPartitionTableName + " SH: " + row.toString());
    		}
    		
    		// 8、删除数据库
    //		tenv.executeSql("drop database " + databaseName + " cascade");
    	}
    
    	private static HiveCatalog init(StreamTableEnvironment tenv) throws Exception {
    		String moduleName = "myhive";
    		String hiveVersion = "3.1.2";
    		tenv.loadModule(moduleName, new HiveModule(hiveVersion));
    
    		String name = "alan_hive";
    		String defaultDatabase = "default";
    		String hiveConfDir = "/usr/local/bigdata/apache-hive-3.1.2-bin/conf";
    
    		HiveCatalog hiveCatalog = new HiveCatalog(name, defaultDatabase, hiveConfDir);
    		tenv.registerCatalog(name, hiveCatalog);
    		tenv.useCatalog(name);
    		tenv.listDatabases();
    		hiveCatalog.createDatabase(databaseName, new CatalogDatabaseImpl(new HashMap(), hiveConfDir) {
    		}, true);
    
    		tenv.useDatabase(databaseName);
    		return hiveCatalog;
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    3、运行结果
    • flink 任务运行结果
    [alanchan@server2 bin]$ flink run  /usr/local/bigdata/flink-1.13.5/examples/table/table_sql-0.0.10-SNAPSHOT.jar
    
    Hive Session ID = ba971dc3-7fa5-4f2c-a872-9200a0396337
    Hive Session ID = a3c01c23-9828-4473-96ad-c9dc40b417c0
    Hive Session ID = 547668a9-d603-4c1d-ae29-29c4cccd54f0
    Job has been submitted with JobID 881de04ddea94f2c7a9f5fb051e1d4af
    Hive Session ID = 676c6dfe-11ae-411e-9be7-ddef386fb2ac
    Job has been submitted with JobID 0d76f2446d8cdcfd296d82965f9f759b
    Hive Session ID = b18c5e00-7da9-4a43-bf50-d6bcb57d45a3
    Job has been submitted with JobID 644f094a3c9fadeb0d81b9bcf339a1e7
    Hive Session ID = 76f06744-ec5b-444c-a2d3-e22dfb17d83c
    Job has been submitted with JobID 1e8d36f0b0961f81a63de4e9f2ce21af
    Hive Session ID = 97f14128-1032-437e-b59f-f89a1e331e34
    Job has been submitted with JobID 3bbd81cf693279fd8ebe8a889bdb08e3
    Hive Session ID = 1456c502-8c30-44c5-94d1-6b2e4bf71bc3
    Job has been submitted with JobID 377101faffcc12d3d4638826e004ddc5
    Hive Session ID = ef4f659d-735b-44ca-90c0-4e19ba000e37
    Job has been submitted with JobID 33d50d9501a83f28068e52f77d0b0f6d
    Hive Session ID = fccefaea-5340-422d-b9ed-dd904857346e
    Job has been submitted with JobID 4a53753c008f16573ab7c84e8964bc48
    Hive Session ID = 5c066f43-57e8-4aba-9c7b-b75caf4f9fe7
    2023-10-19 05:49:12,774 INFO  org.apache.hadoop.conf.Configuration.deprecation             [] - mapred.input.dir is deprecated. Instead, use mapreduce.input.fileinputformat.inputdir
    Job has been submitted with JobID b44dd095b7460470c23f8e28243fc895
    sourceTable: +I[1, alan, 18, SH]
    sourceTable: +I[6, alan, 18, TJ]
    sourceTable: +I[4, alan_chan, 18, BJ]
    sourceTable: +I[2, alanchan, 18, SH]
    sourceTable: +I[3, alanchanchn, 18, SH]
    sourceTable: +I[5, alan_chan_chn, 18, BJ]
    sourceTable: +I[7, alan, 18, NJ]
    sourceTable: +I[8, alan, 18, HZ]
    dynamic.partition:[true]
    dynamic.partition.mode:[nonstrict]
    dynamic.partition:[true]
    dynamic.partition.mode:[nonstrict]
    Hive Session ID = e63fd003-5d5f-458c-a9bf-e7cbfe51fbf8
    Job has been submitted with JobID 59e2558aaf8daced29b7943e12a41164
    Hive Session ID = 3111db81-a822-4731-a342-ab32cdc48d86
    Job has been submitted with JobID 949435047e324bce96a5aa9e5b6f448d
    targetPartitionTable : +I[2, alanchan, 18, SH]
    targetPartitionTable : +I[7, alan, 18, NJ]
    targetPartitionTable : +I[1, alan, 18, SH]
    targetPartitionTable : +I[3, alanchanchn, 18, SH]
    targetPartitionTable : +I[5, alan_chan_chn, 18, BJ]
    targetPartitionTable : +I[4, alan_chan, 18, BJ]
    targetPartitionTable : +I[8, alan, 18, HZ]
    targetPartitionTable : +I[6, alan, 18, TJ]
    Hive Session ID = 0bfbd60b-da1d-4a44-be23-0bde71e1ad59
    Job has been submitted with JobID 49b728c8dc7fdc8037ab72bd6f3c5339
    targetPartitionTable SH: +I[1, alan, 18, SH]
    targetPartitionTable SH: +I[3, alanchanchn, 18, SH]
    targetPartitionTable SH: +I[2, alanchan, 18, SH]
    Hive Session ID = 68716de6-fceb-486e-91a8-8e4cf734ecfa
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • hdfs数据存储情况

    在这里插入图片描述
    在这里插入图片描述
    以上,介绍了java api/sql操作分区、函数和表,特别是针对表操作使用了6个示例进行说明。

  • 相关阅读:
    协程的介绍
    企业加密软件哪个最好用?
    【适合所有群体】云原生从入门到精通(GO、Docker、K8S、微服务)【只此一文,踏入山巅】(持续更新,预计2022年7月底完结)
    日常需要哪些账本记账
    1587 - Box (UVA)
    阿里架构师十年开发总结的《分布式系统开发学习笔记》太强了
    .NET使用CsvHelper快速读取和写入CSV文件
    【计算机毕业设计】69.助残志愿者系统源码
    通过挂载点访问可读写区域
    【手把手教你写Go】03.基本数据类型
  • 原文地址:https://blog.csdn.net/chenwewi520feng/article/details/133862804