Metircs: 指标|度量,表示一段时间某一维度的衡量尺度 Trace: 追踪链路,表示请求链路信息
receiver一子模块
- receiver负责接收外部请求,并由集群进行所有节点流式处理
- receiver基于插件方式实现,高度可扩展
maven子模块 | 功能概述 |
---|
skywalking-register-receiver-plugin | agent注册,同步信息等信息的接收模块 | skywalking-sharing-server-plugin | 额外的服务器模块,依据配置决定另起grpc和jetty服务,亦或是共用core模块的grpc与jetty | skywalking-trace-receiver-plugin | 链路trace数据上报的receiver模块 | skywalking-jvm-receiver-plugin | jvm进程上报Metrics的receiver模块 | … | 其他子模块 |
流式处理简介
- 流式处理由StreamProcessor触发
- StreamProcessor内含有一个worker责任链
- 数据在worker节点之间流动处理,所以称为流式处理
- 此外worker中可能存在remoteworker会将数据发往其他oapServer进行处理
- remoteWorker之前的worker工作一般称为L1聚合
- remoteWorker之后的worker工作一般称为L2聚合
- worker名称会指明是L1还是L2
JVM一Metrics模块工作原理图
源码分析一JVMModule
- JVMModuleProvider启动时向GRPC服务器注册JVMMetricReportServiceHandler
- JVMMetricReportServiceHandler负责接收agent上报的JVMMetrics信息
public class JVMModuleProvider extends ModuleProvider {
@Override public void start() {
GRPCHandlerRegister grpcHandlerRegister = getManager().find(SharingServerModule.NAME).provider().getService(GRPCHandlerRegister.class);
grpcHandlerRegister.addHandler(new JVMMetricsServiceHandler(getManager()));
v2版本处理器 [本文分析内容]
grpcHandlerRegister.addHandler(new JVMMetricReportServiceHandler(getManager()));
}
}
源码分析一JVMMetricReportServiceHandler
- 根据时间计算出分钟级降采样窗口
- jvmSourceDispatcher负责区分cpu和内存池,内存和GC执行请求分发
public class JVMMetricReportServiceHandler extends JVMMetricReportServiceGrpc.JVMMetricReportServiceImplBase implements GRPCHandler {
grpc协议 agent批量上报JVMMetric调用该方法处理
@Override public void collect(JVMMetricCollection request, StreamObserver<Commands> responseObserver) {
int serviceInstanceId = request.getServiceInstanceId();
request.getMetricsList().forEach(metrics -> {
根据时间计算出分钟级降采样窗口
long minuteTimeBucket = TimeBucket.getMinuteTimeBucket(metrics.getTime());
jvmSourceDispatcher负责区分cpu和内存池,内存和GC执行分发
jvmSourceDispatcher.sendMetric(serviceInstanceId, minuteTimeBucket, metrics);
});
responseObserver.onNext(Commands.newBuilder().build());
responseObserver.onCompleted();
}
}
源码分析一JVMSourceDispatcher
- 根据服务id获取agent机器节点实例id
- 依据agent度量模型构建包含serviceId,serviceInstanceId的服务端度量模型ServiceInstanceJVMXXX
- sourceReceiver.receive 处理度量信息
public class JVMSourceDispatcher {
void sendMetric(int serviceInstanceId, long minuteTimeBucket, JVMMetric metrics) {
根据服务id获取agent机器节点实例id
ServiceInstanceInventory serviceInstanceInventory = instanceInventoryCache.get(serviceInstanceId);
int serviceId;
if (Objects.nonNull(serviceInstanceInventory)) {
serviceId = serviceInstanceInventory.getServiceId();
} else {
logger.warn("Can't find service by service instance id from cache, service instance id is: {}", serviceInstanceId);
return;
}
构建ServiceInstanceJVMCPU,交给sourceReceiver.receive处理
this.sendToCpuMetricProcess(serviceId, serviceInstanceId, minuteTimeBucket, metrics.getCpu());
构建ServiceInstanceJVMMemory,交给sourceReceiver.receive处理
this.sendToMemoryMetricProcess(serviceId, serviceInstanceId, minuteTimeBucket, metrics.getMemoryList());
构建ServiceInstanceJVMMemoryPool,交给sourceReceiver.receive处理
this.sendToMemoryPoolMetricProcess(serviceId, serviceInstanceId, minuteTimeBucket, metrics.getMemoryPoolList());
构建ServiceInstanceJVMGC,交给sourceReceiver.receive处理
this.sendToGCMetricProcess(serviceId, serviceInstanceId, minuteTimeBucket, metrics.getGcList());
ServiceInstanceJVMCPU ServiceInstanceJVMMemory ServiceInstanceJVMMemoryPool ServiceInstanceJVMGC
继承Source 同时相比jvm的度量信息多了serviceId,serviceInstanceId等来源信息
sourceReceiver.receive统一处理source信息
}
private void sendToCpuMetricProcess(int serviceId, int serviceInstanceId, long timeBucket, CPU cpu) {
ServiceInstanceJVMXXX 完成JVMMetrics内部度量比如CPU的模型转换,由agent客户端模型转成oapServer服务端模型
ServiceInstanceJVMCPU serviceInstanceJVMCPU = new ServiceInstanceJVMCPU();
serviceInstanceJVMCPU.setId(serviceInstanceId);
serviceInstanceJVMCPU.setName(Const.EMPTY_STRING);
serviceInstanceJVMCPU.setServiceId(serviceId);
serviceInstanceJVMCPU.setServiceName(Const.EMPTY_STRING);
serviceInstanceJVMCPU.setUsePercent(cpu.getUsagePercent());
serviceInstanceJVMCPU.setTimeBucket(timeBucket);
sourceReceiver.receive(serviceInstanceJVMCPU);
}
}
源码分析一DispatcherManager执行颁发
- SourceReceiver交由dispatcherManager执行转发
- dispatcherManager根据Source.scope查找指定dispatcher进行处理
- scope代表一个唯一确定的业务场景
public class SourceReceiverImpl implements SourceReceiver {
@Override public void receive(Source source) {
OAL动态生成和源码的分发器集合 [OAL引擎解析official_analysis.oal中配置的Metrics和Dispatcher]
dispatcherManager.forward(source);
}
}
public class DispatcherManager implements DispatcherDetectorListener {
public void forward(Source source) {
获取当前监控的场景唯一标志符scope
List<SourceDispatcher> dispatchers = dispatcherMap.get(source.scope());
dispatchers完成Source到Metrics模型转换并处理StreamProcessor流式处理
if (dispatchers != null) {
for (SourceDispatcher dispatcher : dispatchers) {
dispatcher.dispatch(source);
}
}
}
}
源码分析一Dispatcher.dispatch
- JVMMetrics的Dispatcher一般有OALEngine动态生成
- Dispatcher的作用是完成source模型到Metrics模型转换
- 调用StreamProcessor.in进行流式处理
public class ServiceInstanceJVMCPUDispatcher implements SourceDispatcher<ServiceInstanceJVMCPU> {
private void doInstanceJvmCpu(ServiceInstanceJVMCPU var1) {
将服务端source模型转成服务端Metrics模型
InstanceJvmCpuMetrics var2 = new InstanceJvmCpuMetrics();
var2.setTimeBucket(var1.getTimeBucket());
var2.setEntityId(var1.getEntityId());
var2.setServiceId(var1.getServiceId());
var2.combine(var1.getUsePercent(), (long)1);
执行Stream流式处理
MetricsStreamProcessor.getInstance().in(var2);
}
public void dispatch(Source var1) {
ServiceInstanceJVMCPU var2 = (ServiceInstanceJVMCPU)var1;
this.doInstanceJvmCpu(var2);
}
}
总结
- server-receiver-plugin包含众多插件
- skywalking-register-receiver-plugin插件主要用于服务注册发现
- skywalking-trace-receiver-plugin插件主要用于链路追踪数据处理
- 本文分析的JVM插件主要用于处理度量信息以及告警
|