• 数仓开发之DIM层


    目录

    一:DIM层设计要点

     二:DIM层大概实操流程

         2.1 读取数据

       2.2 过滤数据

       2.3 写出数据

     三:配置表

    3.1 配置表设计

     四:实操流程

    4.1 接收Kafka数据,过滤空值数据

    4.2 动态拆分维度表功能

    4.3  把流中的数据保存到对应的维度表

    五:具体代码实现

     5.1 接收Kafka数据,过滤空值数据

    5.2 根据MySQL的配置表,动态进行分流

    5.3 保存维度到HBase(Phoenix)


    一:DIM层设计要点

    (1)DIM层的设计依据是维度建模理论,该层存储维度模型的维度表。

    (2)DIM层的数据存储在 HBase 表中。

    DIM 层表是用于维度关联的,要通过主键去获取相关维度信息,这种场景下 K-V 类型数据库的效率较高。常见的 K-V 类型数据库有 Redis、HBase,而 Redis 的数据常驻内存,会给内存造成较大压力,因而选用 HBase 存储维度数据。

    (3)DIM层表名的命名规范为dim_表名

     二:DIM层大概实操流程

         2.1 读取数据

    Kafka---topic_db(包含所有的46张业务表)

       2.2 过滤数据

    过滤出所需要的维表数据
            过滤条件:在代码中给定十几张维表的表名
            问题:如果增加维表,需要修改代码-重新编译-打包-上传、重启任务
            优化1:不修改代码、只重启任务
                配置信息中保存需要的维表信息,配置信息只在程序启动的时候加载一次
            优化2:不修改代码、不重启任务
                方向:让程序在启动以后还可以获取配置信息中增加的内容
                具体实施:
                    1) 定时任务:每隔一段时间加载一次配置信息
                        将定时任务写在Open方法
                    2) 监控配置信息:一旦配置信息增加了数据,可以立马获取到
                        (1) MySQLBinlog:FlinkCDC监控直接创建流
                            a.将配置信息处理成广播流:缺点 -> 如果配置信息过大,冗余太多
                            b.按照表名进行KeyBy处理:缺点 -> 有可能产生数据倾斜
                        (2) 文件:Flume->Kafka->Flink消费创建流

       2.3 写出数据

    将数据写出到Phoenix、JdbcSink、自定义Sink

     三:配置表

    本层的任务是将业务数据直接写入到不同的 HBase 表中。那么如何让程序知道流中的哪些数据是维度数据?维度数据又应该写到 HBase 的哪些表中?为了解决这个问题,我们选择在 MySQL 中构建一张配置表,通过 Flink CDC 将配置表信息读取到程序中。

    3.1 配置表设计

    1)字段解析

    我们将为配置表设计五个字段

    • source_table:作为数据源的业务数据表名 
    • sink_table:作为数据目的地的 Phoenix 表名
    • sink_columns:Phoenix 表字段
    • sink_pk:Phoenix 表主键
    • sink_extend:Phoenix 建表扩展,即建表时一些额外的配置语句

    source_table 作为配置表的主键,可以通过它获取唯一的目标表名、字段、主键和建表扩展,从而得到完整的 Phoenix 建表语句。

     数据格式:

    1. {"before":null,"after":
    2. {"source_table":"aa","sink_table":"bb","sink_columns":"cc","sink_pk":"id","sink_extend":"xxx"},"source":
    3. {"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":165251303
    4. 9549,"snapshot":"false","db":"gmall-211126-
    5. config","sequence":null,"table":"table_process","server_id":0,"gtid":null,"file":"","pos":0
    6. ,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1652513039551,"transaction":null}

    2)在Mysql中创建数据库建表并开启Binlog

    (1)创建数据库 gmall_config ,注意:和 gmall 业务库区分开

    1. [atguigu@hadoop102 db_log]$ mysql -uroot -p000000 -e"create database gmall_config charset
    2. utf8 default collate utf8_general_ci"

    (2)在 gmall_config 库中创建配置表 table_process

    1. CREATE TABLE `table_process` (
    2. `source_table` varchar(200) NOT NULL COMMENT '来源表',
    3. `sink_table` varchar(200) DEFAULT NULL COMMENT '输出表',
    4. `sink_columns` varchar(2000) DEFAULT NULL COMMENT '输出字段',
    5. `sink_pk` varchar(200) DEFAULT NULL COMMENT '主键字段',
    6. `sink_extend` varchar(200) DEFAULT NULL COMMENT '建表扩展',
    7. PRIMARY KEY (`source_table`)
    8. ) ENGINE=InnoDB DEFAULT CHARSET=utf8;

    (3)在MySQL配置文件中增加 gmall_config 开启Binlog

    [axing@hadoop107 ~]$ sudo vim /etc/my.cnf

     (4)为了方便测试,目前就插入两张表名数据,作为维度表

     四:实操流程

    4.1 接收Kafka数据,过滤空值数据

    对Maxwell抓取的数据进行ETL,有用的部分保留,没用的过滤掉。

    4.2 动态拆分维度表功能

    由于Maxwell是把全部数据统一写入一个Topic中, 这样显然不利于日后的数据处理。所以需要把各个维度表拆开处理

    在实时计算中一般把维度数据写入存储容器,一般是方便通过主键查询的数据库比如HBase,Redis,MySQL等。

    这样的配置不适合写在配置文件中,因为这样的话,业务端随着需求变化每增加一张维度表表,就要修改配置重启计算程序。

    所以这里需要一种动态配置方案,把这种配置长期保存起来,一旦配置有变化,实时计算可以自动感知。这种可以有三个方案实现:

    一种是用Zookeeper存储,通过Watch感知数据变化;

    另一种是用mysql数据库存储,周期性的同步;

    再一种是用mysql数据库存储,使用广播流。

    这里选择第三种方案,主要是MySQL对于配置数据初始化和维护管理,使用FlinkCDC读取配置信息表,将配置流作为广播流与主流进行连接。

    4.3  把流中的数据保存到对应的维度表

     维度数据保存到HBase的表中。

    五:具体代码实现

     5.1 接收Kafka数据,过滤空值数据

    1)创建 KafkaUtil 工具类

    和 Kafka 交互要用到 Flink 提供的 FlinkKafkaConsumer、FlinkKafkaProducer 类,为了提高模板代码的复用性,将其封装到 KafkaUtil 工具类中。

    此处从 Kafka 读取数据,创建 getKafkaConsumer(String topic, String groupId) 方法

    1. public class KafkaUtil {
    2. static String BOOTSTRAP_SERVERS = "hadoop102:9092, hadoop103:9092, hadoop104:9092";
    3. static String DEFAULT_TOPIC = "default_topic";
    4. public static FlinkKafkaConsumer getKafkaConsumer(String topic, String groupId) {
    5. Properties prop = new Properties();
    6. prop.setProperty("bootstrap.servers", BOOTSTRAP_SERVERS);
    7. prop.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    8. FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<>(topic,
    9. new KafkaDeserializationSchema() {
    10. @Override
    11. public boolean isEndOfStream(String nextElement) {
    12. return false;
    13. }
    14. @Override
    15. public String deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
    16. if(record != null && record.value() != null) {
    17. return new String(record.value());
    18. }
    19. return null;
    20. }
    21. @Override
    22. public TypeInformation getProducedType() {
    23. return TypeInformation.of(String.class);
    24. }
    25. }, prop);
    26. return consumer;
    27. }
    28. }

    2)主程序

    1. package com.atguigu.app.dim;
    2. import com.alibaba.fastjson.JSON;
    3. import com.alibaba.fastjson.JSONObject;
    4. import com.atguigu.app.func.TableProcessFunction;
    5. import com.atguigu.bean.TableProcess;
    6. import com.atguigu.utils.MyKafkaUtil;
    7. import com.ververica.cdc.connectors.mysql.source.MySqlSource;
    8. import com.ververica.cdc.connectors.mysql.table.StartupOptions;
    9. import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
    10. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    11. import org.apache.flink.api.common.functions.FlatMapFunction;
    12. import org.apache.flink.api.common.state.MapStateDescriptor;
    13. import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
    14. import org.apache.flink.streaming.api.datastream.BroadcastStream;
    15. import org.apache.flink.streaming.api.datastream.DataStreamSource;
    16. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    17. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    18. import org.apache.flink.util.Collector;
    19. public class DimApp {
    20. public static void main(String[] args) throws Exception {
    21. //1.获取执行环境
    22. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    23. env.setParallelism(1);//生产环境,并行度应设置为kafka主题的分区数
    24. /*
    25. //生产环境下使用:
    26. //1.1 开启checkpoint
    27. env.enableCheckpointing(5*6000L, CheckpointingMode.EXACTLY_ONCE);
    28. env.getCheckpointConfig().setCheckpointTimeout(10*6000L);
    29. env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
    30. env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,5000L));
    31. //1.2 设置状态后端
    32. env.setStateBackend(new HashMapStateBackend());
    33. env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop107:8020/211126/ck");
    34. System.setProperty("HADOOP_USER_NAME","atguigu");
    35. */
    36. //2.读取kafka topic_db主题数据创建主流
    37. String topic ="topic_db";
    38. String groupId = "dim_app_211126";
    39. DataStreamSource kafkaDS = env.addSource(MyKafkaUtil.getFlinkKafkaConsumer(topic, groupId));
    40. //3.过滤掉非JSON数据以及保留新增、变化以及初始化数据并将数据转换为JSON格式
    41. SingleOutputStreamOperator filterJsonDS = kafkaDS.flatMap(new FlatMapFunction() {
    42. @Override
    43. public void flatMap(String value, Collector collector) throws Exception {
    44. try {
    45. //将数据装换为JSON格式
    46. JSONObject jsonObject = JSON.parseObject(value);
    47. //获取数据中的操作类型字段
    48. String type = jsonObject.getString("type");
    49. //保留新增、变化、以及初始化数据
    50. if ("insert".equals(type) || "update".equals(type) || "bootstrap-insert".equals(type)) {
    51. collector.collect(jsonObject);
    52. }
    53. } catch (Exception e) {
    54. System.out.println("脏数据:" + value);//或者写入侧输出流
    55. }
    56. }
    57. });
    58. //4.使用FlinkCDC读取mysql配置信息表创建配置流
    59. MySqlSource mySqlSource = MySqlSource.builder()
    60. .hostname("hadoop107")
    61. .port(3306)
    62. .username("root")
    63. .password("000000")
    64. .databaseList("gmall-config")
    65. .tableList("gmall-config.table_process")
    66. .startupOptions(StartupOptions.initial())
    67. .deserializer(new JsonDebeziumDeserializationSchema())
    68. .build();
    69. DataStreamSource mysqlSourceDS = env.fromSource(mySqlSource,
    70. WatermarkStrategy.noWatermarks(),
    71. "MysqlSource"
    72. );
    73. //5.将配置流处理为广播流
    74. MapStateDescriptor mapStateDescriptor = new MapStateDescriptor<>("map-state", String.class, TableProcess.class);
    75. BroadcastStream broadcastStream = mysqlSourceDS.broadcast(mapStateDescriptor);
    76. //6.连接主流和广播流
    77. BroadcastConnectedStream connectedStream = filterJsonDS.connect(broadcastStream);
    78. //7.处理连接流,根据配置信息处理主流数据
    79. SingleOutputStreamOperator dimDS = connectedStream.process(new TableProcessFunction(mapStateDescriptor));
    80. //8.将数据写出到Phoenix
    81. dimDS.print(">>>>>");
    82. //9.启动任务
    83. env.execute();
    84. }
    85. }

    5.2 根据MySQL的配置表,动态进行分流

    1)导入依赖

    1. <dependency>
    2. <groupId>org.apache.flinkgroupId>
    3. <artifactId>flink-connector-jdbc_${scala.version}artifactId>
    4. <version>${flink.version}version>
    5. dependency>
    6. <dependency>
    7. <groupId>com.ververicagroupId>
    8. <artifactId>flink-connector-mysql-cdcartifactId>
    9. <version>2.1.0version>
    10. dependency>
    11. <dependency>
    12. <groupId>org.apache.phoenixgroupId>
    13. <artifactId>phoenix-sparkartifactId>
    14. <version>5.0.0-HBase-2.0version>
    15. <exclusions>
    16. <exclusion>
    17. <groupId>org.glassfishgroupId>
    18. <artifactId>javax.elartifactId>
    19. exclusion>
    20. exclusions>
    21. dependency>
    22. <dependency>
    23. <groupId>org.apache.flinkgroupId>
    24. <artifactId>flink-table-api-java-bridge_2.12artifactId>
    25. <version>1.13.0version>
    26. dependency>

    2)创建配置表实体类

    1. package com.atguigu.gmall.realtime.bean;
    2. import lombok.Data;
    3. @Data
    4. public class TableProcess {
    5. //来源表
    6. String sourceTable;
    7. //输出表
    8. String sinkTable;
    9. //输出字段
    10. String sinkColumns;
    11. //主键字段
    12. String sinkPk;
    13. //建表扩展
    14. String sinkExtend;
    15. }

    3)编写操作读取配置表形成广播流

    1. // TODO 6. FlinkCDC 读取配置流并广播流
    2. // 6.1 FlinkCDC 读取配置表信息
    3. MySqlSource mySqlSource = MySqlSource.builder()
    4. .hostname("hadoop102")
    5. .port(3306)
    6. .databaseList("gmall_config") // set captured database
    7. .tableList("gmall_config.table_process") // set captured table
    8. .username("root")
    9. .password("000000")
    10. .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
    11. .startupOptions(StartupOptions.initial())
    12. .build();
    13. // 6.2 封装为流
    14. DataStreamSource mysqlDSSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MysqlSource");
    15. // 6.3 广播配置流
    16. MapStateDescriptor tableConfigDescriptor = new MapStateDescriptor("table-process-state", String.class, TableProcess.class);
    17. BroadcastStream broadcastDS = mysqlDSSource.broadcast(tableConfigDescriptor);
    18. // TODO 7. 连接流
    19. BroadcastConnectedStream connectedStream = filterDS.connect(broadcastDS);

    4)定义一个项目中常用的配置常量类GmallConfig

    1. package com.atguigu.gmall.realtime.common;
    2. public class GmallConfig {
    3. // Phoenix库名
    4. public static final String HBASE_SCHEMA = "GMALL2022_REALTIME";
    5. // Phoenix驱动
    6. public static final String PHOENIX_DRIVER = "org.apache.phoenix.jdbc.PhoenixDriver";
    7. // Phoenix连接参数
    8. public static final String PHOENIX_SERVER = "jdbc:phoenix:hadoop102,hadoop103,hadoop104:2181";
    9. }

    5)自定义函数MyBroadcastFunction

    (1)定义类MyBroadcastFunction

    1. package com.atguigu.gmall.realtime.app.func;
    2. import com.alibaba.fastjson.JSONObject;
    3. import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
    4. import org.apache.flink.util.Collector;
    5. import org.apache.flink.util.OutputTag;
    6. public class MyBroadcastFunction extends BroadcastProcessFunction {
    7. private MapStateDescriptor tableConfigDescriptor;
    8. public MyBroadcastFunction(MapStateDescriptor tableConfigDescriptor) {
    9. this.tableConfigDescriptor = tableConfigDescriptor;
    10. }
    11. @Override
    12. public void processElement(JSONObject jsonObj, ReadOnlyContext readOnlyContext, Collector out) throws Exception {
    13. }
    14. @Override
    15. public void processBroadcastElement(String jsonStr, Context context, Collector out) throws Exception {
    16. }
    17. }

     (2)自定义函数MyBroadcastFunction-open

    1. // 定义Phoenix的连接
    2. private Connection conn;
    3. @Override
    4. public void open(Configuration parameter) throws Exception {
    5. super.open(parameter);
    6. Class.forName(GmallConfig.PHOENIX_DRIVER);
    7. conn = DriverManager.getConnection(GmallConfig.PHOENIX_SERVER);
    8. }

    (3)自定义函数MyBroadcastFunction-processBroadcastElement

    1. @Override
    2. public void processBroadcastElement(String jsonStr, Context context, Collector out) throws Exception {
    3. JSONObject jsonObj = JSON.parseObject(jsonStr);
    4. BroadcastState tableConfigState = context.getBroadcastState(tableConfigDescriptor);
    5. String op = jsonObj.getString("op");
    6. if ("d".equals(op)) {
    7. TableProcess before = jsonObj.getObject("before", TableProcess.class);
    8. String sourceTable = before.getSourceTable();
    9. tableConfigState.remove(sourceTable);
    10. } else {
    11. TableProcess config = jsonObj.getObject("after", TableProcess.class);
    12. String sourceTable = config.getSourceTable();
    13. String sinkTable = config.getSinkTable();
    14. String sinkColumns = config.getSinkColumns();
    15. String sinkPk = config.getSinkPk();
    16. String sinkExtend = config.getSinkExtend();
    17. tableConfigState.put(sourceTable, config);
    18. checkTable(sinkTable, sinkColumns, sinkPk, sinkExtend);
    19. }
    20. }

    (4)自定义函数MyBroadcastFunction-checkTable

    在 Phoenix 建表之前要先创建命名空间 GMALL2022_REALTIM

    0: jdbc:phoenix:> create schema GMALL2022_REALTIME;

    checkTable() 方法如下

    1. /**
    2. * Phoenix 建表函数
    3. *
    4. * @param sinkTable 目标表名 eg. test
    5. * @param sinkColumns 目标表字段 eg. id,name,sex
    6. * @param sinkPk 目标表主键 eg. id
    7. * @param sinkExtend 目标表建表扩展字段 eg. ""
    8. * eg. create table if not exists mydb.test(id varchar primary key, name varchar, sex varchar)...
    9. */
    10. private void checkTable(String sinkTable, String sinkColumns, String sinkPk, String sinkExtend) {
    11. // 封装建表 SQL
    12. StringBuilder sql = new StringBuilder();
    13. sql.append("create table if not exists " + GmallConfig.HBASE_SCHEMA
    14. + "." + sinkTable + "(\n");
    15. String[] columnArr = sinkColumns.split(",");
    16. // 为主键及扩展字段赋默认值
    17. if (sinkPk == null) {
    18. sinkPk = "id";
    19. }
    20. if (sinkExtend == null) {
    21. sinkExtend = "";
    22. }
    23. // 遍历添加字段信息
    24. for (int i = 0; i < columnArr.length; i++) {
    25. sql.append(columnArr[i] + " varchar");
    26. // 判断当前字段是否为主键
    27. if (sinkPk.equals(columnArr[i])) {
    28. sql.append(" primary key");
    29. }
    30. // 如果当前字段不是最后一个字段,则追加","
    31. if (i < columnArr.length - 1) {
    32. sql.append(",\n");
    33. }
    34. }
    35. sql.append(")");
    36. sql.append(sinkExtend);
    37. String createStatement = sql.toString();
    38. // 为数据库操作对象赋默认值,执行建表 SQL
    39. PreparedStatement preparedSt = null;
    40. try {
    41. preparedSt = conn.prepareStatement(createStatement);
    42. preparedSt.execute();
    43. } catch (SQLException sqlException) {
    44. sqlException.printStackTrace();
    45. System.out.println("建表语句\n" + createStatement + "\n执行异常");
    46. } finally {
    47. if (preparedSt != null) {
    48. try {
    49. preparedSt.close();
    50. } catch (SQLException sqlException) {
    51. sqlException.printStackTrace();
    52. throw new RuntimeException("数据库操作对象释放异常");
    53. }
    54. }
    55. }
    56. }

    (5)自定义函数MyBroadcastFunction-processElement()

    1. @Override
    2. public void processElement(JSONObject jsonObj, ReadOnlyContext readOnlyContext, Collector out) throws Exception {
    3. ReadOnlyBroadcastState tableConfigState = readOnlyContext.getBroadcastState(tableConfigDescriptor);
    4. // 获取配置信息
    5. String sourceTable = jsonObj.getString("table");
    6. TableProcess tableConfig = tableConfigState.get(sourceTable);
    7. if (tableConfig != null) {
    8. JSONObject data = jsonObj.getJSONObject("data");
    9. String sinkTable = tableConfig.getSinkTable();
    10. // 根据 sinkColumns 过滤数据
    11. String sinkColumns = tableConfig.getSinkColumns();
    12. filterColumns(data, sinkColumns);
    13. // 将目标表名加入到主流数据中
    14. data.put("sinkTable", sinkTable);
    15. out.collect(data);
    16. }
    17. }

    (6)自定义函数MyBroadcastFunction-filterColumns(),校验字段,过滤掉多余的字段

    1. private void filterColumns(JSONObject data, String sinkColumns) {
    2. Set> dataEntries = data.entrySet();
    3. dataEntries.removeIf(r -> !sinkColumns.contains(r.getKey()));
    4. }

    (7)主程序DimSinkApp中调用MyBroadcastFunction提取维度数据

    1. // TODO 8. 处理维度表数据
    2. SingleOutputStreamOperator dimDS = connectedStream.process(
    3. new MyBroadcastFunction(tableConfigDescriptor)
    4. );

    5.3 保存维度到HBase(Phoenix)

    1)程序流程分析

     

    DimSink 继承了RickSinkFunction,这个function得分两条时间线:

    一条是任务启动时执行open操作(图中紫线),我们可以把连接的初始化工作放在此处一次性执行;

    另一条是随着每条数据的到达反复执行invoke()(图中黑线),在这里面我们要实现数据的保存,主要策略就是根据数据组合成sql提交给hbase。

    2)创建 PhoenixUtil 工具类,在其中创建insertValues()方法

    1. package com.atguigu.gmall.realtime.util;
    2. import com.alibaba.fastjson.JSONObject;
    3. import com.atguigu.gmall.realtime.common.GmallConfig;
    4. import org.apache.commons.beanutils.BeanUtils;
    5. import org.apache.commons.lang3.StringUtils;
    6. import java.sql.*;
    7. import java.util.ArrayList;
    8. import java.util.Collection;
    9. import java.util.List;
    10. import java.util.Set;
    11. public class PhoenixUtil {
    12. /**
    13. * Phoenix 表数据导入方法
    14. *
    15. * @param conn 连接对象
    16. * @param sinkTable 写入数据的 Phoenix 目标表名
    17. * @param data 待写入的数据
    18. */
    19. public static void insertValues(Connection conn, String sinkTable, JSONObject data) {
    20. // 获取字段名
    21. Set columns = data.keySet();
    22. // 获取字段对应的值
    23. Collection values = data.values();
    24. // 拼接字段名
    25. String columnStr = StringUtils.join(columns, ",");
    26. // 拼接字段值
    27. String valueStr = StringUtils.join(values, "','");
    28. // 拼接插入语句
    29. String sql = "upsert into " + GmallConfig.HBASE_SCHEMA
    30. + "." + sinkTable + "(" +
    31. columnStr + ") values ('" + valueStr + "')";
    32. // 为数据库操作对象赋默认值
    33. PreparedStatement preparedSt = null;
    34. // 执行 SQL
    35. try {
    36. preparedSt = conn.prepareStatement(sql);
    37. preparedSt.execute();
    38. // 提交事务
    39. conn.commit();
    40. } catch (SQLException sqlException) {
    41. sqlException.printStackTrace();
    42. throw new RuntimeException("数据库操作对象获取或执行异常");
    43. } finally {
    44. if (preparedSt != null) {
    45. try {
    46. preparedSt.close();
    47. } catch (SQLException sqlException) {
    48. sqlException.printStackTrace();
    49. throw new RuntimeException("数据库操作对象释放异常");
    50. }
    51. }
    52. }
    53. }
    54. }
    55. 3)MyPhoenixSink

      自定义 SinkFunction 子类 MyPhoenixSink,在其中调用 Phoenix 工具类的 insertValues(String sinkTable, JSONObject data) 方法,将维度数据写出到 Phoenix 的维度表中。为了提升效率,减少频繁创建销毁连接带来的性能损耗,创建连接池。

      (1)添加德鲁伊连接池依赖

      1. <dependency>
      2. <groupId>com.alibabagroupId>
      3. <artifactId>druidartifactId>
      4. <version>1.1.16version>
      5. dependency>

      2)连接池创建工具类

      1. package com.atguigu.gmall.realtime.util;
      2. import com.alibaba.druid.pool.DruidDataSource;
      3. public class DruidDSUtil {
      4. private static DruidDataSource druidDataSource;
      5. public static DruidDataSource createDataSource() {
      6. // 创建连接池
      7. druidDataSource = new DruidDataSource();
      8. // 设置驱动全类名
      9. druidDataSource.setDriverClassName(GmallConfig.PHOENIX_DRIVER);
      10. // 设置连接 url
      11. druidDataSource.setUrl(GmallConfig.PHOENIX_SERVER);
      12. // 设置初始化连接池时池中连接的数量
      13. druidDataSource.setInitialSize(5);
      14. // 设置同时活跃的最大连接数
      15. druidDataSource.setMaxActive(20);
      16. // 设置空闲时的最小连接数,必须介于 0 和最大连接数之间,默认为 0
      17. druidDataSource.setMinIdle(1);
      18. // 设置没有空余连接时的等待时间,超时抛出异常,-1 表示一直等待
      19. druidDataSource.setMaxWait(-1);
      20. // 验证连接是否可用使用的 SQL 语句
      21. druidDataSource.setValidationQuery("select 1");
      22. // 指明连接是否被空闲连接回收器(如果有)进行检验,如果检测失败,则连接将被从池中去除
      23. // 注意,默认值为 true,如果没有设置 validationQuery,则报错
      24. // testWhileIdle is true, validationQuery not set
      25. druidDataSource.setTestWhileIdle(true);
      26. // 借出连接时,是否测试,设置为 false,不测试,否则很影响性能
      27. druidDataSource.setTestOnBorrow(false);
      28. // 归还连接时,是否测试
      29. druidDataSource.setTestOnReturn(false);
      30. // 设置空闲连接回收器每隔 30s 运行一次
      31. druidDataSource.setTimeBetweenEvictionRunsMillis(30 * 1000L);
      32. // 设置池中连接空闲 30min 被回收,默认值即为 30 min
      33. druidDataSource.setMinEvictableIdleTimeMillis(30 * 60 * 1000L);
      34. return druidDataSource;
      35. }
      36. }

      3MyPhoenixSink 函数

      1. package com.atguigu.gmall.realtime.app.func;
      2. import com.alibaba.druid.pool.DruidDataSource;
      3. import com.alibaba.druid.pool.DruidPooledConnection;
      4. import com.alibaba.fastjson.JSONObject;
      5. import com.atguigu.gmall.realtime.util.DruidDSUtil;
      6. import com.atguigu.gmall.realtime.util.PhoenixUtil;
      7. import org.apache.flink.configuration.Configuration;
      8. import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
      9. import java.sql.SQLException;
      10. public class MyPhoenixSink extends RichSinkFunction {
      11. private DruidDataSource druidDataSource;
      12. @Override
      13. public void open(Configuration parameters) throws Exception {
      14. super.open(parameters);
      15. // 创建连接池
      16. druidDataSource = DruidDSUtil.createDataSource();
      17. }
      18. @Override
      19. public void invoke(JSONObject jsonObj, Context context) throws Exception {
      20. // 获取目标表表名
      21. String sinkTable = jsonObj.getString("sinkTable");
      22. // 获取 id 字段的值
      23. String id = jsonObj.getString("id");
      24. // 清除 JSON 对象中的 sinkTable 字段
      25. // 以便可将该对象直接用于 HBase 表的数据写入
      26. jsonObj.remove("sinkTable");
      27. // 获取连接对象
      28. DruidPooledConnection conn = druidDataSource.getConnection();
      29. try {
      30. PhoenixUtil.insertValues(conn, sinkTable, jsonObj);
      31. } catch (Exception e) {
      32. System.out.println("维度数据写入异常");
      33. e.printStackTrace();
      34. } finally {
      35. try {
      36. // 归还数据库连接对象
      37. conn.close();
      38. } catch (SQLException sqlException) {
      39. System.out.println("数据库连接对象归还异常");
      40. sqlException.printStackTrace();
      41. }
      42. }
      43. }
      44. }

      4)主程序 DimSinkApp 中调用 MyPhoenixSink

      1. // TODO 9. 将数据写入 Phoenix 表
      2. dimDS.addSink(new MyPhoenixSink());

      6)测试

      (1)启动HDFS、ZK、Kafka、Maxwell、HBase

      (2)运行 IDEA 中的 DimSinkApp

      (3)执行 mysql_to_kafka_init.sh 脚本

      mysql_to_kafka_init.sh all

      (4)通过phoenix查看hbase的schema以及表情况


       附:整个流程的步骤以及所需要的进程

      1. 数据流:web/app -> nginx -> 业务服务器 -> Mysql(binlog) -> Maxwell -> Kafka(ODS) -> FlinkApp -> Phoenix
      2. 程序:Mock -> Mysql(binlog) -> Maxwell -> Kafka(Zk) -> DimApp(FlinkCDC/Mysql) -> Phoenix(HBase/ZK/HDFS)
      3. /**
      4. * 需要启动的进程:
      5. * dfs -> zookeeper -> kafka -> maxwell -> hbase -> phoenix(客户端):bin/sqlline.py
      6. */

    56. 相关阅读:
      【Linux】Linux权限
      金仓数据库KingbaseES客户端编程接口指南-ado.net(7. Kdbnpg支持的类型和类型映射)
      JavaScript -- 05. 对象介绍
      OpenResty的文件IO操作
      全国青少年软件编程等级考试标准(正式级)
      docker:CentOS安装 docker和默认安装目录
      单机部署
      单细胞+RIP-seq项目文章| Cell Reports&hnRNPU蛋白在小鼠精原干细胞池建立的关键作用
      麒麟系统安装找不到安装源!!!!设置基础软件仓库时出错
      掌握基本排序算法:冒泡、选择、插入和快速排序
    57. 原文地址:https://blog.csdn.net/JiaXingNashishua/article/details/127771558