• 从Oracle迁移数据到Hadoop


    目标:把Oracle 下的数据文件迁移到 Hadoop , 数据表都是上亿条,每个数据表的空间都超过100G。

    关键是建立数据文件是UTF8格式,这样数据中的汉字在Hadoop 里显示才正常。

    平台:CentOS 6.10

    工具:Python 2.7 cx_Oracle 库

    Oracle10.2.0.4.0

    Hadoop 2.7.2-transwarp-5.2.1

    1. 查看表结构

    如果是分区表, 直接按分区导出 , 如果是非分区表,查看索引字段, 按索引字段分段导出。

    记录DDL 语句。

    2.编写Python 脚本。

    oralce_lib.py

    # coding:utf-8
    # 服务器上的Oracle接口
    # By 陈年椰子
    
    import cx_Oracle
    import socket
    import os
    import time
    
    os.environ['NLS_LANG'] = 'SIMPLIFIED CHINESE_CHINA.UTF8'
    
    
    # 输出日志
    def proc_log(log_info):
        t_now = time.strftime('%H:%M:%S', time.localtime(time.time()))
        log_str = t_now + log_info
        print(log_str)
        log_file = open('exp_work.log', 'a')
        log_file.write("{}
    ".format(log_str))
        log_file.close()
    
    # 连接XXX平台
    def Conncet_JZXZ():
        conn_dict = {'ip': '10.X.X.X', 'port': 1521, 'user': 'fx_user', 'pw': 'x123456x', 'sid': 'orcl'}
        return Connect(conn_dict)
    
    
    # 连接ORACLE
    def Connect(conn_dict):
        sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sk.settimeout(3)
        try:
            sk.connect((conn_dict['ip'], conn_dict['port']))
        except Exception as e:
            proc_log('Server({0}) port {1} connect Fail!'.format(conn_dict['ip'], conn_dict['port']))
            proc_log(repr(e))
            return ""
        sk.close()
        try:
            server_str = '{0}/{1}'.format(conn_dict['ip'], conn_dict['sid'])
            oracle_db = cx_Oracle.connect(conn_dict['user'], conn_dict['pw'], server_str)
            return oracle_db
        except Exception as e:
            proc_log("数据库Connect出错
    " + repr(e))
            return ""
    
    
    def pack_file(data_file):
        command = "gzip {}".format(data_file)
        if os.system(command) == 0:
            proc_log('	压缩{}成功'.format(data_file))
        else:
            proc_log('	压缩{}失败'.format(data_file))
    
    
    
    def exp_table_func_s(str_sql, file_name):
        proc_log("{} -> {}".format(str_sql, file_name))
        proc_log('	开始获取数据')
        try:
            cnxn = Conncet_JZXZ()
            cursor = cnxn.cursor()
            cursor.execute(str_sql)
            proc_log('	开始写入数据')
            w_i = 0
            file_w = file_name
            file_list = []
            file_i = 0
            for row in cursor:
                l_str = ""
                for x in row:
                    l_str = l_str + ',{}'.format('' if x is None else x)
                w_i = w_i + 1
                if w_i % 10000000 == 1:
                    if file_i > 0:
                        proc_log('	关闭文件 {}'.format(file_w))
                        f.close()
                        pack_file(file_w)
                    file_i = file_i + 1
                    file_w = "{}_{}.{}".format(file_name[:-4], file_i, file_name[-3:])
                    f = open(file_w, 'w')
                    file_list.append(file_w)
                    proc_log('	建立文件 {}'.format(file_w))
                if w_i % 500000 == 0:
                    proc_log("		写入第{}行".format(w_i))
                f.write("{}
    ".format(l_str[1:].replace("'", "").replace("None", "")))
            f.close()
            pack_file(file_w)
            proc_log('	写入数据完成,{}条'.format(w_i))
            cursor.close()
            cnxn.close()
            # for f in file_list:
    
        except Exception as e:
            proc_log("导出平台数据出错" + repr(e))
            return "fail"
        return "ok"
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100

    exp.py

    # coding=utf-8
    
    """
    模块:批量导出平台数据
    功能:
    开发人员:陈年椰子
    建立时间:2019/5/16
    最后修改:2019/5/22
    说明:
    
    """
    
    import oracle_lib as ol
    
    # 这里可以建立工作任务列表
    
    db_src = '''select  *  from TBL_XXXXXX  
    where  CLR_DATE  < to_date('20190401','YYYYMMDD')   
    AND  CLR_DATE  >= to_date('20190101','YYYYMMDD')  '''
    ol.exp_table_func_s(db_src, 'tbl_q1.txt')
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    3. 运行导出脚本

    直接用 python exp.py 或者用 nohup python exp.py & 挂到后台运行。

    数据文件会压缩成gz 文件

    4. 上传 hadoop 服务器

    用 scp 工具上传

    5. 上传到hadoop 目录

    为了省事,也把在hadoop 下建目录,解压,上传文件写了脚本。

    # coding:utf-8
    # code by 陈年椰子  hndm@qq.com
    
    
    import os
    import sys
    
    
    
    def run_cmd(cmd):
        p = os.popen(cmd)
        x = p.read()
        print(x)
        p.close()
    
    
    
    if len(sys.argv) < 3:
        print("使用方法: python hadoop_table.py table_name txt_file_head [new]")
    else:
    
        table_name = sys.argv[1]
        file_name = sys.argv[2]
        create_table = sys.argv[3] if len(sys.argv)==4 else ''
        if create_table == 'new':
            print('建立新目录:{}'.format(table_name))
        # 把youdir 替换成你的hadoop 用户目录名
        cmd_list = [
        'gunzip {}*.gz ./'.format(file_name),
        'hadoop fs -put {}*.txt /user/youdir/{}'.format(file_name, table_name),
        'hadoop fs -count -q -h /user/youdir/{}'.format(table_name),
        'hadoop dfs -ls /user/youdir/{}'.format(table_name)]
    
        if create_table == 'new':
            create_cmd = 'hadoop fs -mkdir {}'.format(table_name)
            run_cmd(create_cmd)
        for c in cmd_list:
            print('运行命令:{}'.format(c))
            run_cmd(c)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    运行脚本

    新建目录 用

    python hadoop_table.py 你的目录名 数据文件开头 new

    已有目录 追加用

    python hadoop_table.py 你的目录名 数据文件开头

    6. 到SQL 界面去建立文本数据外表。 建立语句用 ORACLE里的DDL ,加上外表文件路径即可。

    CREATE TABLE finance.T98_INT_ORG_APP_RELA_H 
       (	
    ....
    )
    ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
    LOCATION '/user/youdir/youtbldatadir';
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
  • 相关阅读:
    html结构
    杂记 | 使用gitlab-runner将python项目以docker镜像方式流水线部署到服务器(解决部署缓慢和时区不对的问题)
    git常用命令和开发常用场景
    Go核心编程(四) -- 内存模型
    计算机视觉基础(7)——相机基础
    UOS服务器操作系统搭建离线yum仓库
    React 全栈体系(十一)
    jenkins使用注意问题
    Docker 搭建 Nexus3 私服 | 基本操作
    【C语言】内存函数的详细教学和模拟实现
  • 原文地址:https://blog.csdn.net/web13985085406/article/details/126553589