• pyarrow 实现hdfs和本地文件传输


    pyarrow 实现hdfs和本地文件传输

    python环境下支持好几种与hdfs文件同步接口,如:pyhdfs,hdfs,libhdfs,pyarrow,shell等。考虑到易用性以及本地环境(公司集群禁掉了http服务),本文介绍使用pyarrow 以一种优雅的方式进行本地文件与hdfs同步。

    1. 安装pyarrow

    下载完.whl文件后,通过 pip instal pyarrow-9.0.0-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl

    • 配置环境变量
      需要使用hadoop目录下的jar包。
    # 这里根据实际情况配置,在linux环境下
    export PATH=$JAVA_HOME/bin:$HBASE_HOME/bin:$HADOOP_HOME/bin:$HIVE_HOME/bin:$SPARK_HOME/bin:$SQOOP_HOME/bin:$PATH:${HOME}/.local/bin:$    {DISTCP_HOME}:$JAVA_HOME/jre/bin:$PATH
    export CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath --glob`
    export HADOOP_LIBEXEC_DIR=${HADOOP_HOME}/libexec
    
    
    • 1
    • 2
    • 3
    • 4
    • 5

    参考官网:

    export CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath --glob`
    # or on Windows
    %HADOOP_HOME%/bin/hadoop classpath --glob > %CLASSPATH%
    
    • 1
    • 2
    • 3

    2. 相关代码

    2.1. hdfs->本地

    HdfsPath,LocalPath分别为自定义的类,主要是实现fs.HadoopFileSystem(host="***", port=, user="***", kerb_ticket=None),fs.LocalFileSystem对象,
    下面2.1,2.3功能 示例较完整。简单的测试实例见2.3.

    def hdfs_to_local(hdfs_path: HdfsPath, local_path: LocalPath):
        """
        同步文件,这里有两种情况:
        1)若是单个文件,可以直接打开用文件流的方式写进入;
        2)若是目录,首先创建文件选择器,进而获取文件目录下的所有文件依次按照 法1) 方式传输。
        :param hdfs_path:
        :param local_path:
        :return:
        """
        hdfs = hdfs_path.get_hdfs()
        local = local_path.get_local()
        _logger.info(f"复制hdfs文件 {hdfs_path.path} 到本地{local_path.path}.")
        # hdfs 本地文件做规则化处理
        if hdfs_path.path.endswith('/'):
            hdfs_path.path = hdfs_path.path[:-1]
        if local_path.path.endswith('/'):
            local_path.path = local_path.path[:-1]
        len_hdfs_path = len(str(Path(hdfs_path.path).absolute().parent))
        file_list = []
        # 如果是文件,按照父目录传递
        if hdfs_path.isFile():
            file_list.append(hdfs.get_file_info(hdfs_path.path))
        elif hdfs_path.isDir():
            file_select = fs.FileSelector(hdfs_path.path, allow_not_found=True, recursive=True)
            file_list = hdfs.get_file_info(file_select)
    
        _logger.info(f"共有{len(file_list)}个文件需要复制...")
        for file in file_list:
            local_filename = local_path.path + file.path[len_hdfs_path:]
            # 判断是否为文件、文件夹
            if file.type.value == 3:
                local.create_dir(local_filename)
            elif file.type.value == 2:
                with hdfs.open_input_stream(file.path) as read_file, local.open_append_stream(
                        local_filename) as write_file:
                    write_file.write(read_file.readall())
            else:
                print("hdfs_file -> local_file 文件下载异常!")
                raise Exception(f"\n{file} 文件为空。\n")
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    2.2. 本地-> hdfs

    def local_to_hdfs(local_path: LocalPath, hdfs_path: HdfsPath):
        """
        同上述方法类似
        :param local_path:
        :param hdfs_path:
        :return:
        """
        _logger.info(f"复制本地 文件 {local_path.path} 到hdfs {hdfs_path.path}.")
        local = local_path.get_local()
        hdfs = hdfs_path.get_hdfs()
        # hdfs 本地文件做规则化处理
        if hdfs_path.path.endswith('/'):
            hdfs_path.path = hdfs_path.path[:-1]
        if local_path.path.endswith('/'):
            local_path.path = local_path.path[:-1]
    
        len_local_path = len(str(Path(local_path.path).absolute().parent))
        file_list = []
        # 如果是文件,按照父目录传递
        if local_path.isFile():
            file_list.append(local.get_file_info(local_path.path))
        elif local_path.isDir():
            file_select = fs.FileSelector(local_path.path, allow_not_found=True, recursive=True)
            file_list = local.get_file_info(file_select)
    
        _logger.info(f"共有{len(file_list)}个文件需要复制...")
        for file in file_list:
            hdfs_filename = hdfs_path.path + file.path[len_local_path:]
            if file.type.value == 3:
                hdfs.create_dir(hdfs_filename)
            elif file.type.value == 2:
                with local.open_input_stream(file.path) as read_file, hdfs.open_output_stream(
                        hdfs_filename) as write_file:
                    write_file.write(read_file.readall())
            else:
                print("local_file -> hdfs_file 文件下载异常!")
                raise Exception(f"\n{file} 文件为空。\n")
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    2.3. 一个简单的例子

    可用于复制单个文件。

    from abc import ABCMeta
    
    from pyarrow import fs
    from pyarrow.filesystem import FileSystem
    
    
    class FileOperation(FileSystem, metaclass=ABCMeta):
        def __init__(self, host='localhost', port=9000, user='hdp-loan-dataai'):
            self.host = host
            self.port = port
            self.user = user
            self.hdfs = fs.HadoopFileSystem(host=host, port=port, user=user)
            self.local = fs.LocalFileSystem()
    
        def hdfs2local(self, src_path, des_path):
            """
    
            :param src_path: hdfs 文件路径
            :param des_path: 本地文件路径
            :return:
            """
            filename = src_path.split('/')[-1]
            try:
                with self.hdfs.open_input_stream(src_path) as read_file, self.local.open_append_stream(
                        des_path + '/' + filename) as write_file:
                    write_file.write(read_file.readall())
            except Exception as e:
                print("hdfs_file -> local_file 文件下载异常!")
    
        
        def local2hdfs(self, src_path, des_path):
            """
    
            :param src_path:
            :param des_path:
            :return:
            """
            filename = src_path.split('/')[-1]
            # self.hdfs.create_dir(des_path + '/' + filename)
            with self.local.open_input_stream(src_path) as read_file, self.hdfs.open_output_stream(
                    des_path + '/' + filename) as write_file:
                write_file.write(read_file.readall())
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    参考文章

    pyarrow-官网下载
    pyarrow API官网

  • 相关阅读:
    【李宏毅机器学习2022】Task07 总结
    spring源码干货分享-对象创建详细解析(set注入和初始化)
    什么是物联网阀控水表?
    【leetcode】最长斐波那契数列
    SpringBoot结合keytool配置ssl双向认证通信
    企业电子招标采购系统源码Spring Boot + Mybatis + Redis + Layui + 前后端分离 构建企业电子招采平台之立项流程图
    STM32驱动MPU6050基于IIC协议
    java_函数式接口
    【目标检测】yolov7改进系列:添加CBAM注意力机制
    老卫带你学---深入理解Golang之context
  • 原文地址:https://blog.csdn.net/ZT7524/article/details/127897595