并发整体框架图
实现一个计数器(体现原子性)
方式1:使用count++
public class Counter{
private int count;
public void increase(){
count++;
}
问题 该代码可能在多线程条件下出现问题,因为count++是非原子的。 count++ 实际上等于三个操作:读数据,加1,写回数据。 解决 为了防止多线程下访问increase方法会报错,所以给increase方法加锁。 count变量修改了其他线程可能看不到,所以就加个volatile关键字吧。
方式2:加synchronized 和 volatile
public class LockCounter{
private volatile count;
public synchronized void increase(){
count++;
}
}
问题 加锁会影响效率,可以考虑使用原子操作类的形式。
方式3:AtomicInteger
public class AtomicCounter{
private AtomicInteger count = new AtomicInteger(0);
public void increase(){
count.incrementAndGet();
}
}
线程的并发和同步
区别 | 同步 | 异步 |
---|
概念 | 同步过程调用发出后,在没有得到结果之前,该调用不返回或不继续执行后续操作 多个线程同时访问同一个资源,一个线程访问结束之后,别的线程才能访问 | 异步过程调用后,调用者没有得到结果之前,就可以继续执行后续操作 调用结果,通过状态、通知和回调来通知调用者 | 操作 | 必须执行到底才能执行其他操作 | 操作可以同时执行 | 使用锁 | 多个线程使用同一把锁 | 多个线程在执行过程中使用不同的锁 |
Fork 和 Join
JUC中提供了Fork/Join的并行计算框架,用来处理分治的情况。 分治的思想 = 分而治之,把复杂的问题分解成相似的子问题,然后子问题再分子问题,知道问题分到很简单不必再划分为止,然后层层返回问题的结果。 分治分为两个阶段:分解任务 和 合并结果
第一个阶段:分解任务ForkJoinPool
public class ForkJoinPool extends AbstractExecutorService 把任务分解为一个个小任务直至小任务可以简单的计算返回结果;(Fork分解任务) ForkJoinPool治理分治任务的线程池,它有多个任务队列(区别于ThreadPoolExecutor线程池),通过ForkJoinPool的invoke、submit、execute提交任务的时候会根据一定规则分配给不同的任务队列(该队列是双端队列)。 ForkJoinPool 有一个机制,当某个工作线程对应消费的任务队列空闲的时候它会去别的忙的任务队列的尾部分担任务过来执行,这种执行又被称之为窃取线程。因为这个性质,所以采用双端队列。 窃取线程如何保证不和被窃取任务的线程冲突。队列的绑定工作线程都从队列头部取任务进行执行,窃取线程会从别的队列的尾部取任务执行。 提交任务方法
提交任务方法 | 说明 |
---|
< T > T invoke(ForkJoinTask< T > task) | 指定给定的任务,完成后返回其结果 同步,有返回结果(会阻塞) | Future< ? > submit(Runnable task) | 提交一个可运行的任务执行,返回一个表示该任务的Future 异步,有返回结果(Future<>) | void execute(Runnable task) | 在将来的某个时刻执行给定的命令 异步,无返回结果 |
构造方法 ForkJoinPool(int parallelism) —— 使用指定的并行级别创建一个ForkJoinPool,使用所有其他的参数的默认值。 完整的参数介绍如下:
参数 | 说明 |
---|
int parallelism | 并行级别 = 也就是设置最大并发数 默认值:Runtime.availableProcessors() | ForkJoinPool.ForkJoinWorkerThreadFactory factory | 创建新线程的工厂 默认值 defaultForkJoinWorkerThreadFactory | Thread.UncaughtExceptionHandler Handler | 由于执行任务时遇到不可恢复的错误而终止的内部工作线程的处理程序 默认值 null | boolean asyncMode | true = 为从未连接的分叉任务简历本地先进先出调度模式;默认值false = 基于本地堆栈的模式 | int corePoolSize | 核心线程数 = 保留在线程池中的线程数 ;默认值 = 并行级别数 | int maximumPoolSize | 允许的最大线程数;默认256 | int minimumRunnable | 未被连接组织的核心线程允许的最小数量;默认值是 1 | Predicate< ? super ForkJoinPool > saturate | 未被连接阻止的核心线程允许的最小数量;默认情况下,当一个线程即将被阻止连接或ForkJoinPool.ManagedBlocker,但由于超过maximumPoolSize不能被替换,因此抛出RejectExecutionException | long keepAliveTime | 在线程终止之前自上次使用以来经过的时间;默认值 60 | unit | keepAliveTime 参数的时间单位 |
双端队列 概念: 限定插入和删除操作在表的两端进行的线性表,两端分别称为端点1和端点2,具有队列和栈性质的数据结构。 普通队列:队列头部删除元素、队列尾部添加元素; 双端队列:队列头部添加元素、队列尾部删除元素。
第二个阶段:合并结果
把每个小任务的结果合并返回得到最终结果。(Join合并结果) ForkJoinTask,分治任务,等同于Runnable。 抽象类,核心方法是fork和join。fork用来异步执行一个子任务,join会阻塞当前线程等待子任务返回。 ForkJoinTask有两个子类:RecursiveAction 和 RecursiveTask 都是抽象类,通过递归来执行分治任务。 每个子类都有compute抽象方法(任务执行的主要计算量),区别在于RecursiveAction没有返回值、RecursiveTast有返回值。
代码思路
1 创建ForkJoinPool,用于执行分治任务的线程池,parallelism = 并行级别,并发线程数
ForkJoinPool forkjoinpool = new ForkJoinPool(int parallelism);
2 创建具体操作执行的类,需要继承ForkJoinTask类 或者其子类
public class ForkJoinTaskxhj类 extends RecursiveTask{}
3 创建ForkJoinTaskxhj类的对象
ForkJoinTaskxhj类 obj = new ForkJoinTaskxhj类(参数);
4 通过ForkJoinPool对象的invoke方法提交任务并执行
forkjoinpool.invoke(obj);
案例应用:斐波那契数列
斐波那契数列:1-1-2-3-5-8-13-21-34… 公式:F(1) = 1,F(2) = 1,F(n) = F(n-2) + F(n-1) (n>=3,n为正整数)
package offer2.Test52;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
public class Test2 {
public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool(4);
Fibonacci fibonacci = new Fibonacci(20);
long startTime = System.currentTimeMillis();
Integer result = forkJoinPool.invoke(fibonacci);
long endTime = System.currentTimeMillis();
System.out.println("Fork/join sum: " + result + " in " + (endTime - startTime) + " ms.");
}
static class Fibonacci extends RecursiveTask<Integer> {
final int n;
Fibonacci(int n) {
this.n = n;
}
@Override
protected Integer compute() {
if (n <= 1) {
return n;
}
Fibonacci f1 = new Fibonacci(n - 1);
f1.fork();
Fibonacci f2 = new Fibonacci(n - 2);
return f2.compute() + f1.join();
}
}
}
阻塞队列BlockingQueue
概念
- 阻塞队列与普通队列的区别:当队列是空的时候,从队列中获取元素的操作将会阻塞;当队列是满的,往队列里面添加元素的操作会被阻塞。
- 是java.util.concurrent包提供的接口,常用于多线程编程中容纳任务队列。
- 适用阻塞队列的好处:不需要关心什么阻塞线程、什么时候唤醒线程,阻塞队列都帮助我们考虑了。
常用实现类
类名 | 说明 |
---|
ArrayBlockingQueue | 数组结构组成的有界阻塞队列 | LinkedBlockingQueue | 链表结构组成的有界阻塞队列 | LinkedTransferQueue | 链表结构组成的无界阻塞队列,和SynchronousQueue类似,还含有非阻塞方式 | LinkedBlockingDeque | 链表解耦组成的双向阻塞队列 | SynchronousQueue | 不存储元素的阻塞队列,即直接提交给线程不保持它们 = 单个元素的队列 | PriorityBlockingQueue | 支持优先级排序的无界阻塞队列 | DelayQueue | 使用优先级队列实现的延迟无界阻塞队列 |
常用方法
方法类型 | 抛出异常 | 特殊值 | 阻塞 | 超时 |
---|
插入 | add(e) 阻塞队列慢,再插入元素抛出IllegalStateException:Queue full… | offer(e) 成功返回true,失败返回false | put(e) 阻塞队列满,生产线程持续执行put操作,队列会阻塞生产线程直到put数据成功或响应中断退出 | offer(e,time,unit) 阻塞队列满,队列会阻塞生产线程一定时间,超时后生产线程会退出 | 移除 | remove() 阻塞队列为空,执行remove操作会抛出NoSuchElementException | poll() 成功返回队列元素,队列没有则返回null | take() 阻塞队列为空,消费者线程从队列里take元素,队列会一直阻塞消费者线程直到队列可用 | poll() 当队列为空,队列会阻塞消费者线程一段时间,超过时限后消费者线程退出 | 检查 | element() 阻塞队列为空,执行element会抛出NoSuchElementException异常 | peek() 返回队列的第一个元素 | 不可用 | 不可用 |
LinkedBlockingQueue
- 使用LinkedBlockingQueue< E > 实现线程同步。
LinkedBlockingQueue< E > 是一个基于已连接节点的,范围任意的阻塞队列。 该队列按照先进先出排序元素。 队列的头部是在队列中时间最长的元素,队列的尾部是在队列中时间最短的元素,新元素插入到队列的尾部。 获取队列元素的操作只会获取头部元素,如果队列满了或者为空会进入阻塞状态。 - 常用方法:
方法名 | 说明 |
---|
LinkedBlockingQueue() | 创建一个容量为Integer.MAX_VALIE的LinkedBlockingQueue | put( E e ) | 在队尾添加一个元素,如果队列满则阻塞当前线程,直到队列有空位 | size() | 返回列表中的元素个数 | take() | 移除并返回头部元素,如果队列空则阻塞当前线程,直到取到元素为止 |
Future
概述
在JUC包中。 public interface Future< V > Future模式很好的解决了那些需要返回值的异步调用。 Future表示异步计算的结果。提供方法来检查计算是否完成,等待其完成,并检索计算结果。只有当计算完成之后,才能使用get方法检索结果;如果需要则阻塞,直到准备就绪。 Future模式本质上是代理模式的一种实际应用。 经常使用Future的场景:计算密集场景、处理大数据量、远程方法调用; 当执行一个长时间运行的任务时,使用Future就可以让我们暂时去执行其他任务,等长任务执行完毕后再返回其结果。
Future模式组成部分
组成 | 说明 |
---|
Main | 系统启动,调用Client发出请求 | Client | 返回Data对象,立即返回FutureData,并开启ClientThread线程装配RealData | Data | 返回数据的接口 | FutureData | Future数据,构造快,虚拟数据,需要装配RealData(订单) | RealData | 真实数据,构造慢(午餐) |
图示:
简单案例
package offer2.Test53.tast1;
public interface Dataxhj {
public String getResult();
}
public class RealDataXhj implements Dataxhj{
protected final String result;
public RealDataXhj(String para){
StringBuffer sb = new StringBuffer(para);
result = sb.toString();
}
@Override
public String getResult() {
return result;
}
}
public class FutureData implements Dataxhj {
protected RealDataXhj realDataXhj = null;
protected boolean isReady = false;
public synchronized void setRealDataXhj(RealDataXhj realDataXhj){
if(isReady){
return;
}
this.realDataXhj = realDataXhj;
isReady = true;
notifyAll();
}
@Override
public synchronized String getResult() {
while(!isReady){
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return realDataXhj.result;
}
}
public class Client {
public Dataxhj request(final String queryStr){
final FutureData future = new FutureData();
new Thread(){
@Override
public void run() {
RealDataXhj realDataXhj = new RealDataXhj(queryStr);
future.setRealDataXhj(realDataXhj);
}
}.start();
System.out.println(future.getResult());
return future;
}
}
public class Mainxhj {
public static void main(String[] args) {
Client client = new Client();
Dataxhj dataxhj = client.request("xhjname");
System.out.println("请求完毕");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
System.out.println("数据 = " + dataxhj.getResult());
}
}
问题:2022/5/3 |
---|
输出结果 dataxhj.getResult显示内容为空。 个人理解输出结果是:数据 = xhjname | 解决:RealData的构造方法并没有使用传入参数para,new StringBuilder(para) 改为这种形式就好了 |
java中的Future模式
JDK内部有一个Future接口,类似之前案例中的订单。 常用方法:
方法名 | 说明 |
---|
boolean cannel(boolean manInterruptIfRunning) | 尝试取消执行此任务 | V get() | 等待计算完成,然后检索其结果 | V get(long timeout,TimeUnit unit) | 如果需要等待最多在给定的时间计算完成,然后索引其结果 | boolean isCancelled() | 如果此任务在正常完成之前被取消,返回true | boolean isDone() | 如果此任务完成,则返回true |
案例:2022/5/3不是很理解
package offer2.Test53.tast1;
public interface Dataxhj {
public String getResult();
}
public class RealDataXhj implements Dataxhj, Callable {
protected final String result;
public RealDataXhj(String para){
StringBuffer sb = new StringBuffer(para);
result = sb.toString();
}
@Override
public String getResult() {
return result;
}
@Override
public Object call() throws Exception {
return result;
}
}
public class Test2 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(1);
final Future<?> future = executor.submit((Callable) new RealDataXhj("xhj"));
System.out.println("请求完毕,数据准备中");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("数据 = " + future.get());
}
}
happens-before
happens-before是JMM最核心的概念。
|