资源链接在文章末尾,包含文件:
1.安装Kettle 并启动carte 服务
1.1 Kettle安装
Kettle 是解压即用的,这次是在 Linux 环境下部署,所以需要解压工具rar ,没有安装的小伙伴可以自行安装或参考《Linux环境 rar工具安装使用》进行安装,这里不再赘述。
unrar x data-integration_v9.rar
chmod +x ./*.sh
1.2 carte服务启用
carte 服务的运行参数:
[root@tcloud data-integration_v9]
WARNING: no libwebkitgtk-1.0 detected, some features will be unavailable
Consider installing the package with apt-get or yum.
e.g. 'sudo apt-get install libwebkitgtk-1.0-0'
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=256m; support was removed in 8.0
Usage: Carte <Interface address> <Port> [-h] [-p <arg>] [-s] [-u <arg>]
or
Usage: Carte <Configuration File>
Starts or stops the carte server.
-h,--help This help text
-p,--password <arg> The administrator password. Required only if
stopping the Carte server.
-s,--stop Stop the running carte server. This is only
allowed when using the hostname/port form of the
command.
-u,--userName <arg> The administrator user name. Required only if
stopping the Carte server.
Example: Carte 127.0.0.1 8080
Example: Carte 192.168.1.221 8081
Example: Carte /foo/bar/carte-config.xml
Example: Carte http://www.example.com/carte-config.xml
Example: Carte 127.0.0.1 8080 -s -u cluster -p cluster
开启 carte 服务:
./carte.sh tcloud 8081
2022/03/11 10:00:37 - Carte - Installing timer to purge stale objects after 1440 minutes.
2022/03/11 10:00:38 - Carte - Created listener for webserver @ address : tcloud:8081
开启后访问 Created listener for webserver @ address : tcloud:8081 用户名和密码均为cluster :
登录成功后显示Slave server menu 点击Show status 后状态页面是相当简洁的:
2.Linux下脚本运行测试
2.1 ktr脚本内容
手动执行test.ktr 这个脚本的内容很简单:生成10个16位的 MasterCard 号码并输出成 txt 文件。
2.2 执行及结果
sh ./pan.sh -file:./test.ktr
2022/03/11 10:31:29 - Pan - Start of run.
2022/03/11 10:31:29 - Carte - Installing timer to purge stale objects after 1440 minutes.
2022/03/11 10:31:30 - test - Dispatching started for transformation [test]
2022/03/11 10:31:30 - 生成随机的信用卡号.0 - Finished processing (I=0, O=0, R=10, W=10, U=0, E=0)
2022/03/11 10:31:30 - 文本文件输出.0 - Finished processing (I=0, O=11, R=10, W=10, U=0, E=0)
2022/03/11 10:31:30 - Pan - Finished!
2022/03/11 10:31:30 - Pan - Start=2022/03/11 10:31:29.728, Stop=2022/03/11 10:31:30.311
2022/03/11 10:31:30 - Pan - Processing ended after 0 seconds.
2022/03/11 10:31:30 - test -
2022/03/11 10:31:30 - test - Step 生成随机的信用卡号.0 ended successfully, processed 10 lines. ( - lines/s)
2022/03/11 10:31:30 - test - Step 文本文件输出.0 ended successfully, processed 10 lines. ( - lines/s)
脚本正常执行,数据也正常输出了,但是 😢 Kettle slave server status 页面并没有这个转换的状态日志信息。
3.Java代码实现远程调用
3.1 依赖
Java代码远程执行转换或作业要添加新的依赖,其他依赖可查看《Java环境实现KJB和KTR脚本文件执行v9版本9.0.0.0-423相关依赖说明》。
<dependency>
<groupId>org.owasp.encoder</groupId>
<artifactId>encoder</artifactId>
<scope>system</scope>
<version>1.2</version>
<systemPath>${project.basedir}/lib/encoder-1.2.jar</systemPath>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<scope>runtime</scope>
<version>4.5.9</version>
<systemPath>${project.basedir}/lib/httpclient-4.5.9.jar</systemPath>
</dependency>
3.2 脚本
一个简单的test.kjb 脚本,生成 kettle-job-test.txt 文件:
3.2 Job远程调用源码
Demo源码:
KettleEnvironment.init();
SlaveServer remoteSlaveServer = new SlaveServer();
remoteSlaveServer.setHostname("tcloud");
remoteSlaveServer.setPort("8081");
remoteSlaveServer.setUsername("cluster");
remoteSlaveServer.setPassword("cluster");
JobExecutionConfiguration jobExecutionConfiguration = new JobExecutionConfiguration();
jobExecutionConfiguration.setRemoteServer(remoteSlaveServer);
String path = "E:\\yuanzheng-codebase\\m-framework\\kettle-git\\file\\test.kjb";
FileSystemResource r = new FileSystemResource(path);
JobMeta jobMeta = new JobMeta(r.getInputStream(), null, null);
String carteObjectId = Job.sendToSlaveServer(jobMeta, jobExecutionConfiguration, null, null);
System.out.println("carteObjectId=" + carteObjectId);
SlaveServerJobStatus jobStatus;
do {
Thread.sleep(5000);
jobStatus = remoteSlaveServer.getJobStatus(jobMeta.getName(), carteObjectId, 0);
System.out.println(jobStatus.getResult());
} while (jobStatus.isRunning());
System.out.println(jobStatus.getResult());
KettleEnvironment.shutdown();
调用结果:
carteObjectId=3a41513f-6d02-4e25-be6f-f599e37870bb
nr=0, errors=0, exit_status=0, result=true
nr=0, errors=0, exit_status=0, result=true
Kettle slave server status 页面也看到了Job的状态:
执行详情:
在服务器上也看到了生成的文件:
删掉生成的文件后,在web页面上可以再次执行:
但是在服务器上找不到 test.kjb 文件,重启 carte 服务之后发现 Jobs 信息没有了,可以判断 test.kjb 是放在内存中的。
3.3 Trans远程调用代码
KettleEnvironment.init();
SlaveServer remoteSlaveServer = new SlaveServer();
remoteSlaveServer.setHostname("tcloud");
remoteSlaveServer.setPort("8081");
remoteSlaveServer.setUsername("cluster");
remoteSlaveServer.setPassword("cluster");
TransExecutionConfiguration transExecutionConfiguration = new TransExecutionConfiguration();
transExecutionConfiguration.setRemoteServer(remoteSlaveServer);
String path = "E:\\yuanzheng-codebase\\m-framework\\kettle-git\\file\\test.ktr";
TransMeta transMeta = new TransMeta(path, new Variables());
String carteObjectId = Trans.sendToSlaveServer(transMeta, transExecutionConfiguration, null, null);
System.out.println("carteObjectId=" + carteObjectId);
SlaveServerTransStatus transStatus;
do {
Thread.sleep(5000);
transStatus = remoteSlaveServer.getTransStatus(transMeta.getName(), carteObjectId, 0);
System.out.println(transStatus.getResult());
} while (transStatus.isRunning());
System.out.println(transStatus.getResult());
KettleEnvironment.shutdown();
web页面状态信息:
比Job要多出来 Step detail 信息。
【资源链接】
链接:https://pan.baidu.com/s/19Jv18ZG_QrXnXmdR7NbQKw 提取码:ti4y
|