• JAVA环境下使用DATAX(不使用python调用:也是个人记录一下)


    DATAX是啥我就不说了 看官方文档

    其实下载过datax源码就能从测试用例里找到java调用的方式

    例如下面这段

    @Test
    public void case01() throws Throwable {
        // given
        prepareTable();
        // when
        String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/csv2t.json"};
        System.setProperty("datax.home", "../target/datax/datax");
        Engine.entry(params);
    }

    重点就在于Engine.entry(params);

    第一步:

    我由于项目原因呢是直接下载了源码 直接复制datax中的common core transformer模块然后pom中依赖他们,其他的部分可以不用拷进来。

    第二步:
    使用下载源码的方式 虽然是不使用python 但是也需要打包datax;具体方式可以看userGuid.md

    第三步:

    这里就是直接调用Engine.entry(params);

    params是一些必要参数

    -mode:standalone 就是单进程方式运行了

    -jobid:每次执行都需要一个任务id

    -job:就是那个json配置文件了

    还需要设置下系统环境变量 设置datax目录 就是datax打包后生成的那个目录

    System.setProperty("datax.home", dataXHome);

    有时候我们肯定是需要设置动态参数 如何你只是每次简单开个任务导一下数据也可以使用System.setProperty();进行设置。例如你的json配置文件中有个动态参数${ID}

    那么你可以直接使用System.setProperty("ID","10")设置。datax后续会进行解析(具体代码再com.alibaba.datax.common.util.StrUtil中)

    但是如何像我项目中 可以同时启动多个任务 用system设置不靠谱。

    我的解决办法暂时就是依葫芦画瓢 也把参数放在那个param参数数据中 ,修改Engine源码 增加options参数。

    解析动态参数实在Engine中的

    ConfigParser.parse(jobPath,incParam);这行代码  后续修改源码也很简单我就不贴了 复制粘贴改改就行了

    第四步

    如果你需要收集datax日志中打印的那些信息 可以看看Communication类

    具体使用方法可以使用

    Communication jobCommunication = LocalTGCommunicationManager.getJobCommunication();

    你也可以轮训调用这个查看其中的信息

    和日志中打印的是一样的。

    VM信息 可以调用VMInfo.getVmInfo();返回的是单例对象。

    第五步

    根据自己业务需要增加插件 这个有空再说~

    最后贴个测试用例吧(只是测试用例 学习使用的 有些乱我可能不知道改了啥 我也是凑合记录下巴)

     @Test
        void contextLoads() throws Throwable {
           SimpleDateFormat dateFormat = new SimpleDateFormat(
                    "yyyy-MM-dd HH:mm:ss");
            long startTimeStamp = System.currentTimeMillis();
    //- 在有总bps限速条件下,单个channel的bps值不能为空,也不能为非正数   datax报此错误 修改datax下的conf/core.json
    //        core -> transport -> channel -> speed -> "byte": 2000000,将单个channel的大小改为2MB即可。
            //如果需要传递一些其他参数 比如sql中 where id = ${id} 这样 再加一个System.setProperty(“id”,1)这样就可以 但是这个系统变量  做个任务呢???
            VMInfo vmInfo = VMInfo.getVmInfo();
    
            String s = vmInfo.toString();
            System.out.println("初始打印VMINFO-------------------"+s);
            Thread thread = new Thread(() -> {
                test111();
            });
            thread.start();
    
            while (true){
                Thread.sleep(10000);
                System.out.println("十秒打印一次--------------------------");
                vmInfo.getDelta(false);
                String deltaString = vmInfo.getDeltaString();
                System.out.println("VMINFO------------"+deltaString);
                Communication jobCommunication = LocalTGCommunicationManager.getJobCommunication();
                String snapshot = CommunicationTool.Stringify.getSnapshot(jobCommunication);
                System.out.println("executor-INFO----------"+snapshot);
                if(jobCommunication.getState().name().equals(State.SUCCEEDED.name())){
    
                        long endTimeStamp = System.currentTimeMillis();
                    if (vmInfo != null) {
                        vmInfo.getDelta(false);
                        String s1 = vmInfo.totalString();
                        System.out.println("执行结束打印vmInfo--------------:"+s1);
                        String s2 = PerfTrace.getInstance().summarizeNoException();
                        System.out.println("结束打印summarizeNoException-------------------------------------:"+s2);
    
                        long totalCosts = (endTimeStamp - startTimeStamp) / 1000;
                        long transferCosts = (endTimeStamp - startTimeStamp) / 1000;
                        if (0L == transferCosts) {
                            transferCosts = 1L;
                        }
                        // 字节速率
                        long byteSpeedPerSecond = jobCommunication.getLongCounter(CommunicationTool.READ_SUCCEED_BYTES)
                                / transferCosts;
    
                        long recordSpeedPerSecond = jobCommunication.getLongCounter(CommunicationTool.READ_SUCCEED_RECORDS)
                                / transferCosts;
                        String format = String.format(
                                "\n" + "%-26s: %-18s\n" + "%-26s: %-18s\n" + "%-26s: %19s\n"
                                        + "%-26s: %19s\n" + "%-26s: %19s\n" + "%-26s: %19s\n"
                                        + "%-26s: %19s\n",
                                "任务启动时刻",
                                dateFormat.format(startTimeStamp),
    
                                "任务结束时刻",
                                dateFormat.format(endTimeStamp),
    
                                "任务总计耗时",
                                String.valueOf(totalCosts) + "s",
                                "任务平均流量",
                                StrUtil.stringify(byteSpeedPerSecond)
                                        + "/s",
                                "记录写入速度",
                                String.valueOf(recordSpeedPerSecond)
                                        + "rec/s", "读出记录总数",
                                String.valueOf(CommunicationTool.getTotalReadRecords(jobCommunication)),
                                "读写失败总数",
                                String.valueOf(CommunicationTool.getTotalErrorRecords(jobCommunication))
                        );
                        System.out.println("结束打印format---------------:"+format);
    
                        if (jobCommunication.getLongCounter(CommunicationTool.TRANSFORMER_SUCCEED_RECORDS) > 0
                                || jobCommunication.getLongCounter(CommunicationTool.TRANSFORMER_FAILED_RECORDS) > 0
                                || jobCommunication.getLongCounter(CommunicationTool.TRANSFORMER_FILTER_RECORDS) > 0) {
                            String format1 = String.format(
                                    "\n" + "%-26s: %19s\n" + "%-26s: %19s\n" + "%-26s: %19s\n",
                                    "Transformer成功记录总数",
                                    jobCommunication.getLongCounter(CommunicationTool.TRANSFORMER_SUCCEED_RECORDS),
    
                                    "Transformer失败记录总数",
                                    jobCommunication.getLongCounter(CommunicationTool.TRANSFORMER_FAILED_RECORDS),
    
                                    "Transformer过滤记录总数",
                                    jobCommunication.getLongCounter(CommunicationTool.TRANSFORMER_FILTER_RECORDS)
                            );
                            System.out.println("结束打印format1---------------:"+format1);
                        }
    
                    }
                    thread.join();
                    break;
                }
            }
    
        }

    private static void test111(){
        String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "D:\\DevelopSoftware\\datax\\datax20210506\\datax\\job\\job.json"};
        System.setProperty("datax.home", "D:\\DevelopSoftware\\datax\\datax20210506\\datax");
        try {
            Engine.entry(params);
        } catch (Throwable e) {
            e.printStackTrace();
        }
    }
  • 相关阅读:
    Leetcode刷题详解——解码方法
    网络-fetch
    企业数字化转型,为何大部分数据中台项目都最终烂尾?
    [附源码]java毕业设计朋辈帮扶系统
    DELL设备维保查询方法
    Cilium系列-14-Cilium NetworkPolicy 简介
    OpenGL教程(五)
    Spring1
    Bazel 安装
    java连接kubernete
  • 原文地址:https://blog.csdn.net/weixin_40790313/article/details/127754081