Dubbo中registry注册组件向ZooKeeper注册中心注册的流程个人见解:
对于Dubbo来说,无论是providers服务提供者,还是consumers服务消费者,都是有提供注册和订阅功能,都拥有注册功能,类似于本地注册中心客户端,真正的“注册中心”服务是其他独立部署的进程,或进程组成的集群,比如 ZooKeeper 集群
Node
在dubbo中,使用Node来表示节点
public interface Node {
URL getUrl();
boolean isAvailable();
void destroy();
}
dubbo-registry-api
注册服务都是通过RegistryService接口中定义的方法来实现,封装了一些基本行为:
public interface RegistryService {
void register(URL url);
void unregister(URL url);
void subscribe(URL url, NotifyListener listener);
void unsubscribe(URL url, NotifyListener listener);
List<URL> lookup(URL url);
}
Registry接口继承了RegistryService和Node接口,其中reExportRegister和reExportUnregister都是直接委托给RegistryService中的订阅和注册方法
public interface Registry extends Node, RegistryService {
default void reExportRegister(URL url) {
register(url);
}
default void reExportUnregister(URL url) {
unregister(url);
}
}
RegistryFactory是Registry的工厂方法,用于创建Registry对象 Dubbo SPI默认扩展名为dubbo Adaptive 适配器会根据URL中的protocol协议来选择对于的实现
@SPI("dubbo")
public interface RegistryFactory {
@Adaptive({"protocol"})
Registry getRegistry(URL url);
}
AbstractRegistryFactory是RegistryFactory的抽象实现类,用于规范URL操作以及缓存Registry对象的公共能力
public abstract class AbstractRegistryFactory implements RegistryFactory {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractRegistryFactory.class);
protected static final ReentrantLock LOCK = new ReentrantLock();
protected static final Map<String, Registry> REGISTRIES = new HashMap<>();
}
AbstractRegistry抽象类实现了Registry接口,实现了注册数据的读写功能,Registry接口所有的实现类都继承了这个抽象类 AbstractRegistry抽象类主要用于缓存当前节点订阅的URL到Properties文件中
public abstract class AbstractRegistry implements Registry {
private static final char URL_SEPARATOR = ' ';
private static final String URL_SPLIT = "\\s+";
private static final int MAX_RETRY_TIMES_SAVE_PROPERTIES = 3;
protected final Logger logger = LoggerFactory.getLogger(getClass());
private final Properties properties = new Properties();
private final ExecutorService registryCacheExecutor = Executors.newFixedThreadPool(1, new NamedThreadFactory("DubboSaveRegistryCache", true));
private boolean syncSaveFile;
private final AtomicLong lastCacheChanged = new AtomicLong();
private final AtomicInteger savePropertiesRetryTimes = new AtomicInteger();
private final Set<URL> registered = new ConcurrentHashSet<>();
private final ConcurrentMap<URL, Set<NotifyListener>> subscribed = new ConcurrentHashMap<>();
private final ConcurrentMap<URL, Map<String, List<URL>>> notified = new ConcurrentHashMap<>();
private URL registryUrl;
private File file;
}
FailbackRegistry继承自AbstractRegistry抽象类,完善注册订阅方法,并且加入了时间轮用于重试机制,一些服务发现组件如zookeeper继承自FailbackRegistry,拥有了重试机制。通过doRegister()/doUnregister()、doSubscribe()/doUnsubscribe() 以及 doNotify()这五个模板方法,用于不同子类组件来实现具体与服务组件交互的操作
public abstract class FailbackRegistry extends AbstractRegistry {
private final ConcurrentMap<URL, FailedRegisteredTask> failedRegistered = new ConcurrentHashMap<URL, FailedRegisteredTask>();
private final ConcurrentMap<URL, FailedUnregisteredTask> failedUnregistered = new ConcurrentHashMap<URL, FailedUnregisteredTask>();
private final ConcurrentMap<Holder, FailedSubscribedTask> failedSubscribed = new ConcurrentHashMap<Holder, FailedSubscribedTask>();
private final ConcurrentMap<Holder, FailedUnsubscribedTask> failedUnsubscribed = new ConcurrentHashMap<Holder, FailedUnsubscribedTask>();
private final ConcurrentMap<Holder, FailedNotifiedTask> failedNotified = new ConcurrentHashMap<Holder, FailedNotifiedTask>();
private final int retryPeriod;
private final HashedWheelTimer retryTimer;
}
ZooKeeper 注册中心 ZooKeeper是为分布式引用设计的高可用且一致性的开源协调服务,是一个树型目录服务,类似于文件夹,支持变更推送
ZookeeperRegistryFactory工厂实现了AbstractRegistryFactory实现类,调用createRegistry方法创建ZookeeperRegistry实例,后续由ZookeeperRegistry完成Zookeeper的交互
public class ZookeeperRegistryFactory extends AbstractRegistryFactory {
private ZookeeperTransporter zookeeperTransporter;
public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {
this.zookeeperTransporter = zookeeperTransporter;
}
@Override
public Registry createRegistry(URL url) {
return new ZookeeperRegistry(url, zookeeperTransporter);
}
}
dubbo-remoting->dubbo-remoting-zookeeper ZookeeperTransporter和ZookeeperClient
@SPI("curator")
public interface ZookeeperTransporter {
@Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
ZookeeperClient connect(URL url);
}
ZookeeperTransporter负责创建ZookeeperClient 对象,同过SPI扩展名curator对应的实现类CuratorZookeeperTransporter来创建ZookeeperClient对象,根据适配器中的client或者transporter来覆盖SPI指定的默认扩展名
ZookeeperTransporter->AbstractZookeeperTransporter->CuratorZookeeperTransporter AbstractZookeeperTransporter抽象类的核心功能如下: 缓存zookeeperClient实例 在某个zookeeper节点无法连接时,切换到备用的zookeeper地址
public class CuratorZookeeperTransporter extends AbstractZookeeperTransporter {
@Override
public ZookeeperClient createZookeeperClient(URL url) {
return new CuratorZookeeperClient(url);
}
}
ZookeeperClient ->AbstractZookeeperClient->CuratorZookeeperClient ZookeeperClient 接口是Dubbo封装的Zookeeper和护短,其中定义的方法都是与zookeeper进行交互的
public interface ZookeeperClient {
void create(String path, boolean ephemeral);
void delete(String path);
List<String> getChildren(String path);
List<String> addChildListener(String path, ChildListener listener);
void addDataListener(String path, DataListener listener);
void addDataListener(String path, DataListener listener, Executor executor);
void removeDataListener(String path, DataListener listener);
void removeChildListener(String path, ChildListener listener);
void addStateListener(StateListener listener);
void removeStateListener(StateListener listener);
boolean isConnected();
void close();
URL getUrl();
void create(String path, String content, boolean ephemeral);
String getContent(String path);
}
AbstractZookeeperClient是ZookeeperClient接口的抽象实现,提供了如下功能: 缓存ZookeeperClient实例创建的持久ZNode节点 管理当前ZookeeperClient实例添加的各类监听器 管理ZookeeperClient的运行状态
public abstract class AbstractZookeeperClient<TargetDataListener, TargetChildListener> implements ZookeeperClient {
protected static final Logger logger = LoggerFactory.getLogger(AbstractZookeeperClient.class);
protected int DEFAULT_CONNECTION_TIMEOUT_MS = 5 * 1000;
protected int DEFAULT_SESSION_TIMEOUT_MS = 60 * 1000;
private final URL url;
private final Set<StateListener> stateListeners = new CopyOnWriteArraySet<StateListener>();
private final ConcurrentMap<String, ConcurrentMap<ChildListener, TargetChildListener>> childListeners = new ConcurrentHashMap<String, ConcurrentMap<ChildListener, TargetChildListener>>();
private final ConcurrentMap<String, ConcurrentMap<DataListener, TargetDataListener>> listeners = new ConcurrentHashMap<String, ConcurrentMap<DataListener, TargetDataListener>>();
private volatile boolean closed = false;
private final Set<String> persistentExistNodePath = new ConcurrentHashSet<>();
}
CuratorZookeeperClient 与 Zookeeper 交互的全部操作,都是围绕着这个 Apache Curator 客户端展开的
public CuratorZookeeperClient(URL url) {
super(url);
try {
int timeout = url.getParameter(TIMEOUT_KEY, DEFAULT_CONNECTION_TIMEOUT_MS);
int sessionExpireMs = url.getParameter(ZK_SESSION_EXPIRE_KEY, DEFAULT_SESSION_TIMEOUT_MS);
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
.connectString(url.getBackupAddress())
.retryPolicy(new RetryNTimes(1, 1000))
.connectionTimeoutMs(timeout)
.sessionTimeoutMs(sessionExpireMs);
String authority = url.getAuthority();
if (authority != null && authority.length() > 0) {
builder = builder.authorization("digest", authority.getBytes());
}
client = builder.build();
client.getConnectionStateListenable().addListener(new CuratorConnectionStateListener(url));
client.start();
boolean connected = client.blockUntilConnected(timeout, TimeUnit.MILLISECONDS);
if (!connected) {
throw new IllegalStateException("zookeeper not connected");
}
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
CuratorWatcherImpl是CuratorZookeeperClient 的内部类,用于监听TreeCache关注的树形结构,当其发生变化时,会调用childEvent方法将变化的路径、节点内容以及事件类型传递给关联的DataListener实例 在CuratorZookeeperClient中监听TreeCache
@Override
protected void addTargetDataListener(String path, CuratorZookeeperClient.CuratorWatcherImpl treeCacheListener, Executor executor) {
try {
TreeCache treeCache = TreeCache.newBuilder(client, path).setCacheData(false).build();
treeCacheMap.putIfAbsent(path, treeCache);
if (executor == null) {
treeCache.getListenable().addListener(treeCacheListener);
} else {
treeCache.getListenable().addListener(treeCacheListener, executor);
}
treeCache.start();
} catch (Exception e) {
throw new IllegalStateException("Add treeCache listener for path:" + path, e);
}
}
CuratorWatcherImpl 实现
static class CuratorWatcherImpl implements CuratorWatcher, TreeCacheListener {
private CuratorFramework client;
private volatile ChildListener childListener;
private volatile DataListener dataListener;
private String path;
public CuratorWatcherImpl(CuratorFramework client, ChildListener listener, String path) {
this.client = client;
this.childListener = listener;
this.path = path;
}
public CuratorWatcherImpl(CuratorFramework client, DataListener dataListener) {
this.dataListener = dataListener;
}
protected CuratorWatcherImpl() {
}
public void unwatch() {
this.childListener = null;
}
@Override
public void process(WatchedEvent event) throws Exception {
if (event.getType() == Watcher.Event.EventType.None) {
return;
}
if (childListener != null) {
childListener.childChanged(path, client.getChildren().usingWatcher(this).forPath(path));
}
}
@Override
public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
if (dataListener != null) {
if (logger.isDebugEnabled()) {
logger.debug("listen the zookeeper changed. The changed data:" + event.getData());
}
TreeCacheEvent.Type type = event.getType();
EventType eventType = null;
String content = null;
String path = null;
switch (type) {
case NODE_ADDED:
eventType = EventType.NodeCreated;
path = event.getData().getPath();
content = event.getData().getData() == null ? "" : new String(event.getData().getData(), CHARSET);
break;
case NODE_UPDATED:
。。。
}
dataListener.dataChanged(path, content, eventType);
}
}
}
ZookeeperRegistry 通过ZookeeperRegistryFactory创建完ZookeeperRegistry后,就可以建立与zookeeper的连接了 ZookeeperRegistry继承自FailbackRegistry拥有重试机制 主要关注StateListener监听器中的RECONNECTED和NEW_SESSION_CREATED状态,在当前Dubbo节点与zookeeper连接恢复或者是session恢复的时候,会重新进行订阅和注册,防止数据丢失
public class ZookeeperRegistry extends FailbackRegistry {
private final static Logger logger = LoggerFactory.getLogger(ZookeeperRegistry.class);
private final static String DEFAULT_ROOT = "dubbo";
private final String root;
private final Set<String> anyServices = new ConcurrentHashSet<>();
private final ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> zkListeners = new ConcurrentHashMap<>();
private final ZookeeperClient zkClient;
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
super(url);
if (url.isAnyHost()) {
throw new IllegalStateException("registry address == null");
}
String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT);
if (!group.startsWith(PATH_SEPARATOR)) {
group = PATH_SEPARATOR + group;
}
this.root = group;
zkClient = zookeeperTransporter.connect(url);
zkClient.addStateListener((state) -> {
if (state == StateListener.RECONNECTED) {
logger.warn("Trying to fetch the latest urls, in case there're provider changes during connection loss.\n" +
" Since ephemeral ZNode will not get deleted for a connection lose, " +
"there's no need to re-register url of this instance.");
ZookeeperRegistry.this.fetchLatestAddresses();
} else if (state == StateListener.NEW_SESSION_CREATED) {
logger.warn("Trying to re-register urls and re-subscribe listeners of this instance to registry...");
try {
ZookeeperRegistry.this.recover();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
} else if (state == StateListener.SESSION_LOST) {
logger.warn("Url of this instance will be deleted from registry soon. " +
"Dubbo client will try to re-register once a new session is created.");
} else if (state == StateListener.SUSPENDED) {
} else if (state == StateListener.CONNECTED) {
}
});
}
}
ZookeeperRegistry的方法: 1.doRegister() 方法和 doUnregister() 方法的实现都是通过ZookeeperClient 去找到合适的路径,然后创建或者删除响应的ZNode节点(相当于注册节点,注册一个服务) PS:doRegister() 注册Provider URL的时候,会根据dynamic参数决定创建的是临时的ZNode还是持久的ZNode,默认是临时的,当Provider与zookeeper会话关闭时,可以快速将变更推送到Consumer消费端 2.doSubscribe()方法通过ZookeeperClient 在指定的path上添加一个ChildListener监听器,当订阅的节点发生变化的时候,通过这个监听器触发notify()方法,在notify方法中会触发传入的NotifyListener监听器,doUnsubscribe()方法实现会将URL和NotifyListener对应的ChildListener从相关的path上删除,从而达到不再监听该path的效果 doSubscribe()方法分为两个分支: 1.订阅的URL中明确指定了service层接口的订阅请求,该分支会从URL拿到Counsumer关注的category节点集合,然后在每个category节点上添加ChildListener监听器 2.监听所有service层的订阅请求,例如monitor监控就会发出这种订阅请求,因为他需要监控所有的service节点的变化,这个分支的逻辑是直接在根节点上加一个ChildListener监听器,当有service层的节点出现时,就会触发这个ChildListener,其中会重新触发doSubscribe()方法执行上一个分支的逻辑,也就是1的逻辑
@Override
public void doSubscribe(final URL url, final NotifyListener listener) {
try {
if (ANY_VALUE.equals(url.getServiceInterface())) {
String root = toRootPath();
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> {
for (String child : currentChilds) {
child = URL.decode(child);
if (!anyServices.contains(child)) {
anyServices.add(child);
subscribe(url.setPath(child).addParameters(INTERFACE_KEY, child,
Constants.CHECK_KEY, String.valueOf(false)), k);
}
}
});
zkClient.create(root, false);
List<String> services = zkClient.addChildListener(root, zkListener);
if (CollectionUtils.isNotEmpty(services)) {
for (String service : services) {
service = URL.decode(service);
anyServices.add(service);
subscribe(url.setPath(service).addParameters(INTERFACE_KEY, service,
Constants.CHECK_KEY, String.valueOf(false)), listener);
}
}
} else {
List<URL> urls = new ArrayList<>();
for (String path : toCategoriesPath(url)) {
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, k, toUrlsWithEmpty(url, parentPath, currentChilds)));
zkClient.create(path, false);
List<String> children = zkClient.addChildListener(path, zkListener);
if (children != null) {
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
notify(url, listener, urls);
}
} catch (Throwable e) {
throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
|