原理图
- 不同信息的服务注册都是同一套流程
- 先基于缓存和存储获取,获取成功则返回agent交由agent构建字典信息[基于字典传输降低带宽]
- 信息获取失败则异步创建
- agent获取失败则轮询发起服务注册请求
源码分析一RegisterModuleProvider
- 为grpc服务器和jetty服务器注册各类handler
- 其中RegisterServiceHandler用于处理服务注册,服务实例注册以及[endpoint 和networkaddress]同步请求
高版本agent交互基本走grpc,skywalking-ui交互走jetty
public class RegisterModuleProvider extends ModuleProvider {
@Override public void prepare() {
}
高版本agent交互基本走grpc,skywalking-ui交互走jetty
@Override public void start() {
GRPCHandlerRegister grpcHandlerRegister = getManager().find(SharingServerModule.NAME).provider().getService(GRPCHandlerRegister.class);
grpcHandlerRegister.addHandler(new ApplicationRegisterHandler(getManager()));
grpcHandlerRegister.addHandler(new InstanceDiscoveryServiceHandler(getManager()));
grpcHandlerRegister.addHandler(new ServiceNameDiscoveryHandler(getManager()));
grpcHandlerRegister.addHandler(new NetworkAddressRegisterServiceHandler(getManager()));
服务注册 服务实例注册 以及[endpoint 和networkaddress]同步请求
grpcHandlerRegister.addHandler(new RegisterServiceHandler(getManager()));
心跳请求 更新heatbeat
grpcHandlerRegister.addHandler(new ServiceInstancePingServiceHandler(getManager()));
其他handler主要处理低版本协议
JettyHandlerRegister jettyHandlerRegister = getManager().find(SharingServerModule.NAME).provider().getService(JettyHandlerRegister.class);
jettyHandlerRegister.addHandler(new ApplicationRegisterServletHandler(getManager()));
jettyHandlerRegister.addHandler(new InstanceDiscoveryServletHandler(getManager()));
jettyHandlerRegister.addHandler(new InstanceHeartBeatServletHandler(getManager()));
jettyHandlerRegister.addHandler(new NetworkAddressRegisterServletHandler(getManager()));
jettyHandlerRegister.addHandler(new ServiceNameDiscoveryServiceHandler(getManager()));
}
@Override public void notifyAfterCompleted() {
}
}
源码分析一服务注册
注册方法概览
其他注册流程类似,不在重复叙述
RegisterServiceHandler方法 | 作用 |
---|
doServiceRegister | 服务注册 | doServiceInstanceRegister | 服务实例注册 | doEndpointRegister | 端点注册,服务端生成id,返回agent端,agent维护id和name字典 | doNetworkAddressRegister | ip注册,同endpoint流量 | doServiceAndNetworkAddressMappingRegister | 维护服务和ip映射关系 |
什么是端点?
端点名一般由agent插件定义,并且在字典中维护[id和名称的关系],传输时通过id传输避免网络带宽
dubbo的端点为接口名 下图为mysql端点名称 下图为elasticsearch端点名称
什么是字典?
字典维护在agent端,agent注册的EndPoint和ip等信息会在服务端生成唯一id并返回agent,agent依据id和名称的关系构建字典映射
将来agent上报record和metrics等数据则通过字典表将endpoint,ip等信息转成对应的唯一id
作用: 减少网络传输的带宽
源码分析一doServiceRegister服务注册
- 通过IServiceInventoryRegister获取serviceid
- 返回agent相关信息
@Override public void doServiceRegister(Services request, StreamObserver<ServiceRegisterMapping> responseObserver) {
ServiceRegisterMapping.Builder builder = ServiceRegisterMapping.newBuilder();
request.getServicesList().forEach(service -> {
String serviceName = service.getServiceName();
服务注册的service_id 如果不存在则直接返回agentagent会轮询调用直到获取成功
service_id不存在会异步创建
int serviceId = serviceInventoryRegister.getOrCreate(serviceName, null);
if (serviceId != Const.NONE) {
KeyIntValuePair value = KeyIntValuePair.newBuilder().setKey(serviceName).setValue(serviceId).build();
builder.addServices(value);
}
});
responseObserver.onNext(builder.build());
responseObserver.onCompleted();
}
源码分析一ServiceInventoryRegister.getOrCreate服务注册信息缓存获取或者异步创建
- 通过缓存获取,缓存不存在则通过es获取
- 存储层也获取不到则异步构建,agent获取不到注册结果会轮询调用
- InventoryStreamProcessor处理创建serviceId流程
@Override public int getOrCreate(String serviceName, JsonObject properties) {
通过缓存获取,缓存不存在则通过es获取
int serviceId = getServiceInventoryCache().getServiceId(serviceName);
存储层也获取不到则异步构建,返回agent NONE,agent注册后字典填充失败会轮询调用
if (serviceId == Const.NONE) {
对应service_inventory索引
ServiceInventory serviceInventory = new ServiceInventory();
serviceInventory.setName(serviceName);
serviceInventory.setAddressId(Const.NONE);
serviceInventory.setIsAddress(BooleanUtils.FALSE);
long now = System.currentTimeMillis();
serviceInventory.setRegisterTime(now);
serviceInventory.setHeartbeatTime(now);
serviceInventory.setMappingServiceId(Const.NONE);
serviceInventory.setLastUpdateTime(now);
serviceInventory.setProperties(properties);
不存在异步添加
InventoryStreamProcessor.getInstance().in(serviceInventory);
}
return serviceId;
}
源码分析一ServiceInventoryCache服务注册缓存获取
- 缓存获取
- 缓存获取失败则通过elasticSearch获取
public int getServiceId(String serviceName) {
缓存获取
Integer serviceId = serviceNameCache.getIfPresent(ServiceInventory.buildId(serviceName));
if (Objects.isNull(serviceId) || serviceId == Const.NONE) {
缓存不存在则通过存储层查找
serviceId = getCacheDAO().getServiceId(serviceName);
if (serviceId != Const.NONE) {
serviceNameCache.put(ServiceInventory.buildId(serviceName), serviceId);
}
}
return serviceId;
}
源码分析一InventoryStreamProcessor服务注册注册异步创建
RegisterDistinctWorker------RegisterRemoteWorker------RegisterPersistentWorker
流式处理节点名称 | 作用 |
---|
RegisterDistinctWorker | 缓冲注册请求,去重,批量发送下一个worker | RegisterRemoteWorker | 远程通信,发往其他集群节点处理 | RegisterPersistentWorker | 缓冲注册请求,去重,加锁创建注册信息 |
为什么注册需要去重?
serviceId会有一个集群多个agent同时发送该serviceid的创建 agent 第一次注册获取serviceId失败会轮询请求
源码分析一RegisterDistinctWorker
private void onWork(RegisterSource source) {
messageNum++;
if (!sources.containsKey(source)) {
第一次出现则加入缓冲区
sources.put(source, source);
} else {
已经存在则合并
sources.get(source).combine(source);
}
消息数量超过1000或者一个批次的数据 【isEndOfBatch不在分析,其原理为,生产者生产时 endOfBatch =false 消费者消费时,获取缓冲区所有的消息则发送nextWorker】
也就是生产者在持续生产 则缓存区满1000则发送下一个worker, 生产者不在生产,则消费者将所有的消息的最后一条设置成end,触发nextWorker
if (messageNum >= 1000 || source.isEndOfBatch()) {
sources.values().forEach(nextWorker::in);
sources.clear();
messageNum = 0;
}
}
源码分析一RegisterPersistentWorker
- 流式处理L2聚合阶段主要处理注册请求的创建
- 通过registerLockDao进行加锁保障线程安全
private void onWork(RegisterSource registerSource) {
if (!sources.containsKey(registerSource)) {
sources.put(registerSource, registerSource);
} else {
sources.get(registerSource).combine(registerSource);
}
try (HistogramMetrics.Timer timer = workerLatencyHistogram.createTimer()) {
if (sources.size() > 1000 || registerSource.isEndOfBatch()) {
sources.values().forEach(source -> {
try {
存在则更新心跳时间等
RegisterSource dbSource = registerDAO.get(modelName, source.id());
if (Objects.nonNull(dbSource)) {
if (dbSource.combine(source)) {
registerDAO.forceUpdate(modelName, dbSource);
}
} else {
不存在加锁
int sequence;
返回一个自增id 为一个全局锁功能 确保创建流程的独占性 [乐观锁更新失败返回Const.NONE]
if ((sequence = registerLockDAO.getId(scopeId, source)) != Const.NONE) {
加锁成功获取
try {
dbSource = registerDAO.get(modelName, source.id());
获取存在则更新
if (Objects.nonNull(dbSource)) {
if (dbSource.combine(source)) {
registerDAO.forceUpdate(modelName, dbSource);
}
} else {
获取不存在则注册
source.setSequence(sequence);
registerDAO.forceInsert(modelName, source);
}
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
} else {
加锁失败说明有其他线程在处理注册请求,该线程无需创建
logger.info("{} inventory register try lock and increment sequence failure.", DefaultScopeDefine.nameOf(scopeId));
}
}
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
});
sources.clear();
}
}
}
总结
- 服务注册将读写分离,读取失败由agent轮询注册
- 异步写时通过registerLockDAO加锁保障线程安全
- 其中不同信息的注册流程大同小异
备注: registerLockDAO原理
- 存储模块启动时会通过RegisterLockEs7Installer完成registerLockDAO对应的es索引初始化
- registerLockDAO通过对注册场景对应的scopeId进行es乐观锁自增更新
- 更新成功加锁成功,更新失败加锁失败
public void install() throws StorageException {
删除其他代码
创建registerLockDAO的底层索引register_lock
createIndex();
遍历每个注册信息的scopeId,在register_lock插入一行数据
主键_document_id = scopeId,乐观锁字段sequence值初始化为1
for (Class registerSource : InventoryStreamProcessor.getInstance().getAllRegisterSources()) {
int scopeId = ((Stream)registerSource.getAnnotation(Stream.class)).scopeId();
putIfAbsent(scopeId);
}
}
register_lock索引概览
__document_id | sequence | 注册类 | 说明 |
---|
14 | 1 | ServiceInventory | |
- DefaultScopeDefine.SERVICE_INVENTORY值为14,代表ServiceInventory服务注册的场景编号为14,则在register_lock索引插入一行主键为14的数据
- sequence为乐观锁机制,初始化为1
- 以上信息都是基于ServiceInventory类的注解@Stream指定
@Stream(name = ServiceInventory.INDEX_NAME, scopeId = DefaultScopeDefine.SERVICE_INVENTORY, builder = ServiceInventory.Builder.class, processor = InventoryStreamProcessor.class)
IRegisterLockDAO乐观锁实现
- 获取register_lock索引sequence字段
- 对sequence字段自增并force强刷es
- 更新成功则加锁成功
@Override public int getId(int scopeId, RegisterSource registerSource) {
String id = scopeId + "";
int sequence = Const.NONE;
try {
GetResponse response = getClient().get(RegisterLockIndex.NAME, id);
if (response.isExists()) {
Map<String, Object> source = response.getSource();
sequence = ((Number)source.get(RegisterLockIndex.COLUMN_SEQUENCE)).intValue();
long version = response.getVersion();
增加版本号
sequence++;
使用es的乐观锁
lock(id, sequence, version);
}
} catch (Throwable t) {
return Const.NONE;
}
return sequence;
}
private void lock(String id, int sequence, long version) throws IOException {
XContentBuilder source = XContentFactory.jsonBuilder().startObject();
es sequence字段
source.field(RegisterLockIndex.COLUMN_SEQUENCE, sequence);
source.endObject();
getClient().forceUpdate(RegisterLockIndex.NAME, id, source, version);
}
|