• 数据采集项目之业务数据(三)


    1. Maxwell框架

    开发公司为Zendesk公司开源,用java编写的MySQL变更数据抓取软件。内部是通过监控MySQL的Binlog日志,并将变更数据以JSON格式发送到Kafka等流处理平台。

    1.1 MySQL主从复制

    主机每次变更数据都会生成对应的Binlog日志,从机可以通过IO流的方式将Binlog日志下载到本地,可以通过它创造和主机一样的环境或者作为热备。

    1.2 安装Maxwell

    1. 解压改名
    2. 启动MySQL Binlog, vim /etc/my.cnf. 增加如下配置:
      • binlog_format 日志类型的三种类型:
        • 基于语句:主机执行了什么语句,在从机里同样执行一遍。如果使用了random语句,会导致主从不一致。但是量级比较低
        • 基于行:主机被改动后,从机同步一份。不会有主从不一致的问题,但是量价比较大,需要将每行修改的数据都拿一份。
        • 混合模式:一般基于语句,但是如果基于语句会导致前后结果产生差异,自动转成基于行。
    #数据库id
    server-id = 1
    #启动binlog,该参数的值会作为binlog的文件名
    log-bin=mysql-bin
    #binlog类型,maxwell要求为row类型
    binlog_format=row
    #启用binlog的数据库,需根据实际情况作出修改
    binlog-do-db=gmall
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    1. 重启MySQL服务
    2. 创建Maxwell所需所需的数据库和用户,用来存储断点续传所需的数据。
    CREATE DATABASE maxwell;
    CREATE USER 'maxwell'@'%' IDENTIFIED BY 'maxwell';
    GRANT ALL ON maxwell.* TO 'maxwell'@'%';//maxwell库的所有权限给maxwell
    GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'%';//其他库的查询、复制权限给maxwell
    
    • 1
    • 2
    • 3
    • 4
    1. 修改maxwell配置文件
      cp 配置文件,将会复制某个文件并且可以改名。
    producer=kafka
    # 目标Kafka集群地址
    kafka.bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092
    #目标Kafka topic,可静态配置,例如:maxwell,也可动态配置,例如:%{database}_%{table}
    kafka_topic=topic_db
    # MySQL相关配置
    host=hadoop102
    user=maxwell
    password=maxwell
    jdbc_options=useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true
    
    # 过滤gmall中的z_log表数据,该表是日志数据的备份,无须采集
    filter=exclude:gmall.z_log
    # 指定数据按照主键分组进入Kafka不同分区,避免数据倾斜
    producer_partition_by=primary_key
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    1.3 Maxwell的使用

    1. 启动zookeeper,kafka
    2. 启动maxwell, bin/maxwell --config config.properties --daemon
    3. 启动kafka消费者进程,用于消费maxwell添加到kafka的变更数据
    4. 启动数据生成jar包,查看消费者进程是否有新数据。
    5. 编写Maxwell启停脚本
    #!/bin/bash
    
    MAXWELL_HOME=/opt/module/maxwell
    
    status_maxwell(){
        result=`ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | wc -l`
        return $result
    }
    
    
    start_maxwell(){
        status_maxwell
        if [[ $? -lt 1 ]]; then
            echo "启动Maxwell"
            $MAXWELL_HOME/bin/maxwell --config $MAXWELL_HOME/config.properties --daemon
        else
            echo "Maxwell正在运行"
        fi
    }
    
    
    stop_maxwell(){
        status_maxwell
        if [[ $? -gt 0 ]]; then
            echo "停止Maxwell"
            ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | awk '{print $2}' | xargs kill -9
        else
            echo "Maxwell未在运行"
        fi
    }
    
    
    case $1 in
        start )
            start_maxwell
        ;;
        stop )
            stop_maxwell
        ;;
        restart )
           stop_maxwell
           start_maxwell
        ;;
    esac
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    1.4 Bootstrap全量同步

    Maxwell获取的数据都是后期变更的数据,但没有获取到数据库在开启Binlog日志之前的原始数据。

    全量同步命令:/opt/module/maxwell/bin/maxwell-bootstrap --database gmall --table user_info --config /opt/module/maxwell/config.properties

    2. 数仓数据同步策略

    2.1 用户行为数据

    数据源:Kafka
    目的地:HDFS
    传输方式采用Flume, 其中source为Kafka source, channel为Memmory channel, sink为HDFS sink。

    根据官网查找相应参数:

    1. Kafka Source
      • type = Kafka Source全类名
      • kafka.bootstrap.servers 连接地址
      • kafka.topics = topic_log
      • batchSize: 批次大小
      • batchDurationMillis: 批次间隔2s
    2. File Channel
      • type: file
      • dataDirs: 存储路径
      • checkpointDir: 偏移量存储地址
      • keep-alive: 管道满了后,生产者间隔多少秒再放数据
    3. HDFS Sink
      • hdfs.rollInterval : 文件滚动,解决小文件问题,每隔多久滚动一次
      • rollSize: 文件大小
      • hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d, 文件存放路径
      • hdfs.round = false, 不采用系统本地时间
    #定义组件
    a1.sources=r1
    a1.channels=c1
    a1.sinks=k1
    
    #配置source1
    a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
    a1.sources.r1.batchSize = 5000
    a1.sources.r1.batchDurationMillis = 2000
    a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
    a1.sources.r1.kafka.topics=topic_log
    
    
    #配置channel
    a1.channels.c1.type = file
    a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
    a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1
    a1.channels.c1.maxFileSize = 2146435071
    a1.channels.c1.capacity = 1000000
    a1.channels.c1.keep-alive = 6
    
    #配置sink
    a1.sinks.k1.type = hdfs
    a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d
    a1.sinks.k1.hdfs.filePrefix = log
    a1.sinks.k1.hdfs.round = false # 是否获取本地时间
    
    
    a1.sinks.k1.hdfs.rollInterval = 10
    a1.sinks.k1.hdfs.rollSize = 134217728
    a1.sinks.k1.hdfs.rollCount = 0
    
    #控制输出文件类型
    a1.sinks.k1.hdfs.fileType = CompressedStream
    a1.sinks.k1.hdfs.codeC = gzip
    
    #组装 
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    2.2 零点漂移问题

    在HDFS系统存放文件时是按照时间进行分区存放的,存放时查看的是header中的timestamp,但是由于数据传输过程中也需要一段时间,header中的时间并不是数据的实际产生时间,这个就是零点漂移问题。

    解决办法:借助拦截器,修改header中的timestamp的值。编写拦截器代码,需要在IDEA中创建对应的项目并打包。

    1. 导入依赖,flume-ng-core和JSON解析依赖fastjson (1.2.62)
    2. 创建包gmall.interceptor
    3. 创建类TimeStampInterceptor, 继承Interceptor接口
    4. 实现intercept(Event event)和intercept(Event events)
    5. 使用fastjson来解析json文件,得到jsonObject对象,用来获取时间戳ts。将获取到的时间戳覆盖header中的timestamp, 如果数据格式错误会抛异常,使用try-catch来捕获它,并过滤掉该条数据。注意此处不能使用for循环来一边遍历,一边删除集合数据
    @Override
        public Event intercept(Event event) {
        //1、获取header和body的数据
        Map<String, String> headers = event.getHeaders();
        String log = new String(event.getBody(), StandardCharsets.UTF_8);
    
        try {
            //2、将body的数据类型转成jsonObject类型(方便获取数据)
            JSONObject jsonObject = JSONObject.parseObject(log);
    
            //3、header中timestamp时间字段替换成日志生成的时间戳(解决数据漂移问题)
            String ts = jsonObject.getString("ts");
            headers.put("timestamp", ts);
    
            return event;
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    @Override
    public List<Event> intercept(List<Event> list) {
        Iterator<Event> iterator = list.iterator();
        while (iterator.hasNext()) {
            Event event = iterator.next();
            if (intercept(event) == null) {
                iterator.remove();//必须使用迭代器删除
            }
        }
        return list;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    1. 打包时注意要带上fastjson依赖,需要在maven中添加配置打包插件。依赖中有flume和fastjson,但在虚拟机上有flume,没有fastjson,所以需要排除flume。可以使用provided标签来排除让打包时排除依赖。

      • compile:在单元测试、编译、运行三种方式都会使用compile表明的依赖;
      • test:在单元测试才会使用test表明的依赖;
      • provided:在编译才会使用test表明的依赖;
    2. Flume配置文件中添加拦截器

    a1.sources.r1.interceptors = i1
    a1.sources.r1.interceptors.i1.type = com.atguigu.gmall.flume.interceptor.TimestampInterceptor$Builder # 全类名建议在IDEA中复制,Builder也需要根据自己的代码函数名修改
    
    • 1
    • 2
    1. 重新生成数据,查看是否根据数据本身的时间戳存放到对应的HDFS分区文件中。

    3. 业务数据同步

    3.1 同步策略

    1. 全量同步:每天将所有数据同步一份,业务数据量小,优先考虑全量同步。
    2. 增量同步:每天只将新增和变化进行同步,业务数据量大,优先考虑增量同步。

    3.2 数据同步工具

    全量:DataX、Sqoop
    增量:Maxwell、Canal

    3.3 DataX

    是一个数据同步工具,致力于实现包括关系型数据库HDFS、Hive、ODPS、HBase、MySQL等等数据源之间的互传。

    1. 架构= reader + framework + writer
    2. 运行流程
      • job: 单个数据同步的作业,会启动一个进程。
      • Task: 根据不同数据源的切分策略,一个Job会切分为多个Task,Task是DataX作业的最小单元,每个Task负责一部分,由一个线程执行。
    3. 调度策略:会根据系统资源设置并发度,并发度为线程同时执行的个数,任务会按照并发度一组一组执行。

    3.4 DataX安装

    1. 下载解压DataX安装包
    2. bin/datax.py job/job.json测试安装包是否完整
    3. MySQL Reader配置文件的书写
    4. HDFS Writer配置文件的书写
    5. 执行datax命令python /opt/module/datax/bin/datax.py -p"-Dtargetdir=/origin_data/gmall/db/activity_info_full/2022-06-08" /opt/module/datax/job/import/gmall.activity_info.json
    6. 执行完后可以使用hadoop fs cat 路径名 | zcat,来查看压缩文件是否正确
  • 相关阅读:
    QML的Popup遇到的坑
    [附源码]Java计算机毕业设计SSM动物保护资讯推荐网站
    基于.NetCore开发博客项目 StarBlog - (13) 加入友情链接功能
    SpringBoot整合MongoDB
    13. Spring AOP(一)思想及使用
    京东小程序接入ARVR的技术方案和性能调优
    CC1101一款低成本的sub- 1ghz 收发器 芯片
    vivado时序分析-3时序分析关键概念
    【计算机毕业设计】基于HTML+CSS+JavaScript学生宿舍管理系统
    开启数字消费新纪元
  • 原文地址:https://blog.csdn.net/qq_44273739/article/details/133694008