• 离线数仓(四)【数仓数据同步策略】


    前言

            今天来把数仓数据同步解决掉,前面我们已经把日志数据到 Kafka 的通道打通了。

    1、实时数仓数据同步

            关于实时数仓,我们的 Flink 直接去 Kafka 读取即可,我们在学习 Flink 的时候也知道 Flink 提供了 Kafka Source,所以这里不需要再去添加什么额外的配置。

    2、离线数仓数据同步

            Flink 可以从 Kafka 中读取数据,可是 Hive 不行啊,Hive 是从 Hadoop HDFS 中读取数据,所以的离线数仓需要进行一些配置。

    2.1、用户行为日志数据同步

    2.1.1、数据通道选择

            用户行为数据由 Flume 从 Kafka 直接同步到 HDFS,由于离线数仓采用 Hive 的分区表按天统计,所以目标路径要包含一层日期。

            这里,我们的 hive 分区表需要按天分区,那就需要我们 Flume 从 Kafka 读取到的数据包含 Event Header 信息(hdfs sink 默认就是按照 event header 中的 timestamp 来落盘的),但是我们上游把用户行为日志传输到 Kafka Channel 的时候,我们设置了 parseAsFlumeEvent=false,这就导致存储在 Kafka Channel 中的日志只有 Event Body,没有 Event Header。应该怎么把 Kafka Channel 中的数据读取写入到 HDFS 而且还能够给日志数据增加一个 header,我们有两种选择方案:

            1. 如果我们选择了 Kafka Channel 做数据源(我们之前说 Kfaka Channel 一共有 3 种结构:source -> kafka channel 、source -> kafka channel -> sink、kafka channel -> sink),选择了 kafka channel -> sink 结构的话,kafka channel 自己会封装一个 header 发送给 sink,但是这个 header 没有时间信息(timestamp),Event Body 中也可以有时间信息(要求我们日志时产生给每一条日志添加时间信息),但是我们不可以在 kafka channel 和 hdfs sink 之间设置拦截器去提取 body 中的时间信息(因为自定义拦截器只能在 source 和 channel 之间使用),所以这种结构无法实现。

            2. 上一种方案如果可以实现的话,我们就省去了 source 读取,可惜上一种结构无法实现,除非把上游的 parseAsFlumeEvent 设置为 true 。所以我们只能再开一个完整的 flume 作业去 kafka 读取,即 kafak source -> file channel -> hdfs sink。

    2.1.2 日志消费Flume配置概述

            按照规划,该 Flume 需将 Kafka 中 topic_log 的数据发往 HDFS。并且对每天产生的用户行为日志进行区分,将不同天的数据发往HDFS不同天的路径。

            这里我们选择Kafka Source、File Channel(数据比较重要的话一般都用 file channel)、HDFS Sink。

    2.1.3、Flume 配置文件

    1. # 定义组件
    2. a1.sources = r1
    3. a1.channels = c1
    4. a1.sinks = k1
    5. # 配置sources
    6. a1.sources.r1.channels = c1
    7. a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
    8. a1.sources.r1.kafka.bootstrap = hadoop102:9092,hadoop103:9092,hadoop104:9092
    9. a1.sources.r1.kafka.topics = topic_log
    10. a1.sources.r1.kafka.consumer.group.id = topic_log
    11. a1.sources.r1.batchSize = 2000
    12. a1.sources.r1.batchDurationMillis = 1000
    13. a1.sources.r1.interceptors = i1
    14. a1.sources.r1.interceptors.i1.type = com.lyh.gmall.interceptor.TimestampInterceptor$Builder
    15. # 配置channels
    16. a1.channels.c1.type = file
    17. a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
    18. a1.channels.c1.useDualCheckpoints = false
    19. a1.channels.c1.dataDirs = /opt/module/flume/checkpoint/behavior1
    20. a1.channels.c1.maxFileSize = 2146435071
    21. a1.channels.c1.capacity = 1000000
    22. a1.channels.c1.keep-alive = 3
    23. # 配置 sinks
    24. a1.sinks.k1.type = hdfs
    25. a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d
    26. a1.sinks.k1.hdfs.filePrefix = log
    27. a1.sinks.k1.hdfs.round = false
    28. a1.sinks.k1.hdfs.rollInterval = 10
    29. a1.sinks.k1.hdfs.rollSize = 134217728
    30. a1.sinks.k1.hdfs.rollCount = 0
    31. #组装
    32. a1.sources.r1.channels = c1
    33. a1.sinks.k1.channel = c1

    kafka source 配置:

    • 我们知道,flume 每发送一次数据需要满足的条件是数据量达到 batchSize 条,或者时间达到 batchDurationMillis 。所以这里 batchDurationMillis  的配置尽量和系统达到 batchSize 的时间相近。比如每 2s 生成 2000 条的数据,那我们这里的 batchDurationMillis 最好就配置为 2000
    • kafka.topics 和 kafka.topics.regex 这两个配置虽然都被加粗,但是只需要配置一个即可。
    • kafka.consumer.group.id(默认为 flume),这里我们尽量配置 kafka 消费者组和业务名一样,因为我们实际项目中可能会有很多业务,如果这几个业务都需要消费这个 topic,但是如果不配置消费者组id,那么这些业务的消费者就会默认被分配到一个消费者组(flume 组),而一个 topic 的一个分区只能被一个消费者组的一个消费者所消费(我们这里的主题 topic_log 并只有一个分区),这样的话,只有一个业务的消费者可以消费到,而别的业务的消费者消费不到。

    file channel 配置:

    • checkpointDir:flume 的 file channel 有一个索引机制,它会把读取到的索引保存到内存当中去,但是防止数据丢失,还会再备份一次,这里就是配置备份的路径。
    • useDualCheckpoints(默认为 false):表示是否开启二次备份。因为一次备份即使保存在磁盘,还是有出问题的可能,如果配置这个参数为 true 则必须配置参数 backupCheckpointDir
    • backupCheckpointDir:这个参数就是配置二次备份的地址。
    • dataDirs:flume 的多目录存储,可以把数据存储在服务器的多个磁盘上
    • maxFileSize:我们的 file channel 是要写入文件的,这里配置的是这个文件的最大大小
    • capacity:file channel 容纳数据条数的限制,默认最多 100w 条
    • keep-alive:我们的 file channel 中的数据如果满了的时候,source 是写不进去的,这就需要回滚,还需要 kafka source 再从 kafka 去读一次,这样条浪费性能了。这个参数的作用是等一会,等到 channel 腾出一定空间之后再写进去。

    hdfs sink 配置:

    • hdfs.path:我们的 hdfs 保存路径中包含 %Y-%m-%d ,这意味这这个文件夹中保存的是一天的数据内容,如果我们有要求保存几个小时的内容,就需要设置 round 参数。
    • round(默认是 false):flume 做的是离线的数据传输,我们的日志会每隔一定时间进行落盘。要精确到小时分钟或秒的话,就需要设置 roundValue 和 roundUnit 参数。比如每 6 个小时进行一次落盘的话,我们首先把路径改为 %Y-%m-%d/%h ,然后 roundValue 设置为 6,roundUnit 设置为 hour。
    • roundValue:时间值
    • roundUnit:时间单位
    • rollInterval:hdfs 数据块滚动间隔(默认是 30s,单位是秒),同样我们最好设置这个采纳数的时间刚好差不多生成一个块大小(128MB)
    • rollSize:基于文件的大小进行滚动(一般我们配置为 134217728 也就是 128MB
    • rollCount:基于 event 的条数进行滚动(一般设置为 0,因为用数据条数不太好控制文件的大小

    注意:rollInterval、rollSize、rollCount 如果都设置为 0 则代表该配置参数不生效。配置不当很容易造成大量小文件问题(危害:hdfs中一个文件在namenode中占用 150kb、一个文件会生成一个 map task )。

    1. 数据漂移问题

    当我们有数据比如在 23:59:59 经过 3s 才能发送到 kafka source,这时 kafka source 会在 event header 中封装一个 timestamp 信息,但是这时封装的 timestamp 已经到第二天了。

    所以解决的办法就是,利用 flume 的自定义拦截器去把 kafka source 中 event body 的时间信息读取出来,封装到 header 当中去,这样就不会造成落盘错误了:

    2. 编写拦截器
    1. package com.lyh.gmall.interceptor;
    2. import com.alibaba.fastjson.JSONObject;
    3. import org.apache.flume.Context;
    4. import org.apache.flume.Event;
    5. import org.apache.flume.interceptor.Interceptor;
    6. import java.nio.charset.StandardCharsets;
    7. import java.util.List;
    8. import java.util.Map;
    9. public class TimestampInterceptor implements Interceptor {
    10. @Override
    11. public void initialize() {
    12. }
    13. @Override
    14. public Event intercept(Event event) {
    15. // 1. 获取 header 和 body 中的数据
    16. Map headers = event.getHeaders();
    17. String log = new String(event.getBody(), StandardCharsets.UTF_8);
    18. // 2. 解析 log(json) 中的 ts 字段
    19. String ts = JSONObject.parseObject(log).getString("ts");
    20. // 3. 把解析出来的 ts 值放到 header 中
    21. headers.put("timestamp",ts);
    22. return event;
    23. }
    24. @Override
    25. public List intercept(List list) {
    26. for (Event event : list) {
    27. intercept(event);
    28. }
    29. return list;
    30. }
    31. @Override
    32. public void close() {
    33. }
    34. public static class Builder implements Interceptor.Builder {
    35. @Override
    36. public Interceptor build() {
    37. return new TimestampInterceptor();
    38. }
    39. @Override
    40. public void configure(Context context) {
    41. }
    42. }
    43. }

    完了重新打包到 hadoop104 下 flume 的 lib 目录下

    2.1.4、日志消费测试

    1. 启动 Zookeeper、Kafka
    2. 启动hadoop102的用户日志采集脚本
    f1.sh start

    这样我们的用户行为日志就被 flume 采集到了 kafka

    3. 在 hadoop104 从kafka 采集日志到 hdfs
    bin/flume-ng agent -n a1 -c conf/ -f job/warehouse/kafka_to_hdfs_log.conf -Dflume.root.logger=INFO,console
    4. 模拟数据生成
    mklog.sh
    5. 测试结果

    可以看到,用户行为日志被成功上传到了 hdfs。

    2.1.5、日志启停脚本

    我们在 hadoop102 编写一个脚本 f2.sh:

    1. #!/bin/bash
    2. case $1 in
    3. "start")
    4. echo " --------启动 hadoop104 日志数据flume-------"
    5. ssh hadoop104 "nohup /opt/module/flume-1.9.0/bin/flume-ng agent -n a1 -c /opt/module/flume-1.9.0/conf -f /opt/module/flume-1.9.0/job/warehouse/kafka_to_hdfs_log.conf >/dev/null 2>&1 &"
    6. ;;
    7. "stop")
    8. echo " --------停止 hadoop104 日志数据flume-------"
    9. ssh hadoop104 "ps -ef | grep kafka_to_hdfs_log | grep -v grep |awk '{print \$2}' | xargs -n1 kill"
    10. ;;
    11. esac

    2.2、业务日志数据同步

            对离线数仓来说,业务数据一般都是按天来进行同步的;但对实时数仓来说,来一条业务数据就必须马上同步。所以对于离线数仓,我们可以不使用 MaxWell ,而是通过 DataX 每天全量采集到数仓。

    2.2.1 数据同步策略概述

            业务数据是数据仓库的重要数据来源,我们需要每日定时从业务数据库中抽取数据,传输到数据仓库中,之后再对数据进行分析统计。

            为保证统计结果的正确性,需要保证数据仓库中的数据与业务数据库是同步的,离线数仓的计算周期通常为天,所以数据同步周期也通常为天,即每天同步一次即可。

            数据的同步策略有全量同步增量同步

            全量同步,就是每天都将业务数据库中的全部数据同步一份到数据仓库,这是保证两侧数据同步的最简单的方式。就相当于每天进行一次 select * from xxx;

            那我们的历史数据(比如今天全量同步后,今天之前的数据就是历史数据)就没有意义了吗?其实我们并不会立即删除历史数据,因为数据是有价值的,我们既可以分析其中的变化,也可以作为备份以防不测。

            增量同步,就是每天只将业务数据中的新增及变化数据同步到数据仓库。采用每日增量同步的表,通常需要在首日先进行一次全量同步。

    2.2.2 数据同步策略选择

    两种策略都能保证数据仓库和业务数据库的数据同步,那应该如何选择呢?下面对两种策略进行简要对比。

    同步策略

    优点

    缺点

    全量同步

    逻辑简单

    在某些情况下效率较低(比如我的一张表大小10亿条,但是我每天只增加一条)。例如某张表数据量较大,但是每天数据的变化比例很低,若对其采用每日全量同步,则会重复同步和存储大量相同的数据。

    增量同步

    效率高,无需同步和存储重复数据

    逻辑复杂,需要将每日的新增及变化数据同原来的数据进行整合,才能使用

    根据上述对比,可以得出以下结论:

            通常情况,业务表数据量比较大,优先考虑增量,数据量比较小,优先考虑全量;具体选择由数仓模型决定。

    大表变化多全量
    大表变化少增量
    小表(比如省份表)变化多全量
    小表变化少全量

    我们一般把全量同步的表叫做维度表,把增量同步的表叫做事实表。

    2.2.3 数据同步工具概述

            数据同步工具种类繁多,大致可分为两类,一类是以 DataX、Sqoop 为代表的基于Select查询的离线、批量同步工具,另一类是以 Maxwell、Canal 为代表的基于数据库数据变更日志(例如MySQL 的 binlog,其会实时记录所有的 insert、update 以及 delete操作)的实时流式同步工具。

            全量同步通常使用 DataX、Sqoop 等基于查询的离线同步工具。而增量同步既可以使用DataX、Sqoop等工具,也可使用 Maxwell、Canal 等工具,下面对增量同步不同方案进行简要对比。

    增量同步方案

    DataX/Sqoop

    Maxwell/Canal

    对数据库的要求

    原理是基于查询,故若想通过select查询获取新增及变化数据,就要求数据表中存在create_time、update_time等字段,然后根据这些字段获取变更数据。

    要求数据库记录变更操作,例如MySQL需开启binlog。

    数据的中间状态

    由于是离线批量同步,故若一条数据在一天中变化多次,该方案只能获取最后一个状态,中间状态无法获取。

    由于是实时获取所有的数据变更操作,所以可以获取变更数据的所有中间状态。

    接下来我们选择用 DataX 来做全量数据的同步工作,用 Maxwell 来做增量数据的同步工作。 

    2.2.4、全量数据同步

    全量表数据由 DataX 从 MySQL 业务数据库直接同步到 HDFS,具体数据流向如下图所示:

    1. DataX 配置文件

    回顾我们执行 DataX 脚本的命令:

    python /opt/module/datax/bin/datax.py /opt/module/datax/job/job.json

    我们是通过把配置写进一个 json 文件然后执行的,所以这里,我们需要全量同步的表共 15 张,也就意味着需要写 15 个 json 配置文件,但是毕竟开发中不可能 100个、1000个表我们也都一个个手写,所以这里我们通过一个 python 来自动生成:

    vim ~/bin/gen_import_config.py
    1. # ecoding=utf-8
    2. import json
    3. import getopt
    4. import os
    5. import sys
    6. import MySQLdb
    7. #MySQL相关配置,需根据实际情况作出修改
    8. mysql_host = "hadoop102"
    9. mysql_port = "3306"
    10. mysql_user = "root"
    11. mysql_passwd = "123456"
    12. #HDFS NameNode相关配置,需根据实际情况作出修改
    13. hdfs_nn_host = "hadoop102"
    14. hdfs_nn_port = "8020"
    15. #生成配置文件的目标路径,可根据实际情况作出修改
    16. output_path = "/opt/module/datax/job/import"
    17. def get_connection():
    18. return MySQLdb.connect(host=mysql_host, port=int(mysql_port), user=mysql_user, passwd=mysql_passwd)
    19. def get_mysql_meta(database, table):
    20. connection = get_connection()
    21. cursor = connection.cursor()
    22. sql = "SELECT COLUMN_NAME,DATA_TYPE from information_schema.COLUMNS WHERE TABLE_SCHEMA=%s AND TABLE_NAME=%s ORDER BY ORDINAL_POSITION"
    23. cursor.execute(sql, [database, table])
    24. fetchall = cursor.fetchall()
    25. cursor.close()
    26. connection.close()
    27. return fetchall
    28. def get_mysql_columns(database, table):
    29. return map(lambda x: x[0], get_mysql_meta(database, table))
    30. def get_hive_columns(database, table):
    31. def type_mapping(mysql_type):
    32. mappings = {
    33. "bigint": "bigint",
    34. "int": "bigint",
    35. "smallint": "bigint",
    36. "tinyint": "bigint",
    37. "decimal": "string",
    38. "double": "double",
    39. "float": "float",
    40. "binary": "string",
    41. "char": "string",
    42. "varchar": "string",
    43. "datetime": "string",
    44. "time": "string",
    45. "timestamp": "string",
    46. "date": "string",
    47. "text": "string"
    48. }
    49. return mappings[mysql_type]
    50. meta = get_mysql_meta(database, table)
    51. return map(lambda x: {"name": x[0], "type": type_mapping(x[1].lower())}, meta)
    52. def generate_json(source_database, source_table):
    53. job = {
    54. "job": {
    55. "setting": {
    56. "speed": {
    57. "channel": 3
    58. },
    59. "errorLimit": {
    60. "record": 0,
    61. "percentage": 0.02
    62. }
    63. },
    64. "content": [{
    65. "reader": {
    66. "name": "mysqlreader",
    67. "parameter": {
    68. "username": mysql_user,
    69. "password": mysql_passwd,
    70. "column": get_mysql_columns(source_database, source_table),
    71. "splitPk": "",
    72. "connection": [{
    73. "table": [source_table],
    74. "jdbcUrl": ["jdbc:mysql://" + mysql_host + ":" + mysql_port + "/" + source_database]
    75. }]
    76. }
    77. },
    78. "writer": {
    79. "name": "hdfswriter",
    80. "parameter": {
    81. "defaultFS": "hdfs://" + hdfs_nn_host + ":" + hdfs_nn_port,
    82. "fileType": "text",
    83. "path": "${targetdir}",
    84. "fileName": source_table,
    85. "column": get_hive_columns(source_database, source_table),
    86. "writeMode": "append",
    87. "fieldDelimiter": "\t",
    88. "compress": "gzip"
    89. }
    90. }
    91. }]
    92. }
    93. }
    94. if not os.path.exists(output_path):
    95. os.makedirs(output_path)
    96. with open(os.path.join(output_path, ".".join([source_database, source_table, "json"])), "w") as f:
    97. json.dump(job, f)
    98. def main(args):
    99. source_database = ""
    100. source_table = ""
    101. options, arguments = getopt.getopt(args, '-d:-t:', ['sourcedb=', 'sourcetbl='])
    102. for opt_name, opt_value in options:
    103. if opt_name in ('-d', '--sourcedb'):
    104. source_database = opt_value
    105. if opt_name in ('-t', '--sourcetbl'):
    106. source_table = opt_value
    107. generate_json(source_database, source_table)
    108. if __name__ == '__main__':
    109. main(sys.argv[1:])

    由于需要使用Python访问Mysql数据库,所以需安装驱动,命令如下:

    sudo yum install -y MySQL-python

    脚本使用说明:

    python gen_import_config.py -d database -t table

    测试一下:

    python gen_import_config.py -d gmall -t base_province

     

    注意: 这里的 hdfswriter 的 writeMode = append,这个值的意思是即使这个目录下存在文件也继续写入。除了这个参数外,还可以设置 writeMode = nonConflict,这个值的意思是,如果发现目录下有文件则停止写入,直接报错。

    可以看到,生成的 json 配置文件需要我们后期指定 datax 命令时再提供一个 targetdir 参数,也就是同步到我们 HDFS 的哪个目录下。

     测试一下这个配置文件能不能用:

    bin/datax.py -p"-Dtargetdir=/base_province" job/import/gmall.base_province.json

    查看 hdfs 端:

     

            这个配置文件没有 where 限制,所以这里是 34 条数据,到这里,说明我们用脚本生成的配置文件是可以正常用的。


    2. DataX 配置文件生成脚本 

    上面我们用一个 python 脚本来生成 json 文件,生成好的 json 文件还需要通过 shell 脚本去调用执行 DataX 任务,所以我们这里编写一个 Shell 脚本: 

    vim ~/bin/gen_import_config.sh
    1. #!/bin/bash
    2. python ~/bin/gen_import_config.py -d gmall -t activity_info
    3. python ~/bin/gen_import_config.py -d gmall -t activity_rule
    4. python ~/bin/gen_import_config.py -d gmall -t base_category1
    5. python ~/bin/gen_import_config.py -d gmall -t base_category2
    6. python ~/bin/gen_import_config.py -d gmall -t base_category3
    7. python ~/bin/gen_import_config.py -d gmall -t base_dic
    8. python ~/bin/gen_import_config.py -d gmall -t base_province
    9. python ~/bin/gen_import_config.py -d gmall -t base_region
    10. python ~/bin/gen_import_config.py -d gmall -t base_trademark
    11. python ~/bin/gen_import_config.py -d gmall -t cart_info
    12. python ~/bin/gen_import_config.py -d gmall -t coupon_info
    13. python ~/bin/gen_import_config.py -d gmall -t sku_attr_value
    14. python ~/bin/gen_import_config.py -d gmall -t sku_info
    15. python ~/bin/gen_import_config.py -d gmall -t sku_sale_attr_value
    16. python ~/bin/gen_import_config.py -d gmall -t spu_info

    赋予 gen_import_config.sh 执行权限后,执行脚本,生成配置文件

    gen_import_config.sh

    3. 全量数据同步脚本
    1. #!/bin/bash
    2. DATAX_HOME=/opt/module/datax
    3. # 如果传入日期则do_date等于传入的日期,否则等于前一天日期
    4. if [ -n "$2" ] ;then
    5. do_date=$2
    6. else
    7. do_date=`date -d "-1 day" +%F`
    8. fi
    9. #处理目标路径,此处的处理逻辑是,如果目标路径不存在,则创建;若存在,则清空,目的是保证同步任务可重复执行
    10. handle_targetdir() {
    11. hadoop fs -test -e $1
    12. if [[ $? -eq 1 ]]; then
    13. echo "路径$1不存在,正在创建......"
    14. hadoop fs -mkdir -p $1
    15. else
    16. echo "路径$1已经存在"
    17. fs_count=$(hadoop fs -count $1)
    18. content_size=$(echo $fs_count | awk '{print $3}')
    19. if [[ $content_size -eq 0 ]]; then
    20. echo "路径$1为空"
    21. else
    22. echo "路径$1不为空,正在清空......"
    23. hadoop fs -rm -r -f $1/*
    24. fi
    25. fi
    26. }
    27. #数据同步
    28. import_data() {
    29. datax_config=$1
    30. target_dir=$2
    31. handle_targetdir $target_dir
    32. python $DATAX_HOME/bin/datax.py -p"-Dtargetdir=$target_dir" $datax_config
    33. }
    34. case $1 in
    35. "activity_info")
    36. import_data /opt/module/datax/job/import/gmall.activity_info.json /origin_data/gmall/db/activity_info_full/$do_date
    37. ;;
    38. "activity_rule")
    39. import_data /opt/module/datax/job/import/gmall.activity_rule.json /origin_data/gmall/db/activity_rule_full/$do_date
    40. ;;
    41. "base_category1")
    42. import_data /opt/module/datax/job/import/gmall.base_category1.json /origin_data/gmall/db/base_category1_full/$do_date
    43. ;;
    44. "base_category2")
    45. import_data /opt/module/datax/job/import/gmall.base_category2.json /origin_data/gmall/db/base_category2_full/$do_date
    46. ;;
    47. "base_category3")
    48. import_data /opt/module/datax/job/import/gmall.base_category3.json /origin_data/gmall/db/base_category3_full/$do_date
    49. ;;
    50. "base_dic")
    51. import_data /opt/module/datax/job/import/gmall.base_dic.json /origin_data/gmall/db/base_dic_full/$do_date
    52. ;;
    53. "base_province")
    54. import_data /opt/module/datax/job/import/gmall.base_province.json /origin_data/gmall/db/base_province_full/$do_date
    55. ;;
    56. "base_region")
    57. import_data /opt/module/datax/job/import/gmall.base_region.json /origin_data/gmall/db/base_region_full/$do_date
    58. ;;
    59. "base_trademark")
    60. import_data /opt/module/datax/job/import/gmall.base_trademark.json /origin_data/gmall/db/base_trademark_full/$do_date
    61. ;;
    62. "cart_info")
    63. import_data /opt/module/datax/job/import/gmall.cart_info.json /origin_data/gmall/db/cart_info_full/$do_date
    64. ;;
    65. "coupon_info")
    66. import_data /opt/module/datax/job/import/gmall.coupon_info.json /origin_data/gmall/db/coupon_info_full/$do_date
    67. ;;
    68. "sku_attr_value")
    69. import_data /opt/module/datax/job/import/gmall.sku_attr_value.json /origin_data/gmall/db/sku_attr_value_full/$do_date
    70. ;;
    71. "sku_info")
    72. import_data /opt/module/datax/job/import/gmall.sku_info.json /origin_data/gmall/db/sku_info_full/$do_date
    73. ;;
    74. "sku_sale_attr_value")
    75. import_data /opt/module/datax/job/import/gmall.sku_sale_attr_value.json /origin_data/gmall/db/sku_sale_attr_value_full/$do_date
    76. ;;
    77. "spu_info")
    78. import_data /opt/module/datax/job/import/gmall.spu_info.json /origin_data/gmall/db/spu_info_full/$do_date
    79. ;;
    80. "all")
    81. import_data /opt/module/datax/job/import/gmall.activity_info.json /origin_data/gmall/db/activity_info_full/$do_date
    82. import_data /opt/module/datax/job/import/gmall.activity_rule.json /origin_data/gmall/db/activity_rule_full/$do_date
    83. import_data /opt/module/datax/job/import/gmall.base_category1.json /origin_data/gmall/db/base_category1_full/$do_date
    84. import_data /opt/module/datax/job/import/gmall.base_category2.json /origin_data/gmall/db/base_category2_full/$do_date
    85. import_data /opt/module/datax/job/import/gmall.base_category3.json /origin_data/gmall/db/base_category3_full/$do_date
    86. import_data /opt/module/datax/job/import/gmall.base_dic.json /origin_data/gmall/db/base_dic_full/$do_date
    87. import_data /opt/module/datax/job/import/gmall.base_province.json /origin_data/gmall/db/base_province_full/$do_date
    88. import_data /opt/module/datax/job/import/gmall.base_region.json /origin_data/gmall/db/base_region_full/$do_date
    89. import_data /opt/module/datax/job/import/gmall.base_trademark.json /origin_data/gmall/db/base_trademark_full/$do_date
    90. import_data /opt/module/datax/job/import/gmall.cart_info.json /origin_data/gmall/db/cart_info_full/$do_date
    91. import_data /opt/module/datax/job/import/gmall.coupon_info.json /origin_data/gmall/db/coupon_info_full/$do_date
    92. import_data /opt/module/datax/job/import/gmall.sku_attr_value.json /origin_data/gmall/db/sku_attr_value_full/$do_date
    93. import_data /opt/module/datax/job/import/gmall.sku_info.json /origin_data/gmall/db/sku_info_full/$do_date
    94. import_data /opt/module/datax/job/import/gmall.sku_sale_attr_value.json /origin_data/gmall/db/sku_sale_attr_value_full/$do_date
    95. import_data /opt/module/datax/job/import/gmall.spu_info.json /origin_data/gmall/db/spu_info_full/$do_date
    96. ;;
    97. esac

    这里 hadoop fs -test -e /base_province 会有一个返回值,我们需要通过 echo $? 来查看,当返回 0 时代表目录存在,当返回 1 时代表目录不存在。

     这里 hadoop fs -count /base_province 作用是查看目录属性,第一个数字代表目录数(包括自己),第二个参数是该目录下的文件数,第三个参数是该目录的总大小(字节),第四个参数是当前的目录名。

    测试同步脚本:

    mysq_to_hdfs_full.sh all 2020-06-14

     查看结果:

    我们共同步了 15 张表,通过这个命令可以看到,该目录下目录数为 31 除了本目录和子目录下的日期目录外刚好 15 个目录,文件数也刚好 15 个。

    2.2.5、增量数据同步

    需要全量同步的表我们已经同步完了,接下来就剩增量同步的表了,比如一些订单表它会不断的生成数据。 

    1. Flume 配置

            Flume需要将Kafka中topic_db主题的数据传输到HDFS,故其需选用KafkaSource以及HDFSSink,Channel选用FileChannel。

            需要注意的是, HDFSSink需要将不同mysql业务表的数据写到不同的路径,并且路径中应当包含一层日期,用于区分每天的数据。关键配置如下:

    也就是说,首先,我们需要从 kafka 读取时,通过 flume 的拦截器给 Event 增加一个 header 信息,在这里把 json 时间信息(因为 Maxwell 是以 json 格式写到 Kafka 的)提取出来,为的是解决数据漂移的问题。

    其次,在 hdfssink 中我们需要声明写入的文件目录,这个文件目录的格式必须和我们上面全量同步的格式一样,带有日期信息。

    这里还需要注意的是,我们 flume kafka source 的 timestamp 字段需要的是一个 13 位的数据,但是我们 kafka 中的 ts 字段是一个 10 位的数字,所以我们在编写拦截器的时候需要把秒级别转为毫秒级别。

    1. a1.sources = r1
    2. a1.channels = c1
    3. a1.sinks = k1
    4. a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
    5. a1.sources.r1.batchSize = 5000
    6. a1.sources.r1.batchDurationMillis = 2000
    7. a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092
    8. a1.sources.r1.kafka.topics = topic_db
    9. a1.sources.r1.kafka.consumer.group.id = topic_db
    10. a1.sources.r1.setTopicHeader = true
    11. a1.sources.r1.topicHeader = topic
    12. a1.sources.r1.interceptors = i1
    13. a1.sources.r1.interceptors.i1.type = com.lyh.gmall.interceptor.TimestampAndTableNameInterceptor$Builder
    14. a1.channels.c1.type = file
    15. a1.channels.c1.checkpointDir = /opt/module/flume-1.9.0/checkpoint/behavior2
    16. a1.channels.c1.dataDirs = /opt/module/flume-1.9.0/data/behavior2/
    17. a1.channels.c1.maxFileSize = 2146435071
    18. a1.channels.c1.capacity = 1000000
    19. a1.channels.c1.keep-alive = 6
    20. ## sink1
    21. a1.sinks.k1.type = hdfs
    22. a1.sinks.k1.hdfs.path = /origin_data/gmall/db/%{tableName}_inc/%Y-%m-%d
    23. a1.sinks.k1.hdfs.filePrefix = db
    24. a1.sinks.k1.hdfs.round = false
    25. a1.sinks.k1.hdfs.rollInterval = 10
    26. a1.sinks.k1.hdfs.rollSize = 134217728
    27. a1.sinks.k1.hdfs.rollCount = 0
    28. a1.sinks.k1.hdfs.fileType = CompressedStream
    29. a1.sinks.k1.hdfs.codeC = gzip
    30. ## 拼装
    31. a1.sources.r1.channels = c1
    32. a1.sinks.k1.channel= c1

    souce 配置:

    这里,我们设置 kafak.consumer.group.id = topic_db ,把这个参数值设置为业务名称,防止多个消费者冲突(因为 flume 默认的消费者组是 flume)。这里的 setTopicHeader = true 和 topicHeader = topic 指的是我们的 flume event 中 event header 的信息,这里的意思是设置数据头中包含 topic 的信息(这里的 key 就是 topic value 是 topic_db)。

    channel 配置:

    channel 这里需要注意的是就是检查点目录的名称不能和之前的冲突,之前我们在全量数据同步用户行为日志数据的时候,在 hadoop104 的 flume 作业中设置了检查点为  behavior1。

    sink 配置:

    这里除了设置输出的 hdfs 路径必须包含日期之外,主要就是滚动策略的配置,我们要防止小文件的问题。 

    编写拦截器:

    1. package com.lyh.gmall.interceptor;
    2. import com.alibaba.fastjson.JSONObject;
    3. import org.apache.flume.Context;
    4. import org.apache.flume.Event;
    5. import org.apache.flume.interceptor.Interceptor;
    6. import java.nio.charset.StandardCharsets;
    7. import java.util.List;
    8. import java.util.Map;
    9. public class TimestampAndTableNameInterceptor implements Interceptor {
    10. @Override
    11. public void initialize() {
    12. }
    13. @Override
    14. public Event intercept(Event event) {
    15. // 1. 把 body 中的 timestamp 和 table 字段提取出来 放到 header
    16. Map headers = event.getHeaders();
    17. String log = new String(event.getBody(), StandardCharsets.UTF_8);
    18. // 2. 解析 log 中的 ts 和 table 字段
    19. JSONObject json = JSONObject.parseObject(log);
    20. String ts = json.getString("ts");
    21. String table = json.getString("table");
    22. // 3. 把 ts 和 table 字段放到 header 中的 tableName 和 timestamp 字段
    23. headers.put("tableName",table);
    24. headers.put("timestamp",ts + "000");
    25. return event;
    26. }
    27. @Override
    28. public List intercept(List list) {
    29. for (Event event: list)
    30. intercept(event);
    31. return list;
    32. }
    33. @Override
    34. public void close() {
    35. }
    36. public static class Builder implements Interceptor.Builder{
    37. @Override
    38. public Interceptor build() {
    39. return new TimestampAndTableNameInterceptor();
    40. }
    41. @Override
    42. public void configure(Context context) {
    43. }
    44. }
    45. }

    打包放到 hadoop104 上 flume 的 lib 目录下,开始测试:

    打通通道

    1. myhadoop start
    2. zk start
    3. kf.sh start
    4. mxw.sh start

    启动 flume 作业:

    [lyh@hadoop104 flume-1.9.0]$ bin/flume-ng agent -n a1 -c conf/ -f job/warehouse/kafka_to_hdfs_db.conf -Dflume.root.logger=INFO,console

    模拟业务数据生成:

    1. cd /opt/module/db_log/
    2. java -jar gmall2020-mock-db-2021-11-14.jar

    查看 hdfs:

    可以看到,其中带 inc 后缀的都是我们增量同步进来的数据。

    增量同步文件数 =  总文件数 - 全量同步文件数 = 27 - 15 = 12 ,没有问题

    这里存在一个问题:我们之前在拦截器中设置了 event header 中的 timestamp 为 kafka 中的数据t  ts 字段的时间信息,但是这里却依然是我们机器的时间,这是因为我们 java -jar 操作数据库的时间就是我们服务器当前的时间,所以导致 Maxwelll 读取 binlog 后的数据就是当前服务器的时间。具体解决办法看下面的 Maxwell 配置。

    2. 编写增量数据同步脚本
    vim f3.sh
    1. #!/bin/bash
    2. case $1 in
    3. "start")
    4. echo " --------启动 hadoop104 业务数据flume-------"
    5. ssh hadoop104 "nohup /opt/module/flume-1.9.0/bin/flume-ng agent -n a1 -c /opt/module/flume-1.9.0/conf -f /opt/module/flume-1.9.0/job/warehouse/kafka_to_hdfs_db.conf >/dev/null 2>&1 &"
    6. ;;
    7. "stop")
    8. echo " --------停止 hadoop104 业务数据flume-------"
    9. ssh hadoop104 "ps -ef | grep kafka_to_hdfs_db | grep -v grep |awk '{print \$2}' | xargs -n1 kill"
    10. ;;
    11. esac
    3. Maxwell 配置

    这里主要是解决时间戳的问题:

    生产环境中是不会有这个问题的,这里我们用的是 经过修改源码的 Maxwell,所以只需要修改一下配置文件即可:

    1. cd /opt/module/maxwell-1.29.2/
    2. vim config.properties

     添加配置:

    mock_date=2020-06-14

    4. 增量表首日全量同步

            增量表本来就存在一些数据,但是 Maxwell 在监听的 binlog 的时候是不知道的,所以我们还需要全量同步一次增量表中的历史数据。但是我们用哪个工具呢,我们知道,Maxwell 也可以做全量,DataX也可以。这里我们选择 Maxwell ,因为 DataX 同步到 HDFS 的文件是一个以特定字符分割的文件,而 Maxwell 同步到 HDFS 的文件是 json 格式的,所以我们肯定是希望保存到 HDFS 后的数据格式都是一致的,那我们就自然会联想到学习 Maxwell 说的 bootstrap,它是 Maxwell  的一张元数据表。

    编写初始化脚本:

    vim mysql_to_kafka_inc_init.sh
    1. #!/bin/bash
    2. # 该脚本的作用是初始化所有的增量表,只需执行一次
    3. MAXWELL_HOME=/opt/module/maxwell-1.29.2
    4. import_data() {
    5. $MAXWELL_HOME/bin/maxwell-bootstrap --database gmall --table $1 --config $MAXWELL_HOME/config.properties
    6. }
    7. case $1 in
    8. "cart_info")
    9. import_data cart_info
    10. ;;
    11. "comment_info")
    12. import_data comment_info
    13. ;;
    14. "coupon_use")
    15. import_data coupon_use
    16. ;;
    17. "favor_info")
    18. import_data favor_info
    19. ;;
    20. "order_detail")
    21. import_data order_detail
    22. ;;
    23. "order_detail_activity")
    24. import_data order_detail_activity
    25. ;;
    26. "order_detail_coupon")
    27. import_data order_detail_coupon
    28. ;;
    29. "order_info")
    30. import_data order_info
    31. ;;
    32. "order_refund_info")
    33. import_data order_refund_info
    34. ;;
    35. "order_status_log")
    36. import_data order_status_log
    37. ;;
    38. "payment_info")
    39. import_data payment_info
    40. ;;
    41. "refund_payment")
    42. import_data refund_payment
    43. ;;
    44. "user_info")
    45. import_data user_info
    46. ;;
    47. "all")
    48. import_data cart_info
    49. import_data comment_info
    50. import_data coupon_use
    51. import_data favor_info
    52. import_data order_detail
    53. import_data order_detail_activity
    54. import_data order_detail_coupon
    55. import_data order_info
    56. import_data order_refund_info
    57. import_data order_status_log
    58. import_data payment_info
    59. import_data refund_payment
    60. import_data user_info
    61. ;;
    62. esac

     测试:

    1. f3.sh start
    2. mysql_to_hdfs_full_init.sh all

    这里需要牢记 Maxwell 可以既做全量又做增量为什么还需要 DataX,这是因为 DataX 对于全量同步更加专业,因为它可以进行一些流控,而且支持更多的数据源并且支持并发。所以 Maxwell 只在初始化同步历史数据的时候用一下,所以不用担心它的性能问题。

    2.3、采集通道启/停脚本

    这里只是为了方便学习的时候用的,生产环境千万不敢用:

    1. #!/bin/bash
    2. case $1 in
    3. "start"){
    4. echo ================== 启动 集群 ==================
    5. #启动 Zookeeper集群
    6. zk start
    7. #启动 Hadoop集群
    8. myhadoop start
    9. #启动 Kafka采集集群
    10. kf.sh start
    11. #启动采集 Flume
    12. f1.sh start
    13. #启动日志消费 Flume
    14. f2.sh start
    15. #启动业务消费 Flume
    16. f3.sh start
    17. #启动 maxwell
    18. mxw.sh start
    19. };;
    20. "stop"){
    21. echo ================== 停止 集群 ==================
    22. #停止 Maxwell
    23. mxw.sh stop
    24. #停止 业务消费Flume
    25. f3.sh stop
    26. #停止 日志消费Flume
    27. f2.sh stop
    28. #停止 日志采集Flume
    29. f1.sh stop
    30. #停止 Kafka采集集群
    31. kf.sh stop
    32. #停止 Hadoop集群
    33. myhadoop stop
    34. #停止 Zookeeper集群
    35. zk stop
    36. };;
    37. esac

    总结

            现在是2024-2-27 19:28 。

            到这里,我们的数仓数据同步工作就都做完了,包括全量用户行为日志的同步(用户行为日志数据并没有增量同步)、增量业务数据的同步、全量业务数据的同步以及业务数据的历史数据初始化全量同步。

            接下来就是关于数仓的知识的学习了,这部分也将是最最重要的!不管是理论还是建模方法和编程实践。

            今天额外的好消息就是四级终于过了,这就剩下了很多时间去专心技术啦!

  • 相关阅读:
    platform总线驱动
    搭建基于Apache的Jena图数据平台
    突破编程_C++_设计模式(单例模式)
    【毕业设计】 大数据二手房数据爬取与分析可视化 -python 数据分析 可视化
    Unity开发——XLua热更新之Hotfix配置(包含xlua获取与导入)
    Centos - CA 证书服务
    Linux系统调优详解(一)——系统调优概述与Top命令详解
    第三天:实现网络编程基于tcp/udp协议在Ubuntu与gec6818开发板之间双向通信
    Linux ubuntu 20.04.5 Server安装远程桌面
    【Mybatis编程:统计相册表中的数据的数量】
  • 原文地址:https://blog.csdn.net/m0_64261982/article/details/136192599