• 实现HBase表和RDB表的转化(附Java源码资源)


    实现HBase表和RDB表的转化

    在这里插入图片描述
    在这里插入图片描述

    一、引入

    转化为HBase表的三大来源:RDB Table、Client API、Files

    在这里插入图片描述
    如何构造通用性的代码模板实现向HBase表的转换,是一个值得考虑的问题。这篇文章着重讲解RDB表向HBase表的转换。

    首先,我们需要分别构造rdb和hbase的对象,根据批处理的思想,我们可以考虑批量将rdb中的数据导出,并且转化为List的格式,直接导入HBase表中,最后释放资源,伪代码模板如下:

    rdb=...
    hbase=...
    rdb.init();
    hbase.init();
    while(rdb.hasNextBatch()){
    	List<Put> batch = rdb.nextBatch();
    	hbase.putBatch(batch);
    }
    hbase.close();
    rdb.close();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    二、代码讲解

    1. 目录结构

    在这里插入图片描述

    2. 具体实现
    • transfer.properties
      在这里插入图片描述

    内含HBase和RDB转换所有配置信息的配置文件,因为该配置文件是在启动时就需要进行配置,因此我们需要按以下图片进行配置导入配置文件:
    在这里插入图片描述

    1. Run/Debug Configurations中,新建一个Application
    2. 配置好主类
    3. 配置好配置文件的具体路径
    • RDB 接口
    public interface RDB extends Com {
        // 要提升性能,需要使用批处理
        boolean hasNextBatch() throws SQLException;// 是否存在下一个批次
        List<Put> nextBatch() throws SQLException;// 一个put代表往一个hbase表的一行的一个列族的一个列插入一条数据,对Hbase来说,批次就是List
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • RDB 实现类
    public class RDBImpl implements RDB {
        private static Logger logger = Logger.getLogger(RDBImpl.class);
        // JDBC 的基本元素:连接对象(装载[驱动]、[URL]、[账号]、[密码])->执行对象(SQL语句)->结果集
        private Properties config;
        /**
         * 它们需要设置成全局变量的原因是它们需要共享
         */
        private Connection con;
        private PreparedStatement pst;
        private ResultSet rst;
        // 定义每个批次处理的记录数的最大数量
        private int batchSize;
        // hbase的行键对应rdb的列的列名
        private String hbaseRowKeyRdbCol;
        private Map<String,Map<String,String>> hbaseRdbColMapping;
    
        // RDB配置可以灵活地从外部传入(构造方法),从内部读取(config())
        public RDBImpl(Properties config) {
            this.config = config;
        }
    
        @Override
        public Properties config() {
            return config;
        }
    
        /**
         * 内部资源初始化
         */
        @Override
        public void init() throws Exception{
            con = getConnection();
            logger.info("RDB 创建 [ 连接 ] 对象成功");
            pst = getStatement(con);
            logger.info("RDB 创建 [ 执行 ] 对象成功");
            rst = getResult(pst);
            logger.info("RDB 创建 [ 结果集 ] 成功");
            batchSize = batchSize();
            hbaseRdbColMapping = hbaseRdbColumnsMapping();
        }
    
        @Override
        public void close() {
            closeAll(rst,pst,con);
        }
    
    
        private String driver(){
            return checkAndGetConfig("rdb.driver");
        }
    
        private String url(){
            return checkAndGetConfig("rdb.url");
        }
    
        private String username(){
            return checkAndGetConfig("rdb.username");
        }
    
        private String password(){
            return checkAndGetConfig("rdb.password");
        }
    
        private String sql(){
            return checkAndGetConfig("rdb.sql");
        }
    
        private int batchSize(){
            return Integer.parseInt(checkAndGetConfig("rdb.batchSize"));
        }
    
        // java.sql下的Connection
        private Connection getConnection() throws ClassNotFoundException, SQLException {
            // 装载驱动
            Class.forName(driver());
            // 获取并返回连接对象
            return DriverManager.getConnection(url(),username(),password());
        }
        private PreparedStatement getStatement(Connection con) throws SQLException {
            return con.prepareStatement(sql());
        }
        private ResultSet getResult(PreparedStatement statement) throws SQLException {
            return statement.executeQuery();
        }
        /**
         * hbase 列族和列与rdb中列的映射关系
         *             hbase列族   hbase列  rdb列
         * @return Map>
         */
        private Map<String, Map<String,String>> hbaseRdbColumnsMapping(){
            String mapping = checkAndGetConfig("rdb.hbase.columns.mapping");
            Map<String,Map<String,String>> map = new HashMap<>();
            String[] pss = mapping.split(",");
            for(String ps : pss){
                String[] pp = ps.split("->");
                String[] p = pp[0].split(":");
                String rdbCol = pp[1],hbaseColFamily,hbaseColName;
                if(p.length==1){
                    hbaseRowKeyRdbCol = pp[1];
                }else {
                    hbaseColFamily = p[0];
                    hbaseColName = p[1];
                    if(!map.containsKey(hbaseColFamily)){
                        map.put(hbaseColFamily,new HashMap<>());
                    }
                    map.get(hbaseColFamily).put(hbaseColName,rdbCol);
                }
            }
            return map;
        }
    
        /**
         * 将RDB的列转化为字节数组(需要确定列的数据类型)
         * @param rdbColumn
         * @return
         * @throws SQLException
         */
    
        private byte[] toBytesFromRdb(String rdbColumn) throws SQLException {
            Object obj = rst.getObject(rdbColumn);
            if(obj instanceof String){
                return Bytes.toBytes((String)obj);
            } else if(obj instanceof Float){
                return Bytes.toBytes(((Float)obj).floatValue());
            } else if(obj instanceof Double){
                return Bytes.toBytes(((Double)obj).doubleValue());
            } else if(obj instanceof BigDecimal){
                return Bytes.toBytes((BigDecimal)obj);
            } else if(obj instanceof Short){
                return Bytes.toBytes(((Short) obj).shortValue());
            } else if(obj instanceof Integer){
                return Bytes.toBytes(((Integer)obj).intValue());
            } else if(obj instanceof Boolean){
                return Bytes.toBytes((Boolean)((Boolean) obj).booleanValue());
            } else {
                throw new SQLException("HBase不支持转化为字节数组的类型:"+obj.getClass().getName());
            }
        }
    
        /**
         * 将HBase的列名或列族名转化为字节数组
         * @param name
         * @return
         */
        private byte[] toBytes(String name){
            return Bytes.toBytes(name);
        }
    
        // 最后一个批次的数据最少有一条
        @Override
        public boolean hasNextBatch() throws SQLException{
            return rst.next();
        }
    
        @Override
        public List<Put> nextBatch() throws SQLException{
            // 预先分配容量
            List<Put> list = new ArrayList<>(batchSize);
            int count = 0;
            do{
                /**
                 * 如何将一行解析为多个put(结合配置文件)
                 * 对每条数据,创建一个带行键的put,向put中放入HBase列族名,HBase列名,RDB列名
                 */
                Put put = new Put(toBytesFromRdb(hbaseRowKeyRdbCol));
                for (Map.Entry<String, Map<String, String>> e : hbaseRdbColMapping.entrySet()) {
                    String columnFamily = e.getKey();
                    for (Map.Entry<String, String> s : e.getValue().entrySet()) {
                        String hbaseColumn = s.getKey();
                        String rdbColumn = s.getValue();
                        // 需要将内容转变为字节数组传入方法
                        put.addColumn(toBytes(columnFamily),toBytes(hbaseColumn),toBytesFromRdb(rdbColumn));
                    }
                }
                list.add(put);
            }while(++count<batchSize && rst.next());
            return list;
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180

    如何理解一行转化为多个put?
    在这里插入图片描述
    结果集的实质?
    在这里插入图片描述
    rst.next() 的两个作用

    rst.next();
    // 1.判定是否存在下一个有效行
    // 2.若存在下一个有效行,则指向该有效行
    
    • 1
    • 2
    • 3

    a. 只通过config作为参数构造rdb
    b. 以JDBC为核心,需要连接对象(驱动,URL,账号,密码)=>执行对象(SQL)=>结果集,这些都需要被设计为全局变量(因为需要被共享)
    c. 既实现了RDB接口,还实现了RDB的继承接口Com中的init()、close()进行资源的初始化和释放,checkAndGetConfig()根据传入的配置文件获取配置信息并且赋值给全局变量。
    d. 重点:我们还需要对RDB和HBase的映射关系进行解析,最终解析出RDB列名,HBase列族名,HBase列名,具体如何解析参考配置文件transfer.properties,并将解析出来的名字构造成一个Put对象,由于构造Put对象只能放字节数组,所以需要转化为字节数组的方法,又因为解析RDB的列名需要考虑列的数据类型,而解析HBase的列族或列名不需要考虑,因此需要有两个转换方法==ToBytesFromRDB()和ToBytes()==分别实现两种情况的字节数组转化。

    • HBase接口
    public interface HBase extends Com {
        // RDBImpl的nextBatch()返回的就是List,直接放入HBase表即可。
        void putBatch(List<Put> batch) throws IOException;
    }
    
    • 1
    • 2
    • 3
    • 4
    • HBase实现类
    public class HBaseImpl implements HBase {
        private static Logger loggerHBase = Logger.getLogger(HBaseImpl.class);
        private Properties config;
        private Connection con;
        private Table hbaseTable;
    
    
        public HBaseImpl(Properties config) {
            this.config = config;
        }
    
        @Override
        public Properties config() {
            return config;
        }
    
        @Override
        public void init() throws Exception {
            con = getCon();
            loggerHBase.info("HBase 创建 [ 连接 ] 成功");
            hbaseTable = checkAndGetTable(con);
            loggerHBase.info("HBase 创建 [ 数据表 ] 成功");
        }
    
        @Override
        public void close() {
            closeAll(hbaseTable,con);
        }
    
        private String tableName(){
            return checkAndGetConfig("hbase.table.name");
        }
        private String zkUrl(){
            return checkAndGetConfig("hbase.zk");
        }
    
        private Connection getCon() throws IOException {
            // hadoop.conf的configuration
            Configuration config = HBaseConfiguration.create();
            config.set("hbase.zookeeper.quorum",zkUrl());
            return ConnectionFactory.createConnection(config);
        }
    
        private Table checkAndGetTable(Connection con) throws IOException {
            /**
             * Admin : HBase DDL
             */
            Admin admin = con.getAdmin();
            TableName tableName = TableName.valueOf(tableName());
            // 通过tableName判定表是否存在
            if(!admin.tableExists(tableName)){
                throw new IOException("HBase表不存在异常:"+tableName);
            }
            /**
             * Table : HBase DML & DQL
             */
            // 传入的参数可以是TableName tableName,ExecutorService pool(表操作可以并发)
            return con.getTable(tableName);
        }
    
        @Override
        public void putBatch(List<Put> batch) throws IOException{
            hbaseTable.put(batch);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65

    HBase的实现类和RDB的实现类也非常类似:
    先重写HBase接口中的方法和Com接口中的方法,发现往里放数据需要构造一个Table对象,而Table对象的构建需要一个连接对象和TableName,因此在构造了两个方法tableName()获取配置信息中的TableName(注意:此时的TableName是字符串类型),zkUrl()获取zk.url作为配置构造连接对象。

    • Com接口
    public interface Com {
        Logger logger = Logger.getLogger(Com.class);
        // 获取配置对象
        Properties config();
    
        // 初始化资源
        void init() throws Exception;
    
        // 释放资源
        void close();
    
        default String checkAndGetConfig(String key){
            if(!config().containsKey(key)){
                // 因为该方法可能被用于HBase和RDB
                throw new RuntimeException("配置项缺失异常:"+key);
            }
            String item = config().getProperty(key);
            logger.info(String.format("获取配置项 %s : %s",key,item));
            return item;
        }
    
        default void closeAll(AutoCloseable...acs){
            for (AutoCloseable ac : acs) {
                if (Objects.nonNull(ac)) {
                    try {
                        ac.close();
                        logger.info(String.format("释放 %s 成功",ac.getClass().getName()));
                    } catch (Exception e) {
                        logger.error("释放资源异常:"+e);
                    }
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    在Com接口中,设计了一些普通方法config()实现配置的导出,init()、close()资源的初始化和关闭;同样还设计了一些无需实现的默认方法便于实现init()和close()方法。这些方法适用于RDB和HBase的实现类。

    • RDBToHBase接口
    public interface RDBToHBase {
        // 创建一个RDB对象
        void setRDB(RDB rdb);
        // 创建一个HBase对象
        void setHBase(HBase hbase);
        // 进行数据的传输
        void startTransfer();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • RDBToHBase实现类
    public class RDBToHBaseImpl implements RDBToHBase {
        // 日志显示
        private static Logger loggerRH = Logger.getLogger(RDBToHBaseImpl.class);
        private RDB rdb;
        private HBase hbase;
    
        @Override
        public void setRDB(RDB rdb) {
            this.rdb = rdb;
        }
    
        @Override
        public void setHBase(HBase hbase) {
            this.hbase = hbase;
        }
    
        @Override
        public void startTransfer() {
            try {
                rdb.init();
                loggerRH.info("RDB 初始化成功");
                hbase.init();
                loggerRH.info("HBase 初始化成功");
                loggerRH.info("数据从 RDB 迁移至 HBase 开始...");
                int count = 0;
                while (rdb.hasNextBatch()) {
                    final List<Put> batch = rdb.nextBatch();
                    hbase.putBatch(batch);
                    loggerRH.info(String.format("第 %d 批:%d 条数据插入成功",++count,batch.size()));
                }
                loggerRH.info("数据从 RDB 迁移至 HBase 结束...");
            } catch (Exception e){
                loggerRH.error("将 RDB 数据批量迁移至 HBase 异常",e);
            } finally{
                hbase.close();
                rdb.close();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • AppRDBToHBase 实现类
    public class AppRDBToHBase
    {
        private static Logger logger = Logger.getLogger(AppRDBToHBase.class);
        private static void start(String[] args){
            try {
                if (Objects.isNull(args) || args.length == 0 || Objects.isNull(args[0])) {
                    throw new NullPointerException("配置文件路径空指针异常");
                }
                final String PATH = args[0];
                final File file = new File(PATH);
                if (!file.exists() || file.length() == 0 || !file.canRead()) {
                    throw new IOException("配置文件不存在、不可读、空白");
                }
                Properties config = new Properties();
                // final String path = args[0];
                config.load(new FileReader(file));
    
                RDB rdb = new RDBImpl(config);
                HBase hBase = new HBaseImpl(config);
                RDBToHBase rdbToHBase = new RDBToHBaseImpl();
                rdbToHBase.setRDB(rdb);
                rdbToHBase.setHBase(hBase);
                rdbToHBase.startTransfer();
            }catch(Exception e){
                logger.error("配置异常",e);
            }
        }
        public static void main( String[] args ) {
            start(args);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    对于传入的配置文件路径,既要检查路径本身,也要检查路径代表的文件本身。
    通过流的方式将文件进行配置,并且利用该配置构造RDB和HBase并进行数据的传输

    其他:日志文件系统Log.4j的应用
    • 准备:需要在Resources模块下配置log4j.properties文件
    • 注意:
      • 日志文件信息的输出方式有三种logger.error()、logger.info()、logger.warn() ,除了对错误信息进行输出之外,也要习惯于补充正常信息的输出,以增强代码的可读性。
      • log.4j除了在控制台打印日志信息之外,还能在磁盘下的日志文件中打印日志信息,因此在导入log4j.properties文件之后需要修改日志文件的路径。
      • 对于不同类或接口下的logger,需要注意进行名字的区分。
  • 相关阅读:
    实际工作项目中搭配git托管代码的流程
    华泰证券:京东营收增长或短期承压
    外链建设如何进行?
    PostgreSQL教程(三十四):服务器管理(十六)之逻辑复制
    MapTR v2文章研读
    SpringBoot —— 整合RabbitMQ常见问题及解决方案
    什么是脏读、不可重复读、幻读讲解
    微服务 - 概念 · 应用 · 通讯 · 授权 · 跨域 · 限流
    丁鹿学堂:前端开发基础知识之像素详解
    linux-grep命令
  • 原文地址:https://blog.csdn.net/m0_74120525/article/details/136764873