IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> Spring Batch介绍及代码示例 -> 正文阅读

[Java知识库]Spring Batch介绍及代码示例

一、Spirng Batch介绍

来自官网原文机器翻译

Spring Batch是一个轻量级、全面的批处理框架,旨在支持开发对企业系统的日常操作至关重要的健壮批处理应用程序。Spring Batch构建在人们所期望的Spring框架的特征之上(生产力、基于pojo的开发方法和一般的易用性),同时使开发人员在必要时更容易访问和利用更高级的企业服务。Spring Batch不是一个调度框架。在商业和开源领域都有许多优秀的企业调度器(如Quartz、Tivoli、Control-M等)。它旨在与调度器协同工作,而不是替换调度器。

Spring Batch提供了可重用的功能,这些功能对于处理大量记录非常重要,包括日志记录/跟踪、事务管理、作业处理统计数据、作业重新启动、跳过和资源管理。它还提供更先进的技术服务和功能,通过优化和分区技术支持高容量和高性能批处理作业。Spring Batch可以用在简单的用例中(比如将文件读入数据库或运行存储过程),也可以用在复杂的、大容量的用例中(比如在数据库之间移动大容量的数据、转换数据等等)。大容量批处理作业可以以高度可伸缩的方式利用该框架来处理大量信息。

二、Spring Batch的结构

  • JobRepository:用来注册job的容器
  • JobLauncher:用来启动Job的接口
  • Job:实际执行的任务,包含一个或多个Step
  • Step:包含ItemReader、ItemProcessor和ItemWriter
  • ItemReader:用来读取数据的接口
  • ItemProcessor:用来处理数据的接口
  • ItemWriter: 用来输出数据的接口

三、使用Spring Batch的demo

  1. 引入依赖
<dependencies>
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-batch</artifactId>
	</dependency>
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-web</artifactId>
	</dependency>
	<!-- 数据库相关操作 -->
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-data-jpa</artifactId>
	</dependency>
	<dependency>
		<groupId>mysql</groupId>
		<artifactId>mysql-connector-java</artifactId>
		<scope>runtime</scope>
	</dependency>
	<dependency>
		<groupId>org.projectlombok</groupId>
		<artifactId>lombok</artifactId>
		<version>1.16.10</version>
	</dependency>
</dependencies>
  1. 配置
spring.datasource.url=jdbc:mysql://120.55.39.117:3306/springbatch?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai
spring.datasource.username=root
spring.datasource.password=xxx

#Java代码实体字段命名与数据库表结构字段之间的名称映射策略
spring.jpa.hibernate.naming.implicit-strategy=org.hibernate.boot.model.naming.ImplicitNamingStrategyLegacyJpaImpl
#下面配置开启后,会禁止将驼峰转为下划线
#spring.jpa.hibernate.naming.physical-strategy=org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl
spring.jpa.open-in-view=false
spring.jpa.properties.hibernate.enable_lazy_load_no_trans=true
# 控制是否可以基于程序中Entity的定义自动创建或者修改DB中表结构
spring.jpa.properties.hibernate.hbm2ddl.auto=update
# 控制是否打印运行时的SQL语句与参数信息
spring.jpa.show-sql=true

spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.type=com.zaxxer.hikari.HikariDataSource
spring.datasource.hikari.minimum-idle=10
spring.datasource.hikari.maximum-pool-size=20
spring.datasource.hikari.idle-timeout=600000
spring.datasource.hikari.max-life-time=1800000

spring.batch.job.enabled=false
  1. 建表

spring batch需要提前建一些表,建表语句在 org.springframework.batch.core包下schema-*.sql,如使用的mysql就是schema-mysql.sql。

或者CRTL+N搜索文件找到它。

找到后执行里面的DDL语句

在建一个student表用于后续操作

