• Flink—读Hive表数据写入Kafka



    技术公众号:后端技术解忧铺
    关注微信公众号:CodingTechWork,一起学习进步。

    引言

    场景

      数仓Hive中的数据需要读取后写入Kafka中进行数据服务输出。

    选型

      选用Flink进行读Hive写Kafka,因为其拥有丰富的connector可选择。

    开发

    pom依赖

    <dependencies>
    	<dependency>
    		<groupId>org.apache.flinkgroupId>
    		<artifactId>flink-table-api-java-bridge_2.11artifactId>
    		<version>1.13.2version>
    	dependency>
    		<dependency>
    		<groupId>org.apache.flinkgroupId>
    		<artifactId>flink-connector-hive_2.11artifactId>
    		<version>2.2.0version>
    	dependency>
    dependencies>
    
    <build>
    	<plugins>
    		<plugin>
    			<groupId>org.apache.maven.pluginsgroupId>
    			<artifactId>maven-source-pluginartifactId>
    			<executions>
    				<execution>
    					<id>attach-sourcesid>
    					<goals>
    						<goal>jargoal>
    					goals>
    				execution>
    			executions>
    			<configuration>
    				<skipSource>trueskipSource>
    			configuration>
    		plugin>
    		<plugin>
    			<groupId>org.apache.maven.pluginsgroupId>
    			<artifactId>maven-jar-pluginartifactId>
    			<version>3.2.0version>
    			<configuration>
    				<archive>
    				
    					<manifest>
    						<mainClass>com.test.demo.flinkhive2kafka.job.Hive2KafkamainClass>
    					manifest>
    				archive>
    			configuration>
    		plugin>
    	plugins>
    build>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45

    job类

    
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.table.api.EnvironmentSettings;
    import org.apache.flink.table.api.TableEnvironment;
    import org.apache.flink.table.api.catalog.hive.HiveCatalog;
    import lombok.extern.slf4j.SLf4j;
    
    @Slf4j
    public class Hive2Kafka {
    	public static void main(String[] args) {
    		// 设置flink sql环境
    		EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().build();
    
    		// 创建table环境
    		TableEnvironment tableEnvironment = TableEnvironment.create(environmentSettings);
    
    		// 设置配置
    		tableEnvironment.getConfig().getConfiguration().setString("table.exec.hive.fallback-mapred-reader", "true")
    		
    		// 获取外部配置
    		ParameterTool parameterTool = ParameterTool.fromArgs(args);
    		log.info("parameters size: {}", parameterTool.getNumberOfParameters());
    		
    		// 获取所有配置
    		String hiveCatalogName = parameterTool.get("hive.catalog.name");
    		String hiveConfDir = parameterTool.get("hive.conf.dir");
    		String hiveDatabaseName = parameterTool.get("hive.db.name");
    		String hiveKafakaTable = parameterTool.get("hive.kafka.tb");
    		String kafkaBootstrapServer = parameterTool.get("kafka.bootstrap.server");
    		String kafkaTopic = parameterTool.get("kafka.topic");
    		String kafkaGroupId = parameterTool.get("kafka.group.id");
    		String kafkaUsername = parameterTool.get("kafka.username");
    		String kafkaPassword = parameterTool.get("kafka.password");
    		String insertKafkaTableSql = parameterTool.get("insert.kafka.table.sql");
    
    		// 创建hive catalog
    		HiveCatalog hiveCatalog = new HiveCatalog(hiveCatalogName, hiveDatabaseName, hiveConfDir);
    		// 注册catalog
    		tableEnvironment.registerCatalog(hiveCatalogName, hiveCatalog);
    		// 使用catalog
    		tableEnvironment.useCatalog(hiveCatalogName);
    		
    		String createKafkaTableSql = String.format("CREATE TABLE IF NOT EXISTS %s(`field01` STRING) \n" +
    		"WITH('connector' = 'kafka', \n" +
    		"'topic' = '%s', \n" + 
    		"'properties.group.id' = '%s', \n" +
    		"'properties.bootstrap.servers' = '%s', \n" +
    		"'scan.startup.mode' = 'group-offsets', \n" +
    		"'properties.auto.offset.reset' = 'earliest', \n" +
    		"'format' = 'raw', \n" +
    		"'properties.security.protocol' = 'SASL_PLAINTEXT', \n" +
    		"'properties.sasl.mechanism' = 'PLAIN', \n" +
    		"'properties.sasl.mechanism' = 'org.apache.kafka.common.security.plain.PlainLoginModule " +
    		"required username = \"%s\" password=\"%s\";'\n" +
    		")",hiveKafkaTable, kafkaTopic, kafkaGroupId, kafkaBootstrapServer, kafkaUsername, kafkaPassword);
    		// 创建kafka表	
    		tableEnvironment.executeSql(createKafkaTableSql).print();
    		// 执行flink sql
    		tableEnvironment.executeSql(insertKafkaTableSql).print();
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61

    执行

    使用yarn-application模式

    ./fkink run-application -t yarn-application flink-hive-2-kafka-1.0.jar --hive.db.name xxx --hive.kafka.tb xxx --kafka.bootstrap.server xxx:9092,xxx:9092 --kafka.topic xxx --kafka.group.id xxx --kafka.username xxx --kafka.password 'xxx' --sql.insert.kafka.table 'xxxxxxx'
    
    • 1
  • 相关阅读:
    数字签名算法类别及用途
    微软秋招内推八月截止,快来找我内推!
    微服务和领域驱动
    组合模式
    基础篇04——多表查询
    项目设计集合(人工智能方向):助力新人快速实战掌握技能、自主完成项目设计升级,提升自身的硬实力(不仅限NLP、知识图谱、计算机视觉等领域)
    软件设计体系 实验一 经典软件体系结构风格
    【Linux网络编程】服务端编程初体验
    微信小程序自定义按钮触发转发分享功能
    Typora文件拷贝另一台电脑后文件中插入的图片失效
  • 原文地址:https://blog.csdn.net/Andya_net/article/details/126544244