IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> Java多线程百万数据快速入库实战 -> 正文阅读

[Java知识库]Java多线程百万数据快速入库实战

出自:

腾讯课堂 700多分钟干货实战Java多线程高并发高性能实战全集 , 我学习完了之后,我做了个笔记.

背景

某应用程序(单台服务器,非分布式的多台服务器),这单台服务器就是你的笔记本电脑了,
?

并发产生100万条数据,这100w条数据是你自己产生的,假设你是架构师,如何运用多线程等基础知识将这100万条数据,快速同步(4分钟以内)到MySQL数据库?
?

分析百万数据快速入库的特点

1.百万数据快速入库的特点:
数据量比较大(高并发),时间很短(性能),
100万条数据如果一条一条的插入到数据库的话,时间是很慢的,所以我们采用批量的方式插入,每次分一两万, 分多个批次,并行的插入到数据库里面.
这就是用并发编程的方式去解决高并发高性能的问题
2.百万数据如何在短时间内入库?如何从架构角度优化性能?
应用程序怎么优化呢? 可以采用并发编程的形式,比如说多线程,线程池去提升性能
在数据连接池这层,我们可以调优,让它的并发量更高,提高数据库连接池的整体性能.
?

代码

Producer

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

public class Producer {

    public static void main(String[] args) {
        Producer.createData();
    }

    public static void createData() {
        ExecutorService pool = Executors.newFixedThreadPool(100);
        final int totalPageNo = 50; //分50批次

        final int pageSize = 20000; //每页大小是2万条
        //共10w条数据,每页5000条数据,20个线程
        final long start = System.currentTimeMillis();
        final AtomicInteger atomicInt = new AtomicInteger();
        for (int currentPageNo = 0; currentPageNo < totalPageNo; currentPageNo++) {
            final int finalCurrentPageNo = currentPageNo;

            Runnable run = new Runnable() {

                @Override
                public void run() {
                    List userList = new ArrayList<>();
                    for (int i = 1; i <= pageSize; i++) {
                        int id = i + finalCurrentPageNo * pageSize;
                        User user = new User();
                        user.setId(id);
                        user.setName("huanglaoxie:" + id);
                        userList.add(user);
                    }

                    atomicInt.addAndGet(UserBatchHandler.batchSave(userList, Thread.currentThread().getName()));
                    //入库的数据达到一百万条的时候就会有个统计.
                    if (atomicInt.get() == (totalPageNo * pageSize)) {
                        //如果有一百万的时候.就会在这里有个结果
                        System.out.println("同步数据到db,它已经花费 " + ((System.currentTimeMillis() - start) / 1000) + "  秒");
                    }

                }
            };
            try {
                Thread.sleep(5);
            } catch (InterruptedException e) {

                e.printStackTrace();
            }
            pool.execute(run);
        }

    }

}

User

import java.sql.Timestamp;

public class User {
    private int id;
    private String name;
    private Timestamp createdTime;
    private Timestamp updatedTime;

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Timestamp getCreatedTime() {
        return createdTime;
    }

    public void setCreatedTime(Timestamp createdTime) {
        this.createdTime = createdTime;
    }

    public Timestamp getUpdatedTime() {
        return updatedTime;
    }

    public void setUpdatedTime(Timestamp updatedTime) {
        this.updatedTime = updatedTime;
    }
}

UserBatchHandler

 

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.List;

public class UserBatchHandler {


    public static int batchSave(List userList, String threadName)  {
        String insertSql ="INSERT INTO user(id,name,createdTime,updatedTime) VALUES(?,?,sysdate(),sysdate())";
        //取得发送sql语句的对象
        PreparedStatement pst = null;
        User  user;
        int[] count = new int[0];
        Connection conn = null;
        try {
            conn= DataSourceUtils.getConnection();
            pst = conn.prepareStatement(insertSql);

            long start=System.currentTimeMillis();
            if(null!=userList&&userList.size()>0){
                for(int i=0;i<userList.size();i++){
                    user= (User) userList.get(i);
                    pst.setInt(1,user.getId());
                    pst.setString(2,user.getName());
                    //加入批处理
                    pst.addBatch();
                }

                count= pst.executeBatch();
                System.out.println(count.length);
                System.out.println(" threadName为"+threadName+", sync data to db, it  has spent " +(System.currentTimeMillis()-start)+"  ms");
        }
        } catch (SQLException e) {
            e.printStackTrace();
        }finally {
            //4. 释放资源
            DataSourceUtils.close(conn, pst);
        }

        //获取到数据更新的行数
        return count.length;
    }
}

