Elastic-Job的使用、进行动态 CRUD,以及使用过程中遇到的一些问题
目录:
1、elastic-job 能干什么
2、elastic-job 怎么用
1. 引入依赖
2. yml配置文件
3. 自定义注解
4. zookeeper配置类
5. elasticjob配置类
6. 动态新增、删除定时任务job Handler类
7. Demo
3、遇到过的一些问题
1、elastic-job 能干什么
Elastic-job:当当网基于quartz 二次开发的弹性分布式任务调度系统,功能丰富强大,采用zookeeper实现分 布式协调,实现任务高可用以及分片。
官网介绍:https://shardingsphere.apache.org/elasticjob/index.html
ElasticJob 是一个分布式调度解决方案,由 2 个相互独立的子项目 ElasticJob-Lite 和 ElasticJob-Cloud 组成。
ElasticJob-Lite 定位为轻量级无中心化解决方案,使用jar的形式提供分布式任务的协调服务;
ElasticJob-Cloud 使用 Mesos 的解决方案,额外提供资源治理、应用分发以及进程隔离等服务。
官网已更新到3.x版本,这里就不做介绍了。
新的技术不一定是最适合的。
2、elastic-job 怎么用
SpringBoot项目引入elasticjob
2.1引入依赖:
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-spring</artifactId>
<version>2.1.5</version>
<exclusions>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-core</artifactId>
<version>2.1.5</version>
<exclusions>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
</exclusions>
</dependency>
2.2 yml配置文件
spring:
elastic-job:
url: jdbc:mysql:你的数据库地址?useSSL=false&useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&serverTimezone=GMT%2B8&rewriteBatchedStatements=true
username: xxx
password: xxxx
poolName: elasticPool
connectionTimeout: 60000
maxLifetime: 1800000
maximumPoolSize: 15
minimumIdle: 5
zk:
maxRetries: 9
namespace: elastic-job
serverList: 127.0.0.1:2181
2.3 自定义注解
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface ElasticSimpleJob {
@AliasFor("cron")
String value() default "";
@AliasFor("value")
String cron() default "";
String jobName() default "";
int shardingTotalCount() default 1;
String shardingItemParameters() default "";
String jobParameter() default "";
String description() default "";
boolean misfire() default false;
boolean failover() default false;
boolean disabled() default false;
}
2.4 配置类
@Configuration
@RequiredArgsConstructor
public class ElasticJobAutoConfiguration {
private final ApplicationContext applicationContext;
@Value("${spring.elastic-job.url}")
private String url;
@Value("${spring.elastic-job.username}")
private String username;
@Value("${spring.elastic-job.password}")
private String password;
@Value("${spring.elastic-job.connectionTimeout}")
private Long connectionTimeout;
@Value("${spring.elastic-job.maxLifetime}")
private Long maxLifetime;
@Value("${spring.elastic-job.minimumIdle}")
private Integer minimumIdle;
@Value("${spring.elastic-job.maximumPoolSize}")
private Integer maximumPoolSize;
@Value("${spring.elastic-job.poolName}")
private String poolName;
private final ZookeeperRegistryCenter zookeeperRegistryCenter;
@Bean
public void initElasticJob() {
Map<String, SimpleJob> map = applicationContext.getBeansOfType(SimpleJob.class);
for (Map.Entry<String, SimpleJob> entry : map.entrySet()) {
SimpleJob simpleJob = entry.getValue();
ElasticSimpleJob elasticSimpleJobAnnotation = simpleJob.getClass().getAnnotation(ElasticSimpleJob.class);
if (elasticSimpleJobAnnotation == null) {
continue;
}
String jobName = StringUtils.defaultIfBlank(elasticSimpleJobAnnotation.jobName(), simpleJob.getClass().getSimpleName());
String cron = StringUtils.defaultIfBlank(elasticSimpleJobAnnotation.cron(), elasticSimpleJobAnnotation.value());
SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(JobCoreConfiguration.newBuilder(jobName, cron,
elasticSimpleJobAnnotation.shardingTotalCount()).shardingItemParameters(elasticSimpleJobAnnotation.shardingItemParameters()).build(),
simpleJob.getClass().getCanonicalName());
LiteJobConfiguration liteJobConfiguration = LiteJobConfiguration.newBuilder(simpleJobConfiguration).overwrite(true)
.jobShardingStrategyClass(AverageAllocationJobShardingStrategy.class.getCanonicalName())
.build();
HikariDataSource dataSource = new HikariDataSource();
dataSource.setJdbcUrl(url);
dataSource.setUsername(username);
dataSource.setPassword(password);
dataSource.setConnectionTimeout(connectionTimeout);
dataSource.setMaxLifetime(maxLifetime);
dataSource.setMinimumIdle(minimumIdle);
dataSource.setMaximumPoolSize(maximumPoolSize);
dataSource.setPoolName(poolName);
JobEventRdbConfiguration jobEventRdbConfiguration = new JobEventRdbConfiguration(dataSource);
SpringJobScheduler jobScheduler = new SpringJobScheduler(simpleJob, zookeeperRegistryCenter, liteJobConfiguration, jobEventRdbConfiguration);
jobScheduler.init();
}
}
}
2.5 zookeeper配置类
@Getter
@Configuration
public class ZkRegistryConfiguration {
@Value("${spring.zk.serverList}")
private String serverList;
@Value("${spring.zk.namespace}")
private String namespace;
@Value("${spring.zk.maxRetries}")
private int maxRetries;
@Bean(initMethod = "init")
public ZookeeperRegistryCenter registryCenter() {
ZookeeperConfiguration configuration = new ZookeeperConfiguration(serverList, namespace);
configuration.setMaxRetries(maxRetries);
return new ZookeeperRegistryCenter(configuration);
}
}
2.6 动态新增、删除定时任务job
传参可以封装成一个对象
@Component
@RequiredArgsConstructor
public class ElasticJobHandler {
private final ZookeeperRegistryCenter zookeeperRegistryCenter;
public void addJob(final String jobName, final SimpleJob simpleJob, final String cron, final int shardingTotalCount,
final String shardingItemParameters) {
simpleJobScheduler(jobName, simpleJob, cron, shardingTotalCount, shardingItemParameters).init();
}
private JobScheduler simpleJobScheduler(final String jobName, final SimpleJob simpleJob, final String cron, final int shardingTotalCount,
final String shardingItemParameters) {
LiteJobConfiguration liteJobConfiguration = getLiteJobConfiguration(jobName, simpleJob.getClass(), cron, shardingTotalCount, shardingItemParameters);
return new SpringJobScheduler(simpleJob, zookeeperRegistryCenter, liteJobConfiguration);
}
private LiteJobConfiguration getLiteJobConfiguration(final String jobName, final Class<? extends SimpleJob> jobClass, final String cron, final int shardingTotalCount, final String shardingItemParameters) {
JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration.newBuilder(
jobName, cron, shardingTotalCount)
.shardingItemParameters(shardingItemParameters)
.misfire(true)
.build();
JobTypeConfiguration jobTypeConfiguration = new SimpleJobConfiguration(jobCoreConfiguration, jobClass.getCanonicalName());
return LiteJobConfiguration.newBuilder(jobTypeConfiguration)
.overwrite(true)
.jobShardingStrategyClass(AverageAllocationJobShardingStrategy.class.getCanonicalName())
.build();
}
public void removeJob(String jobName) {
JobScheduleController jobScheduleController = JobRegistry.getInstance().getJobScheduleController(jobName);
if (jobScheduleController != null) {
jobScheduleController.pauseJob();
jobScheduleController.shutdown();
zookeeperRegistryCenter.remove("/" + jobName);
}
}
public void pauseJob(String jobName) {
JobScheduleController jobScheduleController = JobRegistry.getInstance().getJobScheduleController(jobName);
if (jobScheduleController != null) {
jobScheduleController.pauseJob();
}
}
public void addJobToBean(ElasticJobPO elasticJobPO, final SimpleJob simpleJob) {
simpleJobScheduler(elasticJobPO.getJobName(), simpleJob, elasticJobPO.getCron(), elasticJobPO.getShardingTotalCount(), elasticJobPO.getShardingItemParameters()).init();
}
public boolean start(String jobName) {
JobScheduleController jobScheduleController = JobRegistry.getInstance().getJobScheduleController(jobName);
if (jobScheduleController != null) {
jobScheduleController.triggerJob();
return true;
}
return false;
}
2.7 示例
@Test
public void add(ElasticJobPO defaultJob) {
elasticJobHandler.addJob(defaultJob.getJobName(), defaultTaskJob, defaultJob.getCron(),
defaultJob.getShardingTotalCount(), defaultJob.getShardingItemParameters());
}
3、问题描述
问题1:
暂停、和删除任务有的时候会无效; (可以通过运维平台进行调试)。
研究了源码和它是采用zookeeper实现分布式协调的本质,采用删除节点的方式可以解决。
zookeeperRegistryCenter.remove("/" + jobName);
问题2:
官网3.0X下载的运维监控平台的版本不能兼容2.X的
elasticjob-console-2.1.6 链接: https://pan.baidu.com/s/1FKP7AmO58GwIsRD2dfRa6w 密码: l60p
账号密码:root root 
以上就是在项目开发过程使用遇到的一些问题,如果大家有其他发现,欢迎留言一起讨论。
|