JUC学习
生产者和消费者问题
synchronized版本的生产者和消费者
package com.darlene;
public class demo01 {
public static void main(String[] args) {
Data2 data = new Data2();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"A").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"B").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"C").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"D").start();
}
}
class Data2{
private int number = 0;
public synchronized void increment() throws InterruptedException {
while(number != 0){
this.wait();
}
number++;
System.out.println(Thread.currentThread().getName()+"=>"+number);
this.notifyAll();
}
public synchronized void decrement() throws InterruptedException {
while(number == 0){
this.wait();
}
number--;
System.out.println(Thread.currentThread().getName()+"=>"+number);
this.notifyAll();
}
}
JUC版的生产者和消费者
await signal
package com.darlene;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class demo02 {
public static void main(String[] args) {
Data2 data = new Data2();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"A").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"B").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"C").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"D").start();
}
}
class Data2{
private int number = 0;
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
public void increment() throws InterruptedException {
lock.lock();
try {
while(number != 0){
condition.await();
}
number++;
System.out.println(Thread.currentThread().getName()+"=>"+number);
condition.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
public void decrement() throws InterruptedException {
lock.lock();
try {
while(number == 0){
condition.await();
}
number--;
System.out.println(Thread.currentThread().getName()+"=>"+number);
condition.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
任何一个新的技术,不仅仅只是覆盖了原来的技术,有优势和补充
Condition 精准的通知和唤醒线程,随机的状态,有序的执行
package com.darlene;
import javax.swing.plaf.basic.BasicSplitPaneUI;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class demo03 {
public static void main(String[] args) {
Data3 data = new Data3();
new Thread(()->{
for (int i = 0; i < 10; i++) {
data.printA();
}
},"A").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
data.printB();
}
},"B").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
data.printC();
}
},"C").start();
}
}
class Data3{
private Lock lock = new ReentrantLock();
Condition condition1 = lock.newCondition();
Condition condition2 = lock.newCondition();
Condition condition3 = lock.newCondition();
private int number = 1;
public void printA(){
lock.lock();
try {
while(number!=1){
condition1.await();
}
System.out.println(Thread.currentThread().getName()+"=>AAA");
number = 2;
condition2.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void printB(){
lock.lock();
try {
while(number!=2){
condition2.await();
}
System.out.println(Thread.currentThread().getName()+"=>BBB");
number = 3;
condition3.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void printC(){
lock.lock();
try {
while(number != 3){
condition3.await();
}
System.out.println(Thread.currentThread().getName()+"=>CCC");
number = 1;
condition1.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
8锁现象
如何判断锁的是谁,永远的直到什么锁,锁到底锁的是谁
对象、class
深刻理解我们的锁
package com.darlene.lock8;
import java.util.concurrent.TimeUnit;
public class Test1 {
public static void main(String[] args) throws InterruptedException {
Phone phone = new Phone();
new Thread(()->{
try {
phone.sendSms();
} catch (InterruptedException e) {
e.printStackTrace();
}
},"A").start();
TimeUnit.SECONDS.sleep(1);
new Thread(()->{
phone.call();
},"B").start();
}
}
class Phone{
public synchronized void sendSms() throws InterruptedException {
TimeUnit.SECONDS.sleep(4);
System.out.println("发短信");
}
public synchronized void call(){
System.out.println("打电话");
}
}
package com.darlene.lock8;
import java.util.concurrent.TimeUnit;
public class Test2 {
public static void main(String[] args) throws InterruptedException {
Phone2 phone1 = new Phone2();
Phone2 phone2 = new Phone2();
new Thread(()->{
try {
phone1.sendSms();
} catch (InterruptedException e) {
e.printStackTrace();
}
},"A").start();
TimeUnit.SECONDS.sleep(1);
new Thread(()->{
phone2.call();
},"B").start();
}
}
class Phone2{
public synchronized void sendSms() throws InterruptedException {
TimeUnit.SECONDS.sleep(4);
System.out.println("发短信");
}
public synchronized void call(){
System.out.println("打电话");
}
public void hello(){
System.out.println("hello");
}
}
package com.darlene.lock8;
import java.util.concurrent.TimeUnit;
public class Test3 {
public static void main(String[] args) throws InterruptedException {
Phone3 phone1 = new Phone3();
Phone3 phone2 = new Phone3();
new Thread(()->{
try {
phone1.sendSms();
} catch (InterruptedException e) {
e.printStackTrace();
}
},"A").start();
TimeUnit.SECONDS.sleep(1);
new Thread(()->{
phone2.call();
},"B").start();
}
}
class Phone3{
public static synchronized void sendSms() throws InterruptedException {
TimeUnit.SECONDS.sleep(4);
System.out.println("发短信");
}
public static synchronized void call(){
System.out.println("打电话");
}
}
package com.darlene.lock8;
import java.util.concurrent.TimeUnit;
public class Test4 {
public static void main(String[] args) throws InterruptedException {
Phone4 phone1 = new Phone4();
Phone4 phone2 = new Phone4();
new Thread(()->{
try {
phone1.sendSms();
} catch (InterruptedException e) {
e.printStackTrace();
}
},"A").start();
TimeUnit.SECONDS.sleep(1);
new Thread(()->{
phone2.call();
},"B").start();
}
}
class Phone4{
public static synchronized void sendSms() throws InterruptedException {
TimeUnit.SECONDS.sleep(4);
System.out.println("发短信");
}
public synchronized void call(){
System.out.println("打电话");
}
}
小结
new this 具体的一个手机
static class唯一的一个模板
集合类不安全
list不安全
package com.darlene.unsafe;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
public class ListTest {
public static void main(String[] args) {
List<String> list = new CopyOnWriteArrayList<>();
for (int i = 0; i < 40; i++) {
new Thread(()->{
list.add(UUID.randomUUID().toString().substring(0,5));
System.out.println("list="+list);
},String.valueOf(i)).start();
}
}
}
学习方法推荐:1.先会用 2,寻找其它解决方案 3.分析源码
Set 不安全
package com.darlene.unsafe;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArraySet;
public class SetTest {
public static void main(String[] args) {
Set<String> set = new CopyOnWriteArraySet<>();
for (int i = 0; i < 40; i++) {
new Thread(()->{
set.add(UUID.randomUUID().toString().substring(0,5));
System.out.println(set);
},String.valueOf(i)).start();
}
}
}
HashSet的底层是什么
HashMap
public HashSet() {
map = new HashMap<>();
}
public boolean add(E e) {
return map.put(e, PRESENT)==null;
}
IO 集合类 常用类
Map不安全
回顾Map的基本操作
package com.darlene.unsafe;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
public class MapTest {
public static void main(String[] args) {
ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();
for (int i = 1; i < 50; i++) {
new Thread(()->{
map.put(Thread.currentThread().getName(), UUID.randomUUID().toString().substring(0,5));
System.out.println(map);
},String.valueOf(i)).start();
}
}
}
public class ConcurrentHashMap<K,V>
extends AbstractMap<K,V>
implements ConcurrentMap<K,V>, Serializable支持检索的完全并发性和更新的高预期并发性的哈希表。 该类符合与Hashtable相同的功能规范,并包括与Hashtable每种方法对应的方法的Hashtable 。 不过,尽管所有操作都是线程安全的,检索操作并不意味着锁定,并没有为防止所有访问的方式锁定整个表的任何支持。 这个课程在程序中完全可以与Hashtable进行互操作,程序依赖于其线程安全性,但不依赖于其同步细节。
检索操作(包括get )通常不阻止,因此可能与更新操作重叠(包括put和remove )。 检索反映了最近完成的更新操作的结果。 (更正式地,对于给定密钥的更新操作熊之前发生与任何(非空关系)检索该键报告经更新的值。)对于聚合操作,比如putAll和clear ,并发检索可能反映插入或移除只有一些条目。 类似地,迭代器,分割器和枚举返回在反映迭代器/枚举创建过程中或之后反映哈希表状态的元素。 他们不抛出ConcurrentModificationException 。 然而,迭代器被设计为一次只能由一个线程使用。 请记住,骨料状态方法的结果,包括size , isEmpty ,并containsValue通常是有用的,只有当一个地图没有发生在其他线程并发更新。 否则,这些方法的结果反映了可能足以用于监视或估计目的的瞬态状态,但不适用于程序控制。
当存在太多的冲突(即,具有不同的哈希码但是以表的大小为模数落入相同的时隙的密钥)时,该表被动态扩展,并且每个映射保持大致两个bin的预期平均效果(对应于0.75负载调整大小的因子阈值)。 这个平均值可能会有很多变化,因为映射被添加和删除,但是总的来说,这为哈希表保留了普遍接受的时间/空间权衡。 然而,调整这个或任何其他类型的散列表可能是相对较慢的操作。 如果可能,最好提供一个大小估计作为可选的initialCapacity构造函数参数。 附加的可选的loadFactor构造函数参数提供了通过指定在计算给定数量的元素时要分配的空间量而使用的表密度来定制初始表容量的另一种方法。 此外,为了与此类的以前版本兼容,构造函数可以可选地指定预期的concurrencyLevel作为内部大小调整的附加提示。 请注意,使用完全相同的许多键hashCode()是降低任何哈希表的性能的一种可靠的方法。 为了改善影响,当按键为Comparable时 ,该类可以使用键之间的比较顺序来帮助打破关系。
甲Set投影一个的ConcurrentHashMap可以(使用被创建newKeySet()或newKeySet(int) ),或观察(使用keySet(Object)时仅键是感兴趣的,并且被映射的值是(可能瞬时)不使用或全部取相同的映射值。
ConcurrentHashMap可以通过使用LongAdder值作为可缩放频率映射(直方图或多分集的形式),并通过computeIfAbsent进行初始化。 例如,要添加一个计数到ConcurrentHashMap<String,LongAdder> freqs ,可以使用freqs.computeIfAbsent(key, k -> new LongAdder()).increment();
该类及其视图和迭代器实现了Map和Iterator接口的所有可选方法。
像Hashtable但不像HashMap ,此类不允许 null用作键或值。
ConcurrentHashMaps支持一系列顺序和并行批量操作,与大多数Stream方法不同,它们被设计为安全并且经常明智地应用,即使是由其他线程同时更新的映射; 例如,当计算共享注册表中的值的快照摘要时。 有三种操作,每种具有四种形式,接受键,值,条目和(键,值)对作为参数和/或返回值的函数。 由于ConcurrentHashMap的元素不以任何特定的方式排序,并且可能会在不同的并行执行中以不同的顺序进行处理,因此提供的函数的正确性不应该依赖于任何排序,或者任何其他可能瞬时变化的对象或值计算进行中; 除了每一个行动,理想情况下都是无副作用的。 对Map.Entry对象的批量操作不支持方法setValue 。
forEach:对每个元素执行给定的操作。 变量形式在执行操作之前对每个元素应用给定的变换。
search:返回在每个元素上应用给定函数的第一个可用非空结果; 当找到结果时跳过进一步的搜索。
reduce:累积每个元素。 提供的减少功能不能依赖于排序(更正式地,它应该是关联和交换)。 有五种变体:
平原减少 (由于没有相应的返回类型,因此(key,value)函数参数没有这种方法的形式)
映射的减少积累了应用于每个元素的给定函数的结果。
使用给定的基础值减少到标量双,长和int。
这些批量操作接受一个parallelismThreshold参数。 如果估计当前地图大小小于给定阈值,则方法依次进行。 使用值Long.MAX_VALUE抑制所有的并行性。 使用1的值通过分区到足够的子任务来完全利用用于所有并行计算的ForkJoinPool.commonPool()来实现最大并行度。 通常,您最初将选择其中一个极值,然后测量使用中间值之间的性能,从而降低开销与吞吐量之间的关系。
批量操作的并发属性遵循ConcurrentHashMap的并发属性:从get(key)返回的任何非空结果和相关的访问方法与关联的插入或更新之间发生的事件关系。 任何批量操作的结果反映了这些每个元素关系的组合(但是除非以某种方式已知静止),而且对于整个地图而言并不一定是原子的。 相反,因为映射中的键和值从不为空,所以null作为目前缺乏任何结果的可靠原子指标。 为了保持此属性,null用作所有非标量缩减操作的隐含基础。 对于double,long和int版本,基础应该是当与任何其他值组合时返回其他值(更正式地,它应该是减少的标识元素)。 最常见的减少有这些属性; 例如,使用基数0或最小值与基准MAX_VALUE计算和。
作为参数提供的搜索和转换函数应该类似地返回null以指示缺少任何结果(在这种情况下不被使用)。 在映射缩减的情况下,这也使得转换可以用作过滤器,如果不应该组合元素,返回null(或者在原始专业化的情况下,身份基础)。 在使用它们进行搜索或减少操作之前,您可以通过在“null意味着现在没有任何内容”规则下自行构建复合转换和过滤。
接受和/或返回Entry参数的方法维护键值关联。 例如,当找到最大价值的钥匙时,它们可能是有用的。 请注意,可以使用new AbstractMap.SimpleEntry(k,v)提供“plain”Entry参数。
批量操作可能突然完成,抛出在应用程序中遇到的异常。 在处理这样的异常时,请注意,其他并发执行的函数也可能引发异常,或者如果没有发生第一个异常,则会这样做。
与顺序形式相比,加速比是常见的,但不能保证。 如果并行计算的基础工作比计算本身更昂贵,则涉及小地图上的简短功能的并行操作可能比顺序形式执行得更慢。 类似地,如果所有处理器正忙于执行不相关的任务,并行化可能不会导致太多的实际并行。
所有任务方法的所有参数都必须为非空值。
这个类是Java Collections Framework的成员
Callable
Callable 接口类似于Runnable ,因为它们都是为其实例可能由另一个线程执行的类设计的。 A Runnable ,但是,不返回结果,也不能抛出被检查的异常。
- 可以有返回值
- 可以抛出异常
- 方法不同 run()/call()
代码测试
Interface Runnable
All Known Subinterfaces:
RunnableFuture<V> , RunnableScheduledFuture<V>
所有已知实现类:
AsyncBoxView.ChildState , ForkJoinWorkerThread , FutureTask , RenderableImageProducer , SwingWorker , Task , Thread , TimerTask
Class FutureTask<V>
java.lang.Object
java.util.concurrent.FutureTask<V>
参数类型
V - 该FutureTask的 get方法返回的结果类型
All Implemented Interfaces:
Runnable , Future<V> , RunnableFuture<V>
构造方法详细信息
FutureTask
public FutureTask?(Callable<V> callable)创建一个 FutureTask ,它将在运行时执行给定的 Callable 。
参数
callable - 可调用任务
异常
NullPointerException - 如果可调用为空
package com.darlene.callable;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
public class CallableTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
MyThread thread = new MyThread();
FutureTask futureTask = new FutureTask(thread);
new Thread(futureTask,"A").start();
new Thread(futureTask,"B").start();
String o = (String) futureTask.get();
System.out.println(o);
}
}
class MyThread implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println(call());
return "12";
}
}
- 结果会缓存,提高效率
- 结果可能需要缓存,会阻塞
常用的辅助类
CountDownLatch
public class CountDownLatch
extends Object允许一个或多个线程等待直到在其他线程中执行的一组操作完成的同步辅助。
A CountDownLatch用给定的计数初始化。 await方法阻塞,直到由于调用countDown()方法,当前计数达到零,此后所有等待线程被释放,并且任何后续的调用await立即返回。 这是一个一次性的现象 - 计数无法重置。 如果您需要重置计数的版本,请考虑使用CyclicBarrier 。
A CountDownLatch是一种通用的同步工具,可用于多种用途。 用CountDownLatch初始化的CountDownLatch用作简单的开/关锁存器,或者门:所有线程调用await在门口等待,直到被调用countDown()的线程打开 。 一个初始化为N的CountDownLatch可以用来使一个线程等待直到N个线程完成一些动作,或者一些动作已经完成N次。
CountDownLatch一个有用的属性是,它不需要线程调用countDown等待计数到达零,然后继续,它只是阻止任何线程通过await,直到所有线程可以通过。
package com.darlene.add;
import java.util.concurrent.CountDownLatch;
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(6);
countDownLatch.countDown();
for (int i = 1; i <= 6; i++) {
new Thread(()->{
System.out.println(Thread.currentThread().getName());
countDownLatch.countDown();
},String.valueOf(i)).start();
}
countDownLatch.await();
System.out.println("Close door");
}
}
原理:
countDownLatch.countDown(); //-1
countDownLatch.await(); //等待计数器归0,再向下执行
每次有线程调用countDown()数量-1,假设计数器变为0,countDownLatch.await()会被唤醒,然后继续执行
CyclicBarrier
public class CyclicBarrier
extends Object允许一组线程全部等待彼此达到共同屏障点的同步辅助。 循环阻塞在涉及固定大小的线程方的程序中很有用,这些线程必须偶尔等待彼此。 屏障被称为循环,因为它可以在等待的线程被释放之后重新使用。
A CyclicBarrier支持一个可选的Runnable命令,每个屏障点运行一次,在派对中的最后一个线程到达之后,但在任何线程释放之前。 在任何一方继续进行之前,此屏障操作对更新共享状态很有用。
加法计数器
package com.darlene.add;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierDemo {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(7,()->{
System.out.println("召唤神龙成功");
});
for (int i = 1; i <= 7; i++) {
final int temp = i;
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"收集了"+temp+"个龙珠");
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
}
}
Semaphore
信号量
public class Semaphore
extends Object
implements Serializable一个计数信号量。 在概念上,信号量维持一组许可证。 如果有必要,每个acquire()阻止许可证可用,然后取出。 每个release()都添加了一个许可证,潜在地释放一个阻塞获取方。 但是,没有使用实际的许可证对象; Semaphore只保留可用数量的计数,并相应地进行操作。
信号量通常用于限制线程数,而不是访问某些(物理或逻辑)资源。 例如,这是一个使用信号量来控制对一个项目池的访问的类:
class Pool { private static final int MAX_AVAILABLE = 100; private final Semaphore available = new Semaphore(MAX_AVAILABLE, true); public Object getItem() throws InterruptedException { available.acquire(); return getNextAvailableItem(); } public void putItem(Object x) { if (markAsUnused(x)) available.release(); } // Not a particularly efficient data structure; just for demo protected Object[] items = ... whatever kinds of items being managed protected boolean[] used = new boolean[MAX_AVAILABLE]; protected synchronized Object getNextAvailableItem() { for (int i = 0; i < MAX_AVAILABLE; ++i) { if (!used[i]) { used[i] = true; return items[i]; } } return null; // not reached } protected synchronized boolean markAsUnused(Object item) { for (int i = 0; i < MAX_AVAILABLE; ++i) { if (item == items[i]) { if (used[i]) { used[i] = false; return true; } else return false; } } return false; } } 在获得项目之前,每个线程必须从信号量获取许可证,以确保某个项目可用。 当线程完成该项目后,它将返回到池中,并将许可证返回到信号量,允许另一个线程获取该项目。 请注意,当调用acquire()时,不会保持同步锁定,因为这将阻止某个项目返回到池中。 信号量封装了限制对池的访问所需的同步,与保持池本身一致性所需的任何同步分开。
信号量被初始化为一个,并且被使用,使得它只有至多一个允许可用,可以用作互斥锁。 这通常被称为二进制信号量 ,因为它只有两个状态:一个许可证可用,或零个允许可用。 当以这种方式使用时,二进制信号量具有属性(与许多Lock实现不同),“锁”可以由除所有者之外的线程释放(因为信号量没有所有权概念)。 这在某些专门的上下文中是有用的,例如死锁恢复。
此类的构造函数可选择接受公平参数。 当设置为false时,此类不会保证线程获取许可的顺序。 特别是允许进行驳斥 ,也就是说,调用acquire()的线程可以在一直等待的线程之前分配一个许可证 - 逻辑上新的线程将自身置于等待线程队列的头部。 当公平设置为真时,信号量保证调用任何acquire方法的线程被选择以按照它们调用这些方法的顺序获得许可(先进先出; FIFO)。 请注意,FIFO排序必须适用于这些方法中的特定内部执行点。 因此,一个线程可以在另一个线程之前调用acquire ,但是到另一个线程之后的顺序点,并且类似地从方法返回。 另请注意,未定义的tryAcquire方法不符合公平性设置,但将采取任何可用的许可证。
通常,用于控制资源访问的信号量应该被公平地初始化,以确保线程没有被访问资源。 当使用信号量进行其他类型的同步控制时,非正常排序的吞吐量优势往往超过公平性。
本课程同时提供了acquire和release多个许可证的便利方法。 这些方法通常比循环更有效和有效。 但是,他们没有建立任何偏好顺序。 例如,如果线程A调用s.acquire(3 ),并且线程B调用s.acquire(2) ,并且两个许可证变得可用,则不保证线程B将获得它们,除非它的获取是第一,并且信号量s处于公平模式。
内存一致性效果:在另一个线程中成功执行“获取”方法(例如acquire()之前,调用“释放”方法之前的线程中的操作,如release() happen-before 。
抢车位
6车 3个停车车位
package com.darlene.add;
import java.sql.Time;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class SemaphoreDemo {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(3);
for (int i = 1; i <= 6; i++) {
new Thread(()->{
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName()+"抢到车位");
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName()+"离开车位");
} catch (InterruptedException e) {
e.printStackTrace();
}finally{
semaphore.release();
}
}).start();
}
}
}
原理:
semaphore.acquire():获得,假设已经满了,等待被释放为止
semaphore.release():释放,会将当前的信号量释放+1,然后唤醒等待的线程
作用:多个共享资源互斥使用,并发限流,控制最大的线程数
读写锁
public interface ReadWriteLockA ReadWriteLock维护一对关联的locks ,一个用于只读操作,一个用于写入。 read lock可以由多个阅读器线程同时进行,只要没有作者。 write lock是独家的。
所有的ReadWriteLock实现都必须保证writeLock操作的内存同步效果(在Lock接口中指定)也适用于相关的readLock 。 也就是说,一个线程成功获取读锁定将会看到在之前发布的写锁定所做的所有更新。
读写锁允许访问共享数据时的并发性高于互斥锁所允许的并发性。 它利用了这样一个事实:一次只有一个线程( 写入线程)可以修改共享数据,在许多情况下,任何数量的线程都可以同时读取数据(因此读取器线程)。 从理论上讲,通过使用读写锁允许的并发性增加将导致性能改进超过使用互斥锁。 实际上,并发性的增加只能在多处理器上完全实现,然后只有在共享数据的访问模式是合适的时才可以。
读写锁是否会提高使用互斥锁的性能取决于数据被读取的频率与被修改的频率相比,读取和写入操作的持续时间以及数据的争用 - 即是,将尝试同时读取或写入数据的线程数。 例如,最初填充数据的集合,然后经常被修改的频繁搜索(例如某种目录)是使用读写锁的理想候选者。 然而,如果更新变得频繁,那么数据的大部分时间将被专门锁定,并且并发性增加很少。 此外,如果读取操作太短,则读写锁定实现(其本身比互斥锁更复杂)的开销可以支配执行成本,特别是因为许多读写锁定实现仍将序列化所有线程通过小部分代码。 最终,只有剖析和测量将确定使用读写锁是否适合您的应用程序。
虽然读写锁的基本操作是直接的,但是执行必须做出许多策略决策,这可能会影响给定应用程序中读写锁定的有效性。 这些政策的例子包括:
在写入器释放写入锁定时,确定在读取器和写入器都在等待时是否授予读取锁定或写入锁定。 作家偏好是常见的,因为写作预计会很短,很少见。 读者喜好不常见,因为如果读者经常和长期的预期,写作可能导致漫长的延迟。 公平的或“按顺序”的实现也是可能的。
确定在读卡器处于活动状态并且写入器正在等待时请求读取锁定的读取器是否被授予读取锁定。 读者的偏好可以无限期地拖延作者,而对作者的偏好可以减少并发的潜力。
确定锁是否可重入:一个具有写锁的线程是否可以重新获取? 持有写锁可以获取读锁吗? 读锁本身是否可重入?
写入锁可以降级到读锁,而不允许插入写者? 读锁可以升级到写锁,优先于其他等待读者或作者吗
Lock readLock?() 返回用于阅读的锁。
Lock writeLock?() 返回用于写入的锁。
package com.darlene.add;
import java.lang.invoke.VarHandle;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class ReadWriteLockDemo {
public static void main(String[] args) {
MyCacheLock myCache = new MyCacheLock();
for (int i = 1; i <= 5; i++) {
final int temp = i;
new Thread(()->{
myCache.put(temp+"",temp+"");
},String.valueOf(i)).start();
}
for (int i = 1; i <= 5; i++) {
final int temp = i;
new Thread(()->{
myCache.get(temp+"");
},String.valueOf(i)).start();
}
}
}
class MyCache{
private volatile Map<String,Object> map = new HashMap<>();
public void put(String key,Object value){
System.out.println(Thread.currentThread().getName()+"写入"+key);
map.put(key,value);
System.out.println(Thread.currentThread().getName()+"写入OK");
}
public void get(String key){
System.out.println(Thread.currentThread().getName()+"读取"+key);
Object o = map.get(key);
System.out.println(Thread.currentThread().getName()+"读取OK");
}
}
class MyCacheLock{
private volatile Map<String,Object> map = new HashMap<>();
private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
public void put(String key,Object value){
readWriteLock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName()+"写入"+key);
map.put(key,value);
System.out.println(Thread.currentThread().getName()+"写入OK");
} catch (Exception e) {
e.printStackTrace();
} finally {
readWriteLock.writeLock().unlock();
}
}
public void get(String key){
readWriteLock.readLock().lock();
try {
System.out.println(Thread.currentThread().getName()+"读取"+key);
Object o = map.get(key);
System.out.println(Thread.currentThread().getName()+"读取OK");
} catch (Exception e) {
e.printStackTrace();
} finally {
readWriteLock.readLock().unlock();
}
}
}
阻塞队列
写入:如果队列满了,必须阻塞等待
取:如果队列是空的,必须阻塞等待生产
阻塞队列:
Interface BlockingDeque<E>
参数类型
E - 在这个deque中持有的元素的类型
All Superinterfaces:
BlockingQueue<E> , Collection<E> , Deque<E> , Iterable<E> , Queue<E>
所有已知实现类:
LinkedBlockingDeque
什么情况下我们会使用阻塞队列
多线程,A-B
多线程并发处理,线程池
学会使用队列
添加,移除
四组API
方式 | 抛出异常 | 不会抛出异常,有返回值 | 阻塞等待 | 超时等待 |
---|
添加 | add() | offer() | put() | offer() | 移除 | remove() | poll() | take() | poll() | 判断队列首部 | element | peek | - | - |
package com.darlene.bq;
import java.util.concurrent.ArrayBlockingQueue;
public class Test {
public static void main(String[] args) {
test1();
}
public static void test1(){
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.add("a"));
System.out.println(blockingQueue.add("b"));
System.out.println(blockingQueue.add("c"));
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
}
}
public static void test2(){
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.offer("a"));
System.out.println(blockingQueue.offer("b"));
System.out.println(blockingQueue.offer("c"));
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
}
public static void test3() throws InterruptedException {
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
blockingQueue.put("a");
blockingQueue.put("b");
blockingQueue.put("c");
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
}
public static void test4() throws InterruptedException {
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
blockingQueue.offer("a");
blockingQueue.offer("b");
blockingQueue.offer("c");
blockingQueue.offer("d",2, TimeUnit.SECONDS);
blockingQueue.poll();
blockingQueue.poll();
blockingQueue.poll();
blockingQueue.poll(2,TimeUnit.SECONDS);
}
SynchronousQueue同步队列
没有容量
进去一个元素,必须等待取出来之后,才能再往里面放一个元素
public class SynchronousQueueDemo {
public static void main(String[] args) {
BlockingQueue<String> blockingQueue = new SynchronousQueue<>();
new Thread(()->{
try {
System.out.println(Thread.currentThread().getName()+" put 1");
blockingQueue.put("1");
System.out.println(Thread.currentThread().getName()+" put 2");
blockingQueue.put("2");
System.out.println(Thread.currentThread().getName()+" put 3");
blockingQueue.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"T1").start();
new Thread(()->{
try {
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName()+"=>"+blockingQueue.take());
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName()+"=>"+blockingQueue.take());
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName()+"=>"+blockingQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
},"T2").start();
}
}
线程池
线程池:三大方法,7大参数,4种拒绝策略
池化技术
程序的运行,本质:占用系统的资源,优化资源的使用==》池化技术
线程池,连接池,内存池,对象池、、、、
- 最小的值
池化技术:事先准备好一些资源,有人要用,就来我这里拿,用完之后还给我
线程池的好处:
- 降低资源的消耗
- 提高响应的速度
- 方便管理
线程复用,可以控制最大并发数,可以管理线程
线程池:三大方法
线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险
Executors的弊端:
- 请求队列长度大约为Integer.MAX_VALUE(约为21亿),可能会堆积大量的请求,导致OOM
- 允许创建线程数量为Integer.MAX_VALUE,可能会创建大量的线程,导致OOM
package com.darlene.pool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Demo01 {
public static void main(String[] args) {
ExecutorService threadPool = Executors.newCachedThreadPool();
try {
for (int i = 0; i < 10; i++) {
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName()+" ok");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
}
}
7大参数
源码分析
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
手动创建一个线程池
4种拒绝策略
package com.darlene.pool;
import java.sql.Time;
import java.util.concurrent.*;
public class Demo01 {
public static void main(String[] args) {
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2,
5,
3,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardOldestPolicy());
try {
for (int i = 1; i <= 9; i++) {
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName()+" ok");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
}
}
小结和拓展:
池的最大大小应该如何设置
了解IO密集型和CPU密集型
最大线程应该如何定义:
-
CPU密集型 几核就是几,可以保证CPU的效率最高
System.out.println(Runtime.getRuntime().availableProcessors());
-
IO密集型,15个大型任务,IO十分占用资源,判断程序中十分耗IO的线程,一般设置线程的两倍
四大函数式接口
新时代程序员:lambda表达式,函数式变成,链式编程,Stream流式计算
函数式接口:只有一个方法的接口
超级多,简化变成模型,在新版本的框架底层大量应用
foreach(消费者类型的函数式接口)的参数
Function函数式接口
public interface Function<T, R> {
R apply(T t);
package com.darlene.function;
import java.util.function.Function;
public class Demo01 {
public static void main(String[] args) {
Function function = new Function<String,String>(){
@Override
public String apply(String s) {
return s;
}
};
System.out.println(function.apply("asd"));
}
}
Predicate 断定型接口
@FunctionalInterface
public interface Predicate<T> {
boolean test(T t);
package com.darlene.function;
import java.util.function.Predicate;
public class Demo02 {
public static void main(String[] args) {
Predicate<String> predicate = (str)->{return str.isEmpty();};
System.out.println(predicate.test(""));
}
}
消费型接口 Consumer
只有输入没有输出
@FunctionalInterface
public interface Consumer<T> {
void accept(T t);
package com.darlene.function;
import java.util.function.Consumer;
public class Demo03 {
public static void main(String[] args) {
Consumer<String> consumer = (str)->{
System.out.println("asd");
};
consumer.accept("asd");
}
}
供给型接口 Supplier
@FunctionalInterface
public interface Supplier<T> {
T get();
}
package com.darlene.function;
import java.util.function.Supplier;
public class Demo04 {
public static void main(String[] args) {
Supplier supplier = ()->{return 1024;};
System.out.println(supplier.get());
}
}
Stream流式计算
什么是Stream流式计算
大数据:存储+计算
存储:集合,MySQL
计算:流计算
public interface Stream<T>
extends BaseStream<T,Stream<T>>支持顺序和并行聚合操作的一系列元素。 以下示例说明了使用Stream和IntStream的汇总操作:
int sum = widgets.stream() .filter(w -> w.getColor() == RED) .mapToInt(w -> w.getWeight()) .sum(); 在这个例子中, widgets是一个Collection<Widget> 。 我们通过Collection.stream()创建一个Widget对象的流,过滤它以生成仅包含红色小部件的流,然后将其转换为表示每个红色小部件的权重的int值。 然后将该流相加以产生总重量。
除了Stream ,这是对象引用的流,存在原语特为IntStream , LongStream ,和DoubleStream ,所有这些都称为“流”和符合此处描述的特征和限制。
为了执行计算,流operations被组合成流管道 。 流管线由源(其可以是阵列,集合,生成函数,I / O通道等),零个或多个中间操作 (其将流转换成另一个流,例如filter(Predicate) )组成,以及终端操作 (产生结果或副作用,如count()或forEach(Consumer) )。 流懒惰 源数据上的计算仅在终端操作启动时执行,源元素仅在需要时才被使用。
允许流实现在优化结果的计算方面具有显着的纬度。 例如,如果流实现可以证明它不会影响计算结果,则流实现可以自由地从流流程中删除操作(或整个阶段),并且因此删除行为参数。 这意味着行为参数的副作用可能并不总是执行,除非另有规定(例如通过终端操作forEach和forEachOrdered ),否则不应被依赖。 (有关这种优化的具体示例,请参阅count()操作中记录的API说明。有关详细信息,请参阅流包文档的side-effects部分。)
集合和流动,同时具有一些表面上的相似之处,具有不同的目标。 集合主要关注其元素的有效管理和访问。 相比之下,流不提供直接访问或操纵其元素的手段,而是关心声明性地描述其源和将在该源上进行聚合的计算操作。 但是,如果提供的流操作不提供所需的功能,则可以使用BaseStream.iterator()和BaseStream.spliterator()操作来执行受控遍历。
流管道,如上面的“小部件”示例,可以被视为流源上的查询 。 除非源是明确设计用于并发修改(如ConcurrentHashMap ),否则在查询流源时可能会导致不可预测或错误的行为。
大多数流操作接受描述用户指定行为的参数,例如上述示例中传递给mapToInt的lambda表达式w -> w.getWeight() 。 为了保持正确的行为,这些行为参数 :
必须是non-interfering (他们不修改流源); 和
在大多数情况下必须是stateless (它们的结果不应该依赖于在流管道的执行期间可能改变的任何状态)。
这样的参数总是functional interface的例子 ,如Function ,并且通常是lambda表达式或方法引用。 除非另有说明,否则这些参数必须为非空值 。
流只能运行(调用中间或终端流操作)一次。 这排除了例如“分叉”流,其中相同的源提供两条或多条流水线,或同一流的多遍。 流实现可能会导致IllegalStateException,如果它检测到流被重用。 然而,由于一些流操作可能返回其接收器而不是新的流对象,所以在所有情况下可能无法检测到重用。
Streams具有BaseStream.close()方法并实现AutoCloseable 。 关闭后在流上操作会抛出IllegalStateException 。 大多数流实例在使用后实际上不需要关闭,因为它们由集合,数组或生成函数支持,这些功能不需要特殊的资源管理。 通常,只有源为IO通道的流,例如由Files.lines(Path)返回的流将需要关闭。 如果一个流确实需要关闭,那么它必须作为资源在资源或者类似的控件结构中打开,以确保在操作完成后立即被关闭。
流管线可以顺序执行,也可以在parallel中执行。 此执行模式是流的属性。 流被创建为具有顺序或并行执行的初始选择。 (例如, Collection.stream()创建一个顺序流,并且Collection.parallelStream()创建一个并行的)。执行模式的选择可以由BaseStream.sequential()或BaseStream.parallel()方法修改,并且可以使用BaseStream.isParallel()方法进行查询。
package com.darlene.stream;
import java.util.Arrays;
import java.util.List;
public class Test {
public static void main(String[] args) {
User u1 = new User(1, "a", 21);
User u2 = new User(2, "b", 22);
User u3 = new User(3, "c", 23);
User u4 = new User(4, "d", 24);
User u5 = new User(5, "e", 25);
List<User> list = Arrays.asList(u1, u2, u3, u4, u5);
list.stream().filter(u->{return u.getId()%2==0;})
.filter(u->{return u.getAge()>21;})
.map(u->{return u.getName().toUpperCase();})
.sorted((uu1,uu2)->{return uu2.compareTo(uu1);})
.limit(1)
.forEach(System.out::println);
}
}
ForkJoin详解
分支合并
什么是ForkJoin
ForkJoin在JDK1.7,并发执行任务 提高效率。大数据量
大数据Map Reduce 把大任务拆分成小任务
ForkJoin特点:工作窃取.在自己任务执行完成后去拿别人的任务执行,双端队列
ForkJoin
package com.darlene.forkjoin;
import java.util.concurrent.RecursiveTask;
public class ForkJoinDemo extends RecursiveTask<Long> {
private Long start;
private Long end;
private Long temp=10000L;
public ForkJoinDemo(Long start, Long end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
if((end-start)<temp){
Long sum=0L;
for (Long i = start; i <= end; i++) {
sum += i;
}
return sum;
}else{
long mid = (start+end)/2;
ForkJoinDemo task1 = new ForkJoinDemo(start, mid);
task1.fork();
ForkJoinDemo task2 = new ForkJoinDemo(mid+1, end);
task2.fork();
return task1.join()+task2.join();
}
}
}
package com.darlene.forkjoin;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.stream.LongStream;
public class Test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
test3();
}
public static void test1(){
long start = System.currentTimeMillis();
long sum = 0L;
for (long i = 1L; i <= 10_0000_0000; i++) {
sum+=i;
}
long end = System.currentTimeMillis();
System.out.println("sum="+"时间"+(end-start));
}
public static void test2() throws ExecutionException, InterruptedException {
long start = System.currentTimeMillis();
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<Long> task = new ForkJoinDemo(0L, 10_0000_0000L);
ForkJoinTask<Long> submit = forkJoinPool.submit(task);
Long sum = submit.get();
long end = System.currentTimeMillis();
System.out.println("sum="+"时间"+(end-start));
}
public static void test3(){
long start = System.currentTimeMillis();
long sum = LongStream.rangeClosed(0L, 10_0000_0000L).parallel().reduce(0, Long::sum);
long end = System.currentTimeMillis();
System.out.println("sum="+"时间"+sum);
}
}
异步回调
Future 设计的初衷:对将来的某个事件的结果进行建模
package com.darlene.future;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
public class Demo01 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("CompletableFuture");
int i=10/0;
return 1024;
});
completableFuture.whenComplete((t,u)->{
System.out.println("t=>"+t);
System.out.println("u=>"+u);
}).exceptionally((e)->{
e.printStackTrace();
return 233;
}).get();
}
}
理解JMM
请你谈谈你对你volatile的理解
Volitile是java虚拟机提供的 轻量级的同步机制
- 保证可见性
- 不保证原子性
- 禁止指令重排
什么是JMM
java内存模型,不存在的东西 概念 约定
关于JMM的一些同步的约定:
- 线程解锁前:必须把共享变量立刻刷回主存
- 线程加锁前:必须读取主存中的最新值到工作内存中
- 加锁和解锁是同一把锁
线程:工作内存,主内存
8种操作:read load,use assign,write store,lock unlock
- lock锁定:作用于主内存的变量,吧一个变量标识为线程独占状态
- unlock解锁:作用于主内存的变量,它把一个处于锁定状态的变量释放出来,释放后的变量才可以被其他线程锁定
- read读取:作用于主内存变量,它把一个变量的值从朱北村传输到线程的工作内存中,以便随后的load动作使用
- load载入:作用于工作内存的变量,它把read操作从主存中变量放入工作内存中
- use使用:作用于工作内存中的变量,把它工作内存中的变量传输给执行引擎,每当虚拟机遇到一个需要使用到变量的值,就会使用到这个命令
- assign赋值:作用于工作内存中的变量,它把一个从执行引擎中接受到的值放入工作内存的变量副本中
- store存储:作用于主内存中的变量,它把一个从工作内存中一个变量的值传送到主内存中,以便后续的write使用
- write写入:作用于主内存中的变量,它把store操作从工作内存中得到的变量的值放入主内存的变量中
指令使用规则:
- 不允许read和load,store和write操作之一单独出现
- 不允许线程丢弃它最近的assign操作,即工作变量的数据改变了之后,必须告知主存
- 不允许一个线程将没有assign的数据从工作内存同步回主内存
- 一个新的变量必须在主内存中诞生,不允许工作内存直接使用一个未被初始化的变量,就是对变量实施use,store操作之前,必须经过assign和load操作
- 一个变量同一时间只有一个线程对其进行lock,多次lock后,必须执行相同次数的unlock解锁
- 如果对一个变量进行lock操作,会清空所有工作内存中此变量的值,在执行引擎使用这个变量前,必须重新load和assign操作初始化变量的值
- 如果一个变量没有被lock,就不能对其进行unlock操作,也不能unlock一个被其他线程锁住的变量
- 对一个变量进行unlock操作之前,必须吧此变量同步回主内存
问题:程序不知道主内存中的值已经被修改过了
volatile
1.保证可见性
package com.darlene.volatileTest;
import java.util.concurrent.TimeUnit;
public class JMMDemo {
private volatile static int num = 0;
public static void main(String[] args) throws InterruptedException {
new Thread(()->{
while(num == 0){
}
}).start();
TimeUnit.SECONDS.sleep(1);
num=1;
System.out.println(num);
}
}
2.不保证原子性
原子性:不可分割
线程A在执行任务的时候,不能被打扰的,也不能被分割,同时成功或同时失败
package com.darlene.volatileTest;
import com.sun.nio.sctp.PeerAddressChangeNotification;
public class VDemo02 {
private volatile static int num = 0;
public static void add(){
num++;
}
public static void main(String[] args) {
for (int i = 1; i <= 20; i++) {
new Thread(()->{
for (int j = 0; j < 1000; j++) {
add();
}
}).start();
}
while(Thread.activeCount()>2){
Thread.yield();
}
System.out.println(Thread.currentThread().getName()+" "+num);
}
}
如果加lock和synchronized,怎么样保证原子性
使用原子类解决原子性问题
原子类为什么这么高级
package com.darlene.volatileTest;
import com.sun.nio.sctp.PeerAddressChangeNotification;
import java.util.concurrent.atomic.AtomicInteger;
public class VDemo02 {
private volatile static AtomicInteger num = new AtomicInteger();
public static void add(){
num.getAndIncrement();
}
public static void main(String[] args) {
for (int i = 1; i <= 20; i++) {
new Thread(()->{
for (int j = 0; j < 1000; j++) {
add();
}
}).start();
}
while(Thread.activeCount()>2){
Thread.yield();
}
System.out.println(Thread.currentThread().getName()+" "+num);
}
}
这些类底层都直接和操作系统挂钩,在内存中修改值 Unsafe类是一个很特殊的存在
指令重排
什么是指令重排:你写的程序,计算机并不是按照你写的那样去执行的,
源代码----->编译器优化的重排------------>指令并行也可能会重排----------------->内存系统也会重排------------->执行
int x = 1;
int y = 2;
x = x + 5;
y = x * x;
我们期望的:1234 2134 1324
不可能是4213
处理器在指令重排时,考虑:数据之间的依赖性
可能造成影响的结果:abxy默认都是0
正常的结果:x=0 y=0,可能由于指令重排
x=2 y=1
volatile可以避免指令重排
内存屏障,CPU指令。作用:
- 保证特定的操作执行顺序
- 可以保证某些变量的内存可见性(利用这些特性,就可以保证volatile实现了可见性)
内存屏障:禁止上面指令和下面指令顺序交换
volatile是可以保证可见性,不能保证原子性,由于内存屏障,可以保证避免指令重排的现象产生
单例模式
饿汉式,DCL懒汉式
饿汉式
package com.darlene.single;
public class Hungry {
private byte[] data1 = new byte[1024*1024];
private byte[] data2 = new byte[1024*1024];
private byte[] data3 = new byte[1024*1024];
private byte[] data4 = new byte[1024*1024];
private Hungry(){
}
private final static Hungry HUNGRY = new Hungry();
public static Hungry getInstance(){
return HUNGRY;
}
}
DCL懒汉式
package com.darlene.single;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
public class LazyMan {
private static boolean tl = false;
private LazyMan(){
synchronized (LazyMan.class){
if(tl == false){
tl = true;
}else{
throw new RuntimeException("不要试图使用反射破坏异常");
}
if(lazyMan!=null){
throw new RuntimeException("不要试图使用反射破坏异常");
}
}
System.out.println(Thread.currentThread().getName()+"ok");
}
private volatile static LazyMan lazyMan;
public static LazyMan getInstance(){
if(lazyMan == null){
synchronized (LazyMan.class){
if(lazyMan == null){
lazyMan = new LazyMan();
}
}
}
return lazyMan;
}
public static void main(String[] args) throws NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException, NoSuchFieldException {
Field tl = LazyMan.class.getDeclaredField("tl");
tl.setAccessible(true);
Constructor<LazyMan> declaredConstructor = LazyMan.class.getDeclaredConstructor(null);
declaredConstructor.setAccessible(true);
LazyMan instance1 = declaredConstructor.newInstance();
tl.set(instance1,false);
LazyMan instance2 = declaredConstructor.newInstance();
System.out.println(instance1);
System.out.println(instance2);
}
}
静态内部类
package com.darlene.single;
public class Holder {
private Holder(){
}
private static Holder getInstance(){
return InnerClass.HOLDER;
}
public static class InnerClass{
private static final Holder HOLDER = new Holder();
}
}
单例不安全,反射
枚举
package com.darlene.single;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
public enum EnumSingle {
INSTANCE;
public EnumSingle getInstance(){
return INSTANCE;
}
}
class Test{
public static void main(String[] args) throws NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException {
EnumSingle instance1 = EnumSingle.INSTANCE;
Constructor<EnumSingle> declaredConstructor = EnumSingle.class.getDeclaredConstructor(String.class,int.class);
declaredConstructor.setAccessible(true);
EnumSingle instance2 = declaredConstructor.newInstance();
System.out.println(instance1);
System.out.println(instance2);
}
}
枚举的无参构造和有参构造
枚举类型的最终反编译源码
深入理解CAS
什么是CAS
大厂必须深入研究底层。内功:操作系统,计算机网络原理
java无法操作内存,java可以调用c++ native c++可以操作内存 java的后门,可以通过这个类操作内存
package com.darlene.cas;
import java.lang.invoke.VarHandle;
import java.util.concurrent.atomic.AtomicInteger;
public class CASDemo {
AtomicInteger atomicInteger = new AtomicInteger();
public static void main(String[] args) {
AtomicInteger atomicInteger = new AtomicInteger(2021);
atomicInteger.compareAndSet(2021,2022);
System.out.println(atomicInteger.get());
System.out.println(atomicInteger.compareAndSet(2021, 2022));
System.out.println(atomicInteger.get());
}
}
unsafe
public class AtomicInteger extends Number implements java.io.Serializable {
private static final long serialVersionUID = 6214790243416807050L;
private static final Unsafe U = Unsafe.getUnsafe();
private static final long VALUE
= U.objectFieldOffset(AtomicInteger.class, "value");
private volatile int value;
public final int getAndIncrement() {
return U.getAndAddInt(this, VALUE, 1);
}
自旋锁
@HotSpotIntrinsicCandidate
public final int getAndAddInt(Object o, long offset, int delta) {
int v;
do {
v = getIntVolatile(o, offset);
} while (!weakCompareAndSetInt(o, offset, v, v + delta));
return v;
}
CAS:比较当前工作内存中的值和主内存中的值,如果这个值是期望的,那么则执行操作,如果不是就一直循环
缺点:
- 循环会耗时
- 一次性只能保证一个共享变量的原子性
- 会存在ABA问题
CAS:ABA问题(狸猫换太子)
package com.darlene.cas;
import java.lang.invoke.VarHandle;
import java.util.concurrent.atomic.AtomicInteger;
public class CASDemo {
AtomicInteger atomicInteger = new AtomicInteger();
public static void main(String[] args) {
AtomicInteger atomicInteger = new AtomicInteger(2021);
atomicInteger.compareAndSet(2021,2022);
System.out.println(atomicInteger.get());
System.out.println(atomicInteger.compareAndSet(2021, 2022));
System.out.println(atomicInteger.get());
System.out.println(atomicInteger.compareAndSet(2020, 6666));
System.out.println(atomicInteger.get());
}
}
原子引用
解决ABA问题引入原子引用,对应乐观锁
带版本号的原子操作
package com.darlene.cas;
import java.lang.invoke.VarHandle;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicStampedReference;
public class CASDemo {
AtomicInteger atomicInteger = new AtomicInteger();
public static void main(String[] args) {
AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference<>(1,1);
new Thread(()->{
int stamp = atomicStampedReference.getStamp();
System.out.println("a1=>"+stamp);
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
atomicStampedReference.compareAndSet(1,2,atomicStampedReference.getStamp(),atomicStampedReference.getStamp()+1);
System.out.println("a2=>"+atomicStampedReference.getStamp());
atomicStampedReference.compareAndSet(2,1,atomicStampedReference.getStamp(),atomicStampedReference.getStamp()+1);
System.out.println("a3=>"+atomicStampedReference.getStamp());
},"a").start();
new Thread(()->{
int stamp = atomicStampedReference.getStamp();
System.out.println("b1=>"+stamp);
atomicStampedReference.compareAndSet(1,6,stamp,stamp+1);
System.out.println("b1=>"+atomicStampedReference.getStamp());
},"b").start();
}
}
可各种锁的理解
公平锁,非公平锁
公平锁:不能插队,先来后到
非公平锁:可以插队,默认都是非公平的锁
可重入锁
可重入锁(递归锁)
拿到了外面的锁,就可以拿到里面的锁,自动获得
lock锁
package com.darlene.lock;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class Demo02 {
public static void main(String[] args) {
Phone2 phone = new Phone2();
new Thread(()->{
phone.sms();
},"A").start();
new Thread(()->{
phone.sms();
},"B").start();
}
}
class Phone2{
Lock lock = new ReentrantLock();
public synchronized void sms(){
lock.lock();
try {
System.out.println(Thread.currentThread().getName()+"sms");
call();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public synchronized void call(){
lock.lock();
try {
System.out.println(Thread.currentThread().getName()+"call");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
自旋锁
不断尝试,直到成功为止
自定义一个锁
package com.darlene.lock;
import java.util.concurrent.atomic.AtomicReference;
public class SpinlockDemo {
AtomicReference<Thread> atomicReference = new AtomicReference<>();
public void myLock(){
Thread thread = Thread.currentThread();
System.out.println(Thread.currentThread().getName()+"==> mylock");
while(!atomicReference.compareAndSet(null,thread)){
}
}
public void myUnLock(){
Thread thread = Thread.currentThread();
System.out.println(Thread.currentThread().getName()+"==> myUnlock");
atomicReference.compareAndSet(thread,null);
}
}
测试
package com.darlene.lock;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
public class TestSpinLock {
public static void main(String[] args) throws InterruptedException {
SpinlockDemo lock = new SpinlockDemo();
new Thread(()->{
lock.myLock();
try {
TimeUnit.SECONDS.sleep(3);
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.myUnLock();
}
},"T1").start();
TimeUnit.SECONDS.sleep(1);
new Thread(()->{
lock.myLock();
try {
TimeUnit.SECONDS.sleep(3);
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.myUnLock();
}
},"T2").start();
}
}
死锁
死锁是什么
死锁测试,怎么排除死锁:
package com.darlene.lock;
import java.util.concurrent.TimeUnit;
public class DeadLockDemo {
public static void main(String[] args) {
String lockA = "lockA";
String lockB = "lockB";
new Thread(new MyThread(lockA,lockB),"T1").start();
new Thread(new MyThread(lockB,lockA),"T2").start();
}
}
class MyThread implements Runnable{
private String lockA;
private String lockB;
public MyThread(String lockA, String lockB) {
this.lockA = lockA;
this.lockB = lockB;
}
@Override
public void run() {
synchronized (lockA){
System.out.println(Thread.currentThread().getName()+"lock:"+lockA+"=>get"+lockB);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (lockB){
System.out.println(Thread.currentThread().getName()+"lock:"+lockB+"=>get"+lockA);
}
}
}
}
解决问题
jdk bin
-
使用jps -l定位进程号 Microsoft Windows [版本 10.0.19042.1110]
(c) Microsoft Corporation。保留所有权利。
E:\IdeaProjects\JUC>jps -l
22000 jdk.jcmd/sun.tools.jps.Jps
2304 org.jetbrains.jps.cmdline.Launcher
18004 com.darlene.lock.DeadLockDemo
13480
19096 org.jetbrains.idea.maven.server.RemoteMavenServer
E:\IdeaProjects\JUC>
-
使用jstack 进程号 Java stack information for the threads listed above:
===================================================
"T1":
at com.darlene.lock.MyThread.run(DeadLockDemo.java:32)
- waiting to lock <0x0000000089ac4fe0> (a java.lang.String
)
- locked <0x0000000089ac4fb0> (a java.lang.String)
at java.lang.Thread.run(java.base@15.0.1/Thread.java:832)
"T2":
at com.darlene.lock.MyThread.run(DeadLockDemo.java:32)
- waiting to lock <0x0000000089ac4fb0> (a java.lang.String
)
- locked <0x0000000089ac4fe0> (a java.lang.String)
at java.lang.Thread.run(java.base@15.0.1/Thread.java:832)
Found 1 deadlock.
面试或者工作中 排查问题:
-
日志 lock.myLock(); try { TimeUnit.SECONDS.sleep(3); } catch (Exception e) { e.printStackTrace(); } finally { lock.myUnLock(); } },“T2”).start(); } }
#### 死锁
> 死锁是什么
死锁测试,怎么排除死锁:
```java
package com.darlene.lock;
import java.util.concurrent.TimeUnit;
public class DeadLockDemo {
public static void main(String[] args) {
String lockA = "lockA";
String lockB = "lockB";
new Thread(new MyThread(lockA,lockB),"T1").start();
new Thread(new MyThread(lockB,lockA),"T2").start();
}
}
class MyThread implements Runnable{
private String lockA;
private String lockB;
public MyThread(String lockA, String lockB) {
this.lockA = lockA;
this.lockB = lockB;
}
@Override
public void run() {
synchronized (lockA){
System.out.println(Thread.currentThread().getName()+"lock:"+lockA+"=>get"+lockB);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (lockB){
System.out.println(Thread.currentThread().getName()+"lock:"+lockB+"=>get"+lockA);
}
}
}
}
解决问题
jdk bin
-
使用jps -l定位进程号 Microsoft Windows [版本 10.0.19042.1110]
(c) Microsoft Corporation。保留所有权利。
E:\IdeaProjects\JUC>jps -l
22000 jdk.jcmd/sun.tools.jps.Jps
2304 org.jetbrains.jps.cmdline.Launcher
18004 com.darlene.lock.DeadLockDemo
13480
19096 org.jetbrains.idea.maven.server.RemoteMavenServer
E:\IdeaProjects\JUC>
-
使用jstack 进程号 Java stack information for the threads listed above:
===================================================
"T1":
at com.darlene.lock.MyThread.run(DeadLockDemo.java:32)
- waiting to lock <0x0000000089ac4fe0> (a java.lang.String
)
- locked <0x0000000089ac4fb0> (a java.lang.String)
at java.lang.Thread.run(java.base@15.0.1/Thread.java:832)
"T2":
at com.darlene.lock.MyThread.run(DeadLockDemo.java:32)
- waiting to lock <0x0000000089ac4fb0> (a java.lang.String
)
- locked <0x0000000089ac4fe0> (a java.lang.String)
at java.lang.Thread.run(java.base@15.0.1/Thread.java:832)
Found 1 deadlock.
面试或者工作中 排查问题:
- 日志
- 堆栈信息
|