CREATE TABLE `student` (
	`id` int(11) NOT NULL AUTO_INCREMENT,
	`name` varchar(20) NOT NULL,
	`age` int(11) NOT NULL,
	`sex` varchar(20) NOT NULL,
	`address` varchar(100) NOT NULL,
	`cid` int(11) NOT NULL,
	PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=19 DEFAULT CHARSET=utf8;

insert into student values
(1, 'name1', 1, '男', '湖南',1),
(2, 'name2', 2, '男', '湖南',2),
(3, 'name3', 3, '男', '湖南',3),
(4, 'name4', 4, '男', '湖南',4),
(5, 'name5', 5, '男', '湖南',5),
(6, 'name6', 6, '男', '湖南',6),
(7, 'name7', 7, '男', '湖南',7),
(8, 'name8', 8, '男', '湖南',8),
(9, 'name9', 9, '男', '湖南',9);
  1. 配置Job
  • 创建实体
package com.wq.batch.model;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import javax.persistence.*;

@Data
@Entity
@Table(name = "student")
@NoArgsConstructor
@AllArgsConstructor
public class Student {
	@Id
	@GeneratedValue(strategy = GenerationType.IDENTITY)
	private Integer id;
	
	private String name;
	
	private Integer age;
	
	private String sex;
	
	private String address;
	
	private Integer cid;
}
  • 配置Job

参考第二点的图片,在Job里需要定义Step,而在Step里又需要对定义读数据、处理数据、写数据的抽象。

package com.wq.batch.task;

import com.wq.batch.model.Student;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JpaPagingItemReader;
import org.springframework.batch.item.database.orm.JpaNativeQueryProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.persistence.EntityManagerFactory;

@Component
@Slf4j
public class DataBatchJob {

    private final JobBuilderFactory jobBuilderFactory;

    private final StepBuilderFactory stepBuilderFactory;

    private final EntityManagerFactory entityManagerFactory;

    private final JobListener jobListener;

    @Autowired
    public DataBatchJob(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory, EntityManagerFactory entityManagerFactory, JobListener jobListener) {
        this.jobBuilderFactory = jobBuilderFactory;
        this.stepBuilderFactory = stepBuilderFactory;
        this.entityManagerFactory = entityManagerFactory;
        this.jobListener = jobListener;
    }

    public Job dataHandleJob() {
        return jobBuilderFactory
                .get("dataHandleJob")
                .incrementer(new RunIdIncrementer())
                .start(handleDateStep())
                .listener(jobListener)
                .build();


    }

    private Step handleDateStep() {
        return stepBuilderFactory.get("getData")
                .<Student, Student>chunk(100)   //每次取100条处理
                .faultTolerant()
                .retryLimit(3)  // 每次查3条
                .retry(Exception.class)
                .skipLimit(100)
                .skip(Exception.class)
                .reader(getDataReader())
                .processor(getDataProcessor())
                .writer(getDataWriter())
                .build();

    }

    private ItemWriter<? super Student> getDataWriter() {
        return list -> {
            for(Student student : list) {
                log.info("write data : {}", student.toString());
            }
        };
    }

    private ItemProcessor<? super Student,? extends Student> getDataProcessor() {
        return student -> {
            log.info("process data : {}", student.toString());
            return student;
        };
    }

    private ItemReader<? extends Student> getDataReader() {
        JpaPagingItemReader<Student> reader = new JpaPagingItemReader<>();
        try {
            JpaNativeQueryProvider<Student> queryProvider = new JpaNativeQueryProvider<>();
            queryProvider.setSqlQuery("select * from student");
            queryProvider.setEntityClass(Student.class);
            queryProvider.afterPropertiesSet();

            reader.setEntityManagerFactory(entityManagerFactory);
            reader.setPageSize(3);
            reader.setQueryProvider(queryProvider);
            reader.afterPropertiesSet();

            reader.setSaveState(true);
        } catch (Exception e) {
            log.error("error message: {}", e);
        }
        return reader;
    }
}
  • 配置线程池
package com.wq.batch.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

@Configuration
public class ExecutorConfiguration {

    @Bean
    public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        threadPoolTaskExecutor.setCorePoolSize(50);
        threadPoolTaskExecutor.setMaxPoolSize(200);
        threadPoolTaskExecutor.setQueueCapacity(1000);
        threadPoolTaskExecutor.setThreadNamePrefix("data-job");

        return threadPoolTaskExecutor;
    }
}
  • 配置Job监视器

批处理作业在执行前后会调用监听器的方法,这样我们就可以根据实际的业务需求在作业执行的前后进行一些日志的打印或者逻辑处理等

package com.wq.batch.task;

import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;

/**
 * 实现一个作业的监听器,批处理作业在执行前后会调用监听器的方法,这样我们就可以根据实际的业务需求在作业执行的前后进行一些日志的打印或者逻辑处理等
 */
@Slf4j
@Component
public class JobListener implements JobExecutionListener {

