ElasticJob‐Lite:Script & HTTP作业
在上一篇博客中,博主介绍了Simple 和Dataflow 作业:
ElasticJob 的作业分类基于class 和type 两种类型。基于class 的作业需要开发者自行通过实现接口的方式织入业务逻辑;基于type 的作业则无需编码,只需要提供相应配置即可。基于class 的作业接口的方法参数shardingContext 包含作业配置、片和运行时信息。可通过getShardingTotalCount() 、getShardingItem() 等方法分别获取分片总数和运行在本作业服务器的分片序列号等。
ElasticJob 目前提供Simple 、Dataflow 这两种基于class 的作业类型,并提供Script 、HTTP 这两种基于type 的作业类型,用户可通过实现SPI接口自行扩展作业类型。
通过实现SPI 接口自行扩展作业类型以后再进行介绍,本篇博客介绍Script 和HTTP 作业。
添加依赖(3.0.1 是目前最新的Releases 版本):
<dependency>
<groupId>org.apache.shardingsphere.elasticjob</groupId>
<artifactId>elasticjob-lite-core</artifactId>
<version>3.0.1</version>
</dependency>
Script作业
ElasticJob 支持shell 、python 以及perl 等所有类型脚本。可通过属性script.command.line 配置待执行脚本,无需编码。执行脚本路径可包含参数,参数传递完毕后,作业框架会自动追加最后一个参数为作业运行时信息。
package com.kaven.job;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;
public class Application {
public static void main(String[] args) {
new ScheduleJobBootstrap(createRegistryCenter(), "SCRIPT",
createJobConfiguration()).schedule();
}
private static CoordinatorRegistryCenter createRegistryCenter() {
ZookeeperConfiguration zc = new ZookeeperConfiguration("192.168.1.184:9000", "my-job");
zc.setConnectionTimeoutMilliseconds(40000);
zc.setMaxRetries(5);
CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(zc);
regCenter.init();
return regCenter;
}
private static JobConfiguration createJobConfiguration() {
return JobConfiguration.newBuilder("MyScriptJob", 3)
.description("脚本作业")
.cron("30 * * * * ?")
.setProperty("script.command.line", "python F:\\workspace\\IDEA\\my\\job\\src\\main\\java\\com\\kaven\\job\\script.py")
.overwrite(true)
.failover(true)
.build();
}
}
elasticJobType 参数需要全部大写(比如SCRIPT 和HTTP )。
new ScheduleJobBootstrap(createRegistryCenter(), "SCRIPT",
createJobConfiguration()).schedule();
定义脚本作业只需要一行(这里脚本路径是绝对路径)。
.setProperty("script.command.line", "python F:\\workspace\\IDEA\\my\\job\\src\\main\\java\\com\\kaven\\job\\script.py")
script.py :
import sys
import time
print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()), ':', sys.argv[-1])
输出如下图所示: 脚本路径也可以使用相对路径(相对于项目的路径F:\\workspace\\IDEA\\my\\job )。
.setProperty("script.command.line",
"python .\\src\\main\\java\\com\\kaven\\job\\script.py")
参数
给脚本传参数,就跟手动执行脚本一样,在命令后面添加参数即可。
private static JobConfiguration createJobConfiguration() {
return JobConfiguration.newBuilder("MyScriptJob4", 3)
.description("脚本作业")
.cron("30 * * * * ?")
.setProperty("script.command.line",
"python F:\\workspace\\IDEA\\my\\job\\src\\main\\java\\com\\kaven\\job\\script.py kaven")
.overwrite(true)
.failover(true)
.build();
}
修改脚本:
import sys
import time
print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()), ':', sys.argv[:])
输出如下图所示:
HTTP作业
先搭建一个Spring Boot 项目,在项目中定义一个接口。
pom.xml :
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.kaven</groupId>
<artifactId>server</artifactId>
<packaging>pom</packaging>
<version>1.0-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.2.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
</project>
MessageController 接口定义:
package com.kaven.controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RestController;
import java.text.SimpleDateFormat;
import java.util.Date;
@RestController
public class MessageController {
private static final SimpleDateFormat formatter =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
private static final String MESSAGE_HEAD = "MessageController";
@GetMapping("/message")
public String getMessage(String messageBody, @RequestHeader String shardingContext) {
String message = MESSAGE_HEAD + " " + messageBody;
System.out.println(formatter.format(new Date()) + " " + message + " - " + shardingContext);
return message;
}
}
Server 启动类:
package com.kaven;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Server {
public static void main(String[] args) {
SpringApplication.run(Server.class);
}
}
Application 类:
package com.kaven.job;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.http.props.HttpJobProperties;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;
public class Application {
public static void main(String[] args) {
new ScheduleJobBootstrap(createRegistryCenter(), "HTTP",
createJobConfiguration()).schedule();
}
private static CoordinatorRegistryCenter createRegistryCenter() {
ZookeeperConfiguration zc = new ZookeeperConfiguration("192.168.1.184:9000", "my-job");
zc.setConnectionTimeoutMilliseconds(40000);
zc.setMaxRetries(5);
CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(zc);
regCenter.init();
return regCenter;
}
private static JobConfiguration createJobConfiguration() {
return JobConfiguration.newBuilder("MyHTTPJob", 3)
.description("HTTP作业")
.cron("30 * * * * ?")
.setProperty(HttpJobProperties.URI_KEY, "http://localhost:8080/message?messageBody=MyHTTPJob")
.setProperty(HttpJobProperties.METHOD_KEY, "GET")
.overwrite(true)
.failover(true)
.build();
}
}
定义HTTP 作业也比较简单,只需要设置HTTP 作业的相关参数,比如接口的URI 、类型、参数以及连接超时时间等,HttpJobProperties.DATA_KEY 用于给POST 这类请求设置参数。
.setProperty(HttpJobProperties.URI_KEY, "http://localhost:8080/message?messageBody=MyHTTPJob")
.setProperty(HttpJobProperties.METHOD_KEY, "GET")
HttpJobProperties 类:
package org.apache.shardingsphere.elasticjob.http.props;
public final class HttpJobProperties {
public static final String URI_KEY = "http.uri";
public static final String METHOD_KEY = "http.method";
public static final String DATA_KEY = "http.data";
public static final String CONNECT_TIMEOUT_KEY = "http.connect.timeout.milliseconds";
public static final String READ_TIMEOUT_KEY = "http.read.timeout.milliseconds";
public static final String CONTENT_TYPE_KEY = "http.content.type";
public static final String SHARDING_CONTEXT_KEY = "shardingContext";
}
输出如下图所示:
多参数接口
如果GET 接口有多个参数,在URI 上进行拼接即可。
.setProperty(HttpJobProperties.URI_KEY, "http://localhost:8080/message?arg1=1&arg2=2")
而如果是POST 接口有多个参数,就不能多次调用setProperty(HttpJobProperties.DATA_KEY, "key=value") 来完成多个参数的传值,因为值会被覆盖(即只会取最后一次设置的值,ElasticJob 使用Java 的Properties 来存储参数,而Properties 继承Hashtable ,当键相同时,值会被覆盖,而参数的键都是HttpJobProperties.DATA_KEY )。
private static JobConfiguration createJobConfiguration() {
return JobConfiguration.newBuilder("MyHTTPJob", 3)
.description("HTTP作业")
.cron("30 * * * * ?")
.setProperty(HttpJobProperties.URI_KEY, "http://localhost:8080/message")
.setProperty(HttpJobProperties.METHOD_KEY, "POST")
.setProperty(HttpJobProperties.DATA_KEY, "arg1=1")
.setProperty(HttpJobProperties.DATA_KEY, "arg2=2")
.overwrite(true)
.failover(true)
.build();
}
接口:
package com.kaven.controller;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class MessageController {
@PostMapping("/message")
public void getMessage(String arg1, String arg2) {
System.out.println(arg1 + arg2);
}
}
输出如下图所示: 第一个参数被覆盖了,所以为null ,因此给POST 接口传多个参数,博主想到的方法是将多个参数组合成一个实例,再将其转换成JSON 字符串,接口那边再将这个JSON 字符串转换成对应的实例。大家如果有更好的方法可以评论区留言。
添加依赖:
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.9</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.22</version>
<scope>provided</scope>
</dependency>
存储多个参数的Data 类:
package com.kaven.job.data;
import lombok.*;
@Setter
@AllArgsConstructor
@NoArgsConstructor
@Builder
@ToString
public class Data {
private String username;
private String password;
private School school;
@Setter
@AllArgsConstructor
@NoArgsConstructor
@Builder
@ToString
public static class School{
private String name;
private String address;
}
}
Application 类:
package com.kaven.job;
import com.google.gson.Gson;
import com.kaven.job.data.Data;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.http.props.HttpJobProperties;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;
public class Application {
public static void main(String[] args) {
new ScheduleJobBootstrap(createRegistryCenter(), "HTTP",
createJobConfiguration()).schedule();
}
private static CoordinatorRegistryCenter createRegistryCenter() {
ZookeeperConfiguration zc = new ZookeeperConfiguration("192.168.1.184:9000", "my-job");
zc.setConnectionTimeoutMilliseconds(40000);
zc.setMaxRetries(5);
CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(zc);
regCenter.init();
return regCenter;
}
private static JobConfiguration createJobConfiguration() {
Data data = Data.builder()
.username("kaven")
.password("itkaven")
.school(Data.School.builder().name("xxx").address("中国").build())
.build();
String dataStr = (new Gson()).toJson(data);
System.out.println(dataStr);
return JobConfiguration.newBuilder("MyHTTPJob", 3)
.description("HTTP作业")
.cron("30 * * * * ?")
.setProperty(HttpJobProperties.URI_KEY, "http://localhost:8080/message")
.setProperty(HttpJobProperties.METHOD_KEY, "POST")
.setProperty(HttpJobProperties.DATA_KEY, "args=" + dataStr)
.overwrite(true)
.failover(true)
.build();
}
}
下面这部分就是将要传的多个参数组合成Data 实例,然后再将其转换成JSON 字符串。
Data data = Data.builder()
.username("kaven")
.password("itkaven")
.school(Data.School.builder().name("xxx").address("中国").build())
.build();
String dataStr = (new Gson()).toJson(data);
接口(将JSON 字符串再转换成想要的包含多个参数信息的Data 实例):
package com.kaven.controller;
import com.google.gson.Gson;
import com.kaven.data.Data;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RestController;
import java.text.SimpleDateFormat;
import java.util.Date;
@RestController
public class MessageController {
private static final SimpleDateFormat formatter =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
private static final String MESSAGE_HEAD = "MessageController";
@PostMapping("/message")
public void getMessage(String args, @RequestHeader String shardingContext) {
Gson gson = new Gson();
Data data = gson.fromJson(args, Data.class);
System.out.println(formatter.format(new Date()) + " " + data + " " + shardingContext);
}
}
输出如下图所示: ElasticJob 的Script 和HTTP 作业就介绍到这里,如果博主有说错的地方或者大家有不同的见解,欢迎大家评论补充。
|