一、elastic-job入门
1.1、Elastic-Job介绍
我们可以通过https://shardingsphere.apache.org/elasticjob/
二、快速入门
2.1、环境搭建
- JDK1.7以上
- Maven3.0.4以上
- Zookeeper3.4.6以上
1.Zookeeper安装启动 2.创建Maven工程
2.1.1 导包
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-core</artifactId>
<version>2.1.5</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.20</version>
</dependency>
2.1.2.定义任务
核心:实现SimpleJob接口,重写execute方法
public class FileBackupJob implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
System.out.println("demo task job .............");
System.out.println("作业名称:"+shardingContext.getJobName());
System.out.println("作业任务ID:"+shardingContext.getTaskId());
System.out.println("分片总数:"+shardingContext.getShardingTotalCount());
System.out.println("作业自定义参数:"+shardingContext.getJobParameter());
System.out.println("分片项:"+shardingContext.getShardingItem());
System.out.println("作业分片参数:"+shardingContext.getShardingParameter());
}
}
2.1.3.任务调度
①配置zookeeper配置中心
ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration("127.0.0.1:2181",
"mbw-job-demo-quick-start");
CoordinatorRegistryCenter zookeeperRegistryCenter = new ZookeeperRegistryCenter(zookeeperConfiguration);
zookeeperRegistryCenter.init();
②配置任务
JobCoreConfiguration jobConfig = JobCoreConfiguration.newBuilder("quick-start-demo", "*/2 * * * * ?",1).build();
SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(jobConfig, DemoJob.class.getName());
③启动任务
new JobScheduler(zookeeperRegistryCenter, LiteJobConfiguration.newBuilder(simpleJobConfiguration).build()).init();
④测试效果:
三、工作原理
3.1、整体架构
-
App:应用程序,内部包含任务执行业务逻辑和Elastic-Job-Lite组件,其中执行任务需要实现ElasticJob接口完成与Elastic-Job-Lite组件的集成,并进行任务的相关配置。应用程序可启动多个实例,也就出现了多个任务执行实例。 -
Elastic-Job-Lite:Elastic-Job-Lite定位为轻量级无中心化解决方案,使用jar包的形式提供分布式任务的协调服务,此组件负责任务的调度,并产生日志及任务调度记录。无中心化,是指没有调度中心这一概念,每个运行在集群中的作业服务器都是对等的,各个作业节点是自治的、平等的、节点之间通过注册中心进行分布式协调。我们可以看到里面有Schedule trigger调度策略,当有多个实例去共同处理任务调度时,有一个Leader Election一个Leader的选举,当然作业还可以进行Shard分片等等。 -
Registry:以Zookeeper作为Elastic-Job的注册中心组件,存储了执行任务的相关信息。同时,Elastic-Job利用该组件进行执行任务实例的选举。 -
Console:Elastic-Job提供了运维平台,它通过读取Zookeeper数据展现任务执行状态,或更新Zookeeper数据修改全局配置。通过Elastic-Job-Lite组件产生的数据来查看任务执行历史记录。 应用程序在启动时,在其内嵌的Elastic-Job-Lite组件会向Zookeeper注册该实例的信息,并触发选举(此时可能已经启动了该应用程序的其他实例),从众多实例中选举出一个Leader,让其执行任务。当到达任务执行时间时,Elastic-Job-Lite组件会调用由应用程序实现的任务业务逻辑,任务执行后会产生任务执行记录。当应用程序的某一个实例宕机时,Zookeeper组件会感知到并重新触发leader选举。
3.2、zookeeper的作用
- Elastic-Job依赖ZooKeeper完成对执行任务信息的存储(如任务名称、任务参与实例、任务执行策略等);
首先我们之前执行的demo,其实zookeeper就已经将执行任务的信息存储在里面了,我们可以打开客户端大致看一下: 在命名空间下/任务/config保存了任务的信息,比如任务名称,class等等
- Elastic-Job依赖ZooKeeper实现选举机制,在任务执行实例数量变化时(如在快速上手中的启动新实例或停止实例),会触发选举机制来决定让哪个实例去执行该任务。
四.elasticjob整合springboot
①引入springboot及EJ结合spring的依赖,
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-spring</artifactId>
<version>2.1.5</version>
</dependency>
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>2.3.2</version>
</dependency>
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-core</artifactId>
<version>2.1.5</version>
</dependency>
② 配置文件 主要分为自定义注册中心配置数据和自定义任务相关配置数据
spring:
application:
name: ej-springboot
server:
port: 9090
job:
regCenter:
serverList: 127.0.0.1:2181
namespace: mbw-job-demo-springboot
demojob:
className: com.mbw.job.DemoJob
cron: "*/5 * * * * ?"
shardingTotalCount: 1
jobName: "demo-springboot"
hellojob:
className: com.mbw.job.HelloJob
cron: "*/5 * * * * ?"
shardingTotalCount: 1
jobName: "hello-springboot"
③ 配置类 注册中心配置类: 这里唯一不同的可能就是将之前的注册中心的init初始化方法:
zookeeperRegistryCenter.init();
放到了注解中
@Bean(initMethod = "init")
我们知道,spring在创建bean对象之后就会调用initMethod指定的初始化回调方法。
package com.mbw.job.config;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ZkConfig {
@Bean(initMethod = "init")
public ZookeeperRegistryCenter zookeeperRegistryCenter(@Value("${job.regCenter.serverList}") String serverList,
@Value("${job.regCenter.namespace}")String nameSpace){
ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration(serverList, nameSpace);
return new ZookeeperRegistryCenter(zookeeperConfiguration);
}
}
任务配置类: 该配置类用于DemoJob任务的配置,需要任务类类名,任务名,cron表达式,分片总数即可完成LiteJob配置。然后再通过LiteJobConfiguration和任务类,注册中心就可完成该任务类的所有配置。
package com.mbw.job.config;
import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.lite.api.JobScheduler;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import com.mbw.job.DemoJob;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DemoJobConfig {
@Value("${job.demojob.cron}")
private String corn;
@Value("${job.demojob.shardingTotalCount}")
private Integer shardingTotalCount;
@Value("${job.demojob.jobName}")
private String jobName;
@Value("${job.demojob.className}")
private String className;
@Bean
public DemoJob demoJob(){
return new DemoJob();
}
@Bean(initMethod = "init")
public JobScheduler demoJobScheduler(DemoJob demoJob, ZookeeperRegistryCenter zookeeperRegistryCenter){
return new SpringJobScheduler(demoJob,zookeeperRegistryCenter,
getLiteJobConfiguration(className,jobName,corn,shardingTotalCount));
}
private LiteJobConfiguration getLiteJobConfiguration(String jobClassName, String jobName, String cron, Integer shardingTotalCount){
JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration.newBuilder(jobName, cron, shardingTotalCount).build();
SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(jobCoreConfiguration, jobClassName);
return LiteJobConfiguration.newBuilder(simpleJobConfiguration).overwrite(true).build();
}
}
多个job采用多个配置即可: 比方该类就是HelloJob任务类的配置
package com.mbw.job.config;
import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.lite.api.JobScheduler;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import com.mbw.job.HelloJob;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class HelloJobConfig {
@Value("${job.hellojob.cron}")
private String corn;
@Value("${job.hellojob.shardingTotalCount}")
private Integer shardingTotalCount;
@Value("${job.hellojob.jobName}")
private String jobName;
@Value("${job.hellojob.className}")
private String className;
@Bean
public HelloJob helloJob(){
return new HelloJob();
}
@Bean(initMethod = "init")
public JobScheduler helloJobScheduler(HelloJob helloJob, ZookeeperRegistryCenter zookeeperRegistryCenter){
return new SpringJobScheduler(helloJob,zookeeperRegistryCenter,
getLiteJobConfiguration(className,jobName,corn,shardingTotalCount));
}
private LiteJobConfiguration getLiteJobConfiguration(String jobClassName, String jobName, String cron, Integer shardingTotalCount){
JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration.newBuilder(jobName, cron, shardingTotalCount).build();
SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(jobCoreConfiguration, jobClassName);
LiteJobConfiguration liteJobConfiguration = LiteJobConfiguration.newBuilder(simpleJobConfiguration).overwrite(true).build();
return liteJobConfiguration;
}
}
④任务 DemoJob
package com.mbw.job;
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import java.time.LocalDateTime;
public class DemoJob implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
System.out.println(LocalDateTime.now()+"=========================demo job start =========================");
System.out.println("作业名称:"+shardingContext.getJobName());
System.out.println("作业任务ID:"+shardingContext.getTaskId());
System.out.println("分片总数:"+shardingContext.getShardingTotalCount());
System.out.println("作业自定义参数:"+shardingContext.getJobParameter());
System.out.println("分片项:"+shardingContext.getShardingItem());
System.out.println("作业分片参数:"+shardingContext.getShardingParameter());
System.out.println("=========================demo job end! =========================");
}
}
HelloJob:
package com.mbw.job;
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import java.time.LocalDateTime;
public class HelloJob implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
System.out.println(LocalDateTime.now()+"=========================hello job start =========================");
System.out.println("hello:作业名称:"+shardingContext.getJobName());
System.out.println("hello:作业任务ID:"+shardingContext.getTaskId());
System.out.println("hello:分片总数:"+shardingContext.getShardingTotalCount());
System.out.println("hello:作业自定义参数:"+shardingContext.getJobParameter());
System.out.println("hello:分片项:"+shardingContext.getShardingItem());
System.out.println("hello:作业分片参数:"+shardingContext.getShardingParameter());
System.out.println("=========================hello job end! =========================");
}
}
启动后测试效果: 可以看到启动2个线程分别处理2个任务 zk节点信息:
五.作业分片
5.1 概念
作业分片是指任务的分布式执行,需要将一个任务拆分为多个独立的任务项,然后由分布式的应用实例分别执行某一个或几个分片项。 例如:Elastic-Job快速入门中文件备份的例子,现有2台服务器,每台服务器分别跑一个应用实例。为了快速的执行作业,那么可以将作业分成4片,每个应用实例个执行2片。作业遍历数据的逻辑应为:实例1查找text和image类型文件执行备份;实例2查找radio和vedio类型文件执行备份。如果由于服务器扩容应用实例数量增加为4,则作业遍历数据的逻辑应为:4个实例分别处理text、image、radio、video类型的文件。 可以看到,通过对任务合理的分片化,从而达到任务并行处理的效果,最大限度的提高执行作业的吞吐量。 分片顶与业务处理解耦 Elastic-Job并不直接提供数据处理的功能,框架只会将分片项分配至各个运行中的作业服务器,开发者需要自行处理分片项与真实数据的对应关系。 最大限度利用资源 建议将分片项设置为大于服务器的数量,最好是大于服务器倍数的数量,作业将会合理的利用分布式资源,动态的分配分片项。
5.2 demo代码
在springboot的基础之上,以person表,根据性别 sex(1:男,0:女)来分片,分2个sharding,启动2个实例来运行任务。
5.2.1 maven依赖
新增mysql驱动,mybatis-plus相关依赖。
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.4.3.1</version>
</dependency>
5.2.2 Person 实体类以及建表语句
建person表语句:
CREATE TABLE `person` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`name` varchar(255) DEFAULT NULL,
`sex` tinyint(2) DEFAULT NULL COMMENT '1:男 0:女',
`age` int(3) DEFAULT NULL,
`state` int(2) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1001 DEFAULT CHARSET=utf8;
Person实体类:
package com.mbw.job.pojo;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Person implements Serializable {
private int id;
private String name;
private int sex;
private int age;
private int state;
}
5.2.3 模拟业务实现
我们首先对mybatis进行相关配置 MybatisConfig
package com.mbw.job.config;
import com.baomidou.mybatisplus.extension.plugins.MybatisPlusInterceptor;
import com.baomidou.mybatisplus.extension.plugins.inner.OptimisticLockerInnerInterceptor;
import com.baomidou.mybatisplus.extension.plugins.inner.PaginationInnerInterceptor;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.annotation.EnableTransactionManagement;
@MapperScan("com.mbw.job.mapper")
@EnableTransactionManagement
@Configuration
public class MybatisConfig {
@Bean
public MybatisPlusInterceptor mybatisPlusInterceptor() {
MybatisPlusInterceptor mybatisPlusInterceptor = new MybatisPlusInterceptor();
mybatisPlusInterceptor.addInnerInterceptor(new OptimisticLockerInnerInterceptor());
mybatisPlusInterceptor.addInnerInterceptor(new PaginationInnerInterceptor());
return mybatisPlusInterceptor;
}
}
PersonMapper,只需要继承baseMapper即可
package com.mbw.job.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.mbw.job.pojo.Person;
import org.apache.ibatis.annotations.Mapper;
@Mapper
public interface PersonMapper extends BaseMapper<Person> {
}
PersonService 主要提供三个方法 第一个插入一次性插入1000条随机数据用以测试 第二个用来查询前5条数据 第三个用来将查询的数据的state从0修改为1
package com.mbw.job.service;
import com.mbw.job.pojo.Person;
import java.util.List;
public interface PersonService {
void insert();
void update(Person person);
List<Person> queryPersonBySex(Integer sex);
}
PersonServiceImpl
package com.mbw.job.service;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.mbw.job.mapper.PersonMapper;
import com.mbw.job.pojo.Person;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.util.List;
import java.util.Random;
@Service
@Slf4j
public class PersonServiceImpl implements PersonService {
@Resource
private PersonMapper personMapper;
@Override
@Transactional
public void insert() {
Random random = new Random();
for (int i = 0; i < 100; i++) {
Person randomPerson = Person.builder().name("mbw" + i)
.age(random.nextInt(100))
.sex(random.nextInt(2))
.state(0).build();
personMapper.insert(randomPerson);
}
}
@Override
@Transactional
public void update(Person person) {
UpdateWrapper<Person> personUpdateWrapper = new UpdateWrapper<>();
personUpdateWrapper.eq("id",person.getId()).set("state",1);
log.info("处理任务:name:{},id:{},age:{}",person.getName(),person.getId(),person.getAge());
personMapper.update(person,personUpdateWrapper);
}
@Override
public List<Person> queryPersonBySex(Integer sex) {
log.info("开始处理任务:sex:{}",sex==1?"男":"女");
QueryWrapper<Person> personQueryWrapper = new QueryWrapper<>();
personQueryWrapper.eq("sex", sex).eq("state", 0);
Page<Person> personPage = new Page<>(1,5);
Page<Person> personPageResult = personMapper.selectPage(personPage, personQueryWrapper);
log.info("查询条数:num:{}",personPageResult.getRecords().size());
return personPageResult.getRecords();
}
}
5.2.4 application.yml
在之前的基础上加一个任务类的配置以及mybatis和spring.datasource相关配置 端口在启动参数配置,方便启动多个进程实例。
spring:
application:
name: ej-springboot
datasource:
driver-class-name: com.mysql.jdbc.Driver
username: root
password: ***********
url: jdbc:mysql://localhost:3306/es_job?characterEncoding=UTF-8&useSSL=false
server:
port: ${port:9090}
job:
regCenter:
serverList: 127.0.0.1:2181
namespace: mbw-job-demo-springboot
demojob:
className: com.mbw.job.DemoJob
cron: "*/5 * * * * ?"
shardingTotalCount: 1
jobName: "demo-springboot"
hellojob:
className: com.mbw.job.HelloJob
cron: "*/5 * * * * ?"
shardingTotalCount: 1
jobName: "hello-springboot"
personJob:
className: com.mbw.job.PersonJob
cron: "*/5 * * * * ?"
shardingTotalCount: 2
jobName: "job-sharding-person"
shardingParameter: "0=0,1=1" #分片参数,前面是分片序号,后面是分片数据 此处0代表女 1代表男
mybatis-plus:
type-aliases-package: com.mbw.pojo
configuration:
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
mapper-locations: classpath*:/mapper/*.xml
global-config:
db-config:
logic-delete-value: 1
logic-not-delete-value: 0
5.2.5 具体任务实现
package com.mbw.job;
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import com.mbw.job.pojo.Person;
import com.mbw.job.service.PersonService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.util.List;
@Component
@Slf4j
public class PersonJob implements SimpleJob {
@Resource
private PersonService personService;
@Value("${server.port}")
private Integer port;
@Override
public void execute(ShardingContext context) {
String taskId = context.getTaskId();
String shardingParameter = context.getShardingParameter();
log.info("端口:{},定时任务开始:taskId:{},shardingParameter:{}", port, taskId, shardingParameter);
List<Person> people = personService.queryPersonBySex(Integer.valueOf(shardingParameter));
if (!CollectionUtils.isEmpty(people)) {
for (Person person : people) {
personService.update(person);
}
}
}
}
5.2.6 注册中心配置以及任务配置
注册中心配置类仍然与之前保持一致 我们再写一个对PersonJob的任务配置 其实与之前的一致,只是在最后的getLiteJobConfiguration()我们需要通过它的shardingItemParameters()去单独配置分片参数,通过该参数去进行分片。
package com.mbw.job.config;
import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.lite.api.JobScheduler;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import com.mbw.job.PersonJob;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class PersonJobConfig {
@Value("${job.personJob.cron}")
private String corn;
@Value("${job.personJob.shardingTotalCount}")
private Integer shardingTotalCount;
@Value("${job.personJob.shardingParameter}")
private String shardingParameter;
@Value("${job.personJob.jobName}")
private String jobName;
@Value("${job.personJob.className}")
private String className;
@Bean(initMethod = "init")
public JobScheduler helloJobScheduler(PersonJob personJob, ZookeeperRegistryCenter zookeeperRegistryCenter){
return new SpringJobScheduler(personJob,zookeeperRegistryCenter,
getLiteJobConfiguration(className,jobName,corn,shardingTotalCount));
}
private LiteJobConfiguration getLiteJobConfiguration(String jobClassName, String jobName, String cron, Integer shardingTotalCount){
JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration.newBuilder(jobName, cron, shardingTotalCount)
.shardingItemParameters(shardingParameter)
.build();
SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(jobCoreConfiguration, jobClassName);
return LiteJobConfiguration.newBuilder(simpleJobConfiguration).overwrite(true).build();
}
}
5.2.7 测试数据准备
完成后我们先在测试类完成测试随机插入100条数据
package com.mbw.job;
import com.mbw.job.service.PersonService;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import javax.annotation.Resource;
@SpringBootTest
public class testInsert {
@Resource
private PersonService personService;
@Test
public void testInsert(){
personService.insert();
}
}
5.2.8 启动参数配置
配置2个启动参数,端口分别为9090,9091
5.2.9 测试结果
可见9090端口实例处理的是sex为男的分片,9091处理的是女,达到预期效果。 zookeeper节点信息:可以看出有2个sharding 数据库完成效果:
|