对于Dubbo来说,无论是providers服务提供者,还是consumers服务消费者,都是有提供注册和订阅功能,都拥有注册功能,类似于本地注册中心客户端,真正的“注册中心”服务是其他独立部署的进程,或进程组成的集群,比如 ZooKeeper 集群



 * Node. (API/SPI, Prototype, ThreadSafe)
public interface Node {

     * get url.返回当前节点的URL
     * @return url.
    URL getUrl();

     * is available.判断当前节点是否可用
     * @return available.
    boolean isAvailable();

     * destroy.销毁当前节点,并释放底层资源
    void destroy();




 * RegistryService. (SPI, Prototype, ThreadSafe)
 * @see org.apache.dubbo.registry.Registry
 * @see org.apache.dubbo.registry.RegistryFactory#getRegistry(URL)
public interface RegistryService {
		 void register(URL url);
		 void unregister(URL url);
		 //订阅,NotifyListener 监听node节点的变化,注册中心会根据NotifyListener来通知对于的订阅方节点的变化
		 void subscribe(URL url, NotifyListener listener);
		 void unsubscribe(URL url, NotifyListener listener);
		 List<URL> lookup(URL url);


 * Registry. (SPI, Prototype, ThreadSafe)
 * @see org.apache.dubbo.registry.RegistryFactory#getRegistry(URL)
 * @see
public interface Registry extends Node, RegistryService {
    default void reExportRegister(URL url) {

    default void reExportUnregister(URL url) {

Dubbo SPI默认扩展名为dubbo
Adaptive 适配器会根据URL中的protocol协议来选择对于的实现

 * RegistryFactory. (SPI, Singleton, ThreadSafe)
 * @see
public interface RegistryFactory {

     * Connect to the registry
     * <p>
     * Connecting the registry needs to support the contract: <br>
     * 1. When the check=false is set, the connection is not checked, otherwise the exception is thrown when disconnection <br>
     * 2. Support username:password authority authentication on URL.<br>
     * 3. Support the backup= candidate registry cluster address.<br>
     * 4. Support file=registry.cache local disk file cache.<br>
     * 5. Support the timeout=1000 request timeout setting.<br>
     * 6. Support session=60000 session timeout or expiration settings.<br>
     * @param url Registry address, is not allowed to be empty
     * @return Registry reference, never return empty value
    Registry getRegistry(URL url);



 * AbstractRegistryFactory. (SPI, Singleton, ThreadSafe)
 * @see org.apache.dubbo.registry.RegistryFactory
public abstract class AbstractRegistryFactory implements RegistryFactory {

    // Log output
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractRegistryFactory.class);

    // The lock for the acquisition process of the registry
    protected static final ReentrantLock LOCK = new ReentrantLock();
	// 用于缓存Registry对象
    // Registry Collection Map<RegistryAddress, Registry>
    protected static final Map<String, Registry> REGISTRIES = new HashMap<>();


 * AbstractRegistry. (SPI, Prototype, ThreadSafe)
public abstract class AbstractRegistry implements Registry {

    // URL address separator, used in file cache, service provider URL separation
    private static final char URL_SEPARATOR = ' ';
    // URL address separated regular expression for parsing the service provider URL list in the file cache
    private static final String URL_SPLIT = "\\s+";
    // Max times to retry to save properties to local cache file
    private static final int MAX_RETRY_TIMES_SAVE_PROPERTIES = 3;
    // Log output
    protected final Logger logger = LoggerFactory.getLogger(getClass());
    // Local disk cache, where the special key value.registries records the list of registry centers, and the others are the list of notified service providers
    private final Properties properties = new Properties();
    // File cache timing writing
    private final ExecutorService registryCacheExecutor = Executors.newFixedThreadPool(1, new NamedThreadFactory("DubboSaveRegistryCache", true));
    // Is it synchronized to save the file
    private boolean syncSaveFile;
    private final AtomicLong lastCacheChanged = new AtomicLong();
    private final AtomicInteger savePropertiesRetryTimes = new AtomicInteger();
    private final Set<URL> registered = new ConcurrentHashSet<>();
    private final ConcurrentMap<URL, Set<NotifyListener>> subscribed = new ConcurrentHashMap<>();
    private final ConcurrentMap<URL, Map<String, List<URL>>> notified = new ConcurrentHashMap<>();
    private URL registryUrl;
    // Local disk cache file
    private File file;

FailbackRegistry继承自AbstractRegistry抽象类,完善注册订阅方法,并且加入了时间轮用于重试机制,一些服务发现组件如zookeeper继承自FailbackRegistry,拥有了重试机制。通过doRegister()/doUnregister()、doSubscribe()/doUnsubscribe() 以及 doNotify()这五个模板方法,用于不同子类组件来实现具体与服务组件交互的操作

 * FailbackRegistry. (SPI, Prototype, ThreadSafe)
public abstract class FailbackRegistry extends AbstractRegistry {

    /*  retry task map */
    private final ConcurrentMap<URL, FailedRegisteredTask> failedRegistered = new ConcurrentHashMap<URL, FailedRegisteredTask>();
    private final ConcurrentMap<URL, FailedUnregisteredTask> failedUnregistered = new ConcurrentHashMap<URL, FailedUnregisteredTask>();
    private final ConcurrentMap<Holder, FailedSubscribedTask> failedSubscribed = new ConcurrentHashMap<Holder, FailedSubscribedTask>();
    private final ConcurrentMap<Holder, FailedUnsubscribedTask> failedUnsubscribed = new ConcurrentHashMap<Holder, FailedUnsubscribedTask>();
    private final ConcurrentMap<Holder, FailedNotifiedTask> failedNotified = new ConcurrentHashMap<Holder, FailedNotifiedTask>();

     * The time in milliseconds the retryExecutor will wait
     // 失败重试操作的间隔
    private final int retryPeriod;

    // Timer for failure retry, regular check if there is a request for failure, and if there is, an unlimited retry
    // 用于定时执行失败重试操作的时间轮
    private final HashedWheelTimer retryTimer;

ZooKeeper 注册中心

Dubbo官方网站Zookeeper 注册中心参考手册

 * ZookeeperRegistryFactory.
public class ZookeeperRegistryFactory extends AbstractRegistryFactory {

    private ZookeeperTransporter zookeeperTransporter;

     * Invisible injection of zookeeper client via IOC/SPI
     * @param zookeeperTransporter
    public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {
        this.zookeeperTransporter = zookeeperTransporter;

    public Registry createRegistry(URL url) {
        return new ZookeeperRegistry(url, zookeeperTransporter);


public interface ZookeeperTransporter {

    @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
    ZookeeperClient connect(URL url);


ZookeeperTransporter负责创建ZookeeperClient 对象,同过SPI扩展名curator对应的实现类CuratorZookeeperTransporter来创建ZookeeperClient对象,根据适配器中的client或者transporter来覆盖SPI指定的默认扩展名


public class CuratorZookeeperTransporter extends AbstractZookeeperTransporter {
    public ZookeeperClient createZookeeperClient(URL url) {
        return new CuratorZookeeperClient(url);

ZookeeperClient ->AbstractZookeeperClient->CuratorZookeeperClient
ZookeeperClient 接口是Dubbo封装的Zookeeper和护短,其中定义的方法都是与zookeeper进行交互的

public interface ZookeeperClient {
    void create(String path, boolean ephemeral);
    void delete(String path);
    List<String> getChildren(String path);
    List<String> addChildListener(String path, ChildListener listener);
     * @param path:    directory. All of child of path will be listened.
     * @param listener
    void addDataListener(String path, DataListener listener);
     * @param path:    directory. All of child of path will be listened.
     * @param listener
     * @param executor another thread
    void addDataListener(String path, DataListener listener, Executor executor);
    void removeDataListener(String path, DataListener listener);

    void removeChildListener(String path, ChildListener listener);

    void addStateListener(StateListener listener);

    void removeStateListener(StateListener listener);

    boolean isConnected();
    void close();

    URL getUrl();

    void create(String path, String content, boolean ephemeral);

    String getContent(String path);



public abstract class AbstractZookeeperClient<TargetDataListener, TargetChildListener> implements ZookeeperClient {

    protected static final Logger logger = LoggerFactory.getLogger(AbstractZookeeperClient.class);

    protected int DEFAULT_CONNECTION_TIMEOUT_MS = 5 * 1000;
    protected int DEFAULT_SESSION_TIMEOUT_MS = 60 * 1000;

    private final URL url;
    private final Set<StateListener> stateListeners = new CopyOnWriteArraySet<StateListener>();
    private final ConcurrentMap<String, ConcurrentMap<ChildListener, TargetChildListener>> childListeners = new ConcurrentHashMap<String, ConcurrentMap<ChildListener, TargetChildListener>>();
    private final ConcurrentMap<String, ConcurrentMap<DataListener, TargetDataListener>> listeners = new ConcurrentHashMap<String, ConcurrentMap<DataListener, TargetDataListener>>();

    private volatile boolean closed = false;
    private final Set<String>  persistentExistNodePath = new ConcurrentHashSet<>();

CuratorZookeeperClient 与 Zookeeper 交互的全部操作,都是围绕着这个 Apache Curator 客户端展开的

    public CuratorZookeeperClient(URL url) {
        try {
            int timeout = url.getParameter(TIMEOUT_KEY, DEFAULT_CONNECTION_TIMEOUT_MS);
            int sessionExpireMs = url.getParameter(ZK_SESSION_EXPIRE_KEY, DEFAULT_SESSION_TIMEOUT_MS);
            CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
                    .retryPolicy(new RetryNTimes(1, 1000))
            String authority = url.getAuthority();
            if (authority != null && authority.length() > 0) {
                builder = builder.authorization("digest", authority.getBytes());
            client =;
            client.getConnectionStateListenable().addListener(new CuratorConnectionStateListener(url));
            boolean connected = client.blockUntilConnected(timeout, TimeUnit.MILLISECONDS);
            if (!connected) {
                throw new IllegalStateException("zookeeper not connected");
        } catch (Exception e) {
            throw new IllegalStateException(e.getMessage(), e);

CuratorWatcherImpl是CuratorZookeeperClient 的内部类,用于监听TreeCache关注的树形结构,当其发生变化时,会调用childEvent方法将变化的路径、节点内容以及事件类型传递给关联的DataListener实例

    protected void addTargetDataListener(String path, CuratorZookeeperClient.CuratorWatcherImpl treeCacheListener, Executor executor) {
        try {
            TreeCache treeCache = TreeCache.newBuilder(client, path).setCacheData(false).build();
            treeCacheMap.putIfAbsent(path, treeCache);//缓存TreeCache

            if (executor == null) {//添加监听
            } else {
                treeCache.getListenable().addListener(treeCacheListener, executor);
        } catch (Exception e) {
            throw new IllegalStateException("Add treeCache listener for path:" + path, e);

CuratorWatcherImpl 实现

static class CuratorWatcherImpl implements CuratorWatcher, TreeCacheListener {

        private CuratorFramework client;
        private volatile ChildListener childListener;
        private volatile DataListener dataListener;
        private String path;

        public CuratorWatcherImpl(CuratorFramework client, ChildListener listener, String path) {
            this.client = client;
            this.childListener = listener;
            this.path = path;

        public CuratorWatcherImpl(CuratorFramework client, DataListener dataListener) {
            this.dataListener = dataListener;

        protected CuratorWatcherImpl() {

        public void unwatch() {
            this.childListener = null;
        public void process(WatchedEvent event) throws Exception {
            // if client connect or disconnect to server, zookeeper will queue
            // watched event(Watcher.Event.EventType.None, .., path = null).
            if (event.getType() == Watcher.Event.EventType.None) {

            if (childListener != null) {
                childListener.childChanged(path, client.getChildren().usingWatcher(this).forPath(path));
        public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
            if (dataListener != null) {
                if (logger.isDebugEnabled()) {
                    logger.debug("listen the zookeeper changed. The changed data:" + event.getData());
                TreeCacheEvent.Type type = event.getType();
                EventType eventType = null;
                String content = null;
                String path = null;
                switch (type) {
                    case NODE_ADDED:
                        eventType = EventType.NodeCreated;
                        path = event.getData().getPath();
                        content = event.getData().getData() == null ? "" : new String(event.getData().getData(), CHARSET);
                    case NODE_UPDATED:

                dataListener.dataChanged(path, content, eventType);


 * ZookeeperRegistry
public class ZookeeperRegistry extends FailbackRegistry {

    private final static Logger logger = LoggerFactory.getLogger(ZookeeperRegistry.class);

    private final static String DEFAULT_ROOT = "dubbo";

    private final String root;

    private final Set<String> anyServices = new ConcurrentHashSet<>();

    private final ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> zkListeners = new ConcurrentHashMap<>();

    private final ZookeeperClient zkClient;

    public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
        if (url.isAnyHost()) {
            throw new IllegalStateException("registry address == null");
        String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT);
        if (!group.startsWith(PATH_SEPARATOR)) {
            group = PATH_SEPARATOR + group;
        this.root = group;
        zkClient = zookeeperTransporter.connect(url);
        zkClient.addStateListener((state) -> {
            if (state == StateListener.RECONNECTED) {
                logger.warn("Trying to fetch the latest urls, in case there're provider changes during connection loss.\n" +
                        " Since ephemeral ZNode will not get deleted for a connection lose, " +
                        "there's no need to re-register url of this instance.");
            } else if (state == StateListener.NEW_SESSION_CREATED) {
                logger.warn("Trying to re-register urls and re-subscribe listeners of this instance to registry...");
                try {
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
            } else if (state == StateListener.SESSION_LOST) {
                logger.warn("Url of this instance will be deleted from registry soon. " +
                        "Dubbo client will try to re-register once a new session is created.");
            } else if (state == StateListener.SUSPENDED) {

            } else if (state == StateListener.CONNECTED) {


1.doRegister() 方法和 doUnregister() 方法的实现都是通过ZookeeperClient 去找到合适的路径,然后创建或者删除响应的ZNode节点(相当于注册节点,注册一个服务)
PS:doRegister() 注册Provider URL的时候,会根据dynamic参数决定创建的是临时的ZNode还是持久的ZNode,默认是临时的,当Provider与zookeeper会话关闭时,可以快速将变更推送到Consumer消费端
2.doSubscribe()方法通过ZookeeperClient 在指定的path上添加一个ChildListener监听器,当订阅的节点发生变化的时候,通过这个监听器触发notify()方法,在notify方法中会触发传入的NotifyListener监听器,doUnsubscribe()方法实现会将URL和NotifyListener对应的ChildListener从相关的path上删除,从而达到不再监听该path的效果

    public void doSubscribe(final URL url, final NotifyListener listener) {
        try {
            if (ANY_VALUE.equals(url.getServiceInterface())) {
                String root = toRootPath();
                ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
                ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> {
                    for (String child : currentChilds) {
                        child = URL.decode(child);
                        if (!anyServices.contains(child)) {
                            subscribe(url.setPath(child).addParameters(INTERFACE_KEY, child,
                                    Constants.CHECK_KEY, String.valueOf(false)), k);
                zkClient.create(root, false);//保证根节点存在
                List<String> services = zkClient.addChildListener(root, zkListener);
                if (CollectionUtils.isNotEmpty(services)) {
                    for (String service : services) {
                        service = URL.decode(service);
                        subscribe(url.setPath(service).addParameters(INTERFACE_KEY, service,
                                Constants.CHECK_KEY, String.valueOf(false)), listener);
            } else {
                List<URL> urls = new ArrayList<>();
                for (String path : toCategoriesPath(url)) {//要订阅的所有path
                    ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
                    ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, k, toUrlsWithEmpty(url, parentPath, currentChilds)));
                    zkClient.create(path, false);
                    List<String> children = zkClient.addChildListener(path, zkListener);
                    if (children != null) {
                        urls.addAll(toUrlsWithEmpty(url, path, children));
                notify(url, listener, urls);
        } catch (Throwable e) {
            throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
加:2021-08-29 09:09:56  更:2021-08-29 09:10:28 
