Apache Dubbo服务提供者异步
- Apache Dubbo在新版加入了服务提供者异步
- 服务提供者异步在提升性能和响应时长没有任何用处
- 服务提供者异步旨在尽量规避Dubbo任务处理的瓶颈
图示:
代码演示
provider端接口实现
public class ServiceDemoImpl implements ServiceDemo {
private Executor executor = new ThreadPoolExecutor(2, 4, 1,
TimeUnit.MINUTES, new ArrayBlockingQueue<>(1000), new NamedThreadFactory("wwy-"),
new ThreadPoolExecutor.CallerRunsPolicy());
@Override
public String getSelf(String context) {
AsyncContext asyncContext = RpcContext.startAsync();
executor.execute(() -> {
asyncContext.signalContextSwitch();
String result = "Provider response : Hello " + context + ", Thread name is " + Thread.currentThread().getName();
asyncContext.write(result);
});
return null;
}
}
consumer端dubbo调用
@SpringBootApplication
@ImportResource(locations = {"classpath:springContext-dubbo.xml"})
public class DubboDemoApplication {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ConfigurableApplicationContext context = SpringApplication.run(DubboDemoApplication.class, args);
ServiceDemo serviceDemo = context.getBean("serviceDemo", ServiceDemo.class);
String result = serviceDemo.getSelf("hello wwy");
System.out.println(result);
}
}
输出结果:
onReturn : Provider response : Hello hello wwy, Thread name is wwy--thread-1
源码解析
public static AsyncContext startAsync() throws IllegalStateException {
RpcContext currentContext = getContext();
if (currentContext.asyncContext == null) {
currentContext.asyncContext = new AsyncContextImpl();
}
currentContext.asyncContext.start();
return currentContext.asyncContext;
}
public AsyncContextImpl() {
this.storedContext = RpcContext.getContext();
this.storedServerContext = RpcContext.getServerContext();
}
@Override
public void start() {
if (this.started.compareAndSet(false, true)) {
this.future = new CompletableFuture<>();
}
}
@Override
public void signalContextSwitch() {
RpcContext.restoreContext(storedContext);
RpcContext.restoreServerContext(storedServerContext);
}
@Override
public void write(Object value) {
if (isAsyncStarted() && stop()) {
if (value instanceof Throwable) {
Throwable bizExe = (Throwable) value;
future.completeExceptionally(bizExe);
} else {
future.complete(value);
}
} else {
throw new IllegalStateException("The async response has probably been wrote back by another thread, or the asyncContext has been closed.");
}
}
|