背景
我们有些时候会碰到需要在线程池中使用threadLocal的场景,典型的比如分布式链路追踪的traceId,业务场景中的获取用户id等简单信息。如果你去查看下自己公司这块代码的实现,你会发现他们几乎都没有使用jdk自带的ThreadLocal对象,而是使用了alibaba的TransmittleThreadLocal,为什么呢,这就关系到一个线程变量之间的传递性。 在此之前,我们先来了解在java中,一个线程初始化经历了哪些步骤~
线程初始化干了什么
一句话概括,线程初始化时,在参数缺省的情况下,会继承父线程的属性
直接看代码吧,注释在上面。所有的Thread最后都会调用init方法
public Thread() {
init(null, null, "Thread-" + nextThreadNum(), 0);
}
private void init(ThreadGroup g, Runnable target, String name,
long stackSize) {
init(g, target, name, stackSize, null, true);
}
private void init(ThreadGroup g, Runnable target, String name,
long stackSize, AccessControlContext acc,
boolean inheritThreadLocals) {
if (name == null) {
throw new NullPointerException("name cannot be null");
}
this.name = name;
Thread parent = currentThread();
SecurityManager security = System.getSecurityManager();
if (g == null) {
if (security != null) {
g = security.getThreadGroup();
}
if (g == null) {
g = parent.getThreadGroup();
}
}
g.checkAccess();
if (security != null) {
if (isCCLOverridden(getClass())) {
security.checkPermission(SUBCLASS_IMPLEMENTATION_PERMISSION);
}
}
g.addUnstarted();
this.group = g;
this.daemon = parent.isDaemon();
this.priority = parent.getPriority();
if (security == null || isCCLOverridden(parent.getClass()))
this.contextClassLoader = parent.getContextClassLoader();
else
this.contextClassLoader = parent.contextClassLoader;
this.inheritedAccessControlContext =
acc != null ? acc : AccessController.getContext();
this.target = target;
setPriority(priority);
if (inheritThreadLocals && parent.inheritableThreadLocals != null)
this.inheritableThreadLocals =
ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
this.stackSize = stackSize;
tid = nextThreadID();
}
看完了上面的代码和注释,相信大家多半有点数了,其中的重点是,子线程copy了父线程的inheritableThreadLocals到自己的inheritableThreadLocals中。而一个ThreadLocal有2个集合存放ThreadLocal,另一个是ThreadLocals。ThreadLocals不会被继承
如果我们使用ThreadLocal,那set时就会将值放到ThreadLocals中,所以普通的ThreadLocal无法被子线程获取
子线程获取父线程的ThreadLocal
InheritableThreadLocal
InheritableThreadLocal支持在子线程获取父线程中的值
public static void main(String[] args) throws InterruptedException {
ThreadLocal<String> itl = new InheritableThreadLocal<>();
itl.set("part");
new Thread(() -> {
System.out.println(itl.get());
itl.set("son");
System.out.println(itl.get());
}).start();
Thread.sleep(500);
System.out.println("thread:" + itl.get());
}
part
son
thread:part
为什么呢,相比小伙伴们一定猜出来了,InheritableThreadLocal是从inheritableThreadLocals获取的,而InheritableThreadLocal是默认继承的。
package java.lang;
import java.lang.ref.*;
public class InheritableThreadLocal<T> extends ThreadLocal<T> {
protected T childValue(T parentValue) {
return parentValue;
}
ThreadLocalMap getMap(Thread t) {
return t.inheritableThreadLocals;
}
void createMap(Thread t, T firstValue) {
t.inheritableThreadLocals = new ThreadLocalMap(this, firstValue);
}
}
InheritableThreadLocal本身的设计就很简单,他直接继承ThreadLocal,重写了createMap等方法,从线程的InheritableThreadLocals中获取
线程池中使用ThreadLocal
那InheritableThreadLocal既然能够获取父线程的ThreadLocal了,那为什么还要用TransmittableThreadLocal呢?
因为InheritableThreadLocal不支持池化场景
看懂上面代码的小伙伴一定能够注意到线程变量的继承是发生在线程init这个节点,对于线程池这种已经将线程创建好的场景,并不可能再去调用init。 而TransmittableThreadLocal则克服了这种场景,它在InheritableThreadLocal的基础上使线程池的线程也可以拿到当前线程的变量。
先说明TransmittableThreadLocal的解题思路
包装Runnable,将调用线程的变量取出来,一起传入线程池中
先看一下TransmittableThreadLocal的使用方式
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = new ThreadPoolExecutor(1, 1,
0, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1));
Executor ttlExecutor = TtlExecutors.getTtlExecutor(executorService);
executorService.execute(()->{
log.info("xx");
});
TransmittableThreadLocal<String> ttl = new TransmittableThreadLocal<>();
ttl.set("ps5");
Thread.sleep(1000L);
ttlExecutor.execute(()->{
log.info(ttl.get());
});
}
源码简单解析
由于ttl的代码还挺复杂,我也是一知半解,这里只描述下核心代码和流程,帮助大家理解ttl的原理, 在此先列出要使用ttl的步骤(结合代码)
创建一个线程池
没啥好说的
包装提交的线程池任务
public class TtlExecutors{
public static Executor getTtlExecutor(@Nullable Executor executor) {
if (TtlAgent.isTtlAgentLoaded() || null == executor || executor instanceof TtlEnhanced) {
return executor;
}
return new ExecutorTtlWrapper(executor, true);
}
}
它到底干了什么呢?,看一下ExecutorTtlWrapper,它重写了execute方法,将Runable包装成了 TtlRunnable,相比较Runable,TtlRunable中最重要的一点就是包含了AtomicReference对象,该对象持有ttl的副本的引用,当我们向线程池提交任务的时候,该引用的值会被放入当前线程的InheritableThreadLocals中,所以我们能在方法中引用ttl~
package com.alibaba.ttl.threadpool;
import com.alibaba.ttl.TransmittableThreadLocal;
import com.alibaba.ttl.TtlRunnable;
import com.alibaba.ttl.spi.TtlEnhanced;
import com.alibaba.ttl.spi.TtlWrapper;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.util.concurrent.Executor;
class ExecutorTtlWrapper implements Executor, TtlWrapper<Executor>, TtlEnhanced {
private final Executor executor;
protected final boolean idempotent;
ExecutorTtlWrapper(@NonNull Executor executor, boolean idempotent) {
this.executor = executor;
this.idempotent = idempotent;
}
@Override
public void execute(@NonNull Runnable command) {
executor.execute(TtlRunnable.get(command, false, idempotent));
}
}
TtlRunnable对象,包装了Runnable,持有AtomicReference
public final class TtlRunnable implements Runnable, TtlWrapper<Runnable>, TtlEnhanced, TtlAttachments {
private final AtomicReference<Object> capturedRef;
private final Runnable runnable;
private final boolean releaseTtlValueReferenceAfterRun;
private TtlRunnable(@NonNull Runnable runnable, boolean releaseTtlValueReferenceAfterRun) {
this.capturedRef = new AtomicReference<Object>(capture());
this.runnable = runnable;
this.releaseTtlValueReferenceAfterRun = releaseTtlValueReferenceAfterRun;
}
@Override
那capturedRef是什么时候有值的呢?当然是在构造该对象的时候啦~ 来看一下它的构造方法
private TtlRunnable(@NonNull Runnable runnable, boolean releaseTtlValueReferenceAfterRun) {
this.capturedRef = new AtomicReference<Object>(capture());
this.runnable = runnable;
this.releaseTtlValueReferenceAfterRun = releaseTtlValueReferenceAfterRun;
}
重点,capture方法
@NonNull
public static Object capture() {
return new Snapshot(captureTtlValues(), captureThreadLocalValues());
}
private static HashMap<TransmittableThreadLocal<Object>, Object> captureTtlValues() {
HashMap<TransmittableThreadLocal<Object>, Object> ttl2Value = new HashMap<TransmittableThreadLocal<Object>, Object>();
for (TransmittableThreadLocal<Object> threadLocal : holder.get().keySet()) {
ttl2Value.put(threadLocal, threadLocal.copyValue());
}
return ttl2Value;
}
private static HashMap<ThreadLocal<Object>, Object> captureThreadLocalValues() {
final HashMap<ThreadLocal<Object>, Object> threadLocal2Value = new HashMap<ThreadLocal<Object>, Object>();
for (Map.Entry<ThreadLocal<Object>, TtlCopier<Object>> entry : threadLocalHolder.entrySet()) {
final ThreadLocal<Object> threadLocal = entry.getKey();
final TtlCopier<Object> copier = entry.getValue();
threadLocal2Value.put(threadLocal, copier.copy(threadLocal.get()));
}
return threadLocal2Value;
}
从上面可以看到,capturedRef中保存的就是ttl的副本的AtomicReference引用,其实上面有一个非常核心的holder全局变量,它是用来存放ttl的,稍后细说。到此刻,ttl在线程池的传递的前置工作(指由调用线程执行的动作)都已经全部完成。
总结下这一步干了什么事
- 将线程池包装成ExecutorTtlWrapper,被包装成ExecutorTtlWrapper的线程池会将每一个提交到线程池中的Runnable变成TtlRunnable
- 从当前线程获取ttl上下文的副本,并用AtomicReference存在TtlRunnable中
任务执行
从这里开始,执行线程就是线程池中的线程了
@Override
public void run() {
final Object captured = capturedRef.get();
if (captured == null || releaseTtlValueReferenceAfterRun && !capturedRef.compareAndSet(captured, null)) {
throw new IllegalStateException("TTL value reference is released after run!");
}
final Object backup = replay(captured);
try {
runnable.run();
} finally {
restore(backup);
}
}
@NonNull
public static Object replay(@NonNull Object captured) {
final Snapshot capturedSnapshot = (Snapshot) captured;
return new Snapshot(replayTtlValues(capturedSnapshot.ttl2Value), replayThreadLocalValues(capturedSnapshot.threadLocal2Value));
}
@NonNull
private static HashMap<TransmittableThreadLocal<Object>, Object> replayTtlValues(@NonNull HashMap<TransmittableThreadLocal<Object>, Object> captured) {
HashMap<TransmittableThreadLocal<Object>, Object> backup = new HashMap<TransmittableThreadLocal<Object>, Object>();
for (final Iterator<TransmittableThreadLocal<Object>> iterator = holder.get().keySet().iterator(); iterator.hasNext(); ) {
TransmittableThreadLocal<Object> threadLocal = iterator.next();
backup.put(threadLocal, threadLocal.get());
if (!captured.containsKey(threadLocal)) {
iterator.remove();
threadLocal.superRemove();
}
}
setTtlValuesTo(captured);
doExecuteCallback(true);
return backup;
}
private static HashMap<ThreadLocal<Object>, Object> replayThreadLocalValues(@NonNull HashMap<ThreadLocal<Object>, Object> captured) {
final HashMap<ThreadLocal<Object>, Object> backup = new HashMap<ThreadLocal<Object>, Object>();
for (Map.Entry<ThreadLocal<Object>, Object> entry : captured.entrySet()) {
final ThreadLocal<Object> threadLocal = entry.getKey();
backup.put(threadLocal, threadLocal.get());
final Object value = entry.getValue();
if (value == threadLocalClearMark) threadLocal.remove();
else threadLocal.set(value);
}
return backup;
}
总结下这一步干了什么事
- 从captureRef中获得ttl的副本
- 备份当前线程的上下文,并将captureRef引用的副本都写入inheritThreadLocal中
- 执行方法
- 还原当前线程的上下文
这些步骤执行完,我们就可以在线程池中拿到ttl变量了
holder的作用
从上面的代码我们可以发现,holder就是存放ttl的地方,我们看一下holder的结构
private static final InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>> holder =
new InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>>() {
@Override
protected WeakHashMap<TransmittableThreadLocal<Object>, ?> initialValue() {
return new WeakHashMap<TransmittableThreadLocal<Object>, Object>();
}
@Override
protected WeakHashMap<TransmittableThreadLocal<Object>, ?> childValue(WeakHashMap<TransmittableThreadLocal<Object>, ?> parentValue) {
return new WeakHashMap<TransmittableThreadLocal<Object>, Object>(parentValue);
}
};
holder其实是一个全局静态的InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal, ?>>变量,注意holder是全局静态的变量,但是通过holder只能操作到当前线程的上下文,理解这点很重要。 当我们对ttl进行set时,除了将值存到inheritThreadLocals,还会将值存入到holder中。但需要注意,get时,ttl还是从inheritThreadLocals中读取值,而不是holder。
可以理解为holder就是存取ttl入口,以及为了使ttl具备线程池传递性的缓冲层,而ttl真正存储的位置还是inheritThreadLocals
|