DATAX是啥我就不说了 看官方文档
其实下载过datax源码就能从测试用例里找到java调用的方式
例如下面这段
@Test
public void case01() throws Throwable {
// given
prepareTable();
// when
String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/csv2t.json"};
System.setProperty("datax.home", "../target/datax/datax");
Engine.entry(params);
}
重点就在于Engine.entry(params);
第一步:
我由于项目原因呢是直接下载了源码 直接复制datax中的common core transformer模块然后pom中依赖他们,其他的部分可以不用拷进来。
第二步:
使用下载源码的方式 虽然是不使用python 但是也需要打包datax;具体方式可以看userGuid.md
第三步:
这里就是直接调用Engine.entry(params);
params是一些必要参数
-mode:standalone 就是单进程方式运行了
-jobid:每次执行都需要一个任务id
-job:就是那个json配置文件了
还需要设置下系统环境变量 设置datax目录 就是datax打包后生成的那个目录
System.setProperty("datax.home", dataXHome);
有时候我们肯定是需要设置动态参数 如何你只是每次简单开个任务导一下数据也可以使用System.setProperty();进行设置。例如你的json配置文件中有个动态参数${ID}
那么你可以直接使用System.setProperty("ID","10")设置。datax后续会进行解析(具体代码再com.alibaba.datax.common.util.StrUtil中)
但是如何像我项目中 可以同时启动多个任务 用system设置不靠谱。
我的解决办法暂时就是依葫芦画瓢 也把参数放在那个param参数数据中 ,修改Engine源码 增加options参数。
解析动态参数实在Engine中的
ConfigParser.parse(jobPath,incParam);这行代码 后续修改源码也很简单我就不贴了 复制粘贴改改就行了
第四步
如果你需要收集datax日志中打印的那些信息 可以看看Communication类
具体使用方法可以使用
Communication jobCommunication = LocalTGCommunicationManager.getJobCommunication();
你也可以轮训调用这个查看其中的信息
和日志中打印的是一样的。
VM信息 可以调用VMInfo.getVmInfo();返回的是单例对象。
第五步
根据自己业务需要增加插件 这个有空再说~
最后贴个测试用例吧(只是测试用例 学习使用的 有些乱我可能不知道改了啥 我也是凑合记录下巴)
@Test
void contextLoads() throws Throwable {
SimpleDateFormat dateFormat = new SimpleDateFormat(
"yyyy-MM-dd HH:mm:ss");
long startTimeStamp = System.currentTimeMillis();
//- 在有总bps限速条件下,单个channel的bps值不能为空,也不能为非正数 datax报此错误 修改datax下的conf/core.json
// core -> transport -> channel -> speed -> "byte": 2000000,将单个channel的大小改为2MB即可。
//如果需要传递一些其他参数 比如sql中 where id = ${id} 这样 再加一个System.setProperty(“id”,1)这样就可以 但是这个系统变量 做个任务呢???
VMInfo vmInfo = VMInfo.getVmInfo();
String s = vmInfo.toString();
System.out.println("初始打印VMINFO-------------------"+s);
Thread thread = new Thread(() -> {
test111();
});
thread.start();
while (true){
Thread.sleep(10000);
System.out.println("十秒打印一次--------------------------");
vmInfo.getDelta(false);
String deltaString = vmInfo.getDeltaString();
System.out.println("VMINFO------------"+deltaString);
Communication jobCommunication = LocalTGCommunicationManager.getJobCommunication();
String snapshot = CommunicationTool.Stringify.getSnapshot(jobCommunication);
System.out.println("executor-INFO----------"+snapshot);
if(jobCommunication.getState().name().equals(State.SUCCEEDED.name())){
long endTimeStamp = System.currentTimeMillis();
if (vmInfo != null) {
vmInfo.getDelta(false);
String s1 = vmInfo.totalString();
System.out.println("执行结束打印vmInfo--------------:"+s1);
String s2 = PerfTrace.getInstance().summarizeNoException();
System.out.println("结束打印summarizeNoException-------------------------------------:"+s2);
long totalCosts = (endTimeStamp - startTimeStamp) / 1000;
long transferCosts = (endTimeStamp - startTimeStamp) / 1000;
if (0L == transferCosts) {
transferCosts = 1L;
}
// 字节速率
long byteSpeedPerSecond = jobCommunication.getLongCounter(CommunicationTool.READ_SUCCEED_BYTES)
/ transferCosts;
long recordSpeedPerSecond = jobCommunication.getLongCounter(CommunicationTool.READ_SUCCEED_RECORDS)
/ transferCosts;
String format = String.format(
"\n" + "%-26s: %-18s\n" + "%-26s: %-18s\n" + "%-26s: %19s\n"
+ "%-26s: %19s\n" + "%-26s: %19s\n" + "%-26s: %19s\n"
+ "%-26s: %19s\n",
"任务启动时刻",
dateFormat.format(startTimeStamp),
"任务结束时刻",
dateFormat.format(endTimeStamp),
"任务总计耗时",
String.valueOf(totalCosts) + "s",
"任务平均流量",
StrUtil.stringify(byteSpeedPerSecond)
+ "/s",
"记录写入速度",
String.valueOf(recordSpeedPerSecond)
+ "rec/s", "读出记录总数",
String.valueOf(CommunicationTool.getTotalReadRecords(jobCommunication)),
"读写失败总数",
String.valueOf(CommunicationTool.getTotalErrorRecords(jobCommunication))
);
System.out.println("结束打印format---------------:"+format);
if (jobCommunication.getLongCounter(CommunicationTool.TRANSFORMER_SUCCEED_RECORDS) > 0
|| jobCommunication.getLongCounter(CommunicationTool.TRANSFORMER_FAILED_RECORDS) > 0
|| jobCommunication.getLongCounter(CommunicationTool.TRANSFORMER_FILTER_RECORDS) > 0) {
String format1 = String.format(
"\n" + "%-26s: %19s\n" + "%-26s: %19s\n" + "%-26s: %19s\n",
"Transformer成功记录总数",
jobCommunication.getLongCounter(CommunicationTool.TRANSFORMER_SUCCEED_RECORDS),
"Transformer失败记录总数",
jobCommunication.getLongCounter(CommunicationTool.TRANSFORMER_FAILED_RECORDS),
"Transformer过滤记录总数",
jobCommunication.getLongCounter(CommunicationTool.TRANSFORMER_FILTER_RECORDS)
);
System.out.println("结束打印format1---------------:"+format1);
}
}
thread.join();
break;
}
}
}
private static void test111(){
String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "D:\\DevelopSoftware\\datax\\datax20210506\\datax\\job\\job.json"};
System.setProperty("datax.home", "D:\\DevelopSoftware\\datax\\datax20210506\\datax");
try {
Engine.entry(params);
} catch (Throwable e) {
e.printStackTrace();
}
}