异步无返回
配置
<dubbo:reference id="serviceDemo" interface="com.jiangzheng.course.dubbo.api.service.ServiceDemo">
<dubbo:method name="sayHello" return="false"/>
</dubbo:reference>
注意此处有个坑:如果通过配置的dubbo:reference的话,在使用的时候 请勿在使用@DubboReference去引用接口实例,否则xml的配置不生效
源码解析
org.apache.dubbo.rpc.protocol.dubbo.DubboInvoker#doInvoke
@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(PATH_KEY, getUrl().getPath());
inv.setAttachment(VERSION_KEY, version);
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = calculateTimeout(invocation, methodName);
invocation.put(TIMEOUT_KEY, timeout);
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
return AsyncRpcResult.newDefaultAsyncResult(invocation);
} else {
ExecutorService executor = getCallbackExecutor(getUrl(), inv);
CompletableFuture<AppResponse> appResponseFuture =
currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);
FutureContext.getContext().setCompatibleFuture(appResponseFuture);
AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
result.setExecutor(executor);
return result;
}
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (RemotingException e) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
public static AsyncRpcResult newDefaultAsyncResult(Invocation invocation) {
return newDefaultAsyncResult(null, null, invocation);
}
public static AsyncRpcResult newDefaultAsyncResult(Object value, Throwable t, Invocation invocation) {
CompletableFuture<AppResponse> future = new CompletableFuture<>();
AppResponse result = new AppResponse();
if (t != null) {
result.setException(t);
} else {
result.setValue(value);
}
future.complete(result);
return new AsyncRpcResult(future, invocation);
}
异步
配置
<dubbo:reference id="serviceDemo" interface="com.jiangzheng.course.dubbo.api.service.ServiceDemo">
<dubbo:method name="getSelf" async="true"/>
</dubbo:reference>
调用
import com.jiangzheng.course.dubbo.api.service.ServiceDemo;
import org.apache.dubbo.rpc.RpcContext;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.ImportResource;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@SpringBootApplication
@ImportResource(locations = {"classpath:springContext-dubbo.xml"})
public class DubboDemoApplication {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ConfigurableApplicationContext context = SpringApplication.run(DubboDemoApplication.class, args);
ServiceDemo serviceDemo = context.getBean("serviceDemo", ServiceDemo.class);
serviceDemo.getSelf("hello wwy");
CompletableFuture<String> completableFuture = RpcContext.getContext().getCompletableFuture();
String result = completableFuture.get();
System.out.println(result);
}
}
源码解析
@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(PATH_KEY, getUrl().getPath());
inv.setAttachment(VERSION_KEY, version);
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = calculateTimeout(invocation, methodName);
invocation.put(TIMEOUT_KEY, timeout);
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
return AsyncRpcResult.newDefaultAsyncResult(invocation);
} else {
ExecutorService executor = getCallbackExecutor(getUrl(), inv);
CompletableFuture<AppResponse> appResponseFuture =
currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);
FutureContext.getContext().setCompatibleFuture(appResponseFuture);
AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
result.setExecutor(executor);
return result;
}
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (RemotingException e) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
@Override
public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
return channel.request(request, timeout, executor);
}
@Override
public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
}
Request req = new Request();
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
req.setData(request);
DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor);
try {
channel.send(req);
} catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}
public static DefaultFuture newFuture(Channel channel, Request request, int timeout, ExecutorService executor) {
final DefaultFuture future = new DefaultFuture(channel, request, timeout);
future.setExecutor(executor);
if (executor instanceof ThreadlessExecutor) {
((ThreadlessExecutor) executor).setWaitingFuture(future);
}
timeoutCheck(future);
return future;
}
private DefaultFuture(Channel channel, Request request, int timeout) {
this.channel = channel;
this.request = request;
this.id = request.getId();
this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
FUTURES.put(id, this);
CHANNELS.put(id, channel);
}
我们直接看received方法,看下如何执行请求返回的
public static void received(Channel channel, Response response, boolean timeout) {
try {
DefaultFuture future = FUTURES.remove(response.getId());
if (future != null) {
Timeout t = future.timeoutCheckTask;
if (!timeout) {
t.cancel();
}
future.doReceived(response);
} else {
logger.warn("The timeout response finally returned at "
+ (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
+ ", response status is " + response.getStatus()
+ (channel == null ? "" : ", channel: " + channel.getLocalAddress()
+ " -> " + channel.getRemoteAddress()) + ", please check provider side for detailed result.");
}
} finally {
CHANNELS.remove(response.getId());
}
}
private void doReceived(Response res) {
if (res == null) {
throw new IllegalStateException("response cannot be null");
}
if (res.getStatus() == Response.OK) {
this.complete(res.getResult());
} else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()));
} else {
this.completeExceptionally(new RemotingException(channel, res.getErrorMessage()));
}
if (executor != null && executor instanceof ThreadlessExecutor) {
ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor;
if (threadlessExecutor.isWaiting()) {
threadlessExecutor.notifyReturn(new IllegalStateException("The result has returned, but the biz thread is still waiting" +
" which is not an expected state, interrupt the thread manually by returning an exception."));
}
}
}
异步转同步
源码解析
针对dubbo的异步调用,但是是如何同步返回呢(即async="false"时),我们具体看下
在调用DubboInvoker前,会先调用AbstractInvoker的invoke
@Override
public Result invoke(Invocation inv) throws RpcException {
if (isDestroyed()) {
logger.warn("Invoker for service " + this + " on consumer " + NetUtils.getLocalHost() + " is destroyed, "
+ ", dubbo version is " + Version.getVersion() + ", this invoker should not be used any longer");
}
RpcInvocation invocation = (RpcInvocation) inv;
invocation.setInvoker(this);
if (CollectionUtils.isNotEmptyMap(attachment)) {
invocation.addObjectAttachmentsIfAbsent(attachment);
}
Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments();
if (contextAttachments != null && contextAttachments.size() != 0) {
invocation.addObjectAttachmentsIfAbsent(contextAttachments);
}
invocation.setInvokeMode(RpcUtils.getInvokeMode(url, invocation));
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
AsyncRpcResult asyncResult;
try {
asyncResult = (AsyncRpcResult) doInvoke(invocation);
} catch (InvocationTargetException e) {
Throwable te = e.getTargetException();
if (te == null) {
asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
} else {
if (te instanceof RpcException) {
((RpcException) te).setCode(RpcException.BIZ_EXCEPTION);
}
asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, te, invocation);
}
} catch (RpcException e) {
if (e.isBiz()) {
asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
} else {
throw e;
}
} catch (Throwable e) {
asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
}
RpcContext.getContext().setFuture(new FutureAdapter(asyncResult.getResponseFuture()));
waitForResultIfSync(asyncResult, invocation);
return asyncResult;
}
private void waitForResultIfSync(AsyncRpcResult asyncResult, RpcInvocation invocation) {
try {
if (InvokeMode.SYNC == invocation.getInvokeMode()) {
asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
}
} catch (InterruptedException e) {
throw new RpcException("Interrupted unexpectedly while waiting for remote result to return! method: " +
invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (ExecutionException e) {
Throwable t = e.getCause();
if (t instanceof TimeoutException) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " +
invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} else if (t instanceof RemotingException) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " +
invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} else {
throw new RpcException(RpcException.UNKNOWN_EXCEPTION, "Fail to invoke remote method: " +
invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
} catch (Throwable e) {
throw new RpcException(e.getMessage(), e);
}
}
事件回调
配置
<dubbo:reference id="serviceDemo" interface="com.jiangzheng.course.dubbo.api.service.ServiceDemo">
<dubbo:method name="getSelf" async="true"
onreturn="providerCallbackService.onReturn"
onthrow="providerCallbackService.onThrow"
oninvoke="providerCallbackService.onInvoke"/>
</dubbo:reference>
import org.springframework.stereotype.Service;
@Service("providerCallbackService")
public class ProviderCallbackServiceImpl {
public void onReturn(String response) {
System.out.println("onReturn : " + response);
}
public void onThrow(Throwable throwable) {
System.out.println("onThrow : " + throwable.getMessage());
}
public void onInvoke(String message) {
System.out.println("onInvoke : " + message);
}
}
@SpringBootApplication
@ImportResource(locations = {"classpath:springContext-dubbo.xml"})
public class DubboDemoApplication {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ConfigurableApplicationContext context = SpringApplication.run(DubboDemoApplication.class, args);
ServiceDemo serviceDemo = context.getBean("serviceDemo", ServiceDemo.class);
String result = serviceDemo.getSelf("hello wwy");
System.out.println(result);
}
}
onInvoke : hello wwy
onReturn : hello wwy
源码解析
此处实现在org.apache.dubbo.rpc.protocol.dubbo.filter.FutureFilter中
@Activate(group = CommonConstants.CONSUMER)
public class FutureFilter implements ClusterFilter, ClusterFilter.Listener {
@Override
public Result invoke(final Invoker<?> invoker, final Invocation invocation) throws RpcException {
fireInvokeCallback(invoker, invocation);
return invoker.invoke(invocation);
}
@Override
public void onResponse(Result result, Invoker<?> invoker, Invocation invocation) {
if (result.hasException()) {
fireThrowCallback(invoker, invocation, result.getException());
} else {
fireReturnCallback(invoker, invocation, result.getValue());
}
}
@Override
public void onError(Throwable t, Invoker<?> invoker, Invocation invocation) {
fireThrowCallback(invoker, invocation, t);
}
private void fireInvokeCallback(final Invoker<?> invoker, final Invocation invocation) {
AsyncMethodInfo asyncMethodInfo = (AsyncMethodInfo) invocation.get(ASYNC_METHOD_INFO);
if (asyncMethodInfo != null) {
return asyncMethodInfo;
}
ConsumerModel consumerModel = ApplicationModel.getConsumerModel(invoker.getUrl().getServiceKey());
if (consumerModel == null) {
return null;
}
String methodName = invocation.getMethodName();
if (methodName.equals($INVOKE)) {
methodName = (String) invocation.getArguments()[0];
}
final AsyncMethodInfo asyncMethodInfo = consumerModel.getAsyncInfo(methodName);
if (asyncMethodInfo == null) {
return;
}
final Method onInvokeMethod = asyncMethodInfo.getOninvokeMethod();
final Object onInvokeInst = asyncMethodInfo.getOninvokeInstance();
if (onInvokeMethod == null && onInvokeInst == null) {
return;
}
if (onInvokeMethod == null || onInvokeInst == null) {
throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a oninvoke callback config , but no such " + (onInvokeMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl());
}
if (!onInvokeMethod.isAccessible()) {
onInvokeMethod.setAccessible(true);
}
Object[] params = invocation.getArguments();
try {
onInvokeMethod.invoke(onInvokeInst, params);
} catch (InvocationTargetException e) {
fireThrowCallback(invoker, invocation, e.getTargetException());
} catch (Throwable e) {
fireThrowCallback(invoker, invocation, e);
}
}
private void fireThrowCallback(final Invoker<?> invoker, final Invocation invocation, final Throwable exception) {
final AsyncMethodInfo asyncMethodInfo = getAsyncMethodInfo(invoker, invocation);
if (asyncMethodInfo == null) {
return;
}
final Method onthrowMethod = asyncMethodInfo.getOnthrowMethod();
final Object onthrowInst = asyncMethodInfo.getOnthrowInstance();
if (onthrowMethod == null && onthrowInst == null) {
return;
}
if (onthrowMethod == null || onthrowInst == null) {
throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a onthrow callback config , but no such " + (onthrowMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl());
}
if (!onthrowMethod.isAccessible()) {
onthrowMethod.setAccessible(true);
}
Class<?>[] rParaTypes = onthrowMethod.getParameterTypes();
if (rParaTypes[0].isAssignableFrom(exception.getClass())) {
try {
Object[] args = invocation.getArguments();
Object[] params;
if (rParaTypes.length > 1) {
if (rParaTypes.length == 2 && rParaTypes[1].isAssignableFrom(Object[].class)) {
params = new Object[2];
params[0] = exception;
params[1] = args;
} else {
params = new Object[args.length + 1];
params[0] = exception;
System.arraycopy(args, 0, params, 1, args.length);
}
} else {
params = new Object[]{exception};
}
onthrowMethod.invoke(onthrowInst, params);
} catch (Throwable e) {
logger.error(invocation.getMethodName() + ".call back method invoke error . callback method :" + onthrowMethod + ", url:" + invoker.getUrl(), e);
}
} else {
logger.error(invocation.getMethodName() + ".call back method invoke error . callback method :" + onthrowMethod + ", url:" + invoker.getUrl(), exception);
}
}
private void fireReturnCallback(final Invoker<?> invoker, final Invocation invocation, final Object result) {
final AsyncMethodInfo asyncMethodInfo = getAsyncMethodInfo(invoker, invocation);
if (asyncMethodInfo == null) {
return;
}
final Method onReturnMethod = asyncMethodInfo.getOnreturnMethod();
final Object onReturnInst = asyncMethodInfo.getOnreturnInstance();
if (onReturnMethod == null && onReturnInst == null) {
return;
}
if (onReturnMethod == null || onReturnInst == null) {
throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a onreturn callback config , but no such " + (onReturnMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl());
}
if (!onReturnMethod.isAccessible()) {
onReturnMethod.setAccessible(true);
}
Object[] args = invocation.getArguments();
Object[] params;
Class<?>[] rParaTypes = onReturnMethod.getParameterTypes();
if (rParaTypes.length > 1) {
if (rParaTypes.length == 2 && rParaTypes[1].isAssignableFrom(Object[].class)) {
params = new Object[2];
params[0] = result;
params[1] = args;
} else {
params = new Object[args.length + 1];
params[0] = result;
System.arraycopy(args, 0, params, 1, args.length);
}
} else {
params = new Object[]{result};
}
try {
onReturnMethod.invoke(onReturnInst, params);
} catch (InvocationTargetException e) {
fireThrowCallback(invoker, invocation, e.getTargetException());
} catch (Throwable e) {
fireThrowCallback(invoker, invocation, e);
}
}
}
异步编程的一些其他写法示例可以查看官网:https://dubbo.apache.org/zh/docs/advanced/async-call/
|