27、ExitSpan和LocalSpan
1)、ExitSpan
ExitSpan代表服务消费侧,比如Feign、Okhttp。ExitSpan是链路中一个退出的点或者离开的Span。在一个RPC调用中,会有多层退出的点,而ExitSpan永远表示第一个。比如,Dubbox中使用HttpComponent发起远程调用。ExitSpan表示Dubbox的Span,并忽略HttpComponent的Span信息
EntrySpan和ExitSpan的区别就在于:
- EntrySpan记录的是更靠近服务这一侧的信息
- ExitSpan记录的是更靠近消费这一侧的信息
ExitSpan代码如下:
public class ExitSpan extends StackBasedTracingSpan implements ExitTypeSpan {
public ExitSpan(int spanId, int parentSpanId, String operationName, String peer, TracingContext owner) {
super(spanId, parentSpanId, operationName, peer, owner);
}
public ExitSpan(int spanId, int parentSpanId, String operationName, TracingContext owner) {
super(spanId, parentSpanId, operationName, owner);
}
@Override
public ExitSpan start() {
if (++stackDepth == 1) {
super.start();
}
return this;
}
@Override
public ExitSpan tag(String key, String value) {
if (stackDepth == 1 || isInAsyncMode) {
super.tag(key, value);
}
return this;
}
@Override
public AbstractTracingSpan tag(AbstractTag<?> tag, String value) {
if (stackDepth == 1 || tag.isCanOverwrite() || isInAsyncMode) {
super.tag(tag, value);
}
return this;
}
@Override
public AbstractTracingSpan setLayer(SpanLayer layer) {
if (stackDepth == 1 || isInAsyncMode) {
return super.setLayer(layer);
} else {
return this;
}
}
@Override
public AbstractTracingSpan setComponent(Component component) {
if (stackDepth == 1 || isInAsyncMode) {
return super.setComponent(component);
} else {
return this;
}
}
@Override
public ExitSpan log(Throwable t) {
super.log(t);
return this;
}
@Override
public AbstractTracingSpan setOperationName(String operationName) {
if (stackDepth == 1 || isInAsyncMode) {
return super.setOperationName(operationName);
} else {
return this;
}
}
@Override
public String getPeer() {
return peer;
}
@Override
public ExitSpan inject(final ContextCarrier carrier) {
this.owner.inject(this, carrier);
return this;
}
@Override
public boolean isEntry() {
return false;
}
@Override
public boolean isExit() {
return true;
}
}
如上图,假设有一个应用部署在Tomcat上,使用SpringMVC提供一个getUser() 的Controller方法,getUser() 方法会在调用Redis和MySQL,对于这样一个流程的TraceSegment是怎样的?
Tomcat一进来就会创建EntrySpan,SpringMVC会复用Tomcat创建的EntrySpan。当访问Redis时会创建一个ExitSpan,peer会记录Redis地址
ExitSpan不要把理解为TraceSegment的结束,可以理解为离开当前TraceSegment的操作。当访问MySQL时也会创建一个ExitSpan,peer会记录MySQL地址。这里因为访问Redis和访问MySQL并不是嵌套关系,所以并不复用前面的ExitSpan
注意:
- 所谓ExitSpan和EntrySpan一样采用复用的机制,前提是在插件嵌套的情况下
- 多个ExitSpan不存在嵌套关系,是平行存在的时候,是允许同时存在多个ExitSpan
- 把ExitSpan简单理解为离开当前进程/线程的操作
- TraceSegment里不一定非要有ExitSpan
2)、LocalSpan
LocalSpan继承关系如下:
LocalSpan代码如下:
public class LocalSpan extends AbstractTracingSpan {
public LocalSpan(int spanId, int parentSpanId, String operationName, TracingContext owner) {
super(spanId, parentSpanId, operationName, owner);
}
@Override
public boolean isEntry() {
return false;
}
@Override
public boolean isExit() {
return false;
}
@Override
public AbstractSpan setPeer(String remotePeer) {
return this;
}
}
小结:
SkyWalking中Span的继承关系如下图:
28、链路追踪上下文
1)、AbstractTracerContext
AbstractTracerContext代表链路追踪过程上下文管理器
public interface AbstractTracerContext {
void inject(ContextCarrier carrier);
void extract(ContextCarrier carrier);
跨进程时,inject() 方法将前一个TraceSegment的数据打包放到ContextCarrier中,传递到后一个TraceSegment,extract() 方法负责解压ContextCarrier中的数据
public interface AbstractTracerContext {
ContextSnapshot capture();
void continued(ContextSnapshot snapshot);
inject() 和extract() 方法是在跨进程传播数据时使用的,capture() 和continued() 方法是在跨线程传播数据时使用的
public interface AbstractTracerContext {
String getReadablePrimaryTraceId();
String getSegmentId();
int getSpanId();
AbstractSpan createEntrySpan(String operationName);
AbstractSpan createLocalSpan(String operationName);
AbstractSpan createExitSpan(String operationName, String remotePeer);
AbstractSpan activeSpan();
boolean stopSpan(AbstractSpan span);
AbstractTracerContext awaitFinishAsync();
void asyncStop(AsyncSpan span);
2)、TracingContext
TracingContext是一个核心的链路追踪逻辑控制器,实现了AbstractTracerContext接口,使用栈的工作机制来构建TracingContext
TracingContext负责管理:
- 当前Segment和自己前后的Segment的引用TraceSegmentRef
- 当前Segment内的所有Span
TracingContext中定义的属性如下:
public class TracingContext implements AbstractTracerContext {
private TraceSegment segment;
private LinkedList<AbstractSpan> activeSpanStack = new LinkedList<>();
private AbstractSpan firstSpan = null;
private int spanIdGenerator;
@SuppressWarnings("unused")
private volatile int asyncSpanCounter;
private static final AtomicIntegerFieldUpdater<TracingContext> ASYNC_SPAN_COUNTER_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(TracingContext.class, "asyncSpanCounter");
private volatile boolean isRunningInAsyncMode;
private volatile ReentrantLock asyncFinishLock;
private volatile boolean running;
private final long createTime;
private final SpanLimitWatcher spanLimitWatcher;
TraceSegment中所有创建的Span都会入栈到activeSpanStack中,Span finish的时候会出站,栈顶的Span就是activeSpan
TracingContext中get方法:
public class TracingContext implements AbstractTracerContext {
@Override
public String getReadablePrimaryTraceId() {
return getPrimaryTraceId().getId();
}
private DistributedTraceId getPrimaryTraceId() {
return segment.getRelatedGlobalTrace();
}
@Override
public String getSegmentId() {
return segment.getTraceSegmentId();
}
@Override
public int getSpanId() {
return activeSpan().getSpanId();
}
@Override
public AbstractSpan activeSpan() {
AbstractSpan span = peek();
if (span == null) {
throw new IllegalStateException("No active span.");
}
return span;
}
private AbstractSpan peek() {
if (activeSpanStack.isEmpty()) {
return null;
}
return activeSpanStack.getLast();
}
activeSpan() 方法就是取activeSpanStack栈顶的元素,所以说activeSpanStack栈顶的Span就是activeSpan
TracingContext中创建Span的方法:
public class TracingContext implements AbstractTracerContext {
@Override
public AbstractSpan createEntrySpan(final String operationName) {
if (isLimitMechanismWorking()) {
NoopSpan span = new NoopSpan();
return push(span);
}
AbstractSpan entrySpan;
TracingContext owner = this;
final AbstractSpan parentSpan = peek();
final int parentSpanId = parentSpan == null ? -1 : parentSpan.getSpanId();
if (parentSpan != null && parentSpan.isEntry()) {
profilingRecheck(parentSpan, operationName);
parentSpan.setOperationName(operationName);
entrySpan = parentSpan;
return entrySpan.start();
} else {
entrySpan = new EntrySpan(
spanIdGenerator++, parentSpanId,
operationName, owner
);
entrySpan.start();
return push(entrySpan);
}
}
private boolean isLimitMechanismWorking() {
if (spanIdGenerator >= spanLimitWatcher.getSpanLimit()) {
long currentTimeMillis = System.currentTimeMillis();
if (currentTimeMillis - lastWarningTimestamp > 30 * 1000) {
LOGGER.warn(
new RuntimeException("Shadow tracing context. Thread dump"),
"More than {} spans required to create", spanLimitWatcher.getSpanLimit()
);
lastWarningTimestamp = currentTimeMillis;
}
return true;
} else {
return false;
}
}
@Override
public AbstractSpan createLocalSpan(final String operationName) {
if (isLimitMechanismWorking()) {
NoopSpan span = new NoopSpan();
return push(span);
}
AbstractSpan parentSpan = peek();
final int parentSpanId = parentSpan == null ? -1 : parentSpan.getSpanId();
AbstractTracingSpan span = new LocalSpan(spanIdGenerator++, parentSpanId, operationName, this);
span.start();
return push(span);
}
@Override
public AbstractSpan createExitSpan(final String operationName, final String remotePeer) {
if (isLimitMechanismWorking()) {
NoopExitSpan span = new NoopExitSpan(remotePeer);
return push(span);
}
AbstractSpan exitSpan;
AbstractSpan parentSpan = peek();
TracingContext owner = this;
if (parentSpan != null && parentSpan.isExit()) {
exitSpan = parentSpan;
} else {
final int parentSpanId = parentSpan == null ? -1 : parentSpan.getSpanId();
exitSpan = new ExitSpan(spanIdGenerator++, parentSpanId, operationName, remotePeer, owner);
push(exitSpan);
}
exitSpan.start();
return exitSpan;
}
TracingContext中创建Span的方法处理逻辑如下:
- 判断当前TraceSegment是否能创建更多的Span,如果不能初始化NoopXxxSpan然后入栈
- 如果能创建,弹出activeSpanStack栈顶的Span(也就是activeSpan)作为当前要创建的这个Span的parent,拿到parentSpan的Id,如果parent不存在,则parentSpanId=-1
- 创建EntrySpan和ExitSpan时,会判断如果parentSpan也是同类型的Span则复用,否则才会初始化并入栈。创建LocalSpan时直接初始化并入栈
TracingContext中stopSpan() 方法:
public class TracingContext implements AbstractTracerContext {
@Override
public boolean stopSpan(AbstractSpan span) {
AbstractSpan lastSpan = peek();
if (lastSpan == span) {
if (lastSpan instanceof AbstractTracingSpan) {
AbstractTracingSpan toFinishSpan = (AbstractTracingSpan) lastSpan;
if (toFinishSpan.finish(segment)) {
pop();
}
} else {
pop();
}
} else {
throw new IllegalStateException("Stopping the unexpected span = " + span);
}
finish();
return activeSpanStack.isEmpty();
}
private void finish() {
if (isRunningInAsyncMode) {
asyncFinishLock.lock();
}
try {
boolean isFinishedInMainThread = activeSpanStack.isEmpty() && running;
if (isFinishedInMainThread) {
TracingThreadListenerManager.notifyFinish(this);
}
if (isFinishedInMainThread && (!isRunningInAsyncMode || asyncSpanCounter == 0)) {
TraceSegment finishedSegment = segment.finish(isLimitMechanismWorking());
TracingContext.ListenerManager.notifyFinish(finishedSegment);
running = false;
}
} finally {
if (isRunningInAsyncMode) {
asyncFinishLock.unlock();
}
}
}
stopSpan() 方法处理逻辑如下:
-
传入的Span必须是activeSpanStack栈顶的Span,否则抛出异常 -
栈顶的Span出栈,如果栈顶的Span是AbstractTracingSpan,调用Span自身的finish方法 -
如果栈已经空了且当前TracingContext还在运行状态 1)关闭当前TraceSegment 2)将当前TraceSegment交给TracingContextListener去处理,TracingContextListener会将TraceSegment发送到OAP 3)修改当前TracingContext运行状态为false
TracingContext中inject() 和extract() 方法:
public class TracingContext implements AbstractTracerContext {
@Override
public void inject(ContextCarrier carrier) {
this.inject(this.activeSpan(), carrier);
}
public void inject(AbstractSpan exitSpan, ContextCarrier carrier) {
if (!exitSpan.isExit()) {
throw new IllegalStateException("Inject can be done only in Exit Span");
}
ExitTypeSpan spanWithPeer = (ExitTypeSpan) exitSpan;
String peer = spanWithPeer.getPeer();
if (StringUtil.isEmpty(peer)) {
throw new IllegalStateException("Exit span doesn't include meaningful peer information.");
}
carrier.setTraceId(getReadablePrimaryTraceId());
carrier.setTraceSegmentId(this.segment.getTraceSegmentId());
carrier.setSpanId(exitSpan.getSpanId());
carrier.setParentService(Config.Agent.SERVICE_NAME);
carrier.setParentServiceInstance(Config.Agent.INSTANCE_NAME);
carrier.setParentEndpoint(first().getOperationName());
carrier.setAddressUsedAtClient(peer);
this.correlationContext.inject(carrier);
this.extensionContext.inject(carrier);
}
@Override
public void extract(ContextCarrier carrier) {
TraceSegmentRef ref = new TraceSegmentRef(carrier);
this.segment.ref(ref);
this.segment.relatedGlobalTrace(new PropagatedTraceId(carrier.getTraceId()));
AbstractSpan span = this.activeSpan();
if (span instanceof EntrySpan) {
span.ref(ref);
}
carrier.extractExtensionTo(this);
carrier.extractCorrelationTo(this);
}
客户端A、服务端B两个应用服务,当发生一次A调用B的时候,跨进程传播的步骤如下:
- 客户端A创建一个ExitSpan,调用TracingContext的
inject() 方法初始化ContextCarrier - 使用ContextCarrier的
items() 方法将ContextCarrier所有元素放到调用过程中的请求信息中,比如HTTP的请求头、Dubbo的attachments、Kafka的消息头中 - ContextCarrier随请求传输到服务端
- 服务端B接收具有ContextCarrier的请求,并提取ContextCarrier相关的所有信息
- 服务端B创建EntrySpan,调用TracingContext的
extract() 方法绑定当前TraceSegment的traceSegmentRef、traceId以及EntrySpan的ref
TracingContext中capture() 和continued() 方法:
public class TracingContext implements AbstractTracerContext {
@Override
public ContextSnapshot capture() {
ContextSnapshot snapshot = new ContextSnapshot(
segment.getTraceSegmentId(),
activeSpan().getSpanId(),
getPrimaryTraceId(),
first().getOperationName(),
this.correlationContext,
this.extensionContext
);
return snapshot;
}
@Override
public void continued(ContextSnapshot snapshot) {
if (snapshot.isValid()) {
TraceSegmentRef segmentRef = new TraceSegmentRef(snapshot);
this.segment.ref(segmentRef);
this.activeSpan().ref(segmentRef);
this.segment.relatedGlobalTrace(snapshot.getTraceId());
this.correlationContext.continued(snapshot);
this.extensionContext.continued(snapshot);
this.extensionContext.handle(this.activeSpan());
}
}
小结:
29、上下文适配器ContextManager
TraceSegment及其所包含的Span都在同一个线程内,ContextManager使用ThreadLocal来管理TraceSegment的上下文(也就是AbstractTracerContext)
ContextManager中getOrCreate() 方法:
public class ContextManager implements BootService {
private static ThreadLocal<AbstractTracerContext> CONTEXT = new ThreadLocal<AbstractTracerContext>();
private static AbstractTracerContext getOrCreate(String operationName, boolean forceSampling) {
AbstractTracerContext context = CONTEXT.get();
if (context == null) {
if (StringUtil.isEmpty(operationName)) {
if (LOGGER.isDebugEnable()) {
LOGGER.debug("No operation name, ignore this trace.");
}
context = new IgnoredTracerContext();
} else {
if (EXTEND_SERVICE == null) {
EXTEND_SERVICE = ServiceManager.INSTANCE.findService(ContextManagerExtendService.class);
}
context = EXTEND_SERVICE.createTraceContext(operationName, forceSampling);
}
CONTEXT.set(context);
}
return context;
}
getOrCreate() 方法处理逻辑如下:
- 从ThreadLocal中获取AbstractTracerContext,如果有就返回,没有就新建
- 如果operationName为空创建IgnoredTracerContext
- 否则调用ContextManagerExtendService的
createTraceContext() 方法创建AbstractTracerContext,并设置到ThreadLocal中
ContextManagerExtendService的createTraceContext() 方法代码如下:
@DefaultImplementor
public class ContextManagerExtendService implements BootService, GRPCChannelListener {
private volatile String[] ignoreSuffixArray = new String[0];
public AbstractTracerContext createTraceContext(String operationName, boolean forceSampling) {
AbstractTracerContext context;
if (!Config.Agent.KEEP_TRACING && GRPCChannelStatus.DISCONNECT.equals(status)) {
return new IgnoredTracerContext();
}
int suffixIdx = operationName.lastIndexOf(".");
if (suffixIdx > -1 && Arrays.stream(ignoreSuffixArray)
.anyMatch(a -> a.equals(operationName.substring(suffixIdx)))) {
context = new IgnoredTracerContext();
} else {
SamplingService samplingService = ServiceManager.INSTANCE.findService(SamplingService.class);
if (forceSampling || samplingService.trySampling(operationName)) {
context = new TracingContext(operationName, spanLimitWatcher);
} else {
context = new IgnoredTracerContext();
}
}
return context;
}
createTraceContext() 方法处理逻辑如下:
- 如果OAP挂了不采样且网络连接断开,创建IgnoredTracerContext
- 如果operationName的后缀名在ignoreSuffixArray中(指定哪些后缀的请求不需要追踪),创建IgnoredTracerContext
- 如果是强制采样或尝试采样(SamplingService的
trySampling() 方法)返回true,创建TracingContext,否则创建IgnoredTracerContext
ContextManager中createEntrySpan() 方法:
public class ContextManager implements BootService {
public static AbstractSpan createEntrySpan(String operationName, ContextCarrier carrier) {
AbstractSpan span;
AbstractTracerContext context;
operationName = StringUtil.cut(operationName, OPERATION_NAME_THRESHOLD);
if (carrier != null && carrier.isValid()) {
SamplingService samplingService = ServiceManager.INSTANCE.findService(SamplingService.class);
samplingService.forceSampled();
context = getOrCreate(operationName, true);
span = context.createEntrySpan(operationName);
context.extract(carrier);
} else {
context = getOrCreate(operationName, false);
span = context.createEntrySpan(operationName);
}
return span;
}
createEntrySpan() 方法处理逻辑如下:
- 如果ContextCarrier不为空,强制采样,获取或创建TracingContext,创建EntrySpan,从ContextCarrier将数据提取出来放到TracingContext中
- 如果ContextCarrier为空,不需要强制采样,根据采样率来决定当前链路是否要采样
当创建EntrySpan时有两种情况:
- 请求刚刚进来处于链路的第一个TraceSegment上,如上图左边的TraceSegment,此时不需要强制采样,根据采样率来决定当前链路是否要采样
- 如上图右边的TraceSegment,左边TraceSegment的ExitSpan调用了右边的TraceSegment,上一个TraceSegment的数据需要传递到下一个TraceSegment,下游调用
extract() 方法从ContextCarrier将数据提取出来放到TracingContext中。此时一定要强制采样,因为链路中的前置TraceSegment已经存在,如果不强制采样,尝试采样(SamplingService的trySampling() 方法)返回false,链路就断开了
ContextManager中创建LocalSpan和ExitSpan的方法:
public class ContextManager implements BootService {
public static AbstractSpan createLocalSpan(String operationName) {
operationName = StringUtil.cut(operationName, OPERATION_NAME_THRESHOLD);
AbstractTracerContext context = getOrCreate(operationName, false);
return context.createLocalSpan(operationName);
}
public static AbstractSpan createExitSpan(String operationName, ContextCarrier carrier, String remotePeer) {
if (carrier == null) {
throw new IllegalArgumentException("ContextCarrier can't be null.");
}
operationName = StringUtil.cut(operationName, OPERATION_NAME_THRESHOLD);
AbstractTracerContext context = getOrCreate(operationName, false);
AbstractSpan span = context.createExitSpan(operationName, remotePeer);
context.inject(carrier);
return span;
}
public static AbstractSpan createExitSpan(String operationName, String remotePeer) {
operationName = StringUtil.cut(operationName, OPERATION_NAME_THRESHOLD);
AbstractTracerContext context = getOrCreate(operationName, false);
return context.createExitSpan(operationName, remotePeer);
}
小结:
30、DataCarrier Buffer
Agent采集到的链路数据会先放到DataCarrier中,由消费者线程读取DataCarrier中的数据上报到OAP
1)、QueueBuffer
DataCarrier是使用Buffer作为数据存储,Buffer的底层接口是QueueBuffer,代码如下:
public interface QueueBuffer<T> {
boolean save(T data);
void setStrategy(BufferStrategy strategy);
void obtain(List<T> consumeList);
int getBufferSize();
}
BufferStrategy定义了队列满时的处理策略:
public enum BufferStrategy {
BLOCKING,
IF_POSSIBLE
}
QueueBuffer有两个实现Buffer和ArrayBlockingQueueBuffer
2)、Buffer
Buffer是一个环形队列,代码如下:
public class Buffer<T> implements QueueBuffer<T> {
private final Object[] buffer;
private BufferStrategy strategy;
private AtomicRangeInteger index;
Buffer(int bufferSize, BufferStrategy strategy) {
buffer = new Object[bufferSize];
this.strategy = strategy;
index = new AtomicRangeInteger(0, bufferSize);
}
@Override
public void setStrategy(BufferStrategy strategy) {
this.strategy = strategy;
}
@Override
public boolean save(T data) {
int i = index.getAndIncrement();
if (buffer[i] != null) {
switch (strategy) {
case IF_POSSIBLE:
return false;
default:
}
}
buffer[i] = data;
return true;
}
@Override
public int getBufferSize() {
return buffer.length;
}
@Override
public void obtain(List<T> consumeList) {
this.obtain(consumeList, 0, buffer.length);
}
void obtain(List<T> consumeList, int start, int end) {
for (int i = start; i < end; i++) {
if (buffer[i] != null) {
consumeList.add((T) buffer[i]);
buffer[i] = null;
}
}
}
}
Buffer的数据结构如下图:
AtomicRangeInteger是队列的索引,代码如下:
public class AtomicRangeInteger extends Number implements Serializable {
private static final long serialVersionUID = -4099792402691141643L;
private AtomicIntegerArray values;
private static final int VALUE_OFFSET = 15;
private int startValue;
private int endValue;
public AtomicRangeInteger(int startValue, int maxValue) {
this.values = new AtomicIntegerArray(31);
this.values.set(VALUE_OFFSET, startValue);
this.startValue = startValue;
this.endValue = maxValue - 1;
}
public final int getAndIncrement() {
int next;
do {
next = this.values.incrementAndGet(VALUE_OFFSET);
if (next > endValue && this.values.compareAndSet(VALUE_OFFSET, next, startValue)) {
return endValue;
}
}
while (next > endValue);
return next - 1;
}
public final int get() {
return this.values.get(VALUE_OFFSET);
}
@Override
public int intValue() {
return this.values.get(VALUE_OFFSET);
}
@Override
public long longValue() {
return this.values.get(VALUE_OFFSET);
}
@Override
public float floatValue() {
return this.values.get(VALUE_OFFSET);
}
@Override
public double doubleValue() {
return this.values.get(VALUE_OFFSET);
}
}
AtomicRangeInteger是使用JDK的AtomicIntegerArray实现的,AtomicRangeInteger初始化了一个长度为31的数组,使用数组最中间的元素(下标为15的元素)代表索引值,索引值初始值为0。getAndIncrement() 方法中先对索引值+1,如果此时索引值>endValue就意味着下标越界了,这时候需要通过CAS操作将索引值重置为0,这样就实现了环形队列
AtomicRangeInteger为什么使用AtomicIntegerArray创建一个长度为31的数组?如果只是为了原子性操作完全可以使用AtomicInteger实现
SkyWalking之前也是使用AtomicInteger实现的,后面为了避免伪共享从而提高性能改为了AtomicIntegerArray
对应PR:https://github.com/apache/skywalking/pull/2930
伪共享相关文章:https://blog.csdn.net/qq_40378034/article/details/101383233
3)、ArrayBlockingQueueBuffer
ArrayBlockingQueueBuffer是使用JDK的ArrayBlockingQueue实现的,代码如下:
public class ArrayBlockingQueueBuffer<T> implements QueueBuffer<T> {
private BufferStrategy strategy;
private ArrayBlockingQueue<T> queue;
private int bufferSize;
ArrayBlockingQueueBuffer(int bufferSize, BufferStrategy strategy) {
this.strategy = strategy;
this.queue = new ArrayBlockingQueue<T>(bufferSize);
this.bufferSize = bufferSize;
}
@Override
public boolean save(T data) {
try {
queue.put(data);
} catch (InterruptedException e) {
return false;
}
return true;
}
@Override
public void setStrategy(BufferStrategy strategy) {
this.strategy = strategy;
}
@Override
public void obtain(List<T> consumeList) {
queue.drainTo(consumeList);
}
@Override
public int getBufferSize() {
return bufferSize;
}
}
小结:
32、DataCarrier全解
1)、Channels
Channels中管理了多个Buffer,代码如下:
public class Channels<T> {
private final QueueBuffer<T>[] bufferChannels;
private IDataPartitioner<T> dataPartitioner;
private final BufferStrategy strategy;
private final long size;
public Channels(int channelSize, int bufferSize, IDataPartitioner<T> partitioner, BufferStrategy strategy) {
this.dataPartitioner = partitioner;
this.strategy = strategy;
bufferChannels = new QueueBuffer[channelSize];
for (int i = 0; i < channelSize; i++) {
if (BufferStrategy.BLOCKING.equals(strategy)) {
bufferChannels[i] = new ArrayBlockingQueueBuffer<>(bufferSize, strategy);
} else {
bufferChannels[i] = new Buffer<>(bufferSize, strategy);
}
}
size = 1L * channelSize * bufferSize;
}
public boolean save(T data) {
int index = dataPartitioner.partition(bufferChannels.length, data);
int retryCountDown = 1;
if (BufferStrategy.IF_POSSIBLE.equals(strategy)) {
int maxRetryCount = dataPartitioner.maxRetryCount();
if (maxRetryCount > 1) {
retryCountDown = maxRetryCount;
}
}
for (; retryCountDown > 0; retryCountDown--) {
if (bufferChannels[index].save(data)) {
return true;
}
}
return false;
}
一个Channels中包含多个Buffer,数据结构如下图:
数据分区器IDataPartitioner接口代码如下:
public interface IDataPartitioner<T> {
int partition(int total, T data);
int maxRetryCount();
}
IDataPartitioner有两个实现SimpleRollingPartitioner和ProducerThreadPartitioner
SimpleRollingPartitioner分区是每次+1和total取模:
public class SimpleRollingPartitioner<T> implements IDataPartitioner<T> {
@SuppressWarnings("NonAtomicVolatileUpdate")
private volatile int i = 0;
@Override
public int partition(int total, T data) {
return Math.abs(i++ % total);
}
@Override
public int maxRetryCount() {
return 3;
}
}
ProducerThreadPartitioner分区是使用当前线程ID和total取模:
public class ProducerThreadPartitioner<T> implements IDataPartitioner<T> {
public ProducerThreadPartitioner() {
}
@Override
public int partition(int total, T data) {
return (int) Thread.currentThread().getId() % total;
}
@Override
public int maxRetryCount() {
return 1;
}
}
2)、消费者
消费者读取DataCarrier中的数据上报到OAP,IConsumer是消费者的顶层接口:
public interface IConsumer<T> {
void init();
void consume(List<T> data);
void onError(List<T> data, Throwable t);
void onExit();
default void nothingToConsume() {
return;
}
}
ConsumerThread代码如下:
public class ConsumerThread<T> extends Thread {
private volatile boolean running;
private IConsumer<T> consumer;
private List<DataSource> dataSources;
private long consumeCycle;
ConsumerThread(String threadName, IConsumer<T> consumer, long consumeCycle) {
super(threadName);
this.consumer = consumer;
running = false;
dataSources = new ArrayList<DataSource>(1);
this.consumeCycle = consumeCycle;
}
void addDataSource(QueueBuffer<T> sourceBuffer) {
this.dataSources.add(new DataSource(sourceBuffer));
}
@Override
public void run() {
running = true;
final List<T> consumeList = new ArrayList<T>(1500);
while (running) {
if (!consume(consumeList)) {
try {
Thread.sleep(consumeCycle);
} catch (InterruptedException e) {
}
}
}
consume(consumeList);
consumer.onExit();
}
private boolean consume(List<T> consumeList) {
for (DataSource dataSource : dataSources) {
dataSource.obtain(consumeList);
}
if (!consumeList.isEmpty()) {
try {
consumer.consume(consumeList);
} catch (Throwable t) {
consumer.onError(consumeList, t);
} finally {
consumeList.clear();
}
return true;
}
consumer.nothingToConsume();
return false;
}
void shutdown() {
running = false;
}
class DataSource {
private QueueBuffer<T> sourceBuffer;
DataSource(QueueBuffer<T> sourceBuffer) {
this.sourceBuffer = sourceBuffer;
}
void obtain(List<T> consumeList) {
sourceBuffer.obtain(consumeList);
}
}
}
ConsumerThread的数据结构如下图:
一个ConsumerThread中包含多个DataSource,DataSource里包装了Buffer。同时一个ConsumerThread绑定了一个Consumer,Consumer会消费ConsumerThread中的DataSource
MultipleChannelsConsumer代表一个单消费者线程,但支持多个Channels和它们的消费者,代码如下:
public class MultipleChannelsConsumer extends Thread {
private volatile boolean running;
private volatile ArrayList<Group> consumeTargets;
@SuppressWarnings("NonAtomicVolatileUpdate")
private volatile long size;
private final long consumeCycle;
public MultipleChannelsConsumer(String threadName, long consumeCycle) {
super(threadName);
this.consumeTargets = new ArrayList<Group>();
this.consumeCycle = consumeCycle;
}
@Override
public void run() {
running = true;
final List consumeList = new ArrayList(2000);
while (running) {
boolean hasData = false;
for (Group target : consumeTargets) {
boolean consume = consume(target, consumeList);
hasData = hasData || consume;
}
if (!hasData) {
try {
Thread.sleep(consumeCycle);
} catch (InterruptedException e) {
}
}
}
for (Group target : consumeTargets) {
consume(target, consumeList);
target.consumer.onExit();
}
}
private boolean consume(Group target, List consumeList) {
for (int i = 0; i < target.channels.getChannelSize(); i++) {
QueueBuffer buffer = target.channels.getBuffer(i);
buffer.obtain(consumeList);
}
if (!consumeList.isEmpty()) {
try {
target.consumer.consume(consumeList);
} catch (Throwable t) {
target.consumer.onError(consumeList, t);
} finally {
consumeList.clear();
}
return true;
}
target.consumer.nothingToConsume();
return false;
}
public void addNewTarget(Channels channels, IConsumer consumer) {
Group group = new Group(channels, consumer);
ArrayList<Group> newList = new ArrayList<Group>();
for (Group target : consumeTargets) {
newList.add(target);
}
newList.add(group);
consumeTargets = newList;
size += channels.size();
}
public long size() {
return size;
}
void shutdown() {
running = false;
}
private static class Group {
private Channels channels;
private IConsumer consumer;
public Group(Channels channels, IConsumer consumer) {
this.channels = channels;
this.consumer = consumer;
}
}
}
Group的数据结构如下图:
一个Group中包含一个Consumer和一个Channels,一个Channels包含多个Buffer,Consumer会消费Channels中所有的Buffer
一个MultipleChannelsConsumer包含多个Group,实际上是管理多个Consumer以及它们对应的Buffer,数据结构如下图:
3)、消费者驱动
IDriver代码如下:
public interface IDriver {
boolean isRunning(Channels channels);
void close(Channels channels);
void begin(Channels channels);
}
IDriver实现关系如下图:
ConsumeDriver代码如下:
public class ConsumeDriver<T> implements IDriver {
private boolean running;
private ConsumerThread[] consumerThreads;
private Channels<T> channels;
private ReentrantLock lock;
public ConsumeDriver(String name, Channels<T> channels, Class<? extends IConsumer<T>> consumerClass, int num,
long consumeCycle) {
this(channels, num);
for (int i = 0; i < num; i++) {
consumerThreads[i] = new ConsumerThread("DataCarrier." + name + ".Consumer." + i + ".Thread", getNewConsumerInstance(consumerClass), consumeCycle);
consumerThreads[i].setDaemon(true);
}
}
public ConsumeDriver(String name, Channels<T> channels, IConsumer<T> prototype, int num, long consumeCycle) {
this(channels, num);
prototype.init();
for (int i = 0; i < num; i++) {
consumerThreads[i] = new ConsumerThread("DataCarrier." + name + ".Consumer." + i + ".Thread", prototype, consumeCycle);
consumerThreads[i].setDaemon(true);
}
}
private ConsumeDriver(Channels<T> channels, int num) {
running = false;
this.channels = channels;
consumerThreads = new ConsumerThread[num];
lock = new ReentrantLock();
}
private IConsumer<T> getNewConsumerInstance(Class<? extends IConsumer<T>> consumerClass) {
try {
IConsumer<T> inst = consumerClass.getDeclaredConstructor().newInstance();
inst.init();
return inst;
} catch (InstantiationException e) {
throw new ConsumerCannotBeCreatedException(e);
} catch (IllegalAccessException e) {
throw new ConsumerCannotBeCreatedException(e);
} catch (NoSuchMethodException e) {
throw new ConsumerCannotBeCreatedException(e);
} catch (InvocationTargetException e) {
throw new ConsumerCannotBeCreatedException(e);
}
}
@Override
public void begin(Channels channels) {
if (running) {
return;
}
lock.lock();
try {
this.allocateBuffer2Thread();
for (ConsumerThread consumerThread : consumerThreads) {
consumerThread.start();
}
running = true;
} finally {
lock.unlock();
}
}
@Override
public boolean isRunning(Channels channels) {
return running;
}
private void allocateBuffer2Thread() {
int channelSize = this.channels.getChannelSize();
for (int channelIndex = 0; channelIndex < channelSize; channelIndex++) {
int consumerIndex = channelIndex % consumerThreads.length;
consumerThreads[consumerIndex].addDataSource(channels.getBuffer(channelIndex));
}
}
@Override
public void close(Channels channels) {
lock.lock();
try {
this.running = false;
for (ConsumerThread consumerThread : consumerThreads) {
consumerThread.shutdown();
}
} finally {
lock.unlock();
}
}
}
一个ConsumeDriver包含多个ConsumerThread
public class BulkConsumePool implements ConsumerPool {
private List<MultipleChannelsConsumer> allConsumers;
private volatile boolean isStarted = false;
public BulkConsumePool(String name, int size, long consumeCycle) {
size = EnvUtil.getInt(name + "_THREAD", size);
allConsumers = new ArrayList<MultipleChannelsConsumer>(size);
for (int i = 0; i < size; i++) {
MultipleChannelsConsumer multipleChannelsConsumer = new MultipleChannelsConsumer("DataCarrier." + name + ".BulkConsumePool." + i + ".Thread", consumeCycle);
multipleChannelsConsumer.setDaemon(true);
allConsumers.add(multipleChannelsConsumer);
}
}
@Override
synchronized public void add(String name, Channels channels, IConsumer consumer) {
MultipleChannelsConsumer multipleChannelsConsumer = getLowestPayload();
multipleChannelsConsumer.addNewTarget(channels, consumer);
}
private MultipleChannelsConsumer getLowestPayload() {
MultipleChannelsConsumer winner = allConsumers.get(0);
for (int i = 1; i < allConsumers.size(); i++) {
MultipleChannelsConsumer option = allConsumers.get(i);
if (option.size() < winner.size()) {
winner = option;
}
}
return winner;
}
@Override
public boolean isRunning(Channels channels) {
return isStarted;
}
@Override
public void close(Channels channels) {
for (MultipleChannelsConsumer consumer : allConsumers) {
consumer.shutdown();
}
}
@Override
public void begin(Channels channels) {
if (isStarted) {
return;
}
for (MultipleChannelsConsumer consumer : allConsumers) {
consumer.start();
}
isStarted = true;
}
public static class Creator implements Callable<ConsumerPool> {
private String name;
private int size;
private long consumeCycle;
public Creator(String name, int poolSize, long consumeCycle) {
this.name = name;
this.size = poolSize;
this.consumeCycle = consumeCycle;
}
@Override
public ConsumerPool call() {
return new BulkConsumePool(name, size, consumeCycle);
}
public static int recommendMaxSize() {
return Runtime.getRuntime().availableProcessors() * 2;
}
}
}
一个BulkConsumePool包含多个MultipleChannelsConsumer
小结:
33、链路数据发送到OAP
public class TracingContext implements AbstractTracerContext {
private void finish() {
if (isRunningInAsyncMode) {
asyncFinishLock.lock();
}
try {
boolean isFinishedInMainThread = activeSpanStack.isEmpty() && running;
if (isFinishedInMainThread) {
TracingThreadListenerManager.notifyFinish(this);
}
if (isFinishedInMainThread && (!isRunningInAsyncMode || asyncSpanCounter == 0)) {
TraceSegment finishedSegment = segment.finish(isLimitMechanismWorking());
TracingContext.ListenerManager.notifyFinish(finishedSegment);
running = false;
}
} finally {
if (isRunningInAsyncMode) {
asyncFinishLock.unlock();
}
}
}
public static class ListenerManager {
private static List<TracingContextListener> LISTENERS = new LinkedList<>();
public static synchronized void add(TracingContextListener listener) {
LISTENERS.add(listener);
}
static void notifyFinish(TraceSegment finishedSegment) {
for (TracingContextListener listener : LISTENERS) {
listener.afterFinished(finishedSegment);
}
}
public static synchronized void remove(TracingContextListener listener) {
LISTENERS.remove(listener);
}
}
TracingContext的finish() 方法关闭当前TraceSegment后,会调用ListenerManager的notifyFinish() 方法传入当前关闭的TraceSegment。ListenerManager的notifyFinish() 方法会迭代所有注册的TracingContextListener调用它们的afterFinished() 方法
TraceSegmentServiceClient实现了TracingContextListener接口,并向ListenerManager注册了自己,afterFinished() 方法代码如下:
@DefaultImplementor
public class TraceSegmentServiceClient implements BootService, IConsumer<TraceSegment>, TracingContextListener, GRPCChannelListener {
private volatile DataCarrier<TraceSegment> carrier;
@Override
public void afterFinished(TraceSegment traceSegment) {
if (traceSegment.isIgnore()) {
return;
}
if (!carrier.produce(traceSegment)) {
if (LOGGER.isDebugEnable()) {
LOGGER.debug("One trace segment has been abandoned, cause by buffer is full.");
}
}
}
afterFinished() 方法中会将TraceSegment放到DataCarrier中
TraceSegmentServiceClient也实现了IConsumer接口,消费DataCarrier中的TraceSegment数据,consume() 方法代码如下:
@DefaultImplementor
public class TraceSegmentServiceClient implements BootService, IConsumer<TraceSegment>, TracingContextListener, GRPCChannelListener {
private long lastLogTime;
private long segmentUplinkedCounter;
private long segmentAbandonedCounter;
private volatile TraceSegmentReportServiceGrpc.TraceSegmentReportServiceStub serviceStub;
private volatile GRPCChannelStatus status = GRPCChannelStatus.DISCONNECT;
@Override
public void consume(List<TraceSegment> data) {
if (CONNECTED.equals(status)) {
final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false);
StreamObserver<SegmentObject> upstreamSegmentStreamObserver = serviceStub.withDeadlineAfter(
Config.Collector.GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS
).collect(new StreamObserver<Commands>() {
@Override
public void onNext(Commands commands) {
ServiceManager.INSTANCE.findService(CommandService.class)
.receiveCommand(commands);
}
@Override
public void onError(
Throwable throwable) {
status.finished();
if (LOGGER.isErrorEnable()) {
LOGGER.error(
throwable,
"Send UpstreamSegment to collector fail with a grpc internal exception."
);
}
ServiceManager.INSTANCE
.findService(GRPCChannelManager.class)
.reportError(throwable);
}
@Override
public void onCompleted() {
status.finished();
}
});
try {
for (TraceSegment segment : data) {
SegmentObject upstreamSegment = segment.transform();
upstreamSegmentStreamObserver.onNext(upstreamSegment);
}
} catch (Throwable t) {
LOGGER.error(t, "Transform and send UpstreamSegment to collector fail.");
}
upstreamSegmentStreamObserver.onCompleted();
status.wait4Finish();
segmentUplinkedCounter += data.size();
} else {
segmentAbandonedCounter += data.size();
}
printUplinkStatus();
}
小结:
34、链路追踪案例
上图是SkyWalking UI中展示的一条链路,这条链路的流程如下:
- 入口是demo1的
/api/demo1 接口,demo1先调用MySQL,然后通过HttpClient调用demo2的/api/demo2 接口 - 应用demo2的
/api/demo2 接口直接返回响应 - demo1收到demo2的
/api/demo2 接口的响应后返回,整条链路结束
下面来分析下SkyWalking Agent对这条链路的追踪过程:
1)、demo1入口接收请求
-
请求到达demo1后,走到Tomcat,Tomcat插件(TomcatInvokeInterceptor)创建EntrySpan(ContextManager.createEntrySpan() )。因为ThreadLocal中的TracingContext为空,会先创建TracingContext然后放到ThreadLocal中,然后使用TracingContext创建EntrySpan(TracingContext.createEntrySpan() )。TracingContext中activeSpanStack为空,创建了第一个EntrySpan(spanId=0,parentSpanId=-1)并入栈到activeSpanStack中 Tomcat插件创建的EntrySpan入栈后: -
请求走到SpringMVC后,SpringMVC插件(AbstractMethodInterceptor)使用ThreadLocal中的TracingContext创建EntrySpan。这时TracingContext中activeSpanStack栈顶的Span是EntrySpan,所以直接复用,并覆盖了Tomcat插件记录的信息 SpringMVC插件复用Tomcat插件创建的EntrySpan:
2)、demo1调用MySQL
-
demo1调用MySQL,MySQL插件(PreparedStatementExecuteMethodsInterceptor)使用ThreadLocal中的TracingContext创建ExitSpan。拿到TracingContext中activeSpanStack栈顶的Span(EntrySpan#SpringMVC)作为parentSpan,创建ExitSpan(spanId=1,parentSpanId=0)并入栈到activeSpanStack中 MySQL插件创建的ExitSpan入栈后: -
访问MySQL操作结束后,MySQL插件的后置处理使用ThreadLocal中的TracingContext stopSpan(TracingContext.stopSpan() )。TracingContext中activeSpanStack栈顶的Span出栈,放到TracingContext中TraceSegment的spans集合中(执行完的Span会放到TraceSegment的spans集合中,等待后续发送到OAP) MySQL插件创建的ExitSpan出栈后:
3)、demo1调用demo2接口
-
demo1通过HttpClient调用demo2接口,HttpClient插件(HttpClientExecuteInterceptor)使用ThreadLocal中的TracingContext创建ExitSpan。拿到TracingContext中activeSpanStack栈顶的Span(EntrySpan#SpringMVC)作为parentSpan,创建ExitSpan(spanId=2,parentSpanId=0)并入栈到activeSpanStack中 HttpClient插件创建的ExitSpan入栈后: -
创建完ExitSpan后,调用TracingContext.inject() 给ContextCarrier赋值,包括TraceId、TraceSegmentId、SpanId(当前ExitSpan的Id)、ParentService、ParentServiceInstance等信息。然后会把ContextCarrier中的数据放到Http请求头中,通过这种方式让链路信息传递下去 -
demo2接收到demo1的请求后,创建EntrySpan的流程和demo1入口接收请求一致,这里会多一步,就是从Http请求头中拿到demo1传递的链路信息赋值给ContextCarrier,调用TracingContext.extract() 绑定当前TraceSegment的traceSegmentRef、traceId以及EntrySpan的ref -
demo2的响应返回后,demo1中插件后置处理依次调用TracingContext.stopSpan() ,TracingContext中activeSpanStack中的Span依次出栈,最后activeSpanStack栈为空时,TracingContext结束
上述这条链路如下图所示:
参考:
SkyWalking8.7.0源码分析(如果你对SkyWalking Agent源码感兴趣的话,强烈建议看下该教程)
|