多线程与高并发
多线程
一. 线程历史
相关文章:https://zhuanlan.zhihu.com/p/83525853
单进程人工切换(纸带机):一个cpu,一段程序放进去执行一段
多进程批处理(多个任务批量执行):一个cpu,多段程序写一起执行
多线程并行处理(程序写在不同内存位置上来回切换)
二. 多线程程序意义
多线程程序:压榨cpu,提高资源调度利用率
2.1 单核cpu设定多线程是否有意义
有意义,如果有2个线程,一个线程在磁盘io/计算,此时可让另一个线程继续占用cpu执行
2.2 多个线程压榨cpu,减少时间案例
public class T02_increaseOperationRate {
private static double[] nums= new double[1_0000_0000];
private static Random r = new Random();
private static DecimalFormat df = new DecimalFormat("0.00");
static{
for (int i = 0; i < nums.length; i++) {
nums[i] = r.nextDouble();
}
}
private static void m1(){
long start = System.currentTimeMillis();
double result = 0.0;
for (int i = 0; i < nums.length; i++) {
result += nums[i];
}
long end = System.currentTimeMillis();
System.out.println("m1:" + (end - start) + " result:" + df.format(result));
}
static double result1 = 0.0, result2 = 0.0, result = 0.0;
private static void m2() throws InterruptedException {
Thread thread1 = new Thread(()->{
for (int i = 0; i < nums.length / 2; i++) {
result1 += nums[i];
}
});
Thread thread2 = new Thread(()->{
for (int i = nums.length/2; i < nums.length ; i++) {
result2 += nums[i];
}
});
long start = System.currentTimeMillis();
thread1.start();
thread2.start();
thread1.join();
thread2.join();
long end = System.currentTimeMillis();
result = result1 +result2;
System.out.println("m2:" + (end - start) + " result:" + df.format(result));
}
public static void main(String[] args) throws InterruptedException {
m1();
m2();
}
}
------控制台结果显示
m1:94 result:50006672.82
m2:57 result:50006672.82
2.3 线程越多,利用率就越高?
不是线程数越大越好,os切换线程也是耗费资源和时间的
文章:https://zhuanlan.zhihu.com/p/305035862
2.4 工作线程数/线程池中的数量,设置多少合适
Ncpu:cpu核数
Ucpu:cpu利用率
W:wait(等待时间)
C:compute(计算时间)
W/C的计算可用profiler,jprofiler,ali的arthas等工具,也可手写日志
2.5 多线程和单线程执行程序时间问题
# 1.同样的程序,相对比单线程,为什么多线程分段执行这个程序的时间减少了
- 不一定,单线程也许也是时间最少的(多线程之间,cpu切换线程占用时间片也是耗费时间的)
- m1:0 result:5057.89 (单线程)
- m2:1 result:5057.89 (2线程)
- 若同一个程序,多线程使用的时间较少,是因为计算/磁盘io占的较长,分线程压榨cpu资源达到时间减少
2.6 从底层来看多线程
CPU缓存
? CPU每次去内存里面读写数据太慢了,为了更快的读写数据,所以CPU在自己家里加了一层缓存,每次都是先从自己的缓存读写数据,这样CPU的利用率又提高了,所以下图的内存不是电脑的内存,cpu自己的内存测试
切换线程相关知识
-
程序计数器 因为CPU切换任务后,下次回来需要知道当前任务已经执行到了哪一步,所以对任务节点进行一个标记,回来的时候可以从上次标记的位置继续工作,而这个标记叫做程序计数器。 -
堆栈指针 每个进程分配的内存是独立的,所以切换进程任务回来后我们需要知道当前进程分配的内存地址是那一块,而这个地址就是堆栈指针。
三.创建线程的方式
- 继承Thread创建线程
- 实现Runnable接口创建线程
- lambda创建线程
- ThreadPool
- 实现Callable(利用单线程,FutureTask)
- 实现Callable(利用线程池,Future)
public static class createThread extends Thread{
@Override
public void run() {
System.out.println("通过继承Thread创建线程");
}
}
public static class createThreadByRunnable implements Runnable{
@Override
public void run() {
System.out.println("通过实现Runnable接口创建线程");
}
}
public static class createThreadByCallable implements Callable<String>{
@Override
public String call() throws Exception {
return "通过实现Callable创建线程";
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
new createThread().start();
new Thread(new createThreadByRunnable()).start();
new Thread(() -> {
System.out.println("通过lambda创建线程");
}).start();
ExecutorService service = Executors.newCachedThreadPool();
service.execute(() -> {
System.out.println("通过ThreadPool创建线程");
});
Future<String> submit = service.submit(new createThreadByCallable());
System.out.println(submit.get());
new Thread(new FutureTask<String>(new createThreadByCallable())).start();
service.shutdown();
}
四. 线程状态
4.1 状态种类
- NEW : 线程刚刚创建,还没有启动
- RUNNABLE : 可运行状态,由线程调度器可以安排执行
- WAITING: 等待被唤醒(盲等待,juc是cas,抢不到锁就为盲等待)
- TIMED WAITING: 隔一段时间后自动唤醒
- BLOCKED: 被阻塞,正在等待锁(经过操作系统的调度才会进入此状态,用synchronized就会进入Blocked状态)
- TERMINATED: 线程结束
4.2 代码
public static void main(String[] args) throws InterruptedException {
Thread t = new Thread(() -> {
System.out.println(Thread.currentThread().getState());
});
System.out.println("1." + t.getState());
t.start();
t.join();
System.out.println("4." + t.getState());
Thread t2 = new Thread(() -> {
LockSupport.park();
TimeSleeper.sleep(5);
});
t2.start();
TimeSleeper.sleep(1);
System.out.println("4." + t2.getState());
LockSupport.unpark(t2);
TimeSleeper.sleep(1);
System.out.println("5." + t2.getState());
final Object o = new Object();
Thread block_A = new Thread(() -> {
synchronized (o) {
TimeSleeper.sleep(10);
}
});
TimeSleeper.sleep(1);
block_A.start();
Thread block_B = new Thread(() -> {
synchronized (o) {
}
});
block_B.start();
TimeSleeper.sleep(1);
System.out.println("6." + block_B.getState());
}
@Slf4j
public class TimeSleeper {
public static void sleep(int seconds){
try {
TimeUnit.SECONDS.sleep(seconds);
} catch (InterruptedException e) {
log.error("sleep 异常: {}", e.getMessage());
}
}
}
五. interrupt,sleep,join,yield
5.1 interrupt
5.1.1 interrupt(),isInterrupted(),static Interrupted()
- interrupt():打断某个线程(设置标志位)
- isInterrupted():查询某线程是否被打断过(是否有标志位)
- static interrupted():查询当前线程是否被打断过(是否有标志位),并重置打断标志(false)
代码
public static void main(String[] args) {
Thread thread = new Thread(() ->{
for (;;){
if (Thread.currentThread().isInterrupted()) {
System.out.println("thread 被打断了");
System.out.println(Thread.currentThread().isInterrupted());
if (!Thread.currentThread().isInterrupted()) {
break;
}
}
}
});
thread.start();
TimeSleeper.sleep(5);
thread.interrupt();
Thread thread2 = new Thread(() ->{
for (;;){
if (Thread.interrupted()) {
System.out.println("thread2 被打断了");
System.out.println(Thread.interrupted());
if (!Thread.interrupted()) {
break;
}
}
}
});
thread2.start();
TimeSleeper.sleep(5);
thread2.interrupt();
}
5.1.2 interrupt and sleep
public static void main(String[] args) {
Thread thread = new Thread(() -> {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
System.out.println("Thread is Interrupted");
System.out.println(Thread.currentThread().isInterrupted());
}
});
thread.start();
TimeSleeper.sleep(1);
thread.interrupt();
}
5.1.3 interrupt and wait
private static final Object o = new Object();
public static void main(String[] args) {
Thread t = new Thread(() -> {
synchronized (o) {
try {
o.wait();
} catch (InterruptedException e) {
System.out.println("thread is interrupted");
System.out.println(Thread.currentThread().isInterrupted());
}
}
});
t.start();
t.interrupt();
}
5.1.4 interrupt and synchronized
private static Object o = new Object();
public static void main(String[] args) {
Thread thread_1 = new Thread(()->{
synchronized (o){
TimeSleeper.sleep(5);
}
});
thread_1.start();
TimeSleeper.sleep(1);
Thread thread_2 = new Thread(() -> {
synchronized (o){
}
System.out.println("thread_2 end");
});
thread_2.start();
TimeSleeper.sleep(1);
thread_2.interrupt();
}
5.1.5 interrupt and lock
private static ReentrantLock lock = new ReentrantLock();
public static void main(String[] args) {
Thread thread_1 = new Thread(() -> {
lock.lock();
try {
TimeSleeper.sleep(5);
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
});
thread_1.start();
TimeSleeper.sleep(1);
Thread thread_2 = new Thread(() -> {
lock.lock();
try {
System.out.println("thread_2 have the lock");
}catch (Exception e){
System.out.println("thread_2 is interrupted");
System.out.println(Thread.currentThread().isInterrupted());
}finally {
lock.unlock();
}
System.out.println("thread_2 end");
});
thread_2.start();
thread_2.interrupt();
}
5.1.6 interrupt and lockInterruptibly
private static ReentrantLock lock = new ReentrantLock();
public static void main(String[] args) {
Thread thread_1 = new Thread(() -> {
lock.lock();
try {
TimeSleeper.sleep(10);
} catch (Exception e) {
} finally {
lock.unlock();
}
});
thread_1.start();
TimeSleeper.sleep(1);
Thread thread_2 = new Thread(() -> {
try {
lock.lockInterruptibly();
System.out.println("thread_2 have the lock");
}catch (InterruptedException e){
System.out.println("thread_2 is interrupted");
System.out.println(Thread.currentThread().isInterrupted());
}finally {
lock.unlock();
}
System.out.println("thread_2 end");
});
thread_2.start();
TimeSleeper.sleep(1);
thread_2.interrupt();
}
5.2 sleep
static void testSleep(){
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
System.out.println("sleep start");
testSleep();
System.out.println("sleep end");
}
5.3 join
join:等待线程执行结束
static void testJoin() {
Thread thread_A = new Thread(() -> {
for (int i = 0; i < 5; i++) {
System.out.println("A");
TimeSleeper.sleepMicro(500);
}
});
Thread thread_B = new Thread(() -> {
try {
thread_A.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
for (int i = 0; i < 5; i++) {
System.out.println("B");
TimeSleeper.sleepMicro(500);
}
});
thread_A.start();
thread_B.start();
}
public static void main(String[] args) {
testJoin();
}
5.4 yield
将当前线程所分配的时间片让给其他线程执行
让当前处于运行状态 的线程退回到可运行状态 ,让出抢占资源的机会
static void testYield() {
new Thread(()->{
for(int i=0; i<10; i++) {
System.out.println("A" + i);
if(i%10 == 0) {Thread.yield();}
}
}).start();
new Thread(()->{
for(int i=0; i<10; i++) {
System.out.println("------------B" + i);
if(i%10 == 0) {Thread.yield();}
}
}).start();
}
public static void main(String[] args) {
testYield();
}
六. 结束线程方式
6.1 stop(不建议使用)
stop不建议使用,stop后会释放所有锁,导致数据不一致性 比如:给线程上锁完成某个功能的数据一致性,如果没来得及设置剩下的数据,就stop,会导 致数据不一致性
public static void main(String[] args) {
Thread thread = new Thread(() -> {
while (true){
System.out.println("go on");
TimeSleeper.sleepSeconds(1);
}
});
thread.start();
TimeSleeper.sleepSeconds(5);
thread.stop();
}
6.2 suspend and resume(不建议使用)
不建议使用,suspend不会释放锁,若忘记resume,会产生死锁现象
public static void main(String[] args) {
Thread thread = new Thread(() -> {
while (true) {
System.out.println("go on");
TimeSleeper.sleepSeconds(1);
}
});
thread.start();
TimeSleeper.sleepSeconds(5);
thread.suspend();
TimeSleeper.sleepSeconds(5);
thread.resume();
}
6.3 volatile
缓存一致性
private static volatile boolean flag = true;
public static void main(String[] args) {
Thread thread = new Thread(() -> {
while (flag) {
System.out.println("go on");
TimeSleeper.sleepSeconds(1);
}
});
thread.start();
TimeSleeper.sleepSeconds(5);
flag = false;
}
6.4 interrupt
public class T04_Interrupt {
public static void main(String[] args) {
Thread thread = new Thread(() -> {
while (!Thread.interrupted()) {
System.out.println("go on");
}
System.out.println(Thread.interrupted());
});
thread.start();
TimeSleeper.sleepSeconds(5);
thread.interrupt();
}
}
并发
三大特性:可见性,原子性,有序性
一. 可见性
本地缓存数据,造成数据修改不可见
A B线程都用到一个变量,java默认是A线程中保留一份copy,这样如果B线程修改了该变量,则A线程未必知道
要想保证可见,要么触发同步指令,要么加上volatile,被修饰的内存,只要有修改,马上同步涉及到的每个线程
1.1 volatile 保证一致性
public class T02_Volatile2 {
private static boolean flag = true;
public static void m(){
System.out.println("m start");
while (flag) {
}
System.out.println("m end");
}
public static void main(String[] args) {
new Thread(T02_Volatile2::m,"thread").start();
flag = false;
}
}
1.2 println保持一致性
println 源代码 是synchronized ,也会保持一致性,会触发本地缓存和主缓存的刷新与同步
但不建议这么使用,因为println变相加了一把锁,降低了效率
private static boolean flag = true;
public static void m(){
System.out.println("m start");
while (flag) {
System.out.println("hello");
}
System.out.println("m end");
}
public static void main(String[] args) {
new Thread(T03_Println::m,"thread").start();
TimeSleeper.sleepSeconds(1);
flag = false;
}
1.3 volatile 不保证一致性的场景
volatile 修饰引用类型只能保证引用类型本身的可见性,不能保证内部字段可见性
public static class A{
volatile boolean flag = true;
void m(){
System.out.println("m start");
while (flag) {
}
System.out.println("m end");
}
}
static A a = new A();
public static void main(String[] args) {
new Thread(a::m,"t1").start();
TimeSleeper.sleepSeconds(1);
a.flag = false;
}
thread 线程的内存,也没通过主内存同步,为什么还是能改变flag状态?
问题答案地址
private static boolean flag = true;
public static void main(String[] args) {
Thread thread = new Thread(() -> {
while(flag){
System.out.println("go on");
TimeSleeper.sleepSeconds(1);
}
});
thread.start();
TimeSleeper.sleepSeconds(1);
flag = false;
}
因为sout输出语句中有synchronized,他会造成数据的同步,从而flag更改为false就会被thread线程看到了。
1.4 三级缓存及缓存行
1.4.1 三级缓存
registers ->L1,L2,L3,主内存顺序若L1没缓存,则读L2,顺序读取
主内存 往L3,L2,L1顺序存缓存
1.4.2 缓存行
1.4.3硬件层级缓存一致性
intel
volatile的底层不是MESI
1.4.4 缓存行为什么一行是64字节
缓存行越大,局部性空间效率越高,读取速度慢
缓存行越小,局部性空间效率越低,读取速度快
取一个折中,64
二. 有序性
2.1 乱序
出现乱序的概率较小
乱序的组合:x=b,y=a / y=a,x=b执行
private static int x = 0, y = 0;
private static int a = 0, b = 0;
public static void main(String[] args) throws InterruptedException {
for (long i = 0; i < Long.MAX_VALUE; i++) {
x = 0; y = 0; a = 0; b = 0;
CountDownLatch latch = new CountDownLatch(2);
Thread t1 = new Thread(() -> {
a = 1;
x = b;
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
Thread t2 = new Thread(() -> {
b = 1;
y = a;
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
t1.start();
t2.start();
latch.await();
String result = "第" + i + "次("+x + y +")";
if (x == 0 && y ==0) {
System.out.println(result);
break;
}
}
}
2.1.1 乱序存在的原因
乱序的存在的目的:提高效率
ex: 指令1去内存读取数据,指令2做本地寄存器的一些操作(cpu/寄存器的速度 比 内存速度快很多)
等待返回指令1所需的数据时候,可以先做第二条指令的计算
2.1.2 乱序的原则
- 前后指令没有依赖关系
- 不影响单线程的最终一致性
2.1.3 可见性,有序性的小示例
代码相关问题
- 可见性:volatile没加
- 有序性:num 和 ready2条指令没有依赖关系,有可能换顺序
private static boolean ready = false;
private static int num;
private static class ReaderThread extends Thread{
@Override
public void run() {
while (!ready) {
Thread.yield();
}
System.out.println(num);
}
}
public static void main(String[] args) throws InterruptedException {
ReaderThread t = new ReaderThread();
t.start();
num = 10;
ready = true;
t.join();
}
2.2 对象的半初始化状态
new:申请一块内存,设置变量m为0(默认值,半初始化状态)
invokespecial:调用构造方法,m赋值为8(构造器的作用:给属性进行赋值操作)
astore:建立关联
2.3 this溢出问题
this的中间状态,溢出:
- 下面代码,有可能导致上图的4,7调整顺序
- 而线程启动的时候,这个num的值很有可能还没有变成8
- 导致this建立关联了num为0,线程有可能读到中间状态0的值
public static int num = 8;
public T08_ThisEscape() {
new Thread(() -> System.out.println(this.num)).start();
}
public static void main(String[] args) throws IOException {
new T08_ThisEscape();
System.in.read();
}
让在构造方法里不让线程启动,这样会让乱序的指令执行完并赋值,从而避免乱序
public static int num = 8;
Thread t;
public T09_Resolve_ThisEscape() {
t = new Thread(() -> System.out.println(num));
}
public void start(){
t.start();
}
public static void main(String[] args) throws IOException {
T09_Resolve_ThisEscape x = new T09_Resolve_ThisEscape();
x.start();
System.in.read();
}
2.4 乱序小结
为了提高执行效率,cpu指令可能会乱序执行
乱序执行不得影响单线程的最终一致性
乱序在多线程并发的情况下可能会产生很难察觉的错误
2.5 阻止乱序
内存屏障
内存屏障是特殊的指令,前面的必须执行完,后面的才能执行
所有实现Jvm规范虚拟机,必须实现四个屏障
LoadLoadBarrier,LoadStoreBarrier,StoreLoadBarrier,StoreStoreBarrier
三. 原子性
多个线程访问共享数据的时候产生竞争
多线程之间改变同一个变量,把变量从主内存中缓存到线程自己的寄存器里,完成对该变量操作后,在写回主内存中
3.1 保证原子性的操作
悲观锁和乐观锁
- 悲观的认为这个操作会被别的线程打断(悲观锁)synchronized(上一个小程序)
- 乐观的认为这个做不会被别的线程打断(乐观锁 自旋锁 无锁)cas操作
上锁能使并发保障序列化,可见性,原子性
不保障有序性,单线程保障最终一致性和上锁没关系
cas会产生的问题
-
如果线程1比较后,还是那个值后,这时,被其他线程2打断,改变了这个值,但线程1因为比较后是相等的,又改变了这个值,会产生误操作 -
ABA,原来的引用地址被别的线程改变内容,但引用地址还是那个
cas底层
cas的atomic底层,调用native的c语言,而c语言调用的unsafe_compareAndSwapInt,最终通过指令cmpxchg = compare and exchange set swap改变变量值,而cmpxchg在cpu手册不是原子性的指令,但有lock cmpxchg,对这个指令上锁
虽然cas叫乐观锁,但在底层的实现是悲观锁(通过c语言指令,把这个总线/所在的缓存行上给锁住了)
但如果是单核的就不用加lock,那么单核并发呢,因为cmpxchg是一条指令,上面/下面不存在指令,所以不会不打断
3.2 悲观锁和乐观锁的效率
悲观锁:上锁后,会有对应的队列等待这把锁,被阻塞住,通过OS调度后,才能抢锁,等待队列中的线程不消耗cpu资源
乐观锁:自旋式等待,线程是运行的,cpu需要线程的循环,又要对线程进行切换,是消耗cpu的资源的
线程执行的时间长:用悲观,否则用乐观
而实际中,用synchronized,底层代码又一系列锁升级
|