前提
nacos 2.0.4 个人学习总结,仅供参考,如有问题还请各位老爷指正
1. NotifyCenter
基于发布订阅的方式,进行广播通知,主要涉及
- registerToPublisher
- registerSubscriber
比如关于配置变更的事件-ConfigDataChangeEvent,就是在AsyncNotifyService初始化时进行订阅的
2. AsyncNotifyService
主要涉及异步通知所有节点进行数据更新操作
@Autowired
public AsyncNotifyService(ServerMemberManager memberManager) {
this.memberManager = memberManager;
NotifyCenter.registerToPublisher(ConfigDataChangeEvent.class, NotifyCenter.ringBufferSize);
NotifyCenter.registerSubscriber(new Subscriber() {
@Override
public void onEvent(Event event) {
if (event instanceof ConfigDataChangeEvent) {
ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event;
long dumpTs = evt.lastModifiedTs;
String dataId = evt.dataId;
String group = evt.group;
String tenant = evt.tenant;
String tag = evt.tag;
Collection<Member> ipList = memberManager.allMembers();
Queue<NotifySingleTask> httpQueue = new LinkedList<NotifySingleTask>();
Queue<NotifySingleRpcTask> rpcQueue = new LinkedList<NotifySingleRpcTask>();
for (Member member : ipList) {
if (!MemberUtil.isSupportedLongCon(member)) {
httpQueue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, member.getAddress(),
evt.isBeta));
} else {
rpcQueue.add(
new NotifySingleRpcTask(dataId, group, tenant, tag, dumpTs, evt.isBeta, member));
}
}
if (!httpQueue.isEmpty()) {
ConfigExecutor.executeAsyncNotify(new AsyncTask(nacosAsyncRestTemplate, httpQueue));
}
if (!rpcQueue.isEmpty()) {
ConfigExecutor.executeAsyncNotify(new AsyncRpcTask(rpcQueue));
}
}
}
@Override
public Class<? extends Event> subscribeType() {
return ConfigDataChangeEvent.class;
}
});
}
AsyncNotifyService.AsyncTask
负责执行http get请求任务,任务由NotifySingleTask定义,调用相应节点的v1/cs/communication/dataChange api,需要注意请求头参数lastModified到毫秒 比如:
curl 'http://localhost:8848/nacos/v1/cs/communication/dataChange?dataId=test&group=DEFAULT_GROUP' --header 'lastModified:1648712946033'
3. 发布配置 POST v1/cs/configs
从代码上看,没有主节点重定向逻辑,只是简单的插入数据库,然后通知各节点
4. 客户端-NacosConfigService
|