多线程学习总结:
前置知识
定义
进程:操作系统分配资源的基本单位,内存、io的最小单位 线程:操作系统调度资源的基本单位,cpu的最小单位 纤程:用户态的线程,线程中的线程,切换不需要经过操作系统
线程的数量设置
公式: Nthread = Ncpu * Ucpu * (1+W/C); Ncpu : cpu核数 Ucpu:cpu的利用率 W/C:等待时间/计算时间,得压测得出,压测工具:jprofile、arthes 所以说单核cpu也是可以设置多线程的,为的是解决io等待,提高cpu的运行效率
CPU内部结构
| 速度 | 位置 |
---|
registers | <1ns | 核内部 | L1 cache | 约1ns | 核内部 | L2 cache | 约3ns | 核内部 | L3 cache | 约15ns | cpu内部 | main memory | 约80ns | 机器内部 |

正篇
java内部线程状态
state | 状态 | 方式 |
---|
new | 创建 | new thread() | runnable | 运行 | start()方法 | waiting | 等待 | o.wait()、t.join()、LockSupport.park()、Lock.lock() | time waiting | 延时等待 | sleep(m) | blocked | 阻塞 | synchronized关键字 | terminated | 结束 | |
同步容器
ConcurrentHashMap:继承ConcurrentMap,数据结构采用hash表,get操作不加锁,put操作加锁,锁的是所在链表的头结点,采用的是加synchronized关键字。属性有加volatile,扩容时相关的属性用cas修改,hash表则是将旧hash表的结点复制(new)并加到新的hash表中,采用的也是锁住所在链表的头结点,这样旧数据和put进来的数据进行hash值运算的时候可以保证在相同的链表时,能锁住相同的头节点,保证同步操作 CopyOnWriteList:继承List类,数据结构采用数组,get操作不加锁,add操作用ReentrantLock加锁,add操作只会扩容1 linkedBlockingQueue:继承AbstractQueue类,数据结构采用链表,offer,poll等操作用ReentrantLock加锁,offer操作加的是putLock,poll操作加的是takeLock ArrayBlockingQueue:继承AbstractQueue类,数据结构采用数组,put,poll等操作用ReentrantLock加锁,不同于linkedBlockingQueue,ArrayBlockingQueue只有一个lock DelayQueue:继承AbstractQueue类,内置PriorityQueue,利用PriorityQueue和DelayQueue,实现时间排序,通过ReentrantLock实现同步操作
锁
synchronized:java自带关键字,会有锁升级,锁的升级信息记录在对象的markword里面,用两个位表示,无锁->偏向锁->自选锁->重量级锁,当处于重量级锁时,线程的调度将用操作系统决定 volatile:保证可见性和有序性,具体实现原理是将java指令的read,load,use捆绑在一起以及store,write,assign捆绑在一起 AQS:AbstractQueuedSynchronizer,锁最核心的代码,里面放一等待队列,通过acquire加锁,release解锁,锁的竞争通过一个state判断,所以继承AQS的类需要实现tryAcquire,tryRelease ReentrantLock:最常用的锁,可以用newCondition产生不同的等待队列,进而用await和signalAll操作不同队列 ReentrantReadWriteLock:读写锁,读锁是共享锁,写锁是排他锁,获得排他锁需要等所有共享锁释放成功 CountDownLatch:门闩,先生成一个count数的门闩,使用await时候会阻塞线程,当每次调用countDown会count减1,当count减为0时,await的线程将继续运行下去。可用于子线程全部执行完在执行父线程的场景。
public static void main(String[] args) throws InterruptedException {
int threadMum = 2;
CountDownLatch countDownLatch = new CountDownLatch(threadMum);
for (int i = 0; i < threadMum; i++) {
Thread t = new Thread(() -> {
Util.sleep(1);
System.out.println("new thread endding");
countDownLatch.countDown();
}, "thread" + i);
t.start();
}
countDownLatch.await();
System.out.println("main thread endding");
}
public class Util {
public static void sleep(int second){
try {
TimeUnit.SECONDS.sleep(second);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
CyclicBarrier:调用await使线程进入阻塞,当await的线程达到声明的数量时,唤醒前面所有阻塞的线程,然后重置CyclicBarrier状态。可用于有多个数据源的数据才能进行下一步操作的场景,比如英雄联盟连接进入游戏阶段
public static void main(String[] args) {
int threadMum = 3;
CyclicBarrier cyclicBarrier = new CyclicBarrier(threadMum);
for (int i = 0; i < threadMum; i++) {
final int index = i;
Thread t = new Thread(() -> {
try {
Util.sleep(nums[index]);
cyclicBarrier.await();
System.out.println("new thread endding");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}, "thread" + i);
t.start();
}
}
Phaser:先调用register注册phaser,然后用arriveAndAwaitAdvance控制流程的前进。流程控制,可用场景很多。
public static void main(String[] args) {
CookPhaser phaser = new CookPhaser();
phaser.register();
phaser.arriveAndAwaitAdvance();
phaser.arriveAndAwaitAdvance();
phaser.arriveAndAwaitAdvance();
phaser.arriveAndAwaitAdvance();
}
static class CookPhaser extends Phaser {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
switch (phase) {
case 0:
System.out.println("买菜");
return false;
case 1:
System.out.println("洗菜");
return false;
case 2:
System.out.println("切菜");
return false;
default:
System.out.println("做饭");
return false;
}
}
}
Semaphore:限流,只允许x个线程执行,通过acquire获取锁,release释放锁,其他会陷入阻塞状态。适用于需要做限流的场景。
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(2);
for (int i = 0; i < 10; i++) {
final int index = i;
Thread thread = new Thread(() -> {
try {
semaphore.acquire();
Util.sleep(1);
System.out.println(index);
} catch (Exception e) {
}finally {
semaphore.release();
}
}, "thread " + index);
thread.start();
}
}
Exchanger:只用于两个线程间的通讯 LockSupport:park和unpark方法等待线程和唤醒线程,park操作会释放锁
线程池
七个参数: corePoolSize:核心线程数量 maximumPoolSize:最大线程数量 keepAliveTime:非核心线程最大存活时间 unit:aliveTime的时间单元 workQueue:线程等待队列 threadFactory:线程工厂,jdk默认提供了一个线程工厂,主要用于线程的命名 RejectedExecutionHandler:拒绝策略 线程池执行流程
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
此处是jdk1.8的源码,可以看出线程池的执行顺序是: 1、如果核心线程未满,新建一个线程。若已满,执行2 2、若线程等待队列未满,则加入队列。源码可以看出,在加入队列之后,jdk还对Runnable进行进一步的检测。如果队列已满,执行3 3、新加一个非核心线程,若失败,则执行拒绝策略 拒绝策略jdk提供了4种: 1、CallerRunsPolicy,交个执行execute方法的线程去执行,也就是谁用了线程池谁去执行该线程 2、AbortPolicy,拒绝该任务并抛出RejectedExecutionException 3、DiscardPolicy,默默抛弃该任务 4、DiscardOldestPolicy,默默抛弃掉等待队列里最老的任务 不同场景应用不同的拒绝策略,当然拒绝策略可以自己定义。可以看下ThreadPoolExecutor的源码,这个读起来还是比较容易懂的
|