    private final ThreadPoolTaskExecutor threadPoolTaskExecutor;
    private long startTime;

    @Autowired
    public JobListener (ThreadPoolTaskExecutor threadPoolTaskExecutor) {
        this.threadPoolTaskExecutor = threadPoolTaskExecutor;
    }

    @Override
    public void beforeJob(JobExecution jobExecution) {
        startTime = System.currentTimeMillis();
        log.info("job before {}" , jobExecution.getJobParameters());
    }

    @Override
    public void afterJob(JobExecution jobExecution) {
        log.info("job status : {}", jobExecution.getStatus());
        if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
            log.info("job finished");
            threadPoolTaskExecutor.destroy();
        } else if (jobExecution.getStatus() == BatchStatus.FAILED) {
            log.info("job failed");
        }
        log.info("job cost time : {} ms", System.currentTimeMillis() - startTime);
    }
}
  • 配置定时器来启动任务

这里暂定5分钟运行一次

package com.wq.batch.task;

import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.*;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class TimeTask {
    private final JobLauncher jobLauncher;
    private final DataBatchJob dataBatchJob;

    public TimeTask(JobLauncher jobLauncher, DataBatchJob dataBatchJob) {
        this.jobLauncher = jobLauncher;
        this.dataBatchJob = dataBatchJob;
    }

    @Scheduled(cron = "0 0/5 * * * ?")
    public void runBatch () throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
        log.info("corn job execute start...");
        JobParameters jobParameters = new JobParametersBuilder()
                .addLong("timestamp", System.currentTimeMillis())
                .toJobParameters();

        Job job = dataBatchJob.dataHandleJob();
        JobExecution execution = jobLauncher.run(job, jobParameters);
        log.info("corn job execute end..., exit status : {}", execution.getStatus());
    }
}
  • 最后注意在启动类上加上注解
@SpringBootApplication
@EnableJpaAuditing
@EnableScheduling
@EnableBatchProcessing
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}
  • 启动测试
2022-09-16 20:10:16.895  INFO 38900 --- [      data-job9] com.wq.batch.task.DataBatchJob           : process data : Student(id=323, name=chen323, age=323, sex=男, address=湖南, cid=323)
2022-09-16 20:10:16.895  INFO 38900 --- [      data-job9] com.wq.batch.task.DataBatchJob           : process data : Student(id=334, name=chen334, age=334, sex=男, address=湖南, cid=334)
2022-09-16 20:10:16.895  INFO 38900 --- [      data-job9] com.wq.batch.task.DataBatchJob           : process data : Student(id=344, name=chen344, age=344, sex=男, address=湖南, cid=344)
2022-09-16 20:10:16.897  INFO 38900 --- [      data-job9] com.wq.batch.task.DataBatchJob           : write data : Student(id=2, name=name2, age=2, sex=男, address=湖南, cid=2)
2022-09-16 20:10:16.897  INFO 38900 --- [      data-job9] com.wq.batch.task.DataBatchJob           : write data : Student(id=3, name=name3, age=3, sex=男, address=湖南, cid=3)
2022-09-16 20:10:16.897  INFO 38900 --- [      data-job9] com.wq.batch.task.DataBatchJob           : write data : Student(id=11, name=chen11, age=11, sex=男, address=湖南, cid=11)

上述只是一个简单的demo。

在测试时,发现10条数据太少了,想批量插入一些数据进行处理,查询发现可以使用存储过程进行批量数据插入。

delimiter //   -- 把界定符改成双斜杠
CREATE PROCEDURE BatchInsert(IN init INT, IN loop_time INT)  -- 第一个参数为初始ID号(可自定义),第二个位生成MySQL记录个数
BEGIN
DECLARE Var INT;
DECLARE ID INT;
SET Var = 0;
SET ID = init;
WHILE Var < loop_time DO
insert into student(id, name, age, sex, address, cid) values (ID, CONCAT('chen', ID), ID, '男', '湖南',ID);
SET ID = ID + 1;
SET Var = Var + 1;
END WHILE;
END;
//
delimiter ;  -- 界定符改回分号
CALL BatchInsert(10, 20000)

总结

本文主要讲述了Spring Batch的功能,处理流程及代码示例,通过本文能对Spring Batch有一个初步了解。想详细了解Spirng Batch建议查询官网。Spring Batch

  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2022-09-21 00:12:55  更:2022-09-21 00:18:47 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/23 9:00:10-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码