前言
读完这一篇,再也不用担心线程方面的面试了
一、线程的基本概念
在操作系统中有两个混淆视听的概念叫做 进程(Process) 和 线程(Thread)
- 进程(Process):
进程是资源的组织单位。进程有一个包含了程序内容和数据的地址空间,以及其它的资源,包括打开的文件、子进程和信号处理器等。不同进程的地址空间是互相隔离的
- 线程(Thread):
线程表示的是程序的执行流程,是CPU调度的基本单位。线程有自己的程序计数器、寄存器、栈和帧等。引入线程的动机在于操作系统中阻塞式I/O的存在。当一个线程所执行的I/O被阻塞的时候,同一进程中的其它线程可以使用CPU来进行计算。这样的话,就提高了应用的执行效率。
二、在Java中线程的创建方式
注:基于我个人的习惯,对于一件事物的学习顺序来说,我更倾向于先知道怎么用,然后在使用的过程中去思考会有什么问题,然后根据问题去找答案,一步步去解决这个问题。打个比方(比方是谁具体百度),一辆汽车的制造过程 肯定是先制造出车架(车身),然后在去一步一步添加零件,去细化这些东西。
- 继承Thread类,重写run()方法,调用start() 执行线程,其实Thread也是基于Runnable来实现的。
static class ThreadTest extends Thread {
@Override
public void run() {
System.out.println("Thread 线程");
}
}
public static void main(String[] args) {
ThreadTest threadTest = new ThreadTest();
threadTest.start();
}
- 实现Runnable接口,重写run()方法,调用start()执行线程。
static class ThreadTest implements Runnable{
@Override
public void run() {
System.out.println("Runnable 线程");
}
}
public static void main(String[] args) {
ThreadTest threadTest = new ThreadTest();
threadTest.start();
}
- 实现Callable接口,重写call(),并且使用Futrue来提交我们的任务。
static class ThreadTest implements Callable<Integer>{
@Override
public Integer call() throws Exception {
System.out.println("Callable 线程")
return new Integer(10);
}
}
public static void main(String[] args) {
ThreadTest ts = new ThreadTest();
FutureTask<Integer> integerFutureTask = new FutureTask<>(ts);
new Thread(integerFutureTask).start();
}
- 实现线程池来创建线程
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4, 3000L, TimeUnit.SECONDS, new LinkedBlockingDeque<>(10));
threadPoolExecutor.execute(new Runnable() {
@Override
public void run() {
System.out.println("线程池 创建....");
}
});
}
- 基于CompletableFuture 对线程的创建,这种方式也是基于Futrue实现的
public static void main(String[] args) {
CompletableFuture.runAsync(new Runnable() {
@Override
public void run() {
System.out.println("当前线程名称:"+Thread.currentThread().getName());
}
});
}
三、为什么会出现线程安全的问题呢?
说到线程安全,这里面涉及到两个概念
- 并行
并行是指两个或者多个事件在同一时刻发生,并行是在不同实体上的多个事件,
- 并发
并发是指两个或多个事件在同一时间间隔发生
如上图所述,我们每条线程都有属于自己的一个工作线程,线程A和线程B同时对共享变量i进行操作,在读取的过程中我们没看出什么异样,但在回写的过程中后执行的线程会覆盖先执行线程的数据,这就导致了数据不一致的问题。
由此可总结出引发线程安全的主要几个因素
四、Java中是如何保证线程安全
在java高并发,多线程的程序中,势必会引起数据的一致性问题,在这一系列问题的催生下,锁就此应运而生。(本章主要讲解单机情况下)
public static void main(String[] args) {
Object o = new Object();
String s= ClassLayout.parseInstance(o).toPrintable();
System.out.println("========================对象未上锁之前===================================");
System.out.println(s);
System.out.println("========================对象未上锁之后===================================");
synchronized (o){
System.out.println(ClassLayout.parseInstance(o).toPrintable());
}
}
输出结果: 细心的同学不难发现,对象的头部信息发生了变化,由此可见,我们使用的synchronized锁,是放在了对象的MarkWord分区中,MarkWord中默认存储的是我们的HashCode值,会随着对象的变化而变化,不同的锁状态会对应不同的存储方式,可能发生的值如下图所示:
synchronized锁在java中就是在对象头上面记录一些锁的信息,那最底层的源码是如何实现的呢? 这就要我们去找到C++的源码来看具体的操作
Hotspot源码目录,想自己详细研究的小伙伴可以根据下面目录找到具体的实现
-
Monitor:openjdk\hotspot\src\share\vm\runtime\objectMonitor.hpp -
MarkWord:openjdk\hotspot\src\share\vm\oops\markOop.hpp -
monitorenter|exit指令:openjdk\hotspot\src\share\vm\interpreter\interpreterRuntime.cpp -
偏向锁:openjdk\hotspot\src\share\vm\runtime\biasedLocking.cpp # InterpreterRuntime::monitorenter 当前获取锁的线程
# BasicObjectLock 基础对象锁
IRT_ENTRY_NO_ASYNC(void, InterpreterRuntime::monitorenter(JavaThread* thread, BasicObjectLock* elem))
#ifdef ASSERT
thread->last_frame().interpreter_frame_verify_monitor(elem);
#endif
if (PrintBiasedLockingStatistics) {
Atomic::inc(BiasedLocking::slow_path_entry_count_addr());
}
Handle h_obj(thread, elem->obj());
assert(Universe::heap()->is_in_reserved_or_null(h_obj()),
"must be NULL or an object");
if (UseBiasedLocking) {
ObjectSynchronizer::fast_enter(h_obj, elem->lock(), true, CHECK);
} else {
ObjectSynchronizer::slow_enter(h_obj, elem->lock(), CHECK);
}
assert(Universe::heap()->is_in_reserved_or_null(elem->obj()),
"must be NULL or an object");
#ifdef ASSERT
thread->last_frame().interpreter_frame_verify_monitor(elem);
#endif
IRT_END
偏向锁:ObjectSynchronizer::fast_enter的实现在 synchronizer.cpp void ObjectSynchronizer::fast_enter(Handle obj, BasicLock* lock, bool attempt_rebias, TRAPS) {
if (UseBiasedLocking) {
if (!SafepointSynchronize::is_at_safepoint()) {
BiasedLocking::Condition cond = BiasedLocking::revoke_and_rebias(obj, attempt_rebias, THREAD);
if (cond == BiasedLocking::BIAS_REVOKED_AND_REBIASED) {
return;
}
} else {
assert(!attempt_rebias, "can not rebias toward VM thread");
BiasedLocking::revoke_at_safepoint(obj);
}
assert(!obj->mark()->has_bias_pattern(), "biases should be revoked by now");
}
slow_enter (obj, lock, THREAD) ;
}
偏向锁的获取逻辑:BiasedLocking::revoke_and_rebias(obj, attempt_rebias, THREAD); BiasedLocking::Condition BiasedLocking::revoke_and_rebias(Handle obj, bool attempt_rebias, TRAPS) {
assert(!SafepointSynchronize::is_at_safepoint(), "must not be called while at safepoint");
markOop mark = obj->mark();
if (mark->is_biased_anonymously() && !attempt_rebias) {
markOop biased_value = mark;
markOop unbiased_prototype = markOopDesc::prototype()->set_age(mark->age());
markOop res_mark = (markOop) Atomic::cmpxchg_ptr(unbiased_prototype, obj->mark_addr(), mark);
if (res_mark == biased_value) {
return BIAS_REVOKED;
}
} else if (mark->has_bias_pattern()) {
Klass* k = obj->klass();
markOop prototype_header = k->prototype_header();
if (!prototype_header->has_bias_pattern()) {
markOop biased_value = mark;
markOop res_mark = (markOop) Atomic::cmpxchg_ptr(prototype_header, obj->mark_addr(), mark);
assert(!(*(obj->mark_addr()))->has_bias_pattern(), "even if we raced, should still be revoked");
return BIAS_REVOKED;
} else if (prototype_header->bias_epoch() != mark->bias_epoch()) {
if (attempt_rebias) {
assert(THREAD->is_Java_thread(), "");
markOop biased_value = mark;
markOop rebiased_prototype = markOopDesc::encode((JavaThread*) THREAD, mark->age(), prototype_header->bias_epoch());
markOop res_mark = (markOop) Atomic::cmpxchg_ptr(rebiased_prototype, obj->mark_addr(), mark);
if (res_mark == biased_value) {
return BIAS_REVOKED_AND_REBIASED;
}
} else {
markOop biased_value = mark;
markOop unbiased_prototype = markOopDesc::prototype()->set_age(mark->age());
markOop res_mark = (markOop) Atomic::cmpxchg_ptr(unbiased_prototype, obj->mark_addr(), mark);
if (res_mark == biased_value) {
return BIAS_REVOKED;
}
}
}
}
}
-
CAS全称compare and swap 或者说 compare and set 译为比较并交换
在java中 CAS是一种轻量级锁,业界有人也称之为自旋锁或无锁。在jdk中提供了一些Atomic类来对CAS进行实现,CAS呢提供了三个参数E、V、N,E代表旧值,V代表预期的旧值,N代表修改的值。所谓的比较并交换就是比较E和V的值如果相同就修改N。
大家看如下代码 public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
CAS源码实现: 注:CAS源码有不同的实现方式,这里主要是atomic_linux_x86.inline.phh下的实现
inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) {
int mp = os::is_MP();
__asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)"
: "=a" (exchange_value)
: "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp)
: "cc", "memory");
return exchange_value;
}
LOCK_IF_MP实现:
#define LOCK_IF_MP(mp) "cmp $0, " #mp "; je 1f; lock; 1: "
仔细阅读完如上代码,你就会发现CAS的在C++代码层面的实现了
- lock汇编指令:可以理解为CPU指令级的一种锁。它后面可以跟ADD, ADC, AND, BTC, BTR, BTS, CMPXCHG, CMPXCH8B, DEC, INC, NEG, NOT, OR, SBB, SUB, XOR, XADD, and XCHG等指令
- cmpxchgl汇编指令:比较并交换操作数.如:CMPXCHG r/m,r 将累加器AL/AX/EAX/RAX中的值与首操作数(目的操作数)比较,如果相等,第2操作数(源操作数)的值装载到首操作数,zf置1。如果不等,首操作数的值装载到AL/AX/EAX/RAX并将zf清0
总结:在x86架构上,CAS被翻译为”lock cmpxchg…“。cmpxchg是CAS的汇编指令。在CPU架构中依靠lock信号保证可见性并禁止重排序,在java层面利用do{}while循环来控制自旋。
关于锁的实现原理就讲到这里,感兴趣的小伙伴可以自行查看源码,比如说锁的释放,锁膨胀过程,锁竞争,锁升级,带着这些问题去分析源码,或者利用hsdis反编译工具来监控锁的执行状态,我相信你们都能找到属于自己的答案和理解。
五、线程池的实现原理
相信很多朋友在面试的过程中常常被问到线程池这方面的问题,核心线程数设置多少合适,最大线程数为什么要在核心线程数的基础上*2等等一系列的问题。今天我们自己来手写一个自己的线程池,来帮助大家理解线程池的一个实现原理
池化思想,统一管理线程,避免频繁创建线程,线程复用
-
统一管理线程:对线程进行集中管理
private List<WorkQue> workQues;
private BlockingDeque<Runnable> runnableDeque;
-
避免频繁创建线程:避免重复创建线程,顾名思义就是让线程一直在运行状态呗 class WorkQue extends Thread{
@Override
public void run() {
while (){
System.sout.pringln("处于死循环,一直运行的线程")
}
}
-
合并到一块进行优化一下如下 public class MyThreadPool {
private List<WorkQue> workQues;
private boolean isE = true;
private BlockingDeque<Runnable> runnableDeque;
public MyThreadPool(int maxThreadPool,int maxWorkQue){
runnableDeque = new LinkedBlockingDeque<>(maxWorkQue);
workQues = new ArrayList<>(maxWorkQue);
for (int i = 0 ; i< maxThreadPool;i++){
new WorkQue().start();
}
}
class WorkQue extends Thread{
@Override
public void run() {
while (isE || runnableDeque.size()>0){
Runnable poll = runnableDeque.poll();
if (poll!=null)
poll.run();
}
}
}
public boolean execute(Runnable com){
return runnableDeque.offer(com);
}
-
为什么阿里巴巴开发手册不推荐使用JDK自带的线程池呢? public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
}
通过上面的代码我们也能看出来newSingleThreadExecutor创建的线程池,他的缓存队列采用的是LinkedBlockingQueue构造函数中默认的队列长度,Intrger的最大值,采用这种方式会没有限度的缓存我们的任务,可能会导致oom内存溢出的现象,还会导致我们设置的最大线程数失效的问题(关于为什么会导致最大线程数失效的问题,自行百度线程池的7个参数) -
核心线程数的设置
《Java虚拟机并发编程》中提出的一个公式:线程数 = CPU 核心数 / (1 - 阻塞系数) 下图引用自Java并发编程实战
《Java并发编程实战》中也提出了一个公式:线程数 = CPU 核心数 * (1 + IO 耗时/ CPU 耗时)
总结:线程池的核心线程数和Cpu核心数有很大的关联,但实际设置大小要根据压测以及预估来进行决定的,不同的场景应用不同的策略可以得到最合适的选择
|