存储实现本文默认elasticsearch
数据模型
- StorageData代表一条数据
- 每一条数据都会被存储到elasticsearch
- 数据类上如果标注@Stream注解,则会被构建成Model对象[第五篇annotationScan已经介绍]
- StorageModuleProvider启动时会基于Model向es创建索引
存储层架构设计
- StorageDAO 类似工厂用于获取真正的dao实现
- 不同dao实现用于读写StorageData
- StorageBuilder用于完成与es交互的防腐层转换,es层全部基于map传递数据,StorageBuilder支持map和StorageData的相互转换
- stream注解的扫描会完成Model的创建,model对应es的索引
- ModelInstaller会根据model对象向es存储发送请求索引创建请求
注册数据锁原理图
- 有一种特殊的Dao 叫做IRegisterLockDAO
- IRegisterLockDAO用于对注册数据进行加锁
- 实现原理基于es的版本号[乐观锁机制]
- 每一个@stream内有一个scopeId,比如endpoint注册信息,其写入es时会获取锁
- [启动阶段会为EndpointInventory创建endpoint_inventory索引,存储相关数据,同时会为register_lock创建一个id为16的数据,表示EndpointInventory当前的版本号]
- 将来在对endpoint_inventory写数据需要依据register_lock索引id为16的数据版本号++是否成功来判断是否加锁成功,从而决定写endpoint_inventory索引
源码分析一StorageModuleElasticsearchProvider
- prepare完成esclient的创建和服务注册
- start完成es索引的创建【init模式】
public class StorageModuleElasticsearch7Provider extends ModuleProvider {
@Override
public void prepare() throws ServiceNotProvidedException {
if (!StringUtil.isEmpty(config.getNameSpace())) {
config.setNameSpace(config.getNameSpace().toLowerCase());
}
elasticSearch7Client = new ElasticSearch7Client(config.getClusterNodes(), config.getProtocol(), config.getTrustStorePath(), config.getTrustStorePass(), config.getNameSpace(), config.getUser(), config.getPassword());
构建esClient
注册serviceImpl 必须要注册所有StorageModule.services方法的接口
this.registerServiceImplementation(IBatchDAO.class, new BatchProcessEsDAO(elasticSearch7Client, config.getBulkActions(), config.getFlushInterval(), config.getConcurrentRequests()));
this.registerServiceImplementation(StorageDAO.class, new StorageEs7DAO(elasticSearch7Client));
this.registerServiceImplementation(IRegisterLockDAO.class, new RegisterLockEs77DAOImpl(elasticSearch7Client));
this.registerServiceImplementation(IHistoryDeleteDAO.class, new HistoryDeleteEsDAO(getManager(), elasticSearch7Client, new ElasticsearchStorageTTL()));
this.registerServiceImplementation(IServiceInventoryCacheDAO.class, new ServiceInventoryCacheEs7DAO(elasticSearch7Client, config.getResultWindowMaxSize()));
this.registerServiceImplementation(IServiceInstanceInventoryCacheDAO.class, new ServiceInstanceInventoryCacheEs7DAO(elasticSearch7Client, config.getResultWindowMaxSize()));
this.registerServiceImplementation(IEndpointInventoryCacheDAO.class, new EndpointInventoryCacheEs7DAO(elasticSearch7Client));
this.registerServiceImplementation(INetworkAddressInventoryCacheDAO.class, new NetworkAddressInventoryCacheEs7DAO(elasticSearch7Client, config.getResultWindowMaxSize()));
this.registerServiceImplementation(ITopologyQueryDAO.class, new TopologyQueryEsDAO(elasticSearch7Client));
this.registerServiceImplementation(IMetricsQueryDAO.class, new MetricsQueryEs7DAO(elasticSearch7Client));
this.registerServiceImplementation(ITraceQueryDAO.class, new TraceQueryEs7DAO(elasticSearch7Client, config.getSegmentQueryMaxSize()));
this.registerServiceImplementation(IMetadataQueryDAO.class, new MetadataQueryEs7DAO(elasticSearch7Client, config.getMetadataQueryMaxSize()));
this.registerServiceImplementation(IAggregationQueryDAO.class, new AggregationQueryEs7DAO(elasticSearch7Client));
this.registerServiceImplementation(IAlarmQueryDAO.class, new AlarmQueryEs7DAO(elasticSearch7Client));
this.registerServiceImplementation(ITopNRecordsQueryDAO.class, new TopNRecordsQueryEsDAO(elasticSearch7Client));
this.registerServiceImplementation(ILogQueryDAO.class, new LogQueryEs7DAO(elasticSearch7Client));
}
@Override
public void start() throws ModuleStartException {
overrideCoreModuleTTLConfig();
try {
elasticSearch7Client.connect();
根据mode模式决定是否初始化es索引
StorageEs7Installer installer = new StorageEs7Installer(getManager(), config);
installer.install(elasticSearch7Client);
初始化锁索引
RegisterLockEs7Installer lockInstaller = new RegisterLockEs7Installer(elasticSearch7Client);
lockInstaller.install();
} catch (StorageException | IOException | KeyStoreException | NoSuchAlgorithmException | KeyManagementException | CertificateException e) {
throw new ModuleStartException(e.getMessage(), e);
}
}
@Override
public void notifyAfterCompleted() {
无需处理
}
@Override
public String[] requiredModules() {
return new String[]{CoreModule.NAME};
}
}
索引创建ModelInstaller.install
- StorageModels获取es索引映射对象
- init模式调用esclient创建索引
public final void install(Client client) throws StorageException {
IModelGetter modelGetter = moduleManager.find(CoreModule.NAME).provider().getService(IModelGetter.class);
StorageModels获取es索引映射对象
List<Model> models = modelGetter.getModels();
no-init 不会创建表 es索引等
if (RunningMode.isNoInitMode()) {
for (Model model : models) {
while (!isExists(client, model)) {
try {
logger.info("table: {} does not exist. OAP is running in 'no-init' mode, waiting... retry 3s later.", model.getName());
Thread.sleep(3000L);
} catch (InterruptedException e) {
logger.error(e.getMessage());
}
}
}
} else {
init 索引不存在在创建索引
for (Model model : models) {
if (!isExists(client, model)) {
logger.info("table: {} does not exist", model.getName());
createTable(client, model);
}
}
}
}
源码分析一@stream注解处理
- 分为四类StreamProcessor处理数据[agent上报分为4类]
- StreamProcessor将来处理agent上报的数据
public class StreamAnnotationListener implements AnnotationListener {
@SuppressWarnings("unchecked")
@Override public void notify(Class aClass) {
if (aClass.isAnnotationPresent(Stream.class)) {
Stream stream = (Stream)aClass.getAnnotation(Stream.class);
根据不同元组的Stream标记 创建Storage元信息,并构建Model 将来ModelInstaller 会根据Model信息执行es索引初始化
构建entryWorkers数据的用于流式处理
if (stream.processor().equals(InventoryStreamProcessor.class)) {
InventoryStreamProcessor.getInstance().create(moduleDefineHolder, stream, aClass);
} else if (stream.processor().equals(RecordStreamProcessor.class)) {
RecordStreamProcessor.getInstance().create(moduleDefineHolder, stream, aClass);
} else if (stream.processor().equals(MetricsStreamProcessor.class)) {
MetricsStreamProcessor.getInstance().create(moduleDefineHolder, stream, aClass);
} else if (stream.processor().equals(TopNStreamProcessor.class)) {
TopNStreamProcessor.getInstance().create(moduleDefineHolder, stream, aClass);
} else {
throw new UnexpectedException("Unknown stream processor.");
}
} else {
throw new UnexpectedException("Stream annotation listener could only parse the class present stream annotation.");
}
}
}
举例一InventoryStreamProcessor.create
- StorageModels根据stream注解添加Model[installer根据此model合集在es创建索引]
- 构建worker链路[RegisterDistinctWorker[l1聚合]-> RegisterRemoteWorker -> RegisterPersistentWorker[l2聚合]]
public void create(ModuleDefineHolder moduleDefineHolder, Stream stream,
Class<? extends RegisterSource> inventoryClass) {
StorageDAO storageDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(StorageDAO.class);
IRegisterDAO registerDAO;
try {
registerDAO = storageDAO.newRegisterDao(stream.builder().newInstance());
} catch (InstantiationException | IllegalAccessException e) {
throw new UnexpectedException("Create " + stream.builder().getSimpleName() + " register DAO failure.", e);
}
IModelSetter modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class);
添加到StorageModels 之后installer会根据其包含的model以及init模式进行存储层的table创建[es,db等]
Model model = modelSetter.putIfAbsent(inventoryClass, stream.scopeId(), new Storage(stream.name(), false, false, Downsampling.None), false);
最后一个worker 对请求去重 例如多个服务实例一起进行应用名注册 此外服务注册失败会进行去重
RegisterPersistentWorker persistentWorker = new RegisterPersistentWorker(moduleDefineHolder, model.getName(), registerDAO, stream.scopeId());
String remoteReceiverWorkerName = stream.name() + "_rec";
IWorkerInstanceSetter workerInstanceSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IWorkerInstanceSetter.class);
workerInstanceSetter.put(remoteReceiverWorkerName, persistentWorker, inventoryClass);
发往其他节点处理
RegisterRemoteWorker remoteWorker = new RegisterRemoteWorker(moduleDefineHolder, remoteReceiverWorkerName);
第一个worker
RegisterDistinctWorker distinctWorker = new RegisterDistinctWorker(moduleDefineHolder, remoteWorker);
entryWorkers.put(inventoryClass, distinctWorker);
}
总结
- Storage完成es索引的创建
- 概述Storage上报数据的分类以及Processor数据处理的分类
- 概述Storage的架构设计
- 讲解一个特殊的索引register_lock提供的加锁功能
|