• Datax-异构数据源离线同步


    目录

    一、介绍

    1、Datax介绍

    2、DataX 的设计

    3、支持的数据源

     4、框架设计

     5、运行原理

    六、优势

    二、使用

    1、从stream读取数据并打印到控制台

    2、mysql同步到mysql

     3、读取 DB2 的数据导入 MySQL

    4、读取 SQLServer 的数据导入 MySQL

    5、读取 MongoDB 的数据导入 MySQL

    三、原理

    1、程序入口

    2、Task 切分逻辑

    3.并发数的确定

    4、调度

    5、数据传输

    6、限速的实现 (具体插件)

    四、DataX 使用优化

    速度控制

    关键参数

    一、提升每个 channel 的速度

    二、提升 DataX Job 内 Channel 并发数

    三、提高 JVM 堆内存


    一、介绍

    1、Datax介绍

            DataX 是阿里巴巴开源的一个异构数据源离线同步工具,DataX 实现了包括 MySQL、Oracle、OceanBase、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、Hologres、DRDS 等各种异构数据源之间高效的数据同步功能。

    2、DataX 的设计

            为了解决异构数据源同步问题,DataX 将复杂的网状的同步链路变成了星型数据链路, DataX 作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要 将此数据源对接到 DataX,便能跟已有的数据源做到无缝数据同步。

    3、支持的数据源

            DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、NOSQL、大数据计算系统都已经接入,目前支持数据如下图,详情请点击:DataX数据源参考指南 

    类型数据源Reader(读)Writer(写)文档
    RDBMS 关系型数据库MySQL 、
    Oracle 、
    OceanBase 、
    SQLServer 、
    PostgreSQL 、
    DRDS 、
    Kingbase 、
    通用RDBMS(支持所有关系型数据库) 、
    阿里云数仓数据存储ODPS 、
    ADS
    OSS 、
    OCS
    Hologres
    AnalyticDB For PostgreSQL
    阿里云中间件datahub读 、写
    SLS读 、写
    阿里云图数据库GDB 、
    NoSQL数据存储OTS 、
    Hbase0.94 、
    Hbase1.1 、
    Phoenix4.x 、
    Phoenix5.x 、
    MongoDB 、
    Cassandra 、
    数仓数据存储StarRocks读 、
    ApacheDoris
    ClickHouse
    Hive 、
    kudu
    无结构化数据存储TxtFile 、
    FTP 、
    HDFS 、
    Elasticsearch
    时间序列数据库OpenTSDB
    TSDB 、
    TDengine 、

            目前DataX的已有能力已经全部融和进阿里云的数据集成,并且比DataX更加高效、安全,同时数据集成具备DataX不具备的其它高级特性和功能。可以理解为数据集成是DataX的全面升级的商业化用版本,为企业可以提供稳定、可靠、安全的数据传输服务。与DataX相比,数据集成主要有以下几大突出特点:

    支持实时同步:

    离线同步数据源种类大幅度扩充:

     4、框架设计

    DataX本身作为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。

    • Reader:Reader 为数据采集模块,负责采集数据源的数据,将数据发送给Framework。
    • Writer: Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
    • Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。

     5、运行原理

            DataX 3.0 开源版本支持单机多线程模式完成同步作业运行,DataX作业生命周期的时序图,从整体架构设计非常简要说明DataX各个模块相互关系。

    核心模块介绍:

    1. Job:单个作业的管理节点,负责数据清理、子任务划分、TaskGroup监控管理。
    2. Task:由Job切分而来,是DataX作业的最小单元,每个Task负责一部分数据的同步工作。
    3. Schedule:将Task组成TaskGroup,单个TaskGroup的并发数量为5。
    4. TaskGroup:负责启动Task。

    DataX调度流程:

            举例来说,用户提交了一个DataX作业,并且配置了20个并发,目的是将一个100张分表的mysql数据同步到odps里面。 DataX的调度决策思路是:

    1. DataXJob根据分库分表切分成了100个Task。
    2. 根据20个并发,DataX计算共需要分配4个TaskGroup。
    3. 4个TaskGroup平分切分好的100个Task,每一个TaskGroup负责以5个并发共计运行25个Task。

    六、优势

    • 可靠的数据质量监控

      • 完美解决数据传输个别类型失真问题

                DataX旧版对于部分数据类型(比如时间戳)传输一直存在毫秒阶段等数据失真情况,新版本DataX3.0已经做到支持所有的强数据类型,每一种插件都有自己的数据类型转换策略,让数据可以完整无损的传输到目的端。

      • 提供作业全链路的流量、数据量 运行时监控

        DataX3.0运行过程中可以将作业本身状态、数据流量、数据速度、执行进度等信息进行全面的展示,让用户可以实时了解作业状态。并可在作业执行过程中智能判断源端和目的端的速度对比情况,给予用户更多性能排查信息。

      • 提供脏数据探测

        在大量数据的传输过程中,必定会由于各种原因导致很多数据传输报错(比如类型转换错误),这种数据DataX认为就是脏数据。DataX目前可以实现脏数据精确过滤、识别、采集、展示,为用户提供多种的脏数据处理模式,让用户准确把控数据质量大关!

    • 丰富的数据转换功能

      DataX作为一个服务于大数据的ETL工具,除了提供数据快照搬迁功能之外,还提供了丰富数据转换的功能,让数据在传输过程中可以轻松完成数据脱敏,补全,过滤等数据转换功能,另外还提供了自动groovy函数,让用户自定义转换函数。详情请看DataX3的transformer详细介绍。

    • 精准的速度控制

      还在为同步过程对在线存储压力影响而担心吗?新版本DataX3.0提供了包括通道(并发)、记录流、字节流三种流控模式,可以随意控制你的作业速度,让你的作业在库可以承受的范围内达到最佳的同步速度。

      "speed": { "channel": 5, "byte": 1048576, "record": 10000 }
      
    • 强劲的同步性能

      DataX3.0每一种读插件都有一种或多种切分策略,都能将作业合理切分成多个Task并行执行,单机多线程执行模型可以让DataX速度随并发成线性增长。在源端和目的端性能都足够的情况下,单个作业一定可以打满网卡。另外,DataX团队对所有的已经接入的插件都做了极致的性能优化,并且做了完整的性能测试。性能测试相关详情可以参照每单个数据源的详细介绍:DataX数据源指南

    • 健壮的容错机制

      DataX作业是极易受外部因素的干扰,网络闪断、数据源不稳定等因素很容易让同步到一半的作业报错停止。因此稳定性是DataX的基本要求,在DataX 3.0的设计中,重点完善了框架和插件的稳定性。目前DataX3.0可以做到线程级别、进程级别(暂时未开放)、作业级别多层次局部/全局的重试,保证用户的作业稳定运行。

      • 线程内部重试

        DataX的核心插件都经过团队的全盘review,不同的网络交互方式都有不同的重试策略。

      • 线程级别重试

        目前DataX已经可以实现TaskFailover,针对于中间失败的Task,DataX框架可以做到整个Task级别的重新调度。

    • 极简的使用体验

      • 易用

        下载即可用,支持linux和windows,只需要短短几步骤就可以完成数据的传输。请点击:Quick Start

      • 详细

        DataX在运行日志中打印了大量信息,其中包括传输速度,Reader、Writer性能,进程CPU,JVM和GC情况等等。

        • 传输过程中打印传输速度、进度等

        • 传输过程中会打印进程相关的CPU、JVM等

        • 在任务结束之后,打印总体运行情况

    二、使用

     官方地址

            下载地址:https://github.com/alibaba/DataX/archive/refs/tags/datax_v202210.tar.gzhttps://datax-opensource.oss-cn-hangzhou.aliyuncs.com/202210/datax.tar.gzhttps://github.com/alibaba/DataX/archive/refs/tags/datax_v202210.tar.gz

             源码地址:GitHub - alibaba/DataX: DataX是阿里云DataWorks数据集成的开源版本。

    wget http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz 
    tar -zxvf datax.tar.gz -C /opt/
    cd /opt/datax
    bin/datax.py job/job.json

    如果报错 插件[streamreader,streamwriter]加载失败,1s后重试 

    # 先进入安装目录
    cd /opt/datax/
    # 在进入插件目录
    cd plugin/reader
    # 删除
    rm -rf  ./._*
    # 在进入插件目录
    cd plugin/writer
    # 删除
    rm -rf  ./._*

    出现以下界面说明DataX安装成功

     

    1、从stream读取数据并打印到控制台

    查看官方json配置模板 

    python /opt/datax/bin/datax.py -r streamreader -w streamwriter

    DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
    Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.


    Please refer to the streamreader document:
         https://github.com/alibaba/DataX/blob/master/streamreader/doc/streamreader.md 

    Please refer to the streamwriter document:
         https://github.com/alibaba/DataX/blob/master/streamwriter/doc/streamwriter.md 
     
    Please save the following configuration as a json file and  use
         python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json 
    to run the job.

    {
        "job": {
            "content": [
                {
                    "reader": {
                        "name": "streamreader", 
                        "parameter": {
                            "column": [], 
                            "sliceRecordCount": ""
                        }
                    }, 
                    "writer": {
                        "name": "streamwriter", 
                        "parameter": {
                            "encoding": "", 
                            "print": true
                        }
                    }
                }
            ], 
            "setting": {
                "speed": {
                    "channel": ""
                }
            }
        }
    }

     

     

    根据模板编写json文件: 

    1. {
    2. "job": {
    3. "content": [
    4. {
    5. "reader": {
    6. "name": "streamreader",
    7. "parameter": {
    8. "sliceRecordCount": 10,
    9. "column": [
    10. {
    11. "type": "long",
    12. "value": "11110"
    13. },
    14. {
    15. "type": "string",
    16. "value": "hello,你好,世界-DataX"
    17. }
    18. ]
    19. }
    20. },
    21. "writer": {
    22. "name": "streamwriter",
    23. "parameter": {
    24. "encoding": "UTF-8",
    25. "print": true
    26. }
    27. }
    28. }
    29. ],
    30. "setting": {
    31. "speed": {
    32. "channel": 5
    33. }
    34. }
    35. }
    36. }

    运行Job
    /opt/datax/bin/datax.py ./stream2stream.json

     

    2、mysql同步到mysql

    1. {
    2. "job": {
    3. "content": [{
    4. "reader": {
    5. "name": "mysqlreader",
    6. "parameter": {
    7. "password": "root",
    8. "username": "root",
    9. "connection": [{
    10. "jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/test_dev"],
    11. "querySql": ["SELECT id, name FROM test_test"]
    12. }]
    13. }
    14. },
    15. "writer": {
    16. "name": "mysqlwriter",
    17. "parameter": {
    18. "column": ["id", "name"],
    19. "password": "root",
    20. "username": "root",
    21. "writeMode": "insert",
    22. "connection": [{
    23. "table": ["test_test_1"],
    24. "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/test_dev"
    25. }]
    26. }
    27. }
    28. }],
    29. "setting": {
    30. "speed": {
    31. "channel": 1
    32. },
    33. "errorLimit": {
    34. "record": 0,
    35. "percentage": 0.02
    36. }
    37. }
    38. }
    39. }

     3、读取 DB2 的数据导入 MySQL

    1. {
    2. "job": {
    3. "content": [{
    4. "reader": {
    5. "name": "rdbmsreader",
    6. "parameter": {
    7. "column":[
    8. "ID",
    9. "NAME"
    10. ],
    11. "password": "root",
    12. "username": "root",
    13. "connection": [{
    14. "jdbcUrl": ["jdbc:db2://127.0.0.1:50000/sample"],
    15. "table": ["STUDENT"]
    16. }]
    17. }
    18. },
    19. "writer": {
    20. "name": "mysqlwriter",
    21. "parameter": {
    22. "column": ["*"],
    23. "password": "root",
    24. "username": "root",
    25. "writeMode": "insert",
    26. "connection": [{
    27. "table": ["student"],
    28. "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/datax"
    29. }]
    30. }
    31. }
    32. }],
    33. "setting": {
    34. "speed": {
    35. "channel": 1
    36. }
    37. }
    38. }
    39. }

    4、读取 SQLServer 的数据导入 MySQL

    1. {
    2. "job": {
    3. "content": [{
    4. "reader": {
    5. "name": "sqlserverreader",
    6. "parameter": {
    7. "column":["id","name"],
    8. "password": "root",
    9. "username": "root",
    10. "connection": [{
    11. "jdbcUrl": ["jdbc:sqlserver://127.0.0.1:1433;DatabaseName=datax"],
    12. "table": ["student"]
    13. }]
    14. }
    15. },
    16. "writer": {
    17. "name": "mysqlwriter",
    18. "parameter": {
    19. "column": ["*"],
    20. "password": "root",
    21. "username": "root",
    22. "writeMode": "insert",
    23. "connection": [{
    24. "table": ["student"],
    25. "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/datax"
    26. }]
    27. }
    28. }
    29. }],
    30. "setting": {
    31. "speed": {
    32. "channel": 1
    33. }
    34. }
    35. }
    36. }

    5、读取 MongoDB 的数据导入 MySQL

    1. {
    2. "job": {
    3. "content": [{
    4. "reader": {
    5. "name": "mongodbreader",
    6. "parameter": {
    7. "address":["127.0.0.1:27017"]
    8. "collectionName":"t_user",
    9. "column": [
    10. {"name":"name","type": "string"},
    11. {"name":"url","type": "string"}
    12. ],
    13. "dbName": "test"
    14. }
    15. },
    16. "writer": {
    17. "name": "mysqlwriter",
    18. "parameter": {
    19. "column": ["*"],
    20. "password": "root",
    21. "username": "root",
    22. "writeMode": "insert",
    23. "connection": [{
    24. "table": ["t_user"],
    25. "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/datax"
    26. }]
    27. }
    28. }
    29. }],
    30. "setting": {
    31. "speed": {
    32. "channel": 1
    33. }
    34. }
    35. }
    36. }

    三、原理

    黄色: Job 部分的执行阶段, 蓝色: Task 部分的执行阶段, 绿色:框架执行阶段。 

    1、程序入口

    datax.py

    ......
    DEFAULT_PROPERTY_CONF = "-Dfile.encoding=UTF-8 -Dlogback.statusListenerClass=ch.qos.logback.core.status.NopStatusListener -Djava.security.egd=file:///dev/urandom -Ddatax.home=%s -Dlogback.configurationFile=%s" % (
        DATAX_HOME, LOGBACK_FILE)
    ENGINE_COMMAND = "java -server ${jvm} %s -classpath %s  ${params} com.alibaba.datax.core.Engine -mode ${mode} -jobid ${jobid} -job ${job}" % (
        DEFAULT_PROPERTY_CONF, CLASS_PATH)
    REMOTE_DEBUG_CONFIG = "-Xdebug -Xrunjdwp:transport=dt_socket,server=y,address=9999"

    。。。。

    Engine.java

    1. package com.alibaba.datax.core;
    2. .............
    3. /**
    4. * Engine是DataX入口类,该类负责初始化Job或者Task的运行容器,并运行插件的Job或者Task逻辑
    5. */
    6. public class Engine {
    7. private static final Logger LOG = LoggerFactory.getLogger(Engine.class);
    8. private static String RUNTIME_MODE;
    9. /* check job model (job/task) first */
    10. public void start(Configuration allConf) {
    11. // 绑定column转换信息
    12. ColumnCast.bind(allConf);
    13. /**
    14. * 初始化PluginLoader,可以获取各种插件配置
    15. */
    16. LoadUtil.bind(allConf);
    17. boolean isJob = !("taskGroup".equalsIgnoreCase(allConf
    18. .getString(CoreConstant.DATAX_CORE_CONTAINER_MODEL)));
    19. //JobContainer会在schedule后再行进行设置和调整值
    20. int channelNumber =0;
    21. AbstractContainer container;
    22. long instanceId;
    23. int taskGroupId = -1;
    24. if (isJob) {
    25. allConf.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_MODE, RUNTIME_MODE);
    26. //job容器
    27. container = new JobContainer(allConf);
    28. instanceId = allConf.getLong(
    29. CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, 0);
    30. } else {
    31. //开始构建task容器
    32. container = new TaskGroupContainer(allConf);
    33. instanceId = allConf.getLong(
    34. CoreConstant.DATAX_CORE_CONTAINER_JOB_ID);
    35. taskGroupId = allConf.getInt(
    36. CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_ID);
    37. channelNumber = allConf.getInt(
    38. CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL);
    39. }
    40. //缺省打开perfTrace
    41. boolean traceEnable = allConf.getBool(CoreConstant.DATAX_CORE_CONTAINER_TRACE_ENABLE, true);
    42. boolean perfReportEnable = allConf.getBool(CoreConstant.DATAX_CORE_REPORT_DATAX_PERFLOG, true);
    43. //standalone模式的 datax shell任务不进行汇报
    44. if(instanceId == -1){
    45. perfReportEnable = false;
    46. }
    47. int priority = 0;
    48. try {
    49. priority = Integer.parseInt(System.getenv("SKYNET_PRIORITY"));
    50. }catch (NumberFormatException e){
    51. LOG.warn("prioriy set to 0, because NumberFormatException, the value is: "+System.getProperty("PROIORY"));
    52. }
    53. Configuration jobInfoConfig = allConf.getConfiguration(CoreConstant.DATAX_JOB_JOBINFO);
    54. //初始化PerfTrace
    55. PerfTrace perfTrace = PerfTrace.getInstance(isJob, instanceId, taskGroupId, priority, traceEnable);
    56. perfTrace.setJobInfo(jobInfoConfig,perfReportEnable,channelNumber);
    57. //开启容器
    58. container.start();
    59. }
    60. ...........

    JobContainer.java

    1. /**
    2. * jobContainer主要负责的工作全部在start()里面,包括init、prepare、split、scheduler、
    3. * post以及destroy和statistics
    4. */
    5. @Override
    6. public void start() {
    7. LOG.info("DataX jobContainer starts job.");
    8. boolean hasException = false;
    9. boolean isDryRun = false;
    10. try {
    11. this.startTimeStamp = System.currentTimeMillis();
    12. isDryRun = configuration.getBool(CoreConstant.DATAX_JOB_SETTING_DRYRUN, false);
    13. if(isDryRun) {
    14. LOG.info("jobContainer starts to do preCheck ...");
    15. this.preCheck();
    16. } else {
    17. userConf = configuration.clone();
    18. LOG.debug("jobContainer starts to do preHandle ...");
    19. //Job 前置操作
    20. this.preHandle();
    21. LOG.debug("jobContainer starts to do init ...");
    22. //初始化 reader 和 writer
    23. this.init();
    24. LOG.info("jobContainer starts to do prepare ...");
    25. //全局准备工作,比如 odpswriter 清空目标表
    26. this.prepare();
    27. LOG.info("jobContainer starts to do split ...");
    28. //拆分 Task
    29. this.totalStage = this.split();
    30. LOG.info("jobContainer starts to do schedule ...");
    31. //开始调度
    32. this.schedule();
    33. LOG.debug("jobContainer starts to do post ...");
    34. this.post();
    35. LOG.debug("jobContainer starts to do postHandle ...");
    36. //Job 后置操作
    37. this.postHandle();
    38. LOG.info("DataX jobId [{}] completed successfully.", this.jobId);
    39. this.invokeHooks();
    40. }
    41. } catch (Throwable e) {
    42. LOG.error("Exception when job run", e);
    43. hasException = true;
    44. if (e instanceof OutOfMemoryError) {
    45. this.destroy();
    46. System.gc();
    47. }
    48. if (super.getContainerCommunicator() == null) {
    49. // 由于 containerCollector 是在 scheduler() 中初始化的,所以当在 scheduler() 之前出现异常时,需要在此处对 containerCollector 进行初始化
    50. AbstractContainerCommunicator tempContainerCollector;
    51. // standalone
    52. tempContainerCollector = new StandAloneJobContainerCommunicator(configuration);
    53. super.setContainerCommunicator(tempContainerCollector);
    54. }
    55. Communication communication = super.getContainerCommunicator().collect();
    56. // 汇报前的状态,不需要手动进行设置
    57. // communication.setState(State.FAILED);
    58. communication.setThrowable(e);
    59. communication.setTimestamp(this.endTimeStamp);
    60. Communication tempComm = new Communication();
    61. tempComm.setTimestamp(this.startTransferTimeStamp);
    62. Communication reportCommunication = CommunicationTool.getReportCommunication(communication, tempComm, this.totalStage);
    63. super.getContainerCommunicator().report(reportCommunication);
    64. throw DataXException.asDataXException(
    65. FrameworkErrorCode.RUNTIME_ERROR, e);
    66. } finally {
    67. if(!isDryRun) {
    68. this.destroy();
    69. this.endTimeStamp = System.currentTimeMillis();
    70. if (!hasException) {
    71. //最后打印cpu的平均消耗,GC的统计
    72. VMInfo vmInfo = VMInfo.getVmInfo();
    73. if (vmInfo != null) {
    74. vmInfo.getDelta(false);
    75. LOG.info(vmInfo.totalString());
    76. }
    77. LOG.info(PerfTrace.getInstance().summarizeNoException());
    78. this.logStatistics();
    79. }
    80. }
    81. }
    82. }

    2、Task 切分逻辑

    JobContainer.java

    1. /**
    2. * 执行reader和writer最细粒度的切分,需要注意的是,writer的切分结果要参照reader的切分结果,
    3. * 达到切分后数目相等,才能满足1:1的通道模型,所以这里可以将reader和writer的配置整合到一起,
    4. * 然后,为避免顺序给读写端带来长尾影响,将整合的结果shuffler掉
    5. Task 切分逻辑
    6. */
    7. private int split() {
    8. this.adjustChannelNumber();
    9. if (this.needChannelNumber <= 0) {
    10. this.needChannelNumber = 1;
    11. }
    12. List readerTaskConfigs = this
    13. .doReaderSplit(this.needChannelNumber);
    14. int taskNumber = readerTaskConfigs.size();
    15. List writerTaskConfigs = this
    16. .doWriterSplit(taskNumber);
    17. List transformerList = this.configuration.getListConfiguration(CoreConstant.DATAX_JOB_CONTENT_TRANSFORMER);
    18. LOG.debug("transformer configuration: "+ JSON.toJSONString(transformerList));
    19. /**
    20. * 输入是reader和writer的parameter list,输出是content下面元素的list
    21. */
    22. List contentConfig = mergeReaderAndWriterTaskConfigs(
    23. readerTaskConfigs, writerTaskConfigs, transformerList);
    24. LOG.debug("contentConfig configuration: "+ JSON.toJSONString(contentConfig));
    25. this.configuration.set(CoreConstant.DATAX_JOB_CONTENT, contentConfig);
    26. return contentConfig.size();
    27. }

    3.并发数的确定

    JobContainer.java

    1. //并发数的确定
    2. private void adjustChannelNumber() {
    3. int needChannelNumberByByte = Integer.MAX_VALUE;
    4. int needChannelNumberByRecord = Integer.MAX_VALUE;
    5. boolean isByteLimit = (this.configuration.getInt(
    6. CoreConstant.DATAX_JOB_SETTING_SPEED_BYTE, 0) > 0);
    7. if (isByteLimit) {
    8. long globalLimitedByteSpeed = this.configuration.getInt(
    9. CoreConstant.DATAX_JOB_SETTING_SPEED_BYTE, 10 * 1024 * 1024);
    10. // 在byte流控情况下,单个Channel流量最大值必须设置,否则报错!
    11. Long channelLimitedByteSpeed = this.configuration
    12. .getLong(CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_SPEED_BYTE);
    13. if (channelLimitedByteSpeed == null || channelLimitedByteSpeed <= 0) {
    14. throw DataXException.asDataXException(
    15. FrameworkErrorCode.CONFIG_ERROR,
    16. "在有总bps限速条件下,单个channel的bps值不能为空,也不能为非正数");
    17. }
    18. needChannelNumberByByte =
    19. (int) (globalLimitedByteSpeed / channelLimitedByteSpeed);
    20. needChannelNumberByByte =
    21. needChannelNumberByByte > 0 ? needChannelNumberByByte : 1;
    22. LOG.info("Job set Max-Byte-Speed to " + globalLimitedByteSpeed + " bytes.");
    23. }
    24. boolean isRecordLimit = (this.configuration.getInt(
    25. CoreConstant.DATAX_JOB_SETTING_SPEED_RECORD, 0)) > 0;
    26. if (isRecordLimit) {
    27. long globalLimitedRecordSpeed = this.configuration.getInt(
    28. CoreConstant.DATAX_JOB_SETTING_SPEED_RECORD, 100000);
    29. Long channelLimitedRecordSpeed = this.configuration.getLong(
    30. CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_SPEED_RECORD);
    31. if (channelLimitedRecordSpeed == null || channelLimitedRecordSpeed <= 0) {
    32. throw DataXException.asDataXException(FrameworkErrorCode.CONFIG_ERROR,
    33. "在有总tps限速条件下,单个channel的tps值不能为空,也不能为非正数");
    34. }
    35. needChannelNumberByRecord =
    36. (int) (globalLimitedRecordSpeed / channelLimitedRecordSpeed);
    37. needChannelNumberByRecord =
    38. needChannelNumberByRecord > 0 ? needChannelNumberByRecord : 1;
    39. LOG.info("Job set Max-Record-Speed to " + globalLimitedRecordSpeed + " records.");
    40. }
    41. // 取较小值
    42. this.needChannelNumber = needChannelNumberByByte < needChannelNumberByRecord ?
    43. needChannelNumberByByte : needChannelNumberByRecord;
    44. // 如果从byte或record上设置了needChannelNumber则退出
    45. if (this.needChannelNumber < Integer.MAX_VALUE) {
    46. return;
    47. }
    48. boolean isChannelLimit = (this.configuration.getInt(
    49. CoreConstant.DATAX_JOB_SETTING_SPEED_CHANNEL, 0) > 0);
    50. if (isChannelLimit) {
    51. this.needChannelNumber = this.configuration.getInt(
    52. CoreConstant.DATAX_JOB_SETTING_SPEED_CHANNEL);
    53. LOG.info("Job set Channel-Number to " + this.needChannelNumber
    54. + " channels.");
    55. return;
    56. }
    57. throw DataXException.asDataXException(
    58. FrameworkErrorCode.CONFIG_ERROR,
    59. "Job运行速度必须设置");
    60. }

    4、调度

    1. /**
    2. * schedule首先完成的工作是把上一步reader和writer split的结果整合到具体taskGroupContainer中,
    3. * 同时不同的执行模式调用不同的调度策略,将所有任务调度起来
    4. */
    5. private void schedule() {
    6. /**
    7. * 这里的全局speed和每个channel的速度设置为B/s
    8. */
    9. int channelsPerTaskGroup = this.configuration.getInt(
    10. CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL, 5);
    11. int taskNumber = this.configuration.getList(
    12. CoreConstant.DATAX_JOB_CONTENT).size();
    13. this.needChannelNumber = Math.min(this.needChannelNumber, taskNumber);
    14. PerfTrace.getInstance().setChannelNumber(needChannelNumber);
    15. /**
    16. * 通过获取配置信息得到每个taskGroup需要运行哪些tasks任务
    17. */
    18. //确定组数和分组
    19. List taskGroupConfigs = JobAssignUtil.assignFairly(this.configuration,
    20. this.needChannelNumber, channelsPerTaskGroup);
    21. LOG.info("Scheduler starts [{}] taskGroups.", taskGroupConfigs.size());
    22. ExecuteMode executeMode = null;
    23. AbstractScheduler scheduler;
    24. try {
    25. executeMode = ExecuteMode.STANDALONE;
    26. scheduler = initStandaloneScheduler(this.configuration);
    27. //设置 executeMode
    28. for (Configuration taskGroupConfig : taskGroupConfigs) {
    29. taskGroupConfig.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_MODE, executeMode.getValue());
    30. }
    31. if (executeMode == ExecuteMode.LOCAL || executeMode == ExecuteMode.DISTRIBUTE) {
    32. if (this.jobId <= 0) {
    33. throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR,
    34. "在[ local | distribute ]模式下必须设置jobId,并且其值 > 0 .");
    35. }
    36. }
    37. LOG.info("Running by {} Mode.", executeMode);
    38. this.startTransferTimeStamp = System.currentTimeMillis();
    39. //调度实现
    40. scheduler.schedule(taskGroupConfigs);
    41. this.endTransferTimeStamp = System.currentTimeMillis();
    42. } catch (Exception e) {
    43. LOG.error("运行scheduler 模式[{}]出错.", executeMode);
    44. this.endTransferTimeStamp = System.currentTimeMillis();
    45. throw DataXException.asDataXException(
    46. FrameworkErrorCode.RUNTIME_ERROR, e);
    47. }
    48. /**
    49. * 检查任务执行情况
    50. */
    51. this.checkLimit();
    52. }

    1.确定组数和分组:

    JobAssignUtil:

    1. /**
    2. * 公平的分配 task 到对应的 taskGroup 中。
    3. * 公平体现在:会考虑 task 中对资源负载作的 load 标识进行更均衡的作业分配操作。
    4. * TODO 具体文档举例说明
    5. 1)确定 taskGroupNumber,
    6. 2)做分组分配,
    7. 3)做分组优化
    8. */
    9. public static List assignFairly(Configuration configuration, int channelNumber, int channelsPerTaskGroup) {
    10. Validate.isTrue(configuration != null, "框架获得的 Job 不能为 null.");
    11. List contentConfig = configuration.getListConfiguration(CoreConstant.DATAX_JOB_CONTENT);
    12. Validate.isTrue(contentConfig.size() > 0, "框架获得的切分后的 Job 无内容.");
    13. Validate.isTrue(channelNumber > 0 && channelsPerTaskGroup > 0,
    14. "每个channel的平均task数[averTaskPerChannel],channel数目[channelNumber],每个taskGroup的平均channel数[channelsPerTaskGroup]都应该为正数");
    15. int taskGroupNumber = (int) Math.ceil(1.0 * channelNumber / channelsPerTaskGroup);
    16. Configuration aTaskConfig = contentConfig.get(0);
    17. String readerResourceMark = aTaskConfig.getString(CoreConstant.JOB_READER_PARAMETER + "." +
    18. CommonConstant.LOAD_BALANCE_RESOURCE_MARK);
    19. String writerResourceMark = aTaskConfig.getString(CoreConstant.JOB_WRITER_PARAMETER + "." +
    20. CommonConstant.LOAD_BALANCE_RESOURCE_MARK);
    21. boolean hasLoadBalanceResourceMark = StringUtils.isNotBlank(readerResourceMark) ||
    22. StringUtils.isNotBlank(writerResourceMark);
    23. if (!hasLoadBalanceResourceMark) {
    24. // fake 一个固定的 key 作为资源标识(在 reader 或者 writer 上均可,此处选择在 reader 上进行 fake)
    25. for (Configuration conf : contentConfig) {
    26. conf.set(CoreConstant.JOB_READER_PARAMETER + "." +
    27. CommonConstant.LOAD_BALANCE_RESOURCE_MARK, "aFakeResourceMarkForLoadBalance");
    28. }
    29. // 是为了避免某些插件没有设置 资源标识 而进行了一次随机打乱操作
    30. Collections.shuffle(contentConfig, new Random(System.currentTimeMillis()));
    31. }
    32. LinkedHashMap> resourceMarkAndTaskIdMap = parseAndGetResourceMarkAndTaskIdMap(contentConfig);
    33. List taskGroupConfig = doAssign(resourceMarkAndTaskIdMap, configuration, taskGroupNumber);
    34. // 调整 每个 taskGroup 对应的 Channel 个数(属于优化范畴)
    35. adjustChannelNumPerTaskGroup(taskGroupConfig, channelNumber);
    36. return taskGroupConfig;
    37. }

    2.调度实现:

    AbstractScheduler.java

    1. public void schedule(List configurations) {
    2. Validate.notNull(configurations,
    3. "scheduler配置不能为空");
    4. int jobReportIntervalInMillSec = configurations.get(0).getInt(
    5. CoreConstant.DATAX_CORE_CONTAINER_JOB_REPORTINTERVAL, 30000);
    6. int jobSleepIntervalInMillSec = configurations.get(0).getInt(
    7. CoreConstant.DATAX_CORE_CONTAINER_JOB_SLEEPINTERVAL, 10000);
    8. this.jobId = configurations.get(0).getLong(
    9. CoreConstant.DATAX_CORE_CONTAINER_JOB_ID);
    10. errorLimit = new ErrorRecordChecker(configurations.get(0));
    11. /**
    12. * 给 taskGroupContainer 的 Communication 注册
    13. */
    14. this.containerCommunicator.registerCommunication(configurations);
    15. int totalTasks = calculateTaskCount(configurations);
    16. startAllTaskGroup(configurations);
    17. Communication lastJobContainerCommunication = new Communication();
    18. long lastReportTimeStamp = System.currentTimeMillis();
    19. try {
    20. while (true) {
    21. /**
    22. * step 1: collect job stat
    23. * step 2: getReport info, then report it
    24. * step 3: errorLimit do check
    25. * step 4: dealSucceedStat();
    26. * step 5: dealKillingStat();
    27. * step 6: dealFailedStat();
    28. * step 7: refresh last job stat, and then sleep for next while
    29. *
    30. * above steps, some ones should report info to DS
    31. *
    32. */
    33. Communication nowJobContainerCommunication = this.containerCommunicator.collect();
    34. nowJobContainerCommunication.setTimestamp(System.currentTimeMillis());
    35. LOG.debug(nowJobContainerCommunication.toString());
    36. //汇报周期
    37. long now = System.currentTimeMillis();
    38. if (now - lastReportTimeStamp > jobReportIntervalInMillSec) {
    39. Communication reportCommunication = CommunicationTool
    40. .getReportCommunication(nowJobContainerCommunication, lastJobContainerCommunication, totalTasks);
    41. this.containerCommunicator.report(reportCommunication);
    42. lastReportTimeStamp = now;
    43. lastJobContainerCommunication = nowJobContainerCommunication;
    44. }
    45. errorLimit.checkRecordLimit(nowJobContainerCommunication);
    46. if (nowJobContainerCommunication.getState() == State.SUCCEEDED) {
    47. LOG.info("Scheduler accomplished all tasks.");
    48. break;
    49. }
    50. if (isJobKilling(this.getJobId())) {
    51. dealKillingStat(this.containerCommunicator, totalTasks);
    52. } else if (nowJobContainerCommunication.getState() == State.FAILED) {
    53. dealFailedStat(this.containerCommunicator, nowJobContainerCommunication.getThrowable());
    54. }
    55. Thread.sleep(jobSleepIntervalInMillSec);
    56. }
    57. } catch (InterruptedException e) {
    58. // 以 failed 状态退出
    59. LOG.error("捕获到InterruptedException异常!", e);
    60. throw DataXException.asDataXException(
    61. FrameworkErrorCode.RUNTIME_ERROR, e);
    62. }
    63. }

    实现类:ProcessInnerScheduler、StandAloneScheduler

    1. package com.alibaba.datax.core.job.scheduler.processinner;
    2. import com.alibaba.datax.common.exception.DataXException;
    3. import com.alibaba.datax.common.util.Configuration;
    4. import com.alibaba.datax.core.job.scheduler.AbstractScheduler;
    5. import com.alibaba.datax.core.statistics.container.communicator.AbstractContainerCommunicator;
    6. import com.alibaba.datax.core.taskgroup.TaskGroupContainer;
    7. import com.alibaba.datax.core.taskgroup.runner.TaskGroupContainerRunner;
    8. import com.alibaba.datax.core.util.FrameworkErrorCode;
    9. import java.util.List;
    10. import java.util.concurrent.ExecutorService;
    11. import java.util.concurrent.Executors;
    12. public abstract class ProcessInnerScheduler extends AbstractScheduler {
    13. private ExecutorService taskGroupContainerExecutorService;
    14. public ProcessInnerScheduler(AbstractContainerCommunicator containerCommunicator) {
    15. super(containerCommunicator);
    16. }
    17. @Override
    18. public void startAllTaskGroup(List configurations) {
    19. this.taskGroupContainerExecutorService = Executors
    20. .newFixedThreadPool(configurations.size());
    21. for (Configuration taskGroupConfiguration : configurations) {
    22. TaskGroupContainerRunner taskGroupContainerRunner = newTaskGroupContainerRunner(taskGroupConfiguration);
    23. this.taskGroupContainerExecutorService.execute(taskGroupContainerRunner);
    24. }
    25. this.taskGroupContainerExecutorService.shutdown();
    26. }
    27. @Override
    28. public void dealFailedStat(AbstractContainerCommunicator frameworkCollector, Throwable throwable) {
    29. this.taskGroupContainerExecutorService.shutdownNow();
    30. throw DataXException.asDataXException(
    31. FrameworkErrorCode.PLUGIN_RUNTIME_ERROR, throwable);
    32. }
    33. @Override
    34. public void dealKillingStat(AbstractContainerCommunicator frameworkCollector, int totalTasks) {
    35. //通过进程退出返回码标示状态
    36. this.taskGroupContainerExecutorService.shutdownNow();
    37. throw DataXException.asDataXException(FrameworkErrorCode.KILLED_EXIT_VALUE,
    38. "job killed status");
    39. }
    40. private TaskGroupContainerRunner newTaskGroupContainerRunner(
    41. Configuration configuration) {
    42. TaskGroupContainer taskGroupContainer = new TaskGroupContainer(configuration);
    43. return new TaskGroupContainerRunner(taskGroupContainer);
    44. }
    45. }
    1. package com.alibaba.datax.core.job.scheduler.processinner;
    2. import com.alibaba.datax.core.statistics.container.communicator.AbstractContainerCommunicator;
    3. /**
    4. * Created by hongjiao.hj on 2014/12/22.
    5. */
    6. public class StandAloneScheduler extends ProcessInnerScheduler{
    7. public StandAloneScheduler(AbstractContainerCommunicator containerCommunicator) {
    8. super(containerCommunicator);
    9. }
    10. @Override
    11. protected boolean isJobKilling(Long jobId) {
    12. return false;
    13. }
    14. }

    5、数据传输

    TaskGroupContainer.start() -> taskExecutor.doStart() -->插件

    1. @Override
    2. public void start() {
    3. try {
    4. /**
    5. * 状态check时间间隔,较短,可以把任务及时分发到对应channel中
    6. */
    7. int sleepIntervalInMillSec = this.configuration.getInt(
    8. CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_SLEEPINTERVAL, 100);
    9. /**
    10. * 状态汇报时间间隔,稍长,避免大量汇报
    11. */
    12. long reportIntervalInMillSec = this.configuration.getLong(
    13. CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_REPORTINTERVAL,
    14. 10000);
    15. /**
    16. * 2分钟汇报一次性能统计
    17. */
    18. // 获取channel数目
    19. int channelNumber = this.configuration.getInt(
    20. CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL);
    21. int taskMaxRetryTimes = this.configuration.getInt(
    22. CoreConstant.DATAX_CORE_CONTAINER_TASK_FAILOVER_MAXRETRYTIMES, 1);
    23. long taskRetryIntervalInMsec = this.configuration.getLong(
    24. CoreConstant.DATAX_CORE_CONTAINER_TASK_FAILOVER_RETRYINTERVALINMSEC, 10000);
    25. long taskMaxWaitInMsec = this.configuration.getLong(CoreConstant.DATAX_CORE_CONTAINER_TASK_FAILOVER_MAXWAITINMSEC, 60000);
    26. List taskConfigs = this.configuration
    27. .getListConfiguration(CoreConstant.DATAX_JOB_CONTENT);
    28. if(LOG.isDebugEnabled()) {
    29. LOG.debug("taskGroup[{}]'s task configs[{}]", this.taskGroupId,
    30. JSON.toJSONString(taskConfigs));
    31. }
    32. int taskCountInThisTaskGroup = taskConfigs.size();
    33. LOG.info(String.format(
    34. "taskGroupId=[%d] start [%d] channels for [%d] tasks.",
    35. this.taskGroupId, channelNumber, taskCountInThisTaskGroup));
    36. this.containerCommunicator.registerCommunication(taskConfigs);
    37. Map taskConfigMap = buildTaskConfigMap(taskConfigs); //taskId与task配置
    38. List taskQueue = buildRemainTasks(taskConfigs); //待运行task列表
    39. Map taskFailedExecutorMap = new HashMap(); //taskId与上次失败实例
    40. List runTasks = new ArrayList(channelNumber); //正在运行task
    41. Map taskStartTimeMap = new HashMap(); //任务开始时间
    42. long lastReportTimeStamp = 0;
    43. Communication lastTaskGroupContainerCommunication = new Communication();
    44. while (true) {
    45. //1.判断task状态
    46. boolean failedOrKilled = false;
    47. Map communicationMap = containerCommunicator.getCommunicationMap();
    48. for(Map.Entry entry : communicationMap.entrySet()){
    49. Integer taskId = entry.getKey();
    50. Communication taskCommunication = entry.getValue();
    51. if(!taskCommunication.isFinished()){
    52. continue;
    53. }
    54. TaskExecutor taskExecutor = removeTask(runTasks, taskId);
    55. //上面从runTasks里移除了,因此对应在monitor里移除
    56. taskMonitor.removeTask(taskId);
    57. //失败,看task是否支持failover,重试次数未超过最大限制
    58. if(taskCommunication.getState() == State.FAILED){
    59. taskFailedExecutorMap.put(taskId, taskExecutor);
    60. if(taskExecutor.supportFailOver() && taskExecutor.getAttemptCount() < taskMaxRetryTimes){
    61. taskExecutor.shutdown(); //关闭老的executor
    62. containerCommunicator.resetCommunication(taskId); //将task的状态重置
    63. Configuration taskConfig = taskConfigMap.get(taskId);
    64. taskQueue.add(taskConfig); //重新加入任务列表
    65. }else{
    66. failedOrKilled = true;
    67. break;
    68. }
    69. }else if(taskCommunication.getState() == State.KILLED){
    70. failedOrKilled = true;
    71. break;
    72. }else if(taskCommunication.getState() == State.SUCCEEDED){
    73. Long taskStartTime = taskStartTimeMap.get(taskId);
    74. if(taskStartTime != null){
    75. Long usedTime = System.currentTimeMillis() - taskStartTime;
    76. LOG.info("taskGroup[{}] taskId[{}] is successed, used[{}]ms",
    77. this.taskGroupId, taskId, usedTime);
    78. //usedTime*1000*1000 转换成PerfRecord记录的ns,这里主要是简单登记,进行最长任务的打印。因此增加特定静态方法
    79. PerfRecord.addPerfRecord(taskGroupId, taskId, PerfRecord.PHASE.TASK_TOTAL,taskStartTime, usedTime * 1000L * 1000L);
    80. taskStartTimeMap.remove(taskId);
    81. taskConfigMap.remove(taskId);
    82. }
    83. }
    84. }
    85. // 2.发现该taskGroup下taskExecutor的总状态失败则汇报错误
    86. if (failedOrKilled) {
    87. lastTaskGroupContainerCommunication = reportTaskGroupCommunication(
    88. lastTaskGroupContainerCommunication, taskCountInThisTaskGroup);
    89. throw DataXException.asDataXException(
    90. FrameworkErrorCode.PLUGIN_RUNTIME_ERROR, lastTaskGroupContainerCommunication.getThrowable());
    91. }
    92. //3.有任务未执行,且正在运行的任务数小于最大通道限制
    93. Iterator iterator = taskQueue.iterator();
    94. while(iterator.hasNext() && runTasks.size() < channelNumber){
    95. Configuration taskConfig = iterator.next();
    96. Integer taskId = taskConfig.getInt(CoreConstant.TASK_ID);
    97. int attemptCount = 1;
    98. TaskExecutor lastExecutor = taskFailedExecutorMap.get(taskId);
    99. if(lastExecutor!=null){
    100. attemptCount = lastExecutor.getAttemptCount() + 1;
    101. long now = System.currentTimeMillis();
    102. long failedTime = lastExecutor.getTimeStamp();
    103. if(now - failedTime < taskRetryIntervalInMsec){ //未到等待时间,继续留在队列
    104. continue;
    105. }
    106. if(!lastExecutor.isShutdown()){ //上次失败的task仍未结束
    107. if(now - failedTime > taskMaxWaitInMsec){
    108. markCommunicationFailed(taskId);
    109. reportTaskGroupCommunication(lastTaskGroupContainerCommunication, taskCountInThisTaskGroup);
    110. throw DataXException.asDataXException(CommonErrorCode.WAIT_TIME_EXCEED, "task failover等待超时");
    111. }else{
    112. lastExecutor.shutdown(); //再次尝试关闭
    113. continue;
    114. }
    115. }else{
    116. LOG.info("taskGroup[{}] taskId[{}] attemptCount[{}] has already shutdown",
    117. this.taskGroupId, taskId, lastExecutor.getAttemptCount());
    118. }
    119. }
    120. Configuration taskConfigForRun = taskMaxRetryTimes > 1 ? taskConfig.clone() : taskConfig;
    121. TaskExecutor taskExecutor = new TaskExecutor(taskConfigForRun, attemptCount);
    122. taskStartTimeMap.put(taskId, System.currentTimeMillis());
    123. taskExecutor.doStart();
    124. iterator.remove();
    125. runTasks.add(taskExecutor);
    126. //上面,增加task到runTasks列表,因此在monitor里注册。
    127. taskMonitor.registerTask(taskId, this.containerCommunicator.getCommunication(taskId));
    128. taskFailedExecutorMap.remove(taskId);
    129. LOG.info("taskGroup[{}] taskId[{}] attemptCount[{}] is started",
    130. this.taskGroupId, taskId, attemptCount);
    131. }
    132. //4.任务列表为空,executor已结束, 搜集状态为success--->成功
    133. if (taskQueue.isEmpty() && isAllTaskDone(runTasks) && containerCommunicator.collectState() == State.SUCCEEDED) {
    134. // 成功的情况下,也需要汇报一次。否则在任务结束非常快的情况下,采集的信息将会不准确
    135. lastTaskGroupContainerCommunication = reportTaskGroupCommunication(
    136. lastTaskGroupContainerCommunication, taskCountInThisTaskGroup);
    137. LOG.info("taskGroup[{}] completed it's tasks.", this.taskGroupId);
    138. break;
    139. }
    140. // 5.如果当前时间已经超出汇报时间的interval,那么我们需要马上汇报
    141. long now = System.currentTimeMillis();
    142. if (now - lastReportTimeStamp > reportIntervalInMillSec) {
    143. lastTaskGroupContainerCommunication = reportTaskGroupCommunication(
    144. lastTaskGroupContainerCommunication, taskCountInThisTaskGroup);
    145. lastReportTimeStamp = now;
    146. //taskMonitor对于正在运行的task,每reportIntervalInMillSec进行检查
    147. for(TaskExecutor taskExecutor:runTasks){
    148. taskMonitor.report(taskExecutor.getTaskId(),this.containerCommunicator.getCommunication(taskExecutor.getTaskId()));
    149. }
    150. }
    151. Thread.sleep(sleepIntervalInMillSec);
    152. }
    153. //6.最后还要汇报一次
    154. reportTaskGroupCommunication(lastTaskGroupContainerCommunication, taskCountInThisTaskGroup);
    155. } catch (Throwable e) {
    156. Communication nowTaskGroupContainerCommunication = this.containerCommunicator.collect();
    157. if (nowTaskGroupContainerCommunication.getThrowable() == null) {
    158. nowTaskGroupContainerCommunication.setThrowable(e);
    159. }
    160. nowTaskGroupContainerCommunication.setState(State.FAILED);
    161. this.containerCommunicator.report(nowTaskGroupContainerCommunication);
    162. throw DataXException.asDataXException(
    163. FrameworkErrorCode.RUNTIME_ERROR, e);
    164. }finally {
    165. if(!PerfTrace.getInstance().isJob()){
    166. //最后打印cpu的平均消耗,GC的统计
    167. VMInfo vmInfo = VMInfo.getVmInfo();
    168. if (vmInfo != null) {
    169. vmInfo.getDelta(false);
    170. LOG.info(vmInfo.totalString());
    171. }
    172. LOG.info(PerfTrace.getInstance().summarizeNoException());
    173. }
    174. }
    175. }
    176. private Map buildTaskConfigMap(List configurations){
    177. Map map = new HashMap();
    178. for(Configuration taskConfig : configurations){
    179. int taskId = taskConfig.getInt(CoreConstant.TASK_ID);
    180. map.put(taskId, taskConfig);
    181. }
    182. return map;
    183. }
    184. private List buildRemainTasks(List configurations){
    185. List remainTasks = new LinkedList();
    186. for(Configuration taskConfig : configurations){
    187. remainTasks.add(taskConfig);
    188. }
    189. return remainTasks;
    190. }
    191. private TaskExecutor removeTask(List taskList, int taskId){
    192. Iterator iterator = taskList.iterator();
    193. while(iterator.hasNext()){
    194. TaskExecutor taskExecutor = iterator.next();
    195. if(taskExecutor.getTaskId() == taskId){
    196. iterator.remove();
    197. return taskExecutor;
    198. }
    199. }
    200. return null;
    201. }
    202. private boolean isAllTaskDone(List taskList){
    203. for(TaskExecutor taskExecutor : taskList){
    204. if(!taskExecutor.isTaskFinished()){
    205. return false;
    206. }
    207. }
    208. return true;
    209. }
    210. private Communication reportTaskGroupCommunication(Communication lastTaskGroupContainerCommunication, int taskCount){
    211. Communication nowTaskGroupContainerCommunication = this.containerCommunicator.collect();
    212. nowTaskGroupContainerCommunication.setTimestamp(System.currentTimeMillis());
    213. Communication reportCommunication = CommunicationTool.getReportCommunication(nowTaskGroupContainerCommunication,
    214. lastTaskGroupContainerCommunication, taskCount);
    215. this.containerCommunicator.report(reportCommunication);
    216. return reportCommunication;
    217. }
    218. private void markCommunicationFailed(Integer taskId){
    219. Communication communication = containerCommunicator.getCommunication(taskId);
    220. communication.setState(State.FAILED);
    221. }
    222. /**
    223. * TaskExecutor是一个完整task的执行器
    224. * 其中包括1:1的reader和writer
    225. */
    226. class TaskExecutor {
    227. private Configuration taskConfig;
    228. private int taskId;
    229. private int attemptCount;
    230. private Channel channel;
    231. private Thread readerThread;
    232. private Thread writerThread;
    233. private ReaderRunner readerRunner;
    234. private WriterRunner writerRunner;
    235. /**
    236. * 该处的taskCommunication在多处用到:
    237. * 1. channel
    238. * 2. readerRunner和writerRunner
    239. * 3. reader和writer的taskPluginCollector
    240. */
    241. private Communication taskCommunication;
    242. public TaskExecutor(Configuration taskConf, int attemptCount) {
    243. // 获取该taskExecutor的配置
    244. this.taskConfig = taskConf;
    245. Validate.isTrue(null != this.taskConfig.getConfiguration(CoreConstant.JOB_READER)
    246. && null != this.taskConfig.getConfiguration(CoreConstant.JOB_WRITER),
    247. "[reader|writer]的插件参数不能为空!");
    248. // 得到taskId
    249. this.taskId = this.taskConfig.getInt(CoreConstant.TASK_ID);
    250. this.attemptCount = attemptCount;
    251. /**
    252. * 由taskId得到该taskExecutor的Communication
    253. * 要传给readerRunner和writerRunner,同时要传给channel作统计用
    254. */
    255. this.taskCommunication = containerCommunicator
    256. .getCommunication(taskId);
    257. Validate.notNull(this.taskCommunication,
    258. String.format("taskId[%d]的Communication没有注册过", taskId));
    259. this.channel = ClassUtil.instantiate(channelClazz,
    260. Channel.class, configuration);
    261. this.channel.setCommunication(this.taskCommunication);
    262. /**
    263. * 获取transformer的参数
    264. */
    265. List transformerInfoExecs = TransformerUtil.buildTransformerInfo(taskConfig);
    266. /**
    267. * 生成writerThread
    268. */
    269. writerRunner = (WriterRunner) generateRunner(PluginType.WRITER);
    270. this.writerThread = new Thread(writerRunner,
    271. String.format("%d-%d-%d-writer",
    272. jobId, taskGroupId, this.taskId));
    273. //通过设置thread的contextClassLoader,即可实现同步和主程序不通的加载器
    274. this.writerThread.setContextClassLoader(LoadUtil.getJarLoader(
    275. PluginType.WRITER, this.taskConfig.getString(
    276. CoreConstant.JOB_WRITER_NAME)));
    277. /**
    278. * 生成readerThread
    279. */
    280. readerRunner = (ReaderRunner) generateRunner(PluginType.READER,transformerInfoExecs);
    281. this.readerThread = new Thread(readerRunner,
    282. String.format("%d-%d-%d-reader",
    283. jobId, taskGroupId, this.taskId));
    284. /**
    285. * 通过设置thread的contextClassLoader,即可实现同步和主程序不通的加载器
    286. */
    287. this.readerThread.setContextClassLoader(LoadUtil.getJarLoader(
    288. PluginType.READER, this.taskConfig.getString(
    289. CoreConstant.JOB_READER_NAME)));
    290. }
    291. public void doStart() {
    292. this.writerThread.start();
    293. // reader没有起来,writer不可能结束
    294. if (!this.writerThread.isAlive() || this.taskCommunication.getState() == State.FAILED) {
    295. throw DataXException.asDataXException(
    296. FrameworkErrorCode.RUNTIME_ERROR,
    297. this.taskCommunication.getThrowable());
    298. }
    299. this.readerThread.start();
    300. // 这里reader可能很快结束
    301. if (!this.readerThread.isAlive() && this.taskCommunication.getState() == State.FAILED) {
    302. // 这里有可能出现Reader线上启动即挂情况 对于这类情况 需要立刻抛出异常
    303. throw DataXException.asDataXException(
    304. FrameworkErrorCode.RUNTIME_ERROR,
    305. this.taskCommunication.getThrowable());
    306. }
    307. }
    308. private AbstractRunner generateRunner(PluginType pluginType) {
    309. return generateRunner(pluginType, null);
    310. }
    311. private AbstractRunner generateRunner(PluginType pluginType, List transformerInfoExecs) {
    312. AbstractRunner newRunner = null;
    313. TaskPluginCollector pluginCollector;
    314. //插件类型
    315. switch (pluginType) {
    316. case READER:
    317. newRunner = LoadUtil.loadPluginRunner(pluginType,
    318. this.taskConfig.getString(CoreConstant.JOB_READER_NAME));
    319. newRunner.setJobConf(this.taskConfig.getConfiguration(
    320. CoreConstant.JOB_READER_PARAMETER));
    321. pluginCollector = ClassUtil.instantiate(
    322. taskCollectorClass, AbstractTaskPluginCollector.class,
    323. configuration, this.taskCommunication,
    324. PluginType.READER);
    325. RecordSender recordSender;
    326. if (transformerInfoExecs != null && transformerInfoExecs.size() > 0) {
    327. recordSender = new BufferedRecordTransformerExchanger(taskGroupId, this.taskId, this.channel,this.taskCommunication ,pluginCollector, transformerInfoExecs);
    328. } else {
    329. recordSender = new BufferedRecordExchanger(this.channel, pluginCollector);
    330. }
    331. ((ReaderRunner) newRunner).setRecordSender(recordSender);
    332. /**
    333. * 设置taskPlugin的collector,用来处理脏数据和job/task通信
    334. */
    335. newRunner.setTaskPluginCollector(pluginCollector);
    336. break;
    337. case WRITER:
    338. newRunner = LoadUtil.loadPluginRunner(pluginType,
    339. this.taskConfig.getString(CoreConstant.JOB_WRITER_NAME));
    340. newRunner.setJobConf(this.taskConfig
    341. .getConfiguration(CoreConstant.JOB_WRITER_PARAMETER));
    342. pluginCollector = ClassUtil.instantiate(
    343. taskCollectorClass, AbstractTaskPluginCollector.class,
    344. configuration, this.taskCommunication,
    345. PluginType.WRITER);
    346. ((WriterRunner) newRunner).setRecordReceiver(new BufferedRecordExchanger(
    347. this.channel, pluginCollector));
    348. /**
    349. * 设置taskPlugin的collector,用来处理脏数据和job/task通信
    350. */
    351. newRunner.setTaskPluginCollector(pluginCollector);
    352. break;
    353. default:
    354. throw DataXException.asDataXException(FrameworkErrorCode.ARGUMENT_ERROR, "Cant generateRunner for:" + pluginType);
    355. }
    356. newRunner.setTaskGroupId(taskGroupId);
    357. newRunner.setTaskId(this.taskId);
    358. newRunner.setRunnerCommunication(this.taskCommunication);
    359. return newRunner;
    360. }
    361. // 检查任务是否结束
    362. private boolean isTaskFinished() {
    363. // 如果reader 或 writer没有完成工作,那么直接返回工作没有完成
    364. if (readerThread.isAlive() || writerThread.isAlive()) {
    365. return false;
    366. }
    367. if(taskCommunication==null || !taskCommunication.isFinished()){
    368. return false;
    369. }
    370. return true;
    371. }
    372. private int getTaskId(){
    373. return taskId;
    374. }
    375. private long getTimeStamp(){
    376. return taskCommunication.getTimestamp();
    377. }
    378. private int getAttemptCount(){
    379. return attemptCount;
    380. }
    381. private boolean supportFailOver(){
    382. return writerRunner.supportFailOver();
    383. }
    384. private void shutdown(){
    385. writerRunner.shutdown();
    386. readerRunner.shutdown();
    387. if(writerThread.isAlive()){
    388. writerThread.interrupt();
    389. }
    390. if(readerThread.isAlive()){
    391. readerThread.interrupt();
    392. }
    393. }
    394. private boolean isShutdown(){
    395. return !readerThread.isAlive() && !writerThread.isAlive();
    396. }
    397. }

    6、限速的实现 (具体插件)

    比如:MysqlReader

    MysqlReader -- startReader()

    com.alibaba.datax.plugin.rdbms.reader.CommonRdbmsReader -- startRead() -- transportOneRecord() >> com.alibaba.datax.common.plugin.RecordSender -- sendToWriter() >> com.alibaba.datax.core.transport.exchanger.BufferedRecordExchanger --sendToWriter() -- flush() >> com.alibaba.datax.core.transport.channel.Channel -- pushAll() -- statPush()

    1. private void statPush(long recordSize, long byteSize) {
    2. currentCommunication.increaseCounter(CommunicationTool.READ_SUCCEED_RECORDS,
    3. recordSize);
    4. currentCommunication.increaseCounter(CommunicationTool.READ_SUCCEED_BYTES,
    5. byteSize);
    6. //在读的时候进行统计waitCounter即可,因为写(pull)的时候可能正在阻塞,但读的时候已经能读到这个阻塞的counter数
    7. currentCommunication.setLongCounter(CommunicationTool.WAIT_READER_TIME, waitReaderTime);
    8. currentCommunication.setLongCounter(CommunicationTool.WAIT_WRITER_TIME, waitWriterTime);
    9. boolean isChannelByteSpeedLimit = (this.byteSpeed > 0);
    10. boolean isChannelRecordSpeedLimit = (this.recordSpeed > 0);
    11. if (!isChannelByteSpeedLimit && !isChannelRecordSpeedLimit) {
    12. return;
    13. }
    14. long lastTimestamp = lastCommunication.getTimestamp();
    15. long nowTimestamp = System.currentTimeMillis();
    16. long interval = nowTimestamp - lastTimestamp;
    17. if (interval - this.flowControlInterval >= 0) {
    18. long byteLimitSleepTime = 0;
    19. long recordLimitSleepTime = 0;
    20. if (isChannelByteSpeedLimit) {
    21. long currentByteSpeed = (CommunicationTool.getTotalReadBytes(currentCommunication) -
    22. CommunicationTool.getTotalReadBytes(lastCommunication)) * 1000 / interval;
    23. if (currentByteSpeed > this.byteSpeed) {
    24. // 计算根据byteLimit得到的休眠时间
    25. byteLimitSleepTime = currentByteSpeed * interval / this.byteSpeed
    26. - interval;
    27. }
    28. }
    29. if (isChannelRecordSpeedLimit) {
    30. long currentRecordSpeed = (CommunicationTool.getTotalReadRecords(currentCommunication) -
    31. CommunicationTool.getTotalReadRecords(lastCommunication)) * 1000 / interval;
    32. if (currentRecordSpeed > this.recordSpeed) {
    33. // 计算根据recordLimit得到的休眠时间
    34. recordLimitSleepTime = currentRecordSpeed * interval / this.recordSpeed
    35. - interval;
    36. }
    37. }
    38. // 休眠时间取较大值
    39. long sleepTime = byteLimitSleepTime < recordLimitSleepTime ?
    40. recordLimitSleepTime : byteLimitSleepTime;
    41. if (sleepTime > 0) {
    42. try {
    43. Thread.sleep(sleepTime);
    44. } catch (InterruptedException e) {
    45. Thread.currentThread().interrupt();
    46. }
    47. }
    48. lastCommunication.setLongCounter(CommunicationTool.READ_SUCCEED_BYTES,
    49. currentCommunication.getLongCounter(CommunicationTool.READ_SUCCEED_BYTES));
    50. lastCommunication.setLongCounter(CommunicationTool.READ_FAILED_BYTES,
    51. currentCommunication.getLongCounter(CommunicationTool.READ_FAILED_BYTES));
    52. lastCommunication.setLongCounter(CommunicationTool.READ_SUCCEED_RECORDS,
    53. currentCommunication.getLongCounter(CommunicationTool.READ_SUCCEED_RECORDS));
    54. lastCommunication.setLongCounter(CommunicationTool.READ_FAILED_RECORDS,
    55. currentCommunication.getLongCounter(CommunicationTool.READ_FAILED_RECORDS));
    56. lastCommunication.setTimestamp(nowTimestamp);
    57. }
    58. }

    四、插件开发

    DataX为什么要使用插件机制?

            应对不同数据源的差异、同时提供一致的同步原语和扩展能力,DataX自然而然地采用了框架 + 插件 的模式,

    • 插件只需关心数据的读取或者写入本身。
    • 而同步的共性问题,比如:类型转换、性能、统计,则交由框架来处理。

    作为插件开发人员,则需要关注两个问题:

    1. 数据源本身的读写数据正确性。
    2. 如何与框架沟通、合理正确地使用框架。

    DataX插件需要遵循统一的目录结构:

     

    步骤文档

    五、DataX 使用优化

    速度控制

            DataX3.0提供了包括通道(并发)、记录流、字节流三种流控模式,可以随意控制作业速度,让作业在数据库可以承受的范围内达到最佳的同步速度。

    关键参数

    ➢ job.setting.speed.channel : channel 并发数

    ➢ job.setting.speed.record : 2 全局配置 channel 的 record 限速

    ➢ job.setting.speed.byte:全局配置 channel 的 byte 限速

    ➢ core.transport.channel.speed.record:单个 channel 的 record 限速

    ➢ core.transport.channel.speed.byte:单个 channel 的 byte 限速


    一、提升每个 channel 的速度

            在 DataX 内部对每个 Channel 会有严格的速度控制,分两种,一种是控制每秒同步的记 录数,另外一种是每秒同步的字节数,默认的速度限制是 1MB/s,可以根据具体硬件情况设 置这个 byte 速度或者 record 速度,一般设置 byte 速度,比如:我们可以把单个 Channel 的 速度上限配置为 5MB。

    二、提升 DataX Job 内 Channel 并发数

            并发数 = taskGroup 的数量 * 每个 TaskGroup 并发执行的 Task 数 (默认为 5)。 提升 job 内 Channel 并发有三种配置方式:

    1、配置全局 Byte 限速以及单 Channel Byte 限速

            Channel 个数 = 全局 Byte 限速 / 单 Channel Byte 限速

    {
        "core": {
            "transport": {
                "channel": {
                    "speed": {
                        "byte": 1048576
                    }
                }
            }
        },
        "job": {
            "setting": {
                "speed": {
                    "byte": 5242880
                }
            },
            。。。
        }
    }

            core.transport.channel.speed.byte=1048576,job.setting.speed.byte=5242880,所以 Channel 个数 = 全局 Byte 限速 / 单 Channel Byte 限速=5242880/1048576=5 个

    2、配置全局 Record 限速以及单 Channel Record 限速

    Channel 个数 = 全局 Record 限速 / 单 Channel Record 限速

    {
        "core": {
            "transport": {
                "channel": {
                    "speed": {
                        "record": 100
                    }
                }
            }
        },
        "job": {
            "setting": {
                "speed": {
                    "record": 500
                }
            },
            ...
        }
    }

            core.transport.channel.speed.record=100 , job.setting.speed.record=500, 所 以 配 置 全 局 Record 限速以及单 Channel Record 限速,Channel 个数 = 全局 Record 限速 / 单 Channel Record 限速=500/100=5 

    3、直接配置 Channel 个数

            只有在上面两种未设置才生效,上面两个同时设置是取值小的作为最终的 channel 数。

    {
        "job": {
            "setting": {
                "speed": {
                    "channel": 5
                }
            },
            ...
        }
    }

    直接配置 job.setting.speed.channel=5,所以 job 内 Channel 并发=5 个

    三、提高 JVM 堆内存

            当提升 DataX Job 内 Channel 并发数时,内存的占用会显著增加,因为 DataX 作为数据 交换通道,在内存中会缓存较多的数据。例如 Channel 中会有一个 Buffer,作为临时的数据 交换的缓冲区,而在部分 Reader 和 Writer 的中,也会存在一些 Buffer,为了防止 OOM 等错 误,调大 JVM 的堆内存。

            建议将内存设置为 4G 或者 8G,这个也可以根据实际情况来调整。

            调整 JVM xms xmx 参数的两种方式:一种是直接更改 datax.py 脚本;另一种是在启动 的时候,加上对应的参数,如下:

    python datax/bin/datax.py --jvm="-Xms8G -Xmx8G"  XXX.json

    参考源:

        https://github.com/alibaba/DataX     

  • 相关阅读:
    解锁Spring Boot数据映射新利器:深度探索MapperStruct
    保存save_data()数组有[]的解决办法
    花 1 万块做付费咨询,值得吗?
    聊聊视频中的编解码器,你所不知道的h264、h265、vp8、vp9和av1编解码库
    谁说.NET没有GC调优?只改一行代码就让程序不再占用内存
    李宏毅深度学习--《Unsupervised Learning:Neighbor Embedding》
    centos 安装和卸载 webmin
    电动汽车租赁平台【EV Mobility】申请875万美元纳斯达克IPO上市
    ideal 同一项目启动多实列
    LeetCode 1465. 切割后面积最大的蛋糕
  • 原文地址:https://blog.csdn.net/weixin_43549578/article/details/127783865