1. 前言
DubboBootstrap启动时,首先会通过initialize() 方法完成初始化,装配各种Config对象,为后续的服务暴露和引用准备好环境。 ?
ServiceConfig对象就是Dubbo对服务的描述对象,服务暴露的逻辑都在ServiceConfig#export() 里面,Dubbo暴露服务也就是遍历所有的ServiceConfig,挨个进行暴露。 上图是官方文档给的图,在Dubbo的架构体系中,像集群容错、负载均衡等逻辑都是客户端实现的,所以服务暴露的过程相对会简单很多。ServiceConfig描述了对外提供的服务,ref属性引用了具体的服务实现对象,当Provider接收到Consumer发起的RPC调用时,会交给ref执行,但是Dubbo不会直接使用ref,因为不管是Provider还是Consumer,Dubbo都在向Invoker靠拢。 Invoker在Dubbo体系中是一个非常重要的概念,它代表一个调用者,可能是本地调用、远程调用、甚至是集群调用。对于Provider而言就是本地调用了,生成Invoker非常简单,字节码技术动态生成Wrapper对象,底层调用的还是ref对象。 ?
将ref包装成Invoker后,接下来就是根据协议进行服务暴露了,对应的方法是Protocol#export() ,会得到一个Exporter。Provider在接收到Consumer的RPC请求时,会根据Invocation参数映射到Exporter,然后获取它关联的Invoker,执行本地调用,最后响应结果。
2. 源码分析
Dubbo服务暴露的入口在ServiceConfig#export() 方法,主要做了三件事:
- 配置的校验和更新
- 暴露服务
- 分发服务暴露事件
代码精简后,如下:
public synchronized void export() {
checkAndUpdateSubConfigs();
if (shouldDelay()) {
DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);
} else {
doExport();
}
exported();
}
2.1 checkAndUpdateSubConfigs()
该方法主要是对ServiceConfig对象做一些配置的校验和自动更新。例如使用ProviderConfig的全局默认配置、将protocolIds转换成ProtocolConfig对象、自身的属性按照优先级进行刷新等等。配置更新完了,接下来就是做服务暴露的前置Check,例如注册中心是否有效、ref对象是否符合要求等等。
private void checkAndUpdateSubConfigs() {
completeCompoundConfigs();
checkDefault();
checkProtocol();
if (!isOnlyInJvm()) {
checkRegistry();
}
this.refresh();
checkStubAndLocal(interfaceClass);
ConfigValidationUtils.checkMock(interfaceClass, this);
ConfigValidationUtils.validateServiceConfig(this);
postProcessConfig();
代码有精简...
}
2.2 doExport()
最终会调用doExportUrls() 方法多注册中心多协议暴露服务,Dubbo暴露服务除了会向注册中心注册一份,本地也会注册到ServiceRepository。 ServiceRepository保存了当前应用提供了哪些服务、引用了哪些服务,后续Consumer服务引用时,如果自身已经提供了该服务,就会通过ServiceRepository直接引用本地提供的服务,跳过网络传输。
ServiceRepository repository = ApplicationModel.getServiceRepository();
ServiceDescriptor serviceDescriptor = repository.registerService(getInterfaceClass());
repository.registerProvider(getUniqueServiceName(),ref,serviceDescriptor,this,serviceMetadata);
接下来,获取当前服务需要注册到哪些注册中心,加载对应的URL。
List<URL> registryURLs = ConfigValidationUtils.loadRegistries(this, true);
在Check那一步,就已经解析了服务需要通过哪些协议进行暴露,所以接下来会遍历protocols,进行单协议多注册中心暴露。
for (ProtocolConfig protocolConfig : protocols) {
String pathKey = URL.buildKey(getContextPath(protocolConfig)
.map(p -> p + "/" + path)
.orElse(path), group, version);
repository.registerService(pathKey, interfaceClass);
serviceMetadata.setServiceKey(pathKey);
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
服务暴露需要用到各种参数,用于构建后续的服务暴露URL,这里会使用HashMap存储。Dubbo的配置粒度是到方法级别的,对应的类是MethodConfig,如果有方法级别的配置,也需要解析到Map中。
Map<String, String> map = new HashMap<String, String>();
map.put(SIDE_KEY, PROVIDER_SIDE);
ServiceConfig.appendRuntimeParameters(map);
AbstractConfig.appendParameters(map, getMetrics());
AbstractConfig.appendParameters(map, getApplication());
AbstractConfig.appendParameters(map, getModule());
......
参数组装完毕,解析出服务暴露的host和port,然后构建URL。
String host = findConfigedHosts(protocolConfig, registryURLs, map);
Integer port = findConfigedPorts(protocolConfig, name, map);
URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);
服务暴露的范围可以通过scope 属性配置,none 代表不暴露、local 仅暴露到本地JVM、remote 会暴露到远程。Dubbo在服务暴露前会进行判断,默认情况下会同时暴露到本地JVM和远程。
2.2.1 服务本地暴露
exportLocal() 方法用来本地暴露,本地暴露非常的简单,就是injvm 协议暴露,创建InjvmExporter存储到Map。不监听本地端口,不走网络传输,但是会走Filter和Listener。
private void exportLocal(URL url) {
URL local = URLBuilder.from(url)
.setProtocol(LOCAL_PROTOCOL)
.setHost(LOCALHOST_VALUE)
.setPort(0)
.build();
Exporter<?> exporter = PROTOCOL.export(
PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, local));
exporters.add(exporter);
}
2.2.2 服务远程暴露
暴露完本地,接下来就是远程暴露了。远程暴露前会先将ref包装成Invoker,对应的方法是ProxyFactory#getInvoker() ,Invoker会根据methodName调用ref的方法。 有两种方式,一种是利用Java自带的反射,另一种是利用字节码技术动态生成代理对象。Dubbo默认会选择第二种方式,利用javassist动态创建Class对应的Wrapper对象,动态生成的Wrapper类会根据方法名和参数直接调用ref对应的方法,避免Java反射带来的性能问题。
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
ref包装成了Invoker,然后就可以开始根据协议来暴露服务了。如果服务要注册到注册中心,解析出来的URL协议部分会被改写为registry ,这样SPI触发的就是RegistryProtocol#export() 方法。registry 本身是个伪协议,它只是在原有协议暴露的基础上,增加了服务注册到注册中心的功能。 ?
首先从URL中分别提取出注册中心URL和服务暴露的真实URL。
URL registryUrl = getRegistryUrl(originInvoker);
URL providerUrl = getProviderUrl(originInvoker);
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
providerUrl才是服务暴露的真实协议地址,然后通过doLocalExport() 方法开始根据指定的协议来暴露服务。
要区分服务的暴露和注册,暴露一般是指开始监听本地端口,对外提供服务。注册是指将服务注册到注册中心,让Consumer可以感知到。
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
String key = getCacheKey(originInvoker);
return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
});
}
在进行具体的协议暴露服务前,需要先经过Protocol的两个Wrapper类,这个是由SPI的自动包装特性支持的。首先是ProtocolFilterWrapper,它的目的就是对Invoker封装一层过滤器链FilterChain,在执行目标方法前先执行Filter。然后是ProtocolListenerWrapper,它的目的是在服务unexport 时触发事件。 ?
经过上面两个包装类后,SPI的自适应调用,根据URL的协议加载对应的Protocol实现,以dubbo协议为例,对应的就是DubboProtocol#export 方法。Dubbo协议暴露服务,首先自然还是创建DubboExporter ,但Dubbo服务是要供Consumer调用的,不开启网络服务,Consumer如何调用呢?所以openServer() 方法会开启服务。
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
openServer(url);
optimizeSerialization(url);
return exporter;
}
同一个address,不管暴露多少服务,都只会也只能开启一个服务,所以会用address作为Key,将服务端缓存到Map容器,address不存在时,才会调用createServer() 创建ProtocolServer。 创建服务需要绑定本地端口,最终调用的是Exchanger#bind() 。Exchanger实现类会通过SPI自适应加载,目前只有一种实现类HeaderExchanger。
public class HeaderExchanger implements Exchanger {
public static final String NAME = "header";
@Override
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
}
ExchangerServer依赖Transporter,Transporter是Dubbo对网络传输层的抽象接口,默认使用Netty,其他还有如Mina、Grizzly等,Transporter实现也是通过SPI自适应加载的,可以通过参数server 或transporter 指定,这里我们只看Netty。 NettyTransporter开启服务很简单,就是创建了NettyServer实例,在它的构造函数中,会开启Netty服务端。
public class NettyTransporter implements Transporter {
public static final String NAME = "netty";
@Override
public RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException {
return new NettyServer(url, handler);
}
}
在NettyServer的构造函数中,最终会调用doOpen() 开启服务,熟悉Netty的同学应该很眼熟下面的代码。创建ServerBootstrap,设置EventLoopGroup,编配ChannelHandlerPipeline,最终调用bind() 绑定本地端口。
protected void doOpen() throws Throwable {
bootstrap = new ServerBootstrap();
bossGroup = NettyEventLoopFactory.eventLoopGroup(1, "NettyServerBoss");
workerGroup = NettyEventLoopFactory.eventLoopGroup(
getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
"NettyServerWorker");
final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
channels = nettyServerHandler.getChannels();
bootstrap.group(bossGroup, workerGroup)
.channel(NettyEventLoopFactory.serverSocketChannelClass())
.option(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {
ch.pipeline().addLast("negotiation",SslHandlerInitializer.sslServerHandler(getUrl(), nettyServerHandler));}
ch.pipeline()
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
.addLast("handler", nettyServerHandler);
}
});
ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
channelFuture.syncUninterruptibly();
channel = channelFuture.channel();
}
至此,Provider就开始监听网络请求了,服务的暴露就完成了。
2.2.3 服务注册
服务暴露完了,接下来就是注册,让Consumer可以感知到。先根据注册中心URL加载对应的注册中心实现类。
protected Registry getRegistry(final Invoker<?> originInvoker) {
URL registryUrl = getRegistryUrl(originInvoker);
return registryFactory.getRegistry(registryUrl);
}
解析出需要注册到注册中心的URL,然后调用RegistryService#register() 完成服务注册。
final Registry registry = getRegistry(originInvoker);
final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);
boolean register = providerUrl.getParameter(REGISTER_KEY, true);
if (register) {
register(registryUrl, registeredProviderUrl);
}
这里以Nacos为例,最终操作如下:
public void doRegister(URL url) {
final String serviceName = getServiceName(url);
final Instance instance = createInstance(url);
execute(namingService -> namingService.registerInstance(serviceName,getUrl().getParameter(GROUP_KEY, Constants.DEFAULT_GROUP), instance));
}
3. 总结
Dubbo服务暴露,先将ref封装成Invoker,Invoker会根据Consumer的Invocation参数对ref发起调用,Dubbo默认使用javassist字节码技术动态生成Wrapper类,避免了Java反射带来的性能问题。 有了Invoker就可以通过Protocol根据协议进行服务暴露,如果服务需要注册,Dubbo会改写URL协议为registry ,这是个伪协议,只是在原服务暴露的基础上,增加了服务注册的功能。 在根据协议暴露服务前,还需要关注两个包装类:ProtocolFilterWrapper和ProtocolListenerWrapper,前者用于构建Filter链,后者用于服务取消暴露时触发事件。 以dubbo协议为例,除了创建DubboExporter,还会根据服务暴露的address创建ProtocolServer。Transporter是dubbo对网络传输层的抽象接口,以Netty为例,底层其实就是创建了ServerBootstrap,然后bind 本地接口监听网络请求。
|