一、Spirng Batch介绍
来自官网原文机器翻译
Spring Batch是一个轻量级、全面的批处理框架,旨在支持开发对企业系统的日常操作至关重要的健壮批处理应用程序。Spring Batch构建在人们所期望的Spring框架的特征之上(生产力、基于pojo的开发方法和一般的易用性),同时使开发人员在必要时更容易访问和利用更高级的企业服务。Spring Batch不是一个调度框架。在商业和开源领域都有许多优秀的企业调度器(如Quartz、Tivoli、Control-M等)。它旨在与调度器协同工作,而不是替换调度器。
Spring Batch提供了可重用的功能,这些功能对于处理大量记录非常重要,包括日志记录/跟踪、事务管理、作业处理统计数据、作业重新启动、跳过和资源管理。它还提供更先进的技术服务和功能,通过优化和分区技术支持高容量和高性能批处理作业。Spring Batch可以用在简单的用例中(比如将文件读入数据库或运行存储过程),也可以用在复杂的、大容量的用例中(比如在数据库之间移动大容量的数据、转换数据等等)。大容量批处理作业可以以高度可伸缩的方式利用该框架来处理大量信息。
二、Spring Batch的结构
- JobRepository:用来注册job的容器
- JobLauncher:用来启动Job的接口
- Job:实际执行的任务,包含一个或多个Step
- Step:包含ItemReader、ItemProcessor和ItemWriter
- ItemReader:用来读取数据的接口
- ItemProcessor:用来处理数据的接口
- ItemWriter: 用来输出数据的接口
三、使用Spring Batch的demo
- 引入依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- 数据库相关操作 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.10</version>
</dependency>
</dependencies>
- 配置
spring.datasource.url=jdbc:mysql://120.55.39.117:3306/springbatch?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai
spring.datasource.username=root
spring.datasource.password=xxx
#Java代码实体字段命名与数据库表结构字段之间的名称映射策略
spring.jpa.hibernate.naming.implicit-strategy=org.hibernate.boot.model.naming.ImplicitNamingStrategyLegacyJpaImpl
#下面配置开启后,会禁止将驼峰转为下划线
#spring.jpa.hibernate.naming.physical-strategy=org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl
spring.jpa.open-in-view=false
spring.jpa.properties.hibernate.enable_lazy_load_no_trans=true
# 控制是否可以基于程序中Entity的定义自动创建或者修改DB中表结构
spring.jpa.properties.hibernate.hbm2ddl.auto=update
# 控制是否打印运行时的SQL语句与参数信息
spring.jpa.show-sql=true
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.type=com.zaxxer.hikari.HikariDataSource
spring.datasource.hikari.minimum-idle=10
spring.datasource.hikari.maximum-pool-size=20
spring.datasource.hikari.idle-timeout=600000
spring.datasource.hikari.max-life-time=1800000
spring.batch.job.enabled=false
- 建表
spring batch需要提前建一些表,建表语句在 org.springframework.batch.core包下schema-*.sql,如使用的mysql就是schema-mysql.sql。
或者CRTL+N搜索文件找到它。
找到后执行里面的DDL语句
在建一个student表用于后续操作
CREATE TABLE `student` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`name` varchar(20) NOT NULL,
`age` int(11) NOT NULL,
`sex` varchar(20) NOT NULL,
`address` varchar(100) NOT NULL,
`cid` int(11) NOT NULL,
PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=19 DEFAULT CHARSET=utf8;
insert into student values
(1, 'name1', 1, '男', '湖南',1),
(2, 'name2', 2, '男', '湖南',2),
(3, 'name3', 3, '男', '湖南',3),
(4, 'name4', 4, '男', '湖南',4),
(5, 'name5', 5, '男', '湖南',5),
(6, 'name6', 6, '男', '湖南',6),
(7, 'name7', 7, '男', '湖南',7),
(8, 'name8', 8, '男', '湖南',8),
(9, 'name9', 9, '男', '湖南',9);
- 配置Job
package com.wq.batch.model;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import javax.persistence.*;
@Data
@Entity
@Table(name = "student")
@NoArgsConstructor
@AllArgsConstructor
public class Student {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Integer id;
private String name;
private Integer age;
private String sex;
private String address;
private Integer cid;
}
参考第二点的图片,在Job里需要定义Step,而在Step里又需要对定义读数据、处理数据、写数据的抽象。
package com.wq.batch.task;
import com.wq.batch.model.Student;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JpaPagingItemReader;
import org.springframework.batch.item.database.orm.JpaNativeQueryProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.persistence.EntityManagerFactory;
@Component
@Slf4j
public class DataBatchJob {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final EntityManagerFactory entityManagerFactory;
private final JobListener jobListener;
@Autowired
public DataBatchJob(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory, EntityManagerFactory entityManagerFactory, JobListener jobListener) {
this.jobBuilderFactory = jobBuilderFactory;
this.stepBuilderFactory = stepBuilderFactory;
this.entityManagerFactory = entityManagerFactory;
this.jobListener = jobListener;
}
public Job dataHandleJob() {
return jobBuilderFactory
.get("dataHandleJob")
.incrementer(new RunIdIncrementer())
.start(handleDateStep())
.listener(jobListener)
.build();
}
private Step handleDateStep() {
return stepBuilderFactory.get("getData")
.<Student, Student>chunk(100) //每次取100条处理
.faultTolerant()
.retryLimit(3) // 每次查3条
.retry(Exception.class)
.skipLimit(100)
.skip(Exception.class)
.reader(getDataReader())
.processor(getDataProcessor())
.writer(getDataWriter())
.build();
}
private ItemWriter<? super Student> getDataWriter() {
return list -> {
for(Student student : list) {
log.info("write data : {}", student.toString());
}
};
}
private ItemProcessor<? super Student,? extends Student> getDataProcessor() {
return student -> {
log.info("process data : {}", student.toString());
return student;
};
}
private ItemReader<? extends Student> getDataReader() {
JpaPagingItemReader<Student> reader = new JpaPagingItemReader<>();
try {
JpaNativeQueryProvider<Student> queryProvider = new JpaNativeQueryProvider<>();
queryProvider.setSqlQuery("select * from student");
queryProvider.setEntityClass(Student.class);
queryProvider.afterPropertiesSet();
reader.setEntityManagerFactory(entityManagerFactory);
reader.setPageSize(3);
reader.setQueryProvider(queryProvider);
reader.afterPropertiesSet();
reader.setSaveState(true);
} catch (Exception e) {
log.error("error message: {}", e);
}
return reader;
}
}
package com.wq.batch.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@Configuration
public class ExecutorConfiguration {
@Bean
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(50);
threadPoolTaskExecutor.setMaxPoolSize(200);
threadPoolTaskExecutor.setQueueCapacity(1000);
threadPoolTaskExecutor.setThreadNamePrefix("data-job");
return threadPoolTaskExecutor;
}
}
批处理作业在执行前后会调用监听器的方法,这样我们就可以根据实际的业务需求在作业执行的前后进行一些日志的打印或者逻辑处理等
package com.wq.batch.task;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
/**
* 实现一个作业的监听器,批处理作业在执行前后会调用监听器的方法,这样我们就可以根据实际的业务需求在作业执行的前后进行一些日志的打印或者逻辑处理等
*/
@Slf4j
@Component
public class JobListener implements JobExecutionListener {
private final ThreadPoolTaskExecutor threadPoolTaskExecutor;
private long startTime;
@Autowired
public JobListener (ThreadPoolTaskExecutor threadPoolTaskExecutor) {
this.threadPoolTaskExecutor = threadPoolTaskExecutor;
}
@Override
public void beforeJob(JobExecution jobExecution) {
startTime = System.currentTimeMillis();
log.info("job before {}" , jobExecution.getJobParameters());
}
@Override
public void afterJob(JobExecution jobExecution) {
log.info("job status : {}", jobExecution.getStatus());
if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
log.info("job finished");
threadPoolTaskExecutor.destroy();
} else if (jobExecution.getStatus() == BatchStatus.FAILED) {
log.info("job failed");
}
log.info("job cost time : {} ms", System.currentTimeMillis() - startTime);
}
}
这里暂定5分钟运行一次
package com.wq.batch.task;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.*;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class TimeTask {
private final JobLauncher jobLauncher;
private final DataBatchJob dataBatchJob;
public TimeTask(JobLauncher jobLauncher, DataBatchJob dataBatchJob) {
this.jobLauncher = jobLauncher;
this.dataBatchJob = dataBatchJob;
}
@Scheduled(cron = "0 0/5 * * * ?")
public void runBatch () throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
log.info("corn job execute start...");
JobParameters jobParameters = new JobParametersBuilder()
.addLong("timestamp", System.currentTimeMillis())
.toJobParameters();
Job job = dataBatchJob.dataHandleJob();
JobExecution execution = jobLauncher.run(job, jobParameters);
log.info("corn job execute end..., exit status : {}", execution.getStatus());
}
}
@SpringBootApplication
@EnableJpaAuditing
@EnableScheduling
@EnableBatchProcessing
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
2022-09-16 20:10:16.895 INFO 38900 --- [ data-job9] com.wq.batch.task.DataBatchJob : process data : Student(id=323, name=chen323, age=323, sex=男, address=湖南, cid=323)
2022-09-16 20:10:16.895 INFO 38900 --- [ data-job9] com.wq.batch.task.DataBatchJob : process data : Student(id=334, name=chen334, age=334, sex=男, address=湖南, cid=334)
2022-09-16 20:10:16.895 INFO 38900 --- [ data-job9] com.wq.batch.task.DataBatchJob : process data : Student(id=344, name=chen344, age=344, sex=男, address=湖南, cid=344)
2022-09-16 20:10:16.897 INFO 38900 --- [ data-job9] com.wq.batch.task.DataBatchJob : write data : Student(id=2, name=name2, age=2, sex=男, address=湖南, cid=2)
2022-09-16 20:10:16.897 INFO 38900 --- [ data-job9] com.wq.batch.task.DataBatchJob : write data : Student(id=3, name=name3, age=3, sex=男, address=湖南, cid=3)
2022-09-16 20:10:16.897 INFO 38900 --- [ data-job9] com.wq.batch.task.DataBatchJob : write data : Student(id=11, name=chen11, age=11, sex=男, address=湖南, cid=11)
上述只是一个简单的demo。
在测试时,发现10条数据太少了,想批量插入一些数据进行处理,查询发现可以使用存储过程进行批量数据插入。
delimiter // -- 把界定符改成双斜杠
CREATE PROCEDURE BatchInsert(IN init INT, IN loop_time INT) -- 第一个参数为初始ID号(可自定义),第二个位生成MySQL记录个数
BEGIN
DECLARE Var INT;
DECLARE ID INT;
SET Var = 0;
SET ID = init;
WHILE Var < loop_time DO
insert into student(id, name, age, sex, address, cid) values (ID, CONCAT('chen', ID), ID, '男', '湖南',ID);
SET ID = ID + 1;
SET Var = Var + 1;
END WHILE;
END;
//
delimiter ; -- 界定符改回分号
CALL BatchInsert(10, 20000)
总结
本文主要讲述了Spring Batch的功能,处理流程及代码示例,通过本文能对Spring Batch有一个初步了解。想详细了解Spirng Batch建议查询官网。Spring Batch
|