• 数据同步工具DataX从Mysql同步数据到HDFS实战


    1. 查看数据同步模板

    我自己在下面的模板文件中添加了一些说明注释

    [root@bigdata001 datax]# bin/datax.py -r mysqlreader -w hdfswriter
    
    DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
    Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.
    
    
    Please refer to the mysqlreader document:
         https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md 
    
    Please refer to the hdfswriter document:
         https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md 
     
    Please save the following configuration as a json file and  use
         python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json 
    to run the job.
    
    {
        "job": {
            "content": [
                {
                    "reader": {
                        "name": "mysqlreader", 
                        "parameter": {
                            "column": [],                                             # 可以填写["*"]表示同步所有列。还支持["1", "2.3", "true", "'bazhen.csy'", "null", "upper('a')"], 分别表示整形、浮点数、布尔值、字符串、空指针,表达式
                            "connection": [
                                {
                                    "jdbcUrl": [],                                     # 支持多个连接地址。会依次进行连接测试,选择一个可用的进行查询数据
                                    "table": []                                         # 支持同步多个表。多个表必须schema相同
                                }
                            ], 
                            "password": "", 
                            "username": "", 
                            "where": "",
                            "splitPk": "",                                            # 一般是主键,只支持整形字段。先计算min(splitPk)、max(splitPk),再进行范围分区划分,将job划分成多个task。不指定则只有一个task
                            "querySql": ["select id, name from person where id < 10;"]    # 这个是我自己添加的。有了querySql会自动忽略column、table、where
                        }
                    }, 
                    "writer": {
                        "name": "hdfswriter", 
                        "parameter": {
                            "column": [],                                            # 必须和reader的列数量对应
                            "compress": "",                                       # 默认不填写,表示不压缩。text文件支持gzip、bzip2; orc文件支持NONE、SNAPPY
                            "defaultFS": "", 
                            "fieldDelimiter": "", 
                            "fileName": "", 
                            "fileType": "",                                          # 目前仅支持text和orc。其中orc需指定compress为SNAPPY
                            "path": "",                                                # 该路径必须存在
                            "writeMode": ""                                       # append:表示新建一个文件插入数据;nonConflict:有fileName为前缀的文件直接报错
                        }
                    }
                }
            ], 
            "setting": {
                "speed": {
                    "channel": ""
                }
            }
        }
    }
    [root@bigdata001 datax]#
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60

    2. 高可用HA的HDFS配置

    配置参考如下:

                            "defaultFS": "hdfs://192.168.8.111:9000", 
    						"hadoopConfig": {
    						    "dfs.nameservices": "nnha",
    						    "dfs.ha.namenodes.nnha": "nn1,nn2,nn3",
    						    "dfs.namenode.rpc-address.nnha.nn1": "192.168.8.111:9870",
    						    "dfs.namenode.rpc-address.nnha.nn2": "192.168.8.112:9870",
    						    "dfs.namenode.rpc-address.nnha.nn3": "192.168.8.113:9870",
    						    "dfs.client.failover.proxy.provider.nnha": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
    						},
                            "fieldDelimiter": "|"
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    3. MysqlReader针对Mysql类型转换说明

    DataX内部类型Mysql数据类型
    Longint, tinyint, smallint, mediumint, int, bigint
    Doublefloat, double, decimal
    Stringvarchar, char, tinytext, text, mediumtext, longtext, year
    Datedate, datetime, timestamp, time
    Booleanbit, bool
    Bytestinyblob, mediumblob, blob, longblob, varbinary

    4. HdfsWriter支持大部分Hive类型

    DataX内部类型HIVE数据类型
    LongTINYINT,SMALLINT,INT,BIGINT
    DoubleFLOAT,DOUBLE
    StringSTRING,VARCHAR,CHAR
    BooleanBOOLEAN
    DateDATE,TIMESTAMP

    5. Mysql准备数据如下

    mysql> create table person(
        -> id bigint,
        -> name varchar(64)
        -> );
    Query OK, 0 rows affected (0.06 sec)
    
    mysql> 
    mysql> insert into person(id, name) values(1, 'yi'), (2, 'er');
    Query OK, 2 rows affected (0.02 sec)
    Records: 2  Duplicates: 0  Warnings: 0
    
    mysql> 
    mysql> select * from person;
    +------+------+
    | id   | name |
    +------+------+
    |    1 | yi   |
    |    2 | er   |
    +------+------+
    2 rows in set (0.00 sec)
    
    mysql> 
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    6. 新建job/mysql2hdfs.json

    内容如下:

    [root@bigdata001 datax]# cat job/mysql2hdfs.json 
    {
        "job": {
            "content": [
                {
                    "reader": {
                        "name": "mysqlreader", 
                        "parameter": {
                            "column": ["id", "name"], 
                            "connection": [
                                {
                                    "jdbcUrl": ["jdbc:mysql://192.168.8.115:3306/test"], 
                                    "table": ["person"]
                                }
                            ], 
                            "password": "Root_123", 
                            "username": "root", 
                            "where": ""
                        }
                    }, 
                    "writer": {
                        "name": "hdfswriter", 
                        "parameter": {
                            "column": [{"name": "id", "type": "int"}, {"name": "name", "type": "string"}], 
                            "compress": "", 
                            "defaultFS": "hdfs://192.168.8.111:9000", 
                            "fieldDelimiter": "|", 
                            "fileName": "person.txt", 
                            "fileType": "text", 
                            "path": "/", 
                            "writeMode": "append"
                        }
                    }
                }
            ], 
            "setting": {
                "speed": {
                    "channel": "1"
                }
            }
        }
    }
    [root@bigdata001 datax]#
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    7. 执行job

    会先写入临时文件,如果成功,则将临时文件rename,再删除临时文件;如果失败,直接删除临时文件

    [root@bigdata001 datax]# bin/datax.py job/mysql2hdfs.json 
    
    DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
    ......省略部分......
    2022-06-14 10:16:33.551 [0-0-0-writer] INFO  HdfsWriter$Task - begin do write...
    2022-06-14 10:16:33.551 [0-0-0-writer] INFO  HdfsWriter$Task - write to file : [hdfs://192.168.8.111:9000/__af1d80fc_c721_4973_a54f_18d97902156f/person.txt__8191b9fa_361d_435d_ba34_7b361fffb07d]
    ......省略部分......
    2022-06-14 10:16:43.512 [job-0] INFO  HdfsWriter$Job - start rename file [hdfs://192.168.8.111:9000/__af1d80fc_c721_4973_a54f_18d97902156f/person.txt__8191b9fa_361d_435d_ba34_7b361fffb07d] to file [hdfs://192.168.8.111:9000/person.txt__8191b9fa_361d_435d_ba34_7b361fffb07d].
    ......省略部分......
    2022-06-14 10:16:43.915 [job-0] INFO  HdfsWriter$Job - start delete tmp dir [hdfs://192.168.8.111:9000/__af1d80fc_c721_4973_a54f_18d97902156f] .
    ......省略部分......
    2022-06-14 10:16:44.034 [job-0] INFO  JobContainer - 
    任务启动时刻                    : 2022-06-14 10:16:31
    任务结束时刻                    : 2022-06-14 10:16:44
    任务总计耗时                    :                 12s
    任务平均流量                    :                0B/s
    记录写入速度                    :              0rec/s
    读出记录总数                    :                   2
    读写失败总数                    :                   0
    
    [root@bigdata001 datax]#
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    8. 查看hdfs的文件

    会在该文件名后添加随机的后缀,作为每个线程写入的实际文件名

    [root@bigdata001 ~]# hadoop fs -cat /person.txt__8191b9fa_361d_435d_ba34_7b361fffb07d
    1|yi
    2|er
    [root@bigdata001 ~]#
    
    • 1
    • 2
    • 3
    • 4
  • 相关阅读:
    Day:动态规划 LeedCode 123.买卖股票的最佳时机III 188.买卖股票的最佳时机IV
    微服务:服务拆分和远程调用
    【Python抽奖系统】好消息:史上最强商场抽奖活动来啦,超优惠,攻略快拿好啦~(超牛)
    Java系列 超简单说人话的 异常类详细讲解 Exception MyExcepyion try catch语句
    数据库设计
    高通camera Tuning常见面试题
    unity学习 -- 游戏资源导入
    Visual Studio C++项目的头文件搜索顺序
    App测试中ios和Android的区别
    对于volatile的看法
  • 原文地址:https://blog.csdn.net/yy8623977/article/details/125272844