微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

关于通过java调用datax,返回任务执行的方法

今天小编就为大家分享一篇关于通过java调用datax,返回任务执行的方法,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧

datax

datax 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MysqL、Oracle、sqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各种异构数据源之间高效的数据同步功能

datax的详细介绍

请参考 datax-Introduction

引言

因为业务需要,需要使用到datax把数据从文本写入到数据库,原来的做法都是使用python通过datax.py去调用脚本,阿文为了能更好的管控datax的任务,阿文要求我们对datax进行改造,使用java集成的方式去调用datax,并返回任务执行的详细信息。

datax源码跟踪

从github下完源码开始改造,datax的启动类在datax-core包下Engine类的entry方法,该方法一个静态方法

public static void entry(final String[] args) throws Throwable { Options options = new Options(); options.addOption("job", true, "Job config."); options.addOption("jobid", true, "Job unique id."); options.addOption("mode", true, "Job runtime mode."); BasicParser parser = new BasicParser(); CommandLine cl = parser.parse(options, args); String jobPath = cl.getoptionValue("job"); // 如果用户没有明确指定jobid, 则 datax.py 会指定 jobid 认值为-1 String jobIdString = cl.getoptionValue("jobid"); RUNTIME_MODE = cl.getoptionValue("mode"); Configuration configuration = ConfigParser.parse(jobPath); long jobId; if (!"-1".equalsIgnoreCase(jobIdString)) { jobId = Long.parseLong(jobIdString); } else { // only for dsc & ds & datax 3 update String dscJobUrlPatternString = "/instance/(\d{1,})/config.xml"; String dsJobUrlPatternString = "/inner/job/(\d{1,})/config"; String dsTaskGroupUrlPatternString = "/inner/job/(\d{1,})/taskGroup/"; List patternStringList = Arrays.asList(dscJobUrlPatternString, dsJobUrlPatternString, dsTaskGroupUrlPatternString); jobId = parseJobIdFromUrl(patternStringList, jobPath); } boolean isstandAloneMode = "standalone".equalsIgnoreCase(RUNTIME_MODE); if (!isstandAloneMode && jobId == -1) { // 如果不是 standalone 模式,那么 jobId 一定不能为-1 throw dataxException.asdataxException(FrameworkErrorCode.CONfig_ERROR, "非 standalone 模式必须在 URL 中提供有效的 jobId."); } configuration.set(CoreConstant.datax_CORE_CONTAINER_JOB_ID, jobId); //打印vmInfo VMInfo vmInfo = VMInfo.getVmInfo(); if (vmInfo != null) { LOG.info(vmInfo.toString()); } LOG.info("n" + Engine.filterJobConfiguration(configuration) + "n"); LOG.debug(configuration.toJSON()); ConfigurationValidate.dovalidate(configuration); Engine engine = new Engine(); engine.start(configuration); }

里面最后通过调用engine.start(configuration) 开始启动,我们点进去,最后会发现在里面是调用JobContainer 的start() 方法

@Override public void start() { LOG.info("datax jobContainer starts job."); boolean hasException = false; boolean isDryRun = false; try { this.startTimeStamp = System.currentTimeMillis(); isDryRun = configuration.getBool(CoreConstant.datax_JOB_SETTING_DRYRUN, false); if (isDryRun) { LOG.info("jobContainer starts to do preCheck ..."); this.preCheck(); } else { userConf = configuration.clone(); LOG.debug("jobContainer starts to do preHandle ..."); this.preHandle(); LOG.debug("jobContainer starts to do init ..."); this.init(); LOG.info("jobContainer starts to do prepare ..."); this.prepare(); LOG.info("jobContainer starts to do split ..."); this.totalStage = this.split(); LOG.info("jobContainer starts to do schedule ..."); this.schedule(); LOG.debug("jobContainer starts to do post ..."); this.post(); LOG.debug("jobContainer starts to do postHandle ..."); this.postHandle(); LOG.info("datax jobId [{}] completed successfully.", this.jobId); this.invokeHooks(); } } catch (Throwable e) { LOG.error("Exception when job run", e); hasException = true; if (e instanceof OutOfMemoryError) { this.destroy(); System.gc(); } if (super.getContainerCommunicator() == null) { // 由于 containerCollector 是在 scheduler() 中初始化的,所以当在 scheduler() 之前出现异常时,需要在此处对 containerCollector 进行初始化 AbstractContainerCommunicator tempContainerCollector; // standalone tempContainerCollector = new StandAloneJobContainerCommunicator(configuration); super.setContainerCommunicator(tempContainerCollector); } Communication communication = super.getContainerCommunicator().collect(); // 汇报前的状态,不需要手动进行设置 // communication.setState(State.Failed); communication.setThrowable(e); communication.setTimestamp(this.endTimeStamp); Communication tempComm = new Communication(); tempComm.setTimestamp(this.startTransferTimeStamp); Communication reportCommunication = CommunicationTool.getReportCommunication(communication, tempComm, this.totalStage); super.getContainerCommunicator().report(reportCommunication); throw dataxException.asdataxException( FrameworkErrorCode.RUNTIME_ERROR, e); } finally { if (!isDryRun) { this.destroy(); this.endTimeStamp = System.currentTimeMillis(); if (!hasException) { //最后打印cpu的平均消耗,GC的统计 VMInfo vmInfo = VMInfo.getVmInfo(); if (vmInfo != null) { vmInfo.getDelta(false); LOG.info(vmInfo.totalString()); } LOG.info(Perftrace.getInstance().summarizeNoException()); this.logStatistics(); } } } }

而我们需要的任务信息就在this.logStatistics() 中

private void logStatistics() { long totalCosts = (this.endTimeStamp - this.startTimeStamp) / 1000; long transferCosts = (this.endTransferTimeStamp - this.startTransferTimeStamp) / 1000; if (0L == transferCosts) { transferCosts = 1L; } if (super.getContainerCommunicator() == null) { return; } Communication communication = super.getContainerCommunicator().collect(); communication.setTimestamp(this.endTimeStamp); Communication tempComm = new Communication(); tempComm.setTimestamp(this.startTransferTimeStamp); Communication reportCommunication = CommunicationTool.getReportCommunication(communication, tempComm, this.totalStage); // 字节速率 long byteSpeedPerSecond = communication.getLongCounter(CommunicationTool.READ_SUCCEED_BYTES) / transferCosts; long recordSpeedPerSecond = communication.getLongCounter(CommunicationTool.READ_SUCCEED_RECORDS) / transferCosts; reportCommunication.setLongCounter(CommunicationTool.BYTE_SPEED, byteSpeedPerSecond); reportCommunication.setLongCounter(CommunicationTool.RECORD_SPEED, recordSpeedPerSecond); super.getContainerCommunicator().report(reportCommunication); LOG.info(String.format( "n" + "%-26s: %-18sn" + "%-26s: %-18sn" + "%-26s: %19sn" + "%-26s: %19sn" + "%-26s: %19sn" + "%-26s: %19sn" + "%-26s: %19sn", "任务启动时刻", dateFormat.format(startTimeStamp), "任务结束时刻", dateFormat.format(endTimeStamp), "任务总计耗时", String.valueOf(totalCosts) + "s", "任务平均流量", StrUtil.stringify(byteSpeedPerSecond) + "/s", "记录写入速度", String.valueOf(recordSpeedPerSecond) + "rec/s", "读出记录总数", String.valueOf(CommunicationTool.getTotalReadRecords(communication)), "读写失败总数", String.valueOf(CommunicationTool.getTotalErrorRecords(communication)) )); LOG.info("task-total-info:" + dateFormat.format(startTimeStamp) + "|" + dateFormat.format(endTimeStamp) + "|" + String.valueOf(totalCosts) + "|" + StrUtil.stringify(byteSpeedPerSecond) + "|" + String.valueOf(recordSpeedPerSecond) + "|" + String.valueOf(CommunicationTool.getTotalReadRecords(communication)) + "|" + String.valueOf(CommunicationTool.getTotalErrorRecords(communication)) ); if (communication.getLongCounter(CommunicationTool.TRANSFORMER_SUCCEED_RECORDS) > 0 || communication.getLongCounter(CommunicationTool.TRANSFORMER_Failed_RECORDS) > 0 || communication.getLongCounter(CommunicationTool.TRANSFORMER_FILTER_RECORDS) > 0) { LOG.info(String.format( "n" + "%-26s: %19sn" + "%-26s: %19sn" + "%-26s: %19sn", "Transformer成功记录总数", communication.getLongCounter(CommunicationTool.TRANSFORMER_SUCCEED_RECORDS), "Transformer失败记录总数", communication.getLongCounter(CommunicationTool.TRANSFORMER_Failed_RECORDS), "Transformer过滤记录总数", communication.getLongCounter(CommunicationTool.TRANSFORMER_FILTER_RECORDS) )); } }

改造开始

新增返回实体dataxResult (get、set省略)

public class dataxResult { //任务启动时刻 private long startTimeStamp; //任务结束时刻 private long endTimeStamp; //任务总时耗 private long totalCosts; //任务平均流量 private long byteSpeedPerSecond; //记录写入速度 private long recordSpeedPerSecond; //读出记录总数 private long totalReadRecords; //读写失败总数 private long totalErrorRecords; //成功记录总数 private long transformerSucceedRecords; // 失败记录总数 private long transformerFailedRecords; // 过滤记录总数 private long transformerFilterRecords; //字节数 private long readSucceedBytes; //转换开始时间 private long endTransferTimeStamp; //转换结束时间 private long startTransferTimeStamp; //转换总耗时 private long transferCosts;

重写logStatistics方法,返回该实体。

private dataxResult logStatistics(dataxResult resultMsg) { long totalCosts = (this.endTimeStamp - this.startTimeStamp) / 1000; long transferCosts = (this.endTransferTimeStamp - this.startTransferTimeStamp) / 1000; if (0L == transferCosts) { transferCosts = 1L; } if (super.getContainerCommunicator() == null) { return resultMsg; } Communication communication = super.getContainerCommunicator().collect(); long byteSpeedPerSecond = communication.getLongCounter(CommunicationTool.READ_SUCCEED_BYTES) / transferCosts; long recordSpeedPerSecond = communication.getLongCounter(CommunicationTool.READ_SUCCEED_RECORDS) / transferCosts; return resultMsg.getResultMsg(startTimeStamp, endTimeStamp, totalCosts, byteSpeedPerSecond, recordSpeedPerSecond, communication.getLongCounter(CommunicationTool.TRANSFORMER_SUCCEED_RECORDS), communication.getLongCounter(CommunicationTool.TRANSFORMER_Failed_RECORDS), communication.getLongCounter(CommunicationTool.TRANSFORMER_FILTER_RECORDS), communication.getLongCounter(CommunicationTool.TRANSFORMER_Failed_RECORDS), communication.getLongCounter(CommunicationTool.TRANSFORMER_FILTER_RECORDS), communication.getLongCounter(CommunicationTool.READ_SUCCEED_BYTES), this.endTransferTimeStamp, this.startTransferTimeStamp, transferCosts ); }

还需要重写JobContainer的**start()**方法

@Override public dataxResult start(dataxResult dataxResult) { ... dataxResult result = new dataxResult(); result = logStatistics(dataxResult); ... return result; }

然后在Engine 类中添加模拟测试方法mockentry

public dataxResult mockstart(Configuration allConf) { ... dataxResult dataxResult = new dataxResult(); return container.start(dataxResult); }

开始测试

在com.alibaba.datax.core.util.container.CoreConstant里修改datax_home 为本地路径

datax_home路径下有以下几个目录

public class test { public static void main(String[] args) { String[] datxArgs = {"-job", CoreConstant.datax_HOME + "\job\job2.json", "-mode", "standalone", "-jobid", "-1"}; try { dataxResult dataxResult= Engine.mockentry(datxArgs); } catch (Throwable e) { e.printstacktrace(); } } }

执行结果为

3

大功告成!

以上这篇关于通过java调用datax,返回任务执行的方法就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持编程之家。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