参考:关于mybatis批量插入大批量数据耗时解决
1 思路分析
批量插入这个问题,我们用 JDBC 操作,其实就是两种思路吧:
用一个 for 循环,把数据一条一条的插入(这种需要开启批处理 )。
生成一条插入 sql,类似这种 insert into user(username,address) values(‘aa’,‘bb’),(‘cc’,‘dd’) …。 到底哪种快呢?
我们从两方面来考虑这个问题:
先说第一种方案,就是用 for 循环循环插入:
这种方案的优势在于,JDBC 中的 PreparedStatement 有预编译功能,预编译之后会缓存起来,后面的 SQL 执行会比较快并且 JDBC 可以开启批处理,这个批处理执行非常给力。劣势在于,很多时候我们的 SQL 服务器和应用服务器可能并不是同一台,所以必须要考虑网络 IO,如果网络 IO 比较费时间的话,那么可能会拖慢 SQL 执行的速度。
再来说第二种方案,就是生成一条 SQL 插入:
这种方案的优势在于只有一次网络 IO,即使分片处理也只是数次网络 IO,所以这种方案不会在网络 IO 上花费太多时间。
当然这种方案有好几个劣势,一是 SQL 太长了,甚至可能需要分片后批量处理;二是无法充分发挥 PreparedStatement 预编译的优势,SQL 要重新解析且无法复用;三是最终生成的 SQL 太长了,数据库管理器解析这么长的 SQL 也需要时间。 所以我们最终要考虑的就是我们在网络 IO 上花费的时间,是否超过了 SQL 插入的时间?这是我们要考虑的核心问题。
2.数据测试
接下来我们来做一个简单的测试,批量插入 5 万条数据看下。
首先准备一个简单的测试表:
CREATE TABLE `user` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`username` varchar(255) DEFAULT NULL,
`address` varchar(255) DEFAULT NULL,
`password` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
接下来创建一个 Spring Boot 工程,引入 MyBatis 依赖和 MySQL 驱动,然后 application.properties 中配置一下数据库连接信息:
spring.datasource.username=root
spring.datasource.password=123
spring.datasource.url=jdbc:mysql:/
大家需要注意,这个数据库连接 URL 地址中多了一个参数 rewriteBatchedStatements,这是核心。``
MySQL JDBC 驱动在默认情况下会无视 executeBatch() 语句,把我们期望批量执行的一组 sql 语句拆散,一条一条地发给 MySQL 数据库,批量插入实际上是单条插入,直接造成较低的性能。将 rewriteBatchedStatements 参数置为 true, 数据库驱动才会帮我们批量执行 SQL。
OK,这样准备工作就做好了。
2.1 方案一测试 首先我们来看方案一的测试,即一条一条的插入(实际上是批处理)。
首先创建相应的 mapper,如下:
@Mapper
public interface UserMapper {
Integer addUserOneByOne(User user);
}
对应xml文件
<insert id="addUserOneByOne">
insert into user (username,address,password) values (#{username},#{address},#{password})
</insert>
service 如下:
@Service
public class UserService extends ServiceImpl<UserMapper, User> implements IUserService {
private static final Logger logger = LoggerFactory.getLogger(UserService.class);
@Autowired
UserMapper userMapper;
@Autowired
SqlSessionFactory sqlSessionFactory;
@Transactional(rollbackFor = Exception.class)
public void addUserOneByOne(List<User> users) {
SqlSession session = sqlSessionFactory.openSession(ExecutorType.BATCH);
UserMapper um = session.getMapper(UserMapper.class);
long startTime = System.currentTimeMillis();
for (User user : users) {
um.addUserOneByOne(user);
}
session.commit();
long endTime = System.currentTimeMillis();
logger.info("一条条插入 SQL 耗费时间 {}", (endTime - startTime));
}
}
补充说明:
虽然是一条一条的插入,但是我们要开启批处理模式(BATCH),这样前前后后就只用这一个 SqlSession,如果不采用批处理模式,反反复复的获取 Connection 以及释放 Connection 会耗费大量时间,效率奇低,这种效率奇低的方式不给大家测试了。
接下来写一个简单的测试接口看下:
@RestController
public class HelloController {
private static final Logger logger = getLogger(HelloController.class);
@Autowired
UserService userService;
@GetMapping("/user2")
public void user2() {
List<User> users = new ArrayList<>();
for (int i = 0; i < 50000; i++) {
User u = new User();
u.setAddress("广州:" + i);
u.setUsername("张三:" + i);
u.setPassword("123:" + i);
users.add(u);
}
userService.addUserOneByOne(users);
}
}
经过测试,耗时 901 毫秒,5w 条数据插入不到 1 秒。
2.2 方案二测试
方案二是生成一条 SQL 然后插入。
mapper 如下:
@Mapper
public interface UserMapper {
void addByOneSQL(@Param("users") List<User> users);
}
对应的sql如下:
<insert id="addByOneSQL">
insert into user (username,address,password) values
<foreach collection="users" item="user" separator=",">
(#{user.username},#{user.address},#{user.password})
</foreach>
</insert>
service 如下:
@Service
public class UserService extends ServiceImpl<UserMapper, User> implements IUserService {
private static final Logger logger = LoggerFactory.getLogger(UserService.class);
@Autowired
UserMapper userMapper;
@Autowired
SqlSessionFactory sqlSessionFactory;
@Transactional(rollbackFor = Exception.class)
public void addByOneSQL(List<User> users) {
long startTime = System.currentTimeMillis();
userMapper.addByOneSQL(users);
long endTime = System.currentTimeMillis();
logger.info("合并成一条 SQL 插入耗费时间 {}", (endTime - startTime));
}
}
然后在单元测试中调一下这个方法:
@Test
@Transactional
void addByOneSQL() {
List<User> users = new ArrayList<>();
for (int i = 0; i < 50000; i++) {
User u = new User();
u.setAddress("广州:" + i);
u.setUsername("张三:" + i);
u.setPassword("123:" + i);
users.add(u);
}
userService.addByOneSQL(users);
}
经过测试,可以看到插入 5 万条数据耗时 1805 毫秒。
可以看到,生成一条 SQL 的执行效率还是要差一点。
另外还需要注意,第二种方案还有一个问题,就是当数据量大的时候,生成的 SQL 将特别的长,MySQL 可能一次性处理不了这么大的 SQL,这个时候就需要修改 MySQL 的配置或者对待插入的数据进行分片处理了,这些操作又会导致插入时间更长。
对比分析 很明显,方案一更具优势。当批量插入十万、二十万数据的时候,方案一的优势会更加明显(方案二则需要修改 MySQL 配置或者对待插入数据进行分片)。
1采用线程池批量插入
这里的场景是一次性插入100w条数据
1.先看直接使用批量插入
实体类:
private int id;
private String name;
private String gender;
private int age;
private String qq;
private String email;
private String username;
private String password;
private String address;
2.mapper文件
@Mapper
@Repository
public interface UserMapper {
@Insert({
"<script>",
"insert into user(name,gender,age,address,qq,email,username,password) values ",
"<foreach collection='item' item='item' index='index' separator=','>",
"(#{item.name}, #{item.gender},#{item.age},#{item.address},#{item.qq},#{item.email},#{item.username},#{item.password})",
"</foreach>",
"</script>"
})
int addUserBatch(@Param(value="item") List<User> userList);
}
3.接口测试:
@Controller
public class ForEachController {
@Autowired
private UserMapper userMapper;
private static final Logger logger = LoggerFactory.getLogger(ForEachController.class);
@GetMapping("/foreach/test")
@ResponseBody
public String foreachJoin(){
List<User> userList = new ArrayList<>();
for (int i = 0; i < 1000000; i++) {
User user = new User("zhangsan","男",18,"123456@qq","125","sfsf","4354","fsfsdfs");
userList.add(user);
}
long startTime = System.currentTimeMillis();
userMapper.addUserBatch(userList);
long endTime = System.currentTimeMillis();
logger.info("执行5w条数据插入花费了{}毫秒",endTime-startTime);
return "ok";
}
}
执行结果如下图:
可以看到,这里花费了差不多快要5秒了,这样下去,如果几十万到百万数据插入只会更慢,因此可以采用多线程批量插入来进行优化
1.自定义线程池
@Configuration
@EnableAsync
public class ExecutorConfig {
@Bean
public Executor asyncServiceExecutor(){
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(400);
executor.setThreadNamePrefix("thread-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
}
2.线程池要干的事情,也就是Runnable
@Service
public class ThreadService {
@Autowired
private UserMapper userMapper;
@Async("asyncServiceExecutor")
public Integer asyncBatchAddUser(CountDownLatch countDownLatch, List<User> userList) {
try {
int count = userMapper.addUserBatch(userList);
if(count>0){
return Integer.valueOf(count);
}else{
return 0;
}
} catch (Exception e) {
System.out.println(e.getLocalizedMessage());
return 0;
}finally {
countDownLatch.countDown();
}
}
}
这里先对CountDownLtch 做一个简单介绍
CountDownLatch的 概念
CountDownLatch 是一个同步工具类,用来协调多个线程之间的同步,或者说起到线程之间的通信(而不是用作互斥的作用)。CountDownLatch 能够使一个线程在等待另外一些线程完成各自工作之后,再继续执行。使用一个计数器进行实现。计数器初始值为线程的数量。当每一个线程完成自己任务后,计数器的值就会减一。当计数器的值为0时,表示所有的线程都已经完成了任务,然后在CountDownLatch 上等待的线程就可以恢复执行任务。 CountDownLatch 的用法
CountDownLatch 典型用法1:某一线程在开始运行前等待n个线程执行完毕。将CountDownLatch 的计数器初始化为n —> new CountDownLatch(n) ,每当一个任务线程执行完毕,就将计数器减1 —> countdownlatch.countDown() ,当计数器的值变为0时,在CountDownLatch上 await() 之后的线程就会被唤醒。一个典型应用场景就是启动一个服务时,主线程需要等待多个组件加载完毕,之后再继续执行。CountDownLatch 典型用法2:实现多个线程开始执行任务的最大并行性。注意是并行性,不是并发,强调的是多个线程在某一时刻同时开始执行。类似于赛跑,将多个线程放到起点,等待发令枪响,然后同时开跑。做法是初始化一个共享的CountDownLatch(1) ,将其计数器初始化为1,多个线程在开始执行任务前首先 coundownlatch.await() ,当主线程调用 countDown() 时,计数器变为0,多个线程同时被唤醒。
3.测试接口
@Controller
@RequestMapping("/cd/test")
public class TestController {
@Autowired
private UserMapper userMapper;
@Autowired
private ThreadService threadService;
private CountDownLatch countDownLatch;
private static final Logger logger = LoggerFactory.getLogger(TestController.class);
@GetMapping("/join")
@ResponseBody
public String testJoin() throws InterruptedException {
List<User> userList = new ArrayList<>();
for (int i = 0; i < 50000; i++) {
User user = new User("zhangsan","男",18,"123456@qq","125","sfsf","4354","fsfsdfs");
userList.add(user);
}
int result = 0;
int total = userList.size();
int batchSize = 2000;
int number =total % batchSize == 0 ? total / batchSize :total / batchSize+1;
countDownLatch = new CountDownLatch(number);
long l1 = System.currentTimeMillis();
for(int i = 0;i<number;i++){
List<User> batchList = new ArrayList<>();
if(i== number-1){
batchList = userList.subList(i*batchSize,total);
}else{
batchList = userList.subList(i*batchSize,(i+1)*batchSize);
}
threadService.asyncBatchAddUser(countDownLatch,batchList);
}
countDownLatch.await();
long l2 = System.currentTimeMillis();
logger.info("主线程阻塞执行到此处花费了{}",l2-l1);
return "ok";
}
4.效果如下:
对比可以看到效率带来显著的提升,感兴趣的可以自己去测试一下
|