并发编程
并发和并行的区别
并发:CPU单核,多个线程操作同一资源,线程间切换快,同一时间只有一个线程执行 并行:CPU多核,多个线程同时执行。
Callable简介
callable和runable的区别就是其可以有返回值,可以抛出异常。其次,callable的启动方法是call方法,runable的启动方法是run方法。
实现了Callable接口的类,若要用Thread类启动,需要用FutureTask去适配,因为FutureTask实现了Runable接口。如下所示
class MyThread implements Callable<String>{
@Override
public String call() throws Exception {
return "callable";
}
}
public static void main(String args[]){
FutureTask futureTask=new FutureTask(new MyThread());
new Thread(futureTask).start();
String s = (String)futureTask.get();
}
若多个线程同时将FutureTask作为参数启动时,只有第一个线程会执行call方法,执行完毕后将返回值保存在futureTask的outcome中,其他的线程会直接从outcome中获取返回值。
Lcok锁(java.util.concurrent.locks.Lock)
ReentrantLock是该接口的实现类。该类可以创建公平锁和非公平锁,默认为公平锁。公平锁就是先来后到,所有线程都有机会获得锁,不会饿死,由于所有线程都会经历阻塞态,因此唤醒阻塞线程的开销会很大;非公平锁就是所有的线程拼运气,谁运气好,谁就获取到锁,可以减少CPU唤醒线程的开销,整体的吞吐效率会高,但缺点是可能会有线程长时间甚至永远获取不到锁,导致饿死。
- lock():获取锁,不可以被打断
- unlock():释放锁
- lockInterruptibly():获取可打断锁,阻塞时可以被其他线程打断
- tryLock(Time):尝试获取锁,指定时间内获取不到的话结束阻塞,阻塞状态可以被打断
synchronized 和 lock锁的区别
-
synchronized 是java内置的关键字,lock是一个java类 -
synchronized 无法获取锁的状态,lock 可以判断是否获得了锁 -
synchronized 会自动释放锁,lock不会自动释放锁,需要自己手动释放锁,如果不释放锁,会造成死锁 -
synchronized 线程一(获得锁,然后锁阻塞了),线程二(等待,然后还是傻傻的等待),lock锁就不一定会等待下去 -
synchronized 可重入锁,不可以被中断,非公平,lock 可重入锁,可判断锁,可以自行设置是否为公平锁 -
synchronized 适合少量的代码同步问题,lock适合大量的代码同步问题
使用Lock实现生产者消费者模型
- 编写资源类
class Data{
private Lock lock=new ReentrantLock();
Condition condition = lock.newCondition();
private int number=1;
public void increment() throws InterruptedException {
lock.lock();
while (number!=0){
condition.await();
}
number++;
System.out.print("加法,number="+number);
condition.signalAll();
lock.unlock();
}
public void decrement() throws InterruptedException {
lock.lock();
while (number==0){
condition.await();
}
number--;
System.out.print("减法,number="+number);
condition.signalAll();
lock.unlock();
}
}
此处资源类中的两个同步方法中,不用if用while的原因可以通过一个例子来理解。拿两个加法线程A、B来说,比如A先执行,执行时调用了wait方法,那它会等待,此时会释放锁,那么线程B获得锁并且也会执行wait方法,两个加线程一起等待被唤醒。此时减线程中的某一个线程执行完毕并且唤醒了这俩加线程,那么这俩加线程不会一起执行,其中A获取了锁并且加1,执行完毕之后B再执行。如果是if的话,那么A修改完num后,B不会再去判断num的值,直接会给num+1。如果是while的话,A执行完之后,B还会去判断num的值,因此就不会执行。
- 编写生产者和消费者
public static void main(String[] args) {
Data2 data=new Data2();
new Thread(()->{
for (int i=1;i<=10;i++){
try {
data.increment();
System.out.println(Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"生产者1").start();
new Thread(()->{
for (int i=1;i<=10;i++){
try {
data.increment();
System.out.println(Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"生产者2").start();
new Thread(()->{
for (int i=1;i<=10;i++){
try {
data.decrement();
System.out.println(Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"消费者1").start();
new Thread(()->{
for (int i=1;i<=10;i++){
try {
data.decrement();
System.out.println(Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"消费者2").start();
}
使用传统方法实现顺序执行线程
- 创建打印类
class MyPrint{
private int flag;
private int loopNumber;
public MyPrint(int flag, int loopNumber) {
this.flag = flag;
this.loopNumber = loopNumber;
}
public void print(String str, int flag,int nextFlag){
for (int i=0;i<loopNumber;i++){
synchronized (this){
while (flag!=this.flag){
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.print(str);
this.flag=nextFlag;
this.notifyAll();
}
}
}
}
- 测试
public static void main(String[] args) {
MyPrint waitSet=new MyPrint(1,10);
new Thread(()->{
waitSet.print("a",1,2);
},"t1").start();
new Thread(()->{
waitSet.print("b",2,3);
},"t2").start();
new Thread(()->{
waitSet.print("c",3,1);
},"t3").start();
}
使用Lock实现顺序执行线程
- 创建资源类
class Data{
private int number=1;
private Lock lock=new ReentrantLock();
Condition condition1 = lock.newCondition();
Condition condition2 = lock.newCondition();
Condition condition3 = lock.newCondition();
public void printA() throws InterruptedException {
lock.lock();
while (number!=1){
condition1.await();
}
number++;
System.out.println("printA:"+number);
condition2.signal();
lock.unlock();
}
public void printB() throws InterruptedException {
lock.lock();
while (number!=2){
condition2.await();
}
number++;
System.out.println("printB:"+number);
condition3.signal();
lock.unlock();
}
public void printC() throws InterruptedException {
lock.lock();
while (number!=3){
condition3.await();
}
number=1;
System.out.println("printB:"+number);
condition1.signal();
lock.unlock();
}
}
- 测试
@Test
public void test(){
Data data=new Data();
new Thread(()->{
try {
for (int i=0;i<5;i++){
data.printA();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
new Thread(()->{
try {
for (int i=0;i<5;i++){
data.printB();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
new Thread(()->{
try {
for (int i=0;i<5;i++){
data.printC();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
synchronized理解锁
当同步方法是普通方法时,锁定是对象,当同步方法是静态方法时,锁的是类。 因此,一个对象的两个普通同步方法不会同时执行,先获得锁的执行完毕后另一个才会执行。详情可以参考8锁现象。
8锁现象(狂神说)
JAVA内存模型(JMM)
java内存模型定义了多线程读写共享数据时,对数据的可见性、有序性和原子性的规则和保障。
synchronized: 既可以保证可见性,也可以保证原子性,因此,线程中有同步方法时,那么可以防止去工作缓存中获取变量的值,保证了可见性
volatile: 它可以用来修饰成员变量和静态变量,可以避免线程从自己的工作缓存中查找变量的值,必须到主存中获取它的值,线程操作volatile变量都是直接操作主存。保证可见性,但不保证原子性,适用于一个写线程,多个读线程的情况。其次,volatile可以避免指令重排。因此用双重锁实现单例模式时,可以在静态对象上加一个volatile关键字,这样就不会造成指令重排(因为初始化的语句和给静态变量赋值的语句可能会指令重排,造成未初始化完就把对象返回了),使得返回的对象可能是完整的对象。
保证可见性的原理:在加了volatile修饰的变量的读操作之前会加读屏障,使得之后的代码中的变量都是从主存中读取的,并且不会发生指令重排;写操作之后会加写屏障,使得写屏障之前的变量都是要写入主存的,并且不会发生指令重排
happens-before规则: 规定了哪些写操作对于其他读操作可见,他是可见性与有序性的一套规则总结。
- 一个线程对volatile变量的写对接下来要访问该变量的读操作是可见的
- 一个线程解锁之前对变量进行了写,该变量对接下来加锁(与解锁的对象为同一对象)的其他线程是可见的。
- 线程t2对变量进行了写操作,线程t1打断线程t2,对于其他得知线程t2被打断的线程,可以访问当线程t2修改的那个变量
- 线程start前对变量的写对该线程可见
无锁并发
CAS: 体现了一种乐观锁的思想,当fi中的旧值和变量值相等时,会将变量修改为结果值
while(true){
int 旧值=共享变量
int 结果=旧值+1
if(compareAndSwap(旧值,结果)){
}
}
缺点:
- 循环会耗时
- 一次性只能保证一个共享变量的原子性
- ABA问题:即可能另一个线程将变量修改后又修改了回来,那么该操作对于本线程是不可见的
获取共享变量时,为了保证变量的可见性,需要使用volatile修饰,结合volatile和CAS可以实现无锁并发,适用于竞争不激烈,多核CPU的场景下。JUC中的原子类默认实现了自旋锁。
原子整数:
- incrementAndGet:自增,等价++i
- getAndIncrement:自增,等价于i++
- addAndGet(num):自增num
- get:得到值
- decrementAndGet:自增,等价–i
- getAndDecrement:自增,等价于i–
- getAndUpdate:修改
原子引用:保证引用的安全性
- AtomicReference:原子引用类,将引用传入进去,存在ABA问题
- AtomicMarkableReference:避免了ABA问题,用一个boolean值记录是否被更改过
- AtomicStampedReference:避免了ABA问题,构造方法加入了版本号,每有一个线程修改了原子引用后都会修改版本号
原子数组:保证数组每个元素的安全性
- AtomicIntegerArray
- AtomicLongArray
- AtomicReferenceArray
字段更新器:保证对象的属性是线程安全的
- AtomicReferenceFiledUpdater
- AtomicIntegerFiledUpdater
- AtomicLongFiledUpdater
对应的属性必须配合volatile
class Student{
volatile String name;
@Override
public String toString() {
return "Student{" +
"name='" + name + '\'' +
'}';
}
}
public static void main(String[] args) {
Student student=new Student();
AtomicReferenceFieldUpdater updater=AtomicReferenceFieldUpdater.newUpdater(Student.class,String.class,"name");
System.out.println(updater.compareAndSet(student,null,"asas"));
System.out.println(student);
}
累加器
AQS(AbstractQueuedSynchronizer)
实现的是阻塞式的锁,其中的state属性表示资源的状态,判断是否获取了锁。独占模式表示只允许一个线程访问,共享模式允许多个线程访问。提供了FIFO等待队列,类似于Monitor的EntryList;条件变量来实现等待、唤醒机制,支持多个条件变量,类似于Monitor的WaitSet。
- 获取锁:tryAcquire(arg):返回boolean,获取不到锁时可以入队阻塞,阻塞原理是park和unpark
- 释放锁:tryRelease(arg):让阻塞线程恢复运行
ReentrantLock原理:内部调用AQS的方法实现叫
Lock锁不可打断原理:即其他线程将其打断后,仍然会驻留在AQS队列中,等获得锁以后方能继续运行。而可打断锁的原理是被打断后会抛出异常
非公平锁原理:新来个线程直接抢占锁,不会检查AQS队列是否有等待的线程节点;而公平锁的话会检查AQS队列,如果有节点在排序的话
条件变量原理:创建一个ConditionObject对象,将要等待的线程以链表的形式接起来,然后线程释放锁(重入锁也直接减为0),接着AQS中的等待队列中的第一个线程竞争锁。当先线程调用signal唤醒线程时会将ConditionObject的第一个线程唤醒,调用signalAll时会将所有的线程唤醒,并将唤醒的线程加入到AQS的等待队列中
集合不安全
ArrayList和LinkedList是不安全的,多个线程同时添加的话会出异常(ConcurrentModificationException)。为了解决这种现象,可以有以下几种解决办法
- 用Vector替代ArrayList和LinkedList
- 用Collections的方法synchronizedList生成一个安全的list
- 用CopyOnWriteArrayList类替代ArrayList,与vector相比只有写的方法加了锁
HashSet和TreeSet也是不安全的,和ArrayList一样,也可以解决该问题。
- 用Collections的方法synchronizedSet生成一个安全的Set
- 用CopyOnWriteSet类替代HashSet和TreeSet
HashMap和TreeMap也是不安全的,解决方案如下。
- 用Collections的方法synchronizedMap生成一个安全的map
- 用ConcurrentHashMap替代HashMap和TreeMap
三大常用辅助类
-
CountDownLatch(减法计数器) 相当于一个计数器的功能,创建一个CountDownLatch对象后,构造函数要求赋予一个初值(数字),然后调用CountDownLatch的countDown()方法实现减1的操作。countDown还有await()方法,该方法用来判断countDown的值是否变为了0,如果变为0才会执行await()下面的代码。 CountDownLatch count=new CountDownLatch(6);
for (int i=0;i<6;i++){
new Thread(()->{
count.countDown();
}).start();
}
count.await();
-
CyclicBarrier(加法计数器) 相对于一个加法计数器,每调用一次await()方法,内部就会+1,直到加到设置的阈值,那么就会执行构造函数中的方法。 CyclicBarrier cyclicBarrier=new CyclicBarrier(7,()->{
System.out.println("加到7了,执行");
});
for (int i=0;i<7;i++){
new Thread(()->{
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
-
Semaphore(限制计数器) 同一时间只能让有限个线程获得资源,以下的例子就是在同一时间内只能有三个线程执行。在访问高峰时,让请求线程阻塞,高峰过去再释放即可,只适合限制单机限制线程数。 Semaphore semaphore=new Semaphore(3);
for (int i=1;i<=6;i++){
new Thread(()->{
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName());
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
semaphore.release();
}
},String.valueOf(i)).start();
}
ReadWriteLock读写锁
读写锁详情
BlockingDeque阻塞队列
队列满了之后,写操作会阻塞,队列空的话,读操作会阻塞。
方式 | 抛出异常 | 不抛异常 | 阻塞等待 | 超时等待 |
---|
入队 | add(e) | offer(e) | put(e) | offer(e,time,TimeUnit) | 出队 | remove(e) | poll(e) | take(e) | poll(e,time,TimeUnit) | 查看队首元素 | element() | peek() | - | - |
SynchronousQueue
这是BlockingDeque的一个实现类,该类不存储任何元素,相当于只能在存的时候就直接取,否则会造成阻塞。
BlockingQueue<String> queue=new SynchronousQueue<>();
new Thread(()->{
try {
queue.put("1");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
new Thread(()->{
try {
System.out.println(queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
线程池
线程池的好处:
- 降低资源你的消耗
- 提高响应的速度
- 方便管理
- 线程复用、可以控制最大并发数、管理线程
创建线程池的方法:
- Executors.newSingleThreadExecutor():创建只有一个线程的线程池,阻塞队列为LinkedBlockingQueue,无界队列。即时当前线程出异常,线程池会创建一个新的线程接着执行剩余任务
- Executors.newFixedThreadPool(5):创建固定数量线程的线程池
- Executors.newCachedThreadPool():创建任意的线程池,阻塞队列使用SynchronousQueue,当任务执行完毕后,空闲一分钟释放线程
这三个方法最终都调用了ThreadPoolExecutor(…)方法,如下
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
参数介绍
参数名称 | 说明 |
---|
corePoolSize | 核心线程数,核心线程会一直存活,即使没有任务需要执行 | maximumPoolSize | 线程池中能拥有最多线程数 | keepAliveTime | 空闲线程存活时间 | unit | 时间单位 | workQueue | 用于缓存任务的阻塞队列,当任务数量大于核心线程数加阻塞队列大小时会创建新的线程 | threadFactory | 线程池创建线程使用的工厂 | handler | 线程池对拒绝任务的处理策略 ,比如当任务数大于最大线程数+阻塞队列大小的时候会触发拒绝任务的处理。一共四种策略,分别为AbortPolicy(默认策略,会抛出异常)、CallerRunsPolicy(哪来的任务让哪里处理)、DiscardPolicy(丢掉任务,不抛出异常)、DiscardOldestPolicy(队列满了会尝试竞争,不会抛出异常) |
用线程池启动线程:
线程池对象.execute(()->{});
方法介绍:
- execute(Runnable):执行任务
- submit(futureTask):执行任务,返回值为future对象,通过get方法可以拿到线程中的返回值
- invokeAll(Collection<callable>):执行一组集合中的任务
- invokeAll(Collection<callable>,time,timeUnit):执行一组集合中的任务,带超时
- invokeAny(Collection<callable>):执行一组集合中的任务,返回最先执行完的线程返回值,其他线程停止
- shutdown():打断线程,没执行完的线程执行完自动结束
- shutdownNow:立刻结束所有线程
如何定义线程数的最大值:
- CPU密集型:电脑核数为几就是几,保持效率最高。获取cpu的核数:
Runtime.getRuntime().availableProcessors() - IO密集型:判断程序中耗IO的任务数量,让最大线程数大于这个值
传统延时任务:Timer,缺点是串行执行,如果在执行时其中某个任务失败,那么会导致后面的任务也执行失败
Timer timer=new Timer();
TimerTask timerTask1=new TimerTask() {
@Override
public void run() {
System.out.println("asas");
}
};
TimerTask timerTask2=new TimerTask() {
@Override
public void run() {
System.out.println("222");
}
};
timer.schedule(timerTask1,5000);
timer.schedule(timerTask2,5000);
线程池延时任务:克服了timer的缺点
ScheduledExecutorService pool= Executors.newScheduledThreadPool(2);
pool.schedule(()->{
System.out.println("hahah");
},1, TimeUnit.SECONDS);
pool.schedule(()->{
System.out.println("hahah");
},1, TimeUnit.SECONDS);
线程池定时任务
ScheduledExecutorService pool= Executors.newScheduledThreadPool(2);
pool.scheduleAtFixedRate(()->{
System.out.println("hahah");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
},1,2,TimeUnit.SECONDS);
pool.scheduleWithFixedDelay(()->{
System.out.println("hahah");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
},1,2,TimeUnit.SECONDS);
四大函数式接口
- 函数式接口:Function,实现apply方法,传入一个参数,返回一个参数
- 断定式接口:Predicate,实现test方法,传入一个参数,返回boolean类结果
- 消费型接口:Consumer,实现accept方法,只有输入参数,没有返回值
- 供给型接口:Supplier,实现get方法,没有参数,只有返回值
Stream流式计算
已知User类,有如下五个用户,返回id是偶数的、年龄大于22的、按照年龄排序、并且将用户名转化为大写、只输出第一个用户的用户名
User user1 = new User("1","user1",11);
User user2 = new User("2","user2",21);
User user3 = new User("3","user3",22);
User user4 = new User("4","user4",31);
User user5 = new User("6","user5",41);
List<User> users = Arrays.asList(user1, user2, user3, user4, user5);
users.stream()
.filter(user -> Integer.parseInt(user.getId()) % 2 == 0)
.filter(user->user.getAge()>22)
.sorted((u1,u2)-> u2.getAge()-u1.getAge())
.map((u)-> u.getName().toUpperCase())
.limit(1)
.forEach(System.out::println);
ForkJoin
Fork/Join框架是Java 7提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。Fork/Join框架要完成两件事情:
- 任务分割:首先Fork/Join框架需要把大的任务分割成足够小的子任务,如果子任务比较大的话还要对子任务进行继续分割
- 执行任务并合并结果:分割的子任务分别放到双端队列里,然后几个启动线程分别从双端队列里获取任务执行。子任务执行完的结果都放在另外一个队列里,启动一个线程从队列里取数据,然后合并这些数据。
ForkJoinTask
使用Fork/Join框架,首先需要创建一个ForkJoin任务。该类提供了在任务中执行fork和join的机制。通常情况下我们不需要直接集成ForkJoinTask类,只需要继承它的子类,Fork/Join框架提供了两个子类:
- RecursiveAction,用于没有返回结果的任务
- RecursiveTask,用于有返回结果的任务
class SumTask extends RecursiveTask<Long> {
static final int THRESHOLD = 20;
int start;
int end;
SumTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
if (end - start <= THRESHOLD) {
long sum = 0;
for (int i = start; i <= end; i++) {
sum += i;
}
return sum;
}
int middle = (end + start) / 2;
SumTask subTask1 = new SumTask(start, middle);
SumTask subTask2 = new SumTask(middle+1, end);
invokeAll(subTask1, subTask2);
Long subResult1 = subTask1.join();
Long subResult2 = subTask2.join();
return subResult1 + subResult2;
}
}
ForkJoinPool
ForkJoinTask需要通过ForkJoinPool来执行。任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务(工作窃取算法)
ForkJoinPool pool = new ForkJoinPool();
SumTask task=new SumTask(1,500);
Future<Long> result = pool.submit(task);
System.out.println(result.get());
异步回调CompletableFuture
拿callable接口来说,可以获取返回值,那么有没有一种方法可以异步的获取返回值呢?可以使用线程池的submit方法来获取future对象,然后调用get方法取值。
ExecutorService executor = Executors.newFixedThreadPool(4);
Callable<String> task = new Task();
Future<String> future = executor.submit(task);
String result = future.get();
但是以上的方法有所缺陷,因为get方法会引起主线程阻塞。从Java 8开始引入了CompletableFuture,它针对Future做了改进,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法。使用的模板如下:
CompletableFuture<String> cfQuery = CompletableFuture.supplyAsync(() -> {
return queryCode("中国石油");
});
CompletableFuture<Double> cfFetch = cfQuery.thenApplyAsync((code) -> {
return fetchPrice(code);
});
cfFetch.thenAccept((result) -> {
System.out.println("price: " + result);
});
cfFetch.exceptionally((e) -> {
e.printStackTrace();
return null;
});
其中:
- 创建一个CompletableFuture是通过CompletableFuture.supplyAsync()实现的,它需要一个实现了Supplier接口的对象,即只有返回值
- 继续一个CompletableFuture是通过thenApplyAsync实现的,它需要实现function类型接口,有参数有返回值。
- 完成时,CompletableFuture会调用Consumer对象
- 异常时,CompletableFuture会调用Function对象
任务合并:即将两个目的相同的任务合并为一个任务,其中的anyOf表示只要有一个执行成功即可,allOf表示每个都成功才可。语法如下:
CompletableFuture<Object> cfQuery = CompletableFuture.anyOf(cfQuery1, cfQuery2);
|