一、线程池
ThreadPoolExecutor构造方法的参数
将线程池比作一个公司,每个线程比作一个员工
- corePoolSize: 核心线程的数目(一旦创建,永不销毁,正式员工,不会被辞退)
- maximumPoolSize:核心线程和非核心线程的总数目(非核心线程:一段时间不工作就销毁,临时工:摸鱼太久就滚蛋)
- keepAliveTime:允许非核心线程不工作的时间(允许临时工摸鱼的时间)
- unit:控制keepAliveTime的单位(分,秒,毫秒或者其他值)
- workQueue:阻塞队列,组织了线程要执行的任务
- ThreadFactory:线程的创建方式
- RejectedExecutionHandler:拒绝策略 (当我们的阻塞队列满了的时候又来了一个新的线程)
AbortPolicy() 抛出异常
CallerRunsPolice() 调用者处理
DiscardOldestPolicy() 丢弃队列中最老的任务
DiscardPolicy() 丢弃最新来的任务
Executors
由于ThreadPoolExecutors使用起来比较复杂,标准库又提供了一组Executors类,这个类相当于封装的ThreadPoolExecutors。这个类相当于一个工厂类,可以创建出不同风格的线程池实例。
1.newFixedThreadPool:创建出一个固定线程数量的线程池。(没有临时工版本) 2.newCachedThreadPool: 创建一个线程数量可变的线程池(没有正式工版本) 3.newSingleThreadPool:创建出一个只包含一个线程的线程池 4.newScheduleThreadPool:能够设定延时的线程池(插入的线程可以等一会儿执行)
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadExecutors {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(10);
for ( int i = 0; i < 20; i++) {
int finalI = i;
executorService.submit(new Runnable() {
@Override
public void run() {
System.out.println("任务" + finalI);
}
});
}
}
}
到底使用哪种方法建造我们的线程池呢? 答案很简单,复杂问题用复杂版本的,简单问题用简单版本的。
二、如何实现一个线程池
import javafx.concurrent.Worker;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
public class MyThreadPool {
static class Worker extends Thread{
private BlockingQueue<Runnable> blockingQueue=null;
public Worker(BlockingQueue<Runnable> blockingQueue) {
this.blockingQueue=blockingQueue;
}
@Override
public void run() {
while(true){
try {
Runnable command=blockingQueue.take();
command.run();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
static class ThreadPool{
private BlockingQueue<Runnable> blockingQueue= new LinkedBlockingDeque<>();
List<Thread> workers = new ArrayList<>();
private static final int MIX_WORKER_COUNT=10;
public void submit(Runnable command) throws InterruptedException {
if(workers.size()<MIX_WORKER_COUNT){
Worker worker = new Worker(blockingQueue);
worker.start();
workers.add(worker);
}
blockingQueue.put(command);
}
}
public static void main(String[] args) throws InterruptedException {
ThreadPool threadPool = new ThreadPool();
for (int i =0;i<100;i++) {
int finalI = i;
threadPool.submit(new Runnable() {
@Override
public void run() {
System.out.println(finalI);
}
});
}
}
}
三、常见的锁策略
1.乐观锁和悲观锁
乐观锁: 假设锁冲突的概率很低,甚至不会发生冲突,所以在数据提交更新的时候才会进行并发冲突检测,如果发生锁冲突,则返回错误信息,让用户决定如何去做。
乐观锁的一个重要功能就是要检测出数据是否发生冲突,我们需要引入一个‘“版本号”来进行解决,每次线程执行完操作的时候都将自己的版本号加一。 当有两个线程同时想修改一个数据的时候,只有提交的版本号大于当前主内存中存储的版本号才能执行更新。
悲观锁: 假设锁冲突的概率很高,每次拿数据都会被别人修改,所以每次在拿数据的时候都会加锁,这样别人想拿这个数据就要阻塞直到我用完。
2.读写锁
先清晰一个概念:
- 两个线程都只读一个数据,此时不涉及线程安全问题
- 两个线程一个读一个写,有线程安全问题
- 两个线程都写,有线程安全问题
读写锁就是把读操作和写操作区别开,在加锁期间额外表明加锁意图,读操作之间不互斥,写操作与任何人互斥。Java标准库提供了一个ReentrantReadWriteLock类实现读写锁 ReentrantReadWriteLock.ReadLock表示读锁,这个对象提供了lock() 和 unlock() 方法进行加解锁。 ReentrantReadWriteLock.WriteLock表示写锁,这个对象提供了lock() 和 unlock() 方法进行加解锁。
Synchronized不是读写锁,他没有读写的区分,一旦使用必定互斥。
3.重量级锁&轻量级锁
重量级锁: 加锁机制依赖操作系统提供的mutex,它大量的内核态用户态切换导致操作成本非常高!
轻量级锁: 加锁机制尽量不使用mutex,尽量在用户态使用代码完成,实在搞定不了再使用mutex。
Synchronized开始是一个轻量级锁,如果锁冲突较为严重就会变成重量级锁。
4.自旋锁&挂起等待锁
挂起等待锁: 线程在抢锁失败之后会进入阻塞队列,需要过很久才能再次被调度
自旋锁: 如果获取锁失败,立即再次尝试获取锁,无限循环,直到获取到锁为止,一旦锁被其他线程释放,就能立刻获取到锁,节省了操作系统调度线程的开销。 伪代码:
while(抢锁(lock)==失败){}
自旋锁是一种典型的轻量级锁的实现方式。
优点:没有放弃CPU,不涉及线程阻塞和调度,能快速获得锁 缺点:如果其他线程持有锁时间太久,会早场CPU资源的浪费(挂起等待不消耗CPU资源)。
5. 公平锁&非公平锁
假设三个线程同时竞争一把锁,1线程竞争成功,然后2线程尝试获取,获取失败而进入阻塞等待。3线程又来获取,仍然获取失败进入阻塞等待。当1线程释放锁的时候会发生如下情况:
公平锁: 遵循先来后到,2比3先来的,所以当1释放了锁之后2比3先拿到锁。
非公平锁: 不遵循先来后到,2和3都有可能获得锁
操作系统内部的线程调度可以视为是随机的,如果不加其他限制就是非公平锁,如果要实现公平锁就要依赖额外的数据结构来记录线程的先后顺序。
Synchronized是非公平锁
5. 重入锁&不可重入锁
不可重入锁: 如果一个线程对一个对象1加锁,在这个对象内部又再次对2进行加锁,这时因为1的锁没有释放2就会阻塞等待,但是1必须等2执行完才能释放锁,此时就会陷入死锁局面。。。。。
可重入锁: 某个线程已经获得了某个锁,再次加锁不会死锁。 原理是在锁中加入记录该锁持有者的身份,和一个计数器,如果发现当前加锁对象是锁的持有对象,就让计数器自增1。
四、CAS
CAS意为Compare And Swap(比较并交换),相当于一个原子性操作,同时完成“读取内存,比较,修改内存”这三步,本质上需要cpu的指令支撑。
1.CAS的应用
实现原子类: 在Java内部包含了许多基于CAS实现的类,例如AtomicInteger类,其中的getAndIncrement相当于原子性的i++操作!
- 假设线程1先执行CAS操作,由于oldvalue和value都是0,所以直接对value进行赋值(自增1)
- CAS直接读写内存,并且他的读写比较是一条指令,是原子性的
- 自增完之后格局变为了这样
- 当线程2进来之后发现oldvalue和value不相等,于是把value的值赋给了oldvalue此时格局又变了
- 当线程2再来的时候,发现oldvalue和value的值相同了,这时候value又自增1。
- 线程12返回各自的oldvalue值。
这就实现了一个原子类,即使不使用重量级的锁也可以完成多线程的自增操作!
2.CAS的ABA问题
ABA问题描述: 假设有AB两个线程有一个共享变量num,num的值为100,A要使用CAS把num的值改为50
- 此时A线程先读取num的值存到oldNum中
- 然后再用oldNum和num做对比,如果值还是100就变为50
此时出现了一个问题! B线程进来捣乱了,B线程把num的值从100变为了50又变为了100。 线程A无法确定这个num变量始终是100还是经过一系列变化又变成了100。
ABA问题的解决方案: 加入版本号! 给修改的值引入版本号,在CAS比较数据的新旧值的同时比对版本号。
- 在CAS读取数据的时候,如果当前数据的版本号和之前读的版本号相同,则进行修改并让版本号+1。
- 如果当前的数据版本号(3)高于之前读的版本号,则操作失败。
|