• DataX: Ⅱ


    序言

    这里使用的是master分支,因为官网上并没有release分支,所以先用master分支吧,可能会有问题cuiyaonan2000@163.com

    参考资料:

    1. https://github.com/alibaba/DataX
    2. https://github.com/alibaba/DataX/blob/master/introduction.md    --插件说明文档
    3. https://github.com/alibaba/DataX/blob/master/dataxPluginDev.md

    源码打包

    1. 首先下载 GitHub - alibaba/DataX: DataX是阿里云DataWorks数据集成的开源版本。代码
    2. 首先如果是JDK17则会报错,后来选择JDK1.8
    3. Datax的运行依赖于python所以需要安装python2或者python3,centos7自带的有python2.7.5
    4. 然后打包生成可执行的文件 mvn -U clean package assembly:assembly -Dmaven.test.skip=true
    5. 成功后在根目录下的target中有相关的打包结果,如果包含所有Reader和Writer则打包会慢一点,但是还是有必要的

    执行命令

    datax的bin目录下 

    1. python datax.py -r {YOUR_READER} -w {YOUR_WRITER}   该命令是显示对应的json模板,也可以直接从source或者reader的文档中查看
    2. python datax.py json文件   该命令就是执行对应的json文件

    用例:Stream To Stream 

    1. {
    2. "job": {
    3. "content": [
    4. {
    5. "reader": {
    6. "name": "streamreader",
    7. "parameter": {
    8. "sliceRecordCount": 10,
    9. "column": [
    10. {
    11. "type": "long",
    12. "value": "10"
    13. },
    14. {
    15. "type": "string",
    16. "value": "hello,你好,世界-DataX"
    17. }
    18. ]
    19. }
    20. },
    21. "writer": {
    22. "name": "streamwriter",
    23. "parameter": {
    24. "encoding": "UTF-8",
    25. "print": true
    26. }
    27. }
    28. }
    29. ],
    30. "setting": {
    31. "speed": {
    32. "channel": 5
    33. }
    34. }
    35. }
    36. }

    执行结果

    MysqlReader To Stream 

    通过命令python datax.py -r mysqlreader -w streamwriter 查看相关的模板为

    1. DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
    2. Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.
    3. Please refer to the mysqlreader document:
    4. https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md
    5. Please refer to the streamwriter document:
    6. https://github.com/alibaba/DataX/blob/master/streamwriter/doc/streamwriter.md
    7. Please save the following configuration as a json file and use
    8. python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json
    9. to run the job.
    10. {
    11. "job": {
    12. "content": [
    13. {
    14. "reader": {
    15. "name": "mysqlreader",
    16. "parameter": {
    17. "column": [],
    18. "connection": [
    19. {
    20. "jdbcUrl": [],
    21. "table": []
    22. }
    23. ],
    24. "password": "",
    25. "username": "",
    26. "where": ""
    27. }
    28. },
    29. "writer": {
    30. "name": "streamwriter",
    31. "parameter": {
    32. "encoding": "",
    33. "print": true
    34. }
    35. }
    36. }
    37. ],
    38. "setting": {
    39. "speed": {
    40. "channel": ""
    41. }
    42. }
    43. }
    44. }

    然后编辑该json

    1. {
    2. "job": {
    3. "content": [
    4. {
    5. "reader": {
    6. "name": "mysqlreader",
    7. "parameter": {
    8. "column": ["Name","GroupName"],
    9. "connection": [
    10. {
    11. "jdbcUrl": ["jdbc:mysql://192.168.137.2:3306/test"],
    12. "table": ["employee"]
    13. }
    14. ],
    15. "password": "root",
    16. "username": "root"
    17. }
    18. },
    19. "writer": {
    20. "name": "streamwriter",
    21. "parameter": {
    22. "encoding": "",
    23. "print": true
    24. }
    25. }
    26. }
    27. ],
    28. "setting": {
    29. "speed": {
    30. "channel": "1"
    31. }
    32. }
    33. }
    34. }

    自定义插件

    从设计之初,DataX就把异构数据源同步作为自身的使命,为了应对不同数据源的差异、同时提供一致的同步原语和扩展能力,DataX自然而然地采用了框架 + 插件 的模式:

    • 插件只需关心数据的读取或者写入本身。
    • 而同步的共性问题,比如:类型转换、性能、统计,则交由框架来处理。

    作为插件开发人员,则需要关注两个问题----自定义插件要考虑的点感觉很简单啊cuiyaonan2000@163.com

    1. 数据源本身的读写数据正确性。
    2. 如何与框架沟通、合理正确地使用框架。

    逻辑执行模型

    插件开发者不用关心太多,基本只需要关注特定系统读和写,以及自己的代码在逻辑上是怎样被执行的,哪一个方法是在什么时候被调用的。在此之前,需要明确以下概念:

    • JobJob是DataX用以描述从一个源头到一个目的端的同步作业,是DataX数据同步的最小业务单元。比如:从一张mysql的表同步到odps的一个表的特定分区。
    • TaskTask是为最大化而把Job拆分得到的最小执行单元。比如:读一张有1024个分表的mysql分库分表的Job,拆分成1024个读Task,用若干个并发执行。
    • TaskGroup: 描述的是一组Task集合。在同一个TaskGroupContainer执行下的Task集合称之为TaskGroup
    • JobContainerJob执行器,负责Job全局拆分、调度、前置语句和后置语句等工作的工作单元。类似Yarn中的JobTracker
    • TaskGroupContainerTaskGroup执行器,负责执行一组Task的工作单元,类似Yarn中的TaskTracker。

    简而言之, Job拆分成Task,在分别在框架提供的容器中执行,插件只需要实现JobTask两部分逻辑

    物理执行模型

    框架为插件提供物理上的执行能力(线程)。DataX框架有三种运行模式:

    • Standalone: 单进程运行,没有外部依赖。
    • Local: 单进程运行,统计信息、错误信息汇报到集中存储。
    • Distrubuted: 分布式多进程运行,依赖DataX Service服务。

    当然,上述三种模式对插件的编写而言没有什么区别,你只需要避开一些小错误,插件就能够在单机/分布式之间无缝切换了。 JobContainerTaskGroupContainer运行在同一个进程内时,就是单机模式(StandaloneLocal);当它们分布在不同的进程中执行时,就是分布式(Distributed)模式。

    编程接口

    那么,JobTask的逻辑应是怎么对应到具体的代码中的?

    首先,插件的入口类必须扩展ReaderWriter抽象类,并且实现分别实现JobTask两个内部抽象类,JobTask的实现必须是 内部类 的形式,原因见 加载原理 一节。

    以Reader为例:

    1. public class SomeReader extends Reader {
    2. public static class Job extends Reader.Job {
    3. @Override
    4. public void init() {
    5. }
    6. @Override
    7. public void prepare() {
    8. }
    9. @Override
    10. public List split(int adviceNumber) {
    11. return null;
    12. }
    13. @Override
    14. public void post() {
    15. }
    16. @Override
    17. public void destroy() {
    18. }
    19. }
    20. public static class Task extends Reader.Task {
    21. @Override
    22. public void init() {
    23. }
    24. @Override
    25. public void prepare() {
    26. }
    27. @Override
    28. public void startRead(RecordSender recordSender) {
    29. }
    30. @Override
    31. public void post() {
    32. }
    33. @Override
    34. public void destroy() {
    35. }
    36. }
    37. }

    Job接口功能如下:

    • init: Job对象初始化工作,此时可以通过super.getPluginJobConf()获取与本插件相关的配置。读插件获得配置中reader部分,写插件获得writer部分-----获取插件配置信息cuiyaonan2000@163.com
    • prepare: 全局准备工作,比如odpswriter清空目标表。
    • split: 拆分Task。参数adviceNumber框架建议的拆分数,一般是运行时所配置的并发度。值返回的是Task的配置列表。
    • post: 全局的后置工作,比如mysqlwriter同步完影子表后的rename操作。
    • destroy: Job对象自身的销毁工作。

    Task接口功能如下:

    • init:Task对象的初始化。此时可以通过super.getPluginJobConf()获取与本Task相关的配置。这里的配置是Jobsplit方法返回的配置列表中的其中一个。
    • prepare:局部的准备工作。
    • startRead: 从数据源读数据,写入到RecordSender中。RecordSender会把数据写入连接Reader和Writer的缓存队列。
    • startWrite:从RecordReceiver中读取数据,写入目标数据源。RecordReceiver中的数据来自Reader和Writer之间的缓存队列。
    • post: 局部的后置工作。
    • destroy: Task象自身的销毁工作。

    需要注意的是:

    • JobTask之间一定不能有共享变量,因为分布式运行时不能保证共享变量会被正确初始化。两者之间只能通过配置文件进行依赖。
    • preparepostJobTask中都存在,插件需要根据实际情况确定在什么地方执行操作。

    插件定义

    码写好了,有没有想过框架是怎么找到插件的入口类的?框架是如何加载插件的呢?

    在每个插件的项目中,都有一个plugin.json文件,这个文件定义了插件的相关信息,包括入口类。例如:

    {
        "name": "mysqlwriter",
        "class": "com.alibaba.datax.plugin.writer.mysqlwriter.MysqlWriter",
        "description": "Use Jdbc connect to database, execute insert sql.",
        "developer": "alibaba"
    }

    • name: 插件名称,大小写敏感。框架根据用户在配置文件中指定的名称来搜寻插件。 十分重要 。
    • class: 入口类的全限定名称,框架通过反射插件入口类的实例。十分重要 。
    • description: 描述信息。
    • developer: 开发人员。

    例如:

    插件打包

    打包后的位置跟目录其它插件一样,举例来说:

    插件目录分为readerwriter子目录,读写插件分别存放。插件目录规范如下:

    • ${PLUGIN_HOME}/libs: 插件的依赖库。
    • ${PLUGIN_HOME}/plugin-name-version.jar: 插件本身的jar。
    • ${PLUGIN_HOME}/plugin.json: 插件描述文件。

    尽管框架加载插件时,会把${PLUGIN_HOME}下所有的jar放到classpath,但还是推荐依赖库的jar和插件本身的jar分开存放

    插件配置文件设计

    DataX使用json作为配置文件的格式。一个典型的DataX任务配置如下:

    1. {
    2. "job": {
    3. "content": [
    4. {
    5. "reader": {
    6. "name": "odpsreader",
    7. "parameter": {
    8. "accessKey": "",
    9. "accessId": "",
    10. "column": [""],
    11. "isCompress": "",
    12. "odpsServer": "",
    13. "partition": [
    14. ""
    15. ],
    16. "project": "",
    17. "table": "",
    18. "tunnelServer": ""
    19. }
    20. },
    21. "writer": {
    22. "name": "oraclewriter",
    23. "parameter": {
    24. "username": "",
    25. "password": "",
    26. "column": ["*"],
    27. "connection": [
    28. {
    29. "jdbcUrl": "",
    30. "table": [
    31. ""
    32. ]
    33. }
    34. ]
    35. }
    36. }
    37. }
    38. ]
    39. }
    40. }

    DataX框架有core.json配置文件,指定了框架的默认行为。任务的配置里头可以指定框架中已经存在的配置项,而且具有更高的优先级,会覆盖core.json中的默认值。

    配置中job.content.reader.parameter的value部分会传给Reader.Jobjob.content.writer.parameter的value部分会传给Writer.Job ,Reader.JobWriter.Job可以通过super.getPluginJobConf()来获取。----------就是我们自定义插件也要满足core.json的整体格式,在规定的key下编辑自己的插件属性cuiyaonan2000@163.com

    DataX框架支持对特定的配置项进行RSA加密,例子中以*开头的项目便是加密后的值。 配置项加密解密过程对插件是透明,插件仍然以不带*的key来查询配置和操作配置项 。-------这个可定时有用的cuiyaonan2000@163.com

    工具Configuration

    为了简化对json的操作,DataX提供了简单的DSL配合Configuration类使用。---提供了一个解析Json的工具类

    Configuration提供了常见的get带类型get带默认值getset等读写配置项的操作,以及clonetoJSON等方法。配置项读写操作都需要传入一个path做为参数,这个path就是DataX定义的DSL。语法有两条:

    1. 子map用.key表示,path的第一个点省略。
    2. 数组元素用[index]表示。

    比如操作如下json:

    {
      "a": {
        "b": {
          "c": 2
        },
        "f": [
          1,
          2,
          {
            "g": true,
            "h": false
          },
          4
        ]
      },
      "x": 4
    }

    比如调用configuration.get(path)方法,当path为如下值的时候得到的结果为:

    • x4
    • a.b.c2
    • a.b.c.dnull
    • a.b.f[0]1
    • a.b.f[2].gtrue

    注意,因为插件看到的配置只是整个配置的一部分。使用Configuration对象时,需要注意当前的根路径是什么。

    更多Configuration的操作请参考ConfigurationTest.java

    Channel

    跟一般的生产者-消费者模式一样,Reader插件和Writer插件之间也是通过channel来实现数据的传输的。channel可以是内存的,也可能是持久化的,插件不必关心。插件通过RecordSenderchannel写入数据通过RecordReceiverchannel读取数据

    channel中的一条数据为一个Record的对象,Record中可以放多个Column对象,这可以简单理解为数据库中的记录和列。

    Record有如下方法(感觉最外层是个数组cuiyaonan2000@163.com):

    1. public interface Record {
    2. // 加入一个列,放在最后的位置
    3. void addColumn(Column column);
    4. // 在指定下标处放置一个列
    5. void setColumn(int i, final Column column);
    6. // 获取一个列
    7. Column getColumn(int i);
    8. // 转换为json String
    9. String toString();
    10. // 获取总列数
    11. int getColumnNumber();
    12. // 计算整条记录在内存中占用的字节数
    13. int getByteSize();
    14. }
    1. 因为Record是一个接口,Reader插件首先调用RecordSender.createRecord()创建一个Record实例,然后把Column一个个添加到Record中。
    2. Writer插件调用RecordReceiver.getFromReader()方法获取Record,然后把Column遍历出来,写入目标存储中。当Reader尚未退出,传输还在进行时,如果暂时没有数据RecordReceiver.getFromReader()方法会阻塞直到有数据。如果传输已经结束,会返回nullWriter插件可以据此判断是否结束startWrite方法。-----------结束标致太够模糊了,没有固定的标识么cuiyaonan2000@163.com

    Column的构造和操作,我们在《类型转换》一节介绍。

    类型转换

    为了规范源端和目的端类型转换操作,保证数据不失真,DataX支持六种内部数据类型:

    • Long:定点数(Int、Short、Long、BigInteger等)。
    • Double:浮点数(Float、Double、BigDecimal(无限精度)等)。
    • String:字符串类型,底层不限长,使用通用字符集(Unicode)。
    • Date:日期类型。
    • Bool:布尔值。
    • Bytes:二进制,可以存放诸如MP3等非结构化数据。

    对应地,有DateColumnLongColumnDoubleColumnBytesColumnStringColumnBoolColumn六种Column的实现。

    Column除了提供数据相关的方法外,还提供一系列以as开头的数据类型转换转换方法。

    DataX的内部类型在实现上会选用不同的java类型:

    内部类型实现类型备注
    Datejava.util.Date
    Longjava.math.BigInteger使用无限精度的大整数,保证不失真
    Doublejava.lang.String用String表示,保证不失真
    Bytesbyte[]
    Stringjava.lang.String
    Booljava.lang.Boolean

    类型之间相互转换的关系如下:

    from\toDateLongDoubleBytesStringBool
    Date-使用毫秒时间戳不支持不支持使用系统配置的date/time/datetime格式转换不支持
    Long作为毫秒时间戳构造Date-BigInteger转为BigDecimal,然后BigDecimal.doubleValue()不支持BigInteger.toString()0为false,否则true
    Double不支持内部String构造BigDecimal,然后BigDecimal.longValue()-不支持直接返回内部String
    Bytes不支持不支持不支持-按照common.column.encoding配置的编码转换为String,默认utf-8不支持
    String按照配置的date/time/datetime/extra格式解析用String构造BigDecimal,然后取longValue()用String构造BigDecimal,然后取doubleValue(),会正确处理NaN/Infinity/-Infinity按照common.column.encoding配置的编码转换为byte[],默认utf-8-"true"为true, "false"为false,大小写不敏感。其他字符串不支持
    Bool不支持true1L,否则0Ltrue1.0,否则0.0不支持-

    脏数据处理

    目前主要有三类脏数据:

    1. Reader读到不支持的类型、不合法的值。
    2. 不支持的类型转换,比如:Bytes转换为Date
    3. 写入目标端失败,比如:写mysql整型长度超长。

    Reader.TaskWriter.Task中,通过AbstractTaskPlugin.getTaskPluginCollector()可以拿到一个TaskPluginCollector,它提供了一系列collectDirtyRecord的方法。当脏数据出现时,只需要调用合适的collectDirtyRecord方法,把被认为是脏数据的Record传入即可。

    用户可以在任务的配置中指定脏数据限制条数或者百分比限制,当脏数据超出限制时,框架会结束同步任务,退出。插件需要保证脏数据都被收集到,其他工作交给框架就好

  • 相关阅读:
    Kaggle - LLM Science Exam(二):Open Book QA&debertav3-large详解
    算法思想之回溯法
    算法刷题:位运算及其他拓展
    怎么样学习pmp知识?
    更健康舒适更科技的照明体验!书客SKY护眼台灯SUKER L1上手体验
    统一缓存库jetcache和SpringBoot整合
    【GD32F427开发板试用】+demo的正确打开方式(一)
    Python下载及环境的安装
    Redis:分布式锁误删原因分析
    Linux 进程信号深剖
  • 原文地址:https://blog.csdn.net/cuiyaonan2000/article/details/133297849