DataSourceUtils

 

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;

public class DataSourceUtils {

    public static void main(String[] args){
      Connection conn=  DataSourceUtils.getConnection();
      System.out.println("conn is  :  "+conn);
    }

    //创建一个成员变量
    private static DataSource ds;

    /**
     * 加载的代码写在静态代码块中
     */
    static {
        try {
            Properties info = new Properties();
            //加载类路径下,即src目录下的druid.properties这个文件
            info.load(DataSourceUtils.class.getResourceAsStream("/druid.properties"));

            //读取属性文件创建连接池
            ds = DruidDataSourceFactory.createDataSource(info);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


    /**
     * 得到数据源
     */
    public static DataSource getDataSource() {
        return ds;
    }

    /**
     * 得到连接对象
     */
    public static Connection getConnection() {
        try {
            return ds.getConnection();
        } catch (SQLException e) {
            throw new RuntimeException(e);
        }
    }


    /**
     * 释放资源
     */
    public static void close(Connection conn, Statement stmt, ResultSet rs) {
        if (rs!=null) {
            try {
                rs.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
        if (stmt!=null) {
            try {
                stmt.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
        if (conn!=null) {
            try {
                conn.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    }


    public static void close(Connection conn, Statement stmt) {
        close(conn, stmt, null);
    }


}

druid.properties

# 配置连接池的参数
initialSize=50
maxActive=200
maxWait=600000
minIdle=5



driverClassName=com.mysql.jdbc.Driver
url=jdbc:mysql://zjj101:3306/test?useUnicode=true&characterEncoding=utf8&serverTimezone=UTC&useSSL=false
username=root
password=root

sql脚本

CREATE TABLE `test`.`user` (
  `id` INT NOT NULL,
  `name` VARCHAR(45) NULL,
  `createdTime` timestamp NULL,
  `updatedTime` timestamp NULL,
  PRIMARY KEY (`id`))
COMMENT = '用户测试表';

ALTER TABLE `test`.`user`
ADD INDEX `index` (`id` ASC);


SELECT count(*) FROM test.user;
# delete  from test.user;
SELECT *  FROM test.user  order by  id desc;

操作说明:

  1. 执行sql脚本
    2.执行Producer类即可
    ?

其它优化:

池技术为什么能提升性能?

连接池:
tomcat连接池,数据库连接池等等,通过复用连接来减少创建和释放连接的时间来提升性能.
线程池:
线程池和连接池也是一样的,通过复用连接来减少创建和释放连接的时间来提升性能.
?

druid数据库连接池性能调优

 # 配置连接池的参数
 initialSize=50
 # 连接池的最大数据库连接数。设为0表示无限制。
 maxActive=200
 # 最大建立连接等待时间。如果超过此时间将接到异常。设为-1表示无限制。
 maxWait=600000
 # 连接池中的最小空闲连接数,Druid会定时扫描连接池的连接,如果空闲的连接数大于该值,则关闭多余的连接,反之则创建更多的连接以满足最小连接数要求。
 minIdle=5
 

MySQL的核心参数优化

配置 “my.cnf” 文件里面的innodb_thread_concurrency 的配置,这个是调整线程的并发数 ,配置完了别忘了重启MySQL服务
?

当 innodb_thread_concurrency=12
?

执行程序结果:
我测试一下,第一次是 79秒 第二次88秒 第三次92秒, 不知道为什么 一次比一次多了.
?

当innodb_thread_concurrency=32的时候
我测试了一下第一次是67秒 ,第二次是62秒
?

另外,其它参数也可以修改:
innodb_buffer_pool_size 参数
max_allowed_packet 参数配置

代码Git地址

https://gitee.com/zjj19941/mutil-thread.git

看case1

  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2021-10-14 12:45:25  更:2021-10-14 12:45:35 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/23 22:14:11-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码