dubbo 本地调用
?????????
官网:https://dubbo.apache.org/zh/docs/advanced/local-call/
??????????????
???????????
???????????????????????????????
本地调用
????????
本地调用:使用injvm协议,服务在本地内存存储,不打开端口,但执行过滤链
??????????
本地调用配置
# 定义injvm协议
<dubbo:protocol name="injvm" />
# 服务在本地暴露:从dubbo 2.2.0开始,服务默认都会在本地暴露
<dubbo:service protocol="injvm" />
# 优先调用本地服务:如果不设置,默认优先调用本地服务
<dubbo:reference injvm="true" .../>
# 强制调用远程服务
<dubbo:reference ... scope="remote" />
?????????????
???????????????????
???????????????????????????????
实现原理
??????
服务本地暴露调用栈
# ServiceConfig.exportLocal:服务本地暴露
at org.apache.dubbo.config.ServiceConfig.exportLocal(ServiceConfig.java:654)
# ServiceConfig类
at org.apache.dubbo.config.ServiceConfig.exportUrl(ServiceConfig.java:567)
at org.apache.dubbo.config.ServiceConfig.doExportUrlsFor1Protocol(ServiceConfig.java:402)
at org.apache.dubbo.config.ServiceConfig.doExportUrls(ServiceConfig.java:388)
at org.apache.dubbo.config.ServiceConfig.doExport(ServiceConfig.java:363)
- locked <0x1b87> (a org.apache.dubbo.config.spring.ServiceBean)
at org.apache.dubbo.config.ServiceConfig.export(ServiceConfig.java:237)
# DefaultModuleDeployer类
at org.apache.dubbo.config.deploy.DefaultModuleDeployer.exportServiceInternal(DefaultModuleDeployer.java:338)
at org.apache.dubbo.config.deploy.DefaultModuleDeployer.exportServices(DefaultModuleDeployer.java:310)
at org.apache.dubbo.config.deploy.DefaultModuleDeployer.start(DefaultModuleDeployer.java:142)
- locked <0x1f53> (a org.apache.dubbo.config.deploy.DefaultModuleDeployer)
# DubboDeployApplicationListener类
at org.apache.dubbo.config.spring.context.DubboDeployApplicationListener.onContextRefreshedEvent(DubboDeployApplicationListener.java:108)
at org.apache.dubbo.config.spring.context.DubboDeployApplicationListener.onApplicationEvent(DubboDeployApplicationListener.java:98)
at org.apache.dubbo.config.spring.context.DubboDeployApplicationListener.onApplicationEvent(DubboDeployApplicationListener.java:44)
# SimpleApplicationEventMulticaster类
at org.springframework.context.event.SimpleApplicationEventMulticaster.doInvokeListener(SimpleApplicationEventMulticaster.java:176)
at org.springframework.context.event.SimpleApplicationEventMulticaster.invokeListener(SimpleApplicationEventMulticaster.java:169)
at org.springframework.context.event.SimpleApplicationEventMulticaster.multicastEvent(SimpleApplicationEventMulticaster.java:143)
# AbstractApplicationContext类
at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:421)
at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:378)
at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:938)
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:586)
- locked <0x1f54> (a java.lang.Object)
# ServletWebServerApplicationContext类
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:145)
# SpringApplication类
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:730)
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:412)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:302)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1301)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1290)
# 项目启动入口
at com.example.demo.DemoApplication.main(DemoApplication.java:12)
???????????
ServiceConfig
public class ServiceConfig<T> extends ServiceConfigBase<T> {
private void exportUrl(URL url, List<URL> registryURLs) {
String scope = url.getParameter(SCOPE_KEY);
// don't export when none is configured
if (!SCOPE_NONE.equalsIgnoreCase(scope)) {
// export to local if the config is not remote (export to remote only when config is remote)
if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
exportLocal(url);
} //如果scope!=remote,服务就先在本地暴露
// export to remote if the config is not local (export to local only when config is local)
if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
url = exportRemote(url, registryURLs);
MetadataUtils.publishServiceDefinition(url);
} //如果scope!=local,服务就在远程暴露
}
this.urls.add(url);
}
/**
* always export injvm
*/
private void exportLocal(URL url) {
URL local = URLBuilder.from(url)
.setProtocol(LOCAL_PROTOCOL) //协议为injvm
.setHost(LOCALHOST_VALUE) //主机:127.0.0.1
.setPort(0) //端口:0
.build();
local = local.setScopeModel(getScopeModel())
.setServiceModel(providerModel);
doExportUrl(local, false);
logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry url : " + local);
}
????????????
本地服务调用栈
# 服务调用
at org.apache.dubbo.rpc.cluster.support.FailoverClusterInvoker.doInvoke(FailoverClusterInvoker.java:80)
at org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker.invoke(AbstractClusterInvoker.java:332)
at org.apache.dubbo.monitor.support.MonitorFilter.invoke(MonitorFilter.java:99)
at org.apache.dubbo.rpc.cluster.filter.FilterChainBuilder$CopyOfFilterChainNode.invoke(FilterChainBuilder.java:313)
at org.apache.dubbo.rpc.protocol.dubbo.filter.FutureFilter.invoke(FutureFilter.java:51)
at org.apache.dubbo.rpc.cluster.filter.FilterChainBuilder$CopyOfFilterChainNode.invoke(FilterChainBuilder.java:313)
at org.apache.dubbo.rpc.cluster.filter.support.ConsumerContextFilter.invoke(ConsumerContextFilter.java:108)
at org.apache.dubbo.rpc.cluster.filter.FilterChainBuilder$CopyOfFilterChainNode.invoke(FilterChainBuilder.java:313)
at org.apache.dubbo.rpc.cluster.filter.FilterChainBuilder$CallbackRegistrationInvoker.invoke(FilterChainBuilder.java:187)
at org.apache.dubbo.rpc.cluster.support.wrapper.AbstractCluster$ClusterFilterInvoker.invoke(AbstractCluster.java:92)
at org.apache.dubbo.rpc.cluster.support.wrapper.MockClusterInvoker.invoke(MockClusterInvoker.java:95)
at org.apache.dubbo.rpc.proxy.InvokerInvocationHandler.invoke(InvokerInvocationHandler.java:92)
# 反射操作
at org.apache.dubbo.common.bytecode.proxy0.hello(proxy0.java:-1)
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(NativeMethodAccessorImpl.java:-1)
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:568)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:208)
at jdk.proxy2.$Proxy65.hello(Unknown Source:-1)
# 方法调用入口
at com.example.demo.controller.HelloController.hello(HelloController.java:18)
# 反射操作
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(NativeMethodAccessorImpl.java:-1)
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:568)
# web请求处理
at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:205)
at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:150)
at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:117)
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:895)
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:808)
at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87)
at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1067)
at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:963)
at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006)
at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:898)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:655)
at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:764)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:227)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:117)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:117)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:117)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:197)
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:97)
at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:540)
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:135)
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92)
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:78)
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:357)
at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:382)
at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65)
at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:895)
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1732)
at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49)
- locked <0x23e1> (a org.apache.tomcat.util.net.NioEndpoint$NioSocketWrapper)
# 线程池处理
at org.apache.tomcat.util.threads.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1191)
at org.apache.tomcat.util.threads.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:659)
at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
at java.lang.Thread.run(Thread.java:833)
???????????
FailoverClusterInvoker
public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> {
private static final Logger logger = LoggerFactory.getLogger(FailoverClusterInvoker.class);
public FailoverClusterInvoker(Directory<T> directory) {
super(directory);
}
@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
List<Invoker<T>> copyInvokers = invokers;
checkInvokers(copyInvokers, invocation);
String methodName = RpcUtils.getMethodName(invocation);
int len = calculateInvokeTimes(methodName);
// retry loop.
RpcException le = null; // last exception.
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
Set<String> providers = new HashSet<String>(len);
for (int i = 0; i < len; i++) {
//Reselect before retry to avoid a change of candidate `invokers`.
//NOTE: if `invokers` changed, then `invoked` also lose accuracy.
if (i > 0) {
checkWhetherDestroyed();
copyInvokers = list(invocation);
// check again
checkInvokers(copyInvokers, invocation);
}
Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
invoked.add(invoker);
RpcContext.getServiceContext().setInvokers((List) invoked);
boolean success = false;
try {
Result result = invokeWithContext(invoker, invocation);
if (le != null && logger.isWarnEnabled()) {
logger.warn("Although retry the method " + methodName
+ " in the service " + getInterface().getName()
+ " was successful by the provider " + invoker.getUrl().getAddress()
+ ", but there have been failed providers " + providers
+ " (" + providers.size() + "/" + copyInvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost()
+ " using the dubbo version " + Version.getVersion() + ". Last error is: "
+ le.getMessage(), le);
}
success = true;
return result;
} catch (RpcException e) {
if (e.isBiz()) { // biz exception.
throw e;
}
le = e;
} catch (Throwable e) {
le = new RpcException(e.getMessage(), e);
} finally {
if (!success) {
providers.add(invoker.getUrl().getAddress());
}
}
}
throw new RpcException(le.getCode(), "Failed to invoke the method "
+ methodName + " in the service " + getInterface().getName()
+ ". Tried " + len + " times of the providers " + providers
+ " (" + providers.size() + "/" + copyInvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
+ Version.getVersion() + ". Last error is: "
+ le.getMessage(), le.getCause() != null ? le.getCause() : le);
}
private int calculateInvokeTimes(String methodName) {
int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1;
RpcContext rpcContext = RpcContext.getClientAttachment();
Object retry = rpcContext.getObjectAttachment(RETRIES_KEY);
if (retry instanceof Number) {
len = ((Number) retry).intValue() + 1;
rpcContext.removeAttachment(RETRIES_KEY);
}
if (len <= 0) {
len = 1;
}
return len;
}
}
????????????
? ? ? ? ? ? ? ? ? ? ??
???????????????????
???????????????????????????????
使用示例
??????
?????????????????????????
?????????????????
application.yml
dubbo:
application:
name: dubbo-provider
#register-mode: instance
registry:
address: localhost:2181
protocol: zookeeper
group: dubbo
protocol:
name: dubbo
#port: 20880
????????????
HelloService
public interface HelloService {
String hello();
}
???????????
HelloServiceImpl
@DubboService
public class HelloServiceImpl implements HelloService {
@Override
public String hello() {
return "hello";
}
}
?????????????
HelloController
@RestController
public class HelloController {
@DubboReference
private HelloService helloService;
@RequestMapping("/hello")
public String hello(){
return helloService.hello();
}
}
localhost:8080/hello:可正常返回hello
????????????????
????????????????
|