服务提供者处理请求的调用流程分析
服务调用【六】里面已经说了服务消费者的发送请求的流程分析; 再来瞅瞅服务提供者处理请求的调用流程分析;
- netty的handler处理请求;
- 调用invoker处理业务;
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ch.pipeline()
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
.addLast("handler", nettyServerHandler);
}
});
handler处理请求
1.1 ByteToMessageDecoder#channelRead
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
CodecOutputList out = CodecOutputList.newInstance();
try {
ByteBuf data = (ByteBuf) msg;
first = cumulation == null;
if (first) {
cumulation = data;
} else {
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
}
callDecode(ctx, cumulation, out);
} catch (DecoderException e) {
throw e;
} catch (Exception e) {
throw new DecoderException(e);
} finally {
if (cumulation != null && !cumulation.isReadable()) {
numReads = 0;
cumulation.release();
cumulation = null;
} else if (++ numReads >= discardAfterReads) {
numReads = 0;
discardSomeReadBytes();
}
int size = out.size();
decodeWasNull = !out.insertSinceRecycled();
fireChannelRead(ctx, out, size);
out.recycle();
}
} else {
ctx.fireChannelRead(msg);
}
}
final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
throws Exception {
decodeState = STATE_CALLING_CHILD_DECODE;
try {
decode(ctx, in, out);
} finally {
boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;
decodeState = STATE_INIT;
if (removePending) {
handlerRemoved(ctx);
}
}
}
1.2 InternalDecoder#decode
netty服务器启动的时候,绑定了名为“decoder”解码handler, 因此, 功能: 当请求到来的时候, decoder处理器需要对字节流数据进行解码为Dubbo协议的数据。 ctx : 处理器上下文; input : 输入的字节流数据; out : 解码后的数据;
private class InternalDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out) throws Exception {
ChannelBuffer message = new NettyBackedChannelBuffer(input);
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
try {
do {
int saveReaderIndex = message.readerIndex();
Object msg = codec.decode(channel, message);
if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
message.readerIndex(saveReaderIndex);
break;
} else {
if (saveReaderIndex == message.readerIndex()) {
throw new IOException("Decode without read data.");
}
if (msg != null) {
out.add(msg);
}
}
} while (message.readable());
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.channel());
}
}
}
2. NettyServerHandler#channelRead
Netty会调用下一个handler#received进行处理, 下一个handler为MultiMessageHandledr;
public class NettyServerHandler extends ChannelDuplexHandler {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
try {
handler.received(channel, msg);
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.channel());
}
}
}
3. MultiMessageHandler#received
- 会判断message的类型是否为Multimessage;是则进行遍历消息;
- 为什么message可以进行for循环, MultiMessage实现了Iterable接口;因此可以直接遍历;
- 内部存在一个List容器, 存放参数message; 因此MultiMessage可以存放多个message进行处理;
- 个人感觉:类似请求合并, 一次性可以处理多个请求;
public class MultiMessageHandler extends AbstractChannelHandlerDelegate {
public MultiMessageHandler(ChannelHandler handler) {
super(handler);
}
@SuppressWarnings("unchecked")
@Override
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof MultiMessage) {
MultiMessage list = (MultiMessage) message;
for (Object obj : list) {
handler.received(channel, obj);
}
} else {
handler.received(channel, message);
}
}
}
4. AllChannelHandler#received
AllChannelHandler是Dubbo的线程模型中一种, 即创建了一个业务线程池, 将业务和IO时间进行分离, IO事件由IO事件进行处理, 业务执行交给业务线程池去处理;可以发挥更大的吞吐量;
- 获取线程池executor池;
- 创建ChannelEventRunnable实例, 交给线程池去处理;参数channel为通道, handler为下一个处理的处理器 , ChannelState.RECEIVED为通道撞到, message消息参数;
- 如果执行出现异常, 例如线程池已经无法运行任务, 则会抛出异常,判断twoWay类型, 如果为true,则说明是需要返回值的, 返回一个Response实例,将Response实例的数据通过channel返回;
public class AllChannelHandler extends WrappedChannelHandler {
@Override
public void received(Channel channel, Object message) throws RemotingException {
ExecutorService executor = getExecutorService();
try {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
if(message instanceof Request && t instanceof RejectedExecutionException){
Request request = (Request)message;
if(request.isTwoWay()){
String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();
Response response = new Response(request.getId(), request.getVersion());
response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
response.setErrorMessage(msg);
channel.send(response);
return;
}
}
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
}
}
}
5. DecodeHandler#received
- handler最后执行都会执行handler方法, 该方法会判断message的类型是否为Decodeable类型,获取其数据内容的类型是否为Decodeable类型;我们的请求message的类型为Request, 数据类型为RpcInvocation; 相当于什么没干;
- 如果使用了Decodeable类型传输数据 会使用另外的实现类去处理;
public class DecodeHandler extends AbstractChannelHandlerDelegate {
@Override
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof Decodeable) {
decode(message);
}
if (message instanceof Request) {
decode(((Request) message).getData());
}
if (message instanceof Response) {
decode(((Response) message).getResult());
}
handler.received(channel, message);
}
private void decode(Object message) {
if (message instanceof Decodeable) {
try {
((Decodeable) message).decode();
if (log.isDebugEnabled()) {
log.debug("Decode decodeable message " + message.getClass().getName());
}
} catch (Throwable e) {
if (log.isWarnEnabled()) {
log.warn("Call Decodeable.decode failed: " + e.getMessage(), e);
}
}
}
}
}
6. HeaderExchangeHandler#received
- 将message强转为Request类型,
- 判断event的值, 默认为false,因此走else逻辑;
- 判断twoWay状态, true表示需要返回值, false不需要;这里我们调用是需要返回值的。
- 因此走handleRquest方法;
- 如果不需要返回值, 则直接执行完这个调用链就可以了。
public class HeaderExchangeHandler implements ChannelHandlerDelegate {
@Override
public void received(Channel channel, Object message) throws RemotingException {
channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
if (message instanceof Request) {
Request request = (Request) message;
if (request.isEvent()) {
handlerEvent(channel, request);
} else {
if (request.isTwoWay()) {
handleRequest(exchangeChannel, request);
} else {
handler.received(exchangeChannel, request.getData());
}
}
} else if (message instanceof Response) {
handleResponse(channel, (Response) message);
} else if (message instanceof String) {
} else {
handler.received(exchangeChannel, message);
}
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
}
}
6.2 HeaderExchangeHandler#handleRequest
处理请求;
- 创建一个Response对象实例;设置请求ID,版本号;
- 判断broken的值, 代表是否请求处理失败,默认为false, 正常流程走下来, broken的值为false;
- 调用下一个handler#reply方法处理请求,返回一个CompletionStage实例,代表一个异步实例;
- 调用CompletionStage#whenComplete方法,如果请求执行完成, 会触发该回调方法;
- 若执行发生了异常, ,返回状态为服务调用错误SERVICE_ERROR,设置result结果为异常信息;
- 若执行没有发生异常,返回状态为Response.OK,设置返回结果为apResult, 代表的请求返回的数据
- 最终调用channel#send方法发送数据给客户端;
void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {
Response res = new Response(req.getId(), req.getVersion());
if (req.isBroken()) {
Object data = req.getData();
String msg;
if (data == null) {
msg = null;
} else if (data instanceof Throwable) {
msg = StringUtils.toString((Throwable) data);
} else {
msg = data.toString();
}
res.setErrorMessage("Fail to decode request due to: " + msg);
res.setStatus(Response.BAD_REQUEST);
channel.send(res);
return;
}
Object msg = req.getData();
try {
CompletionStage<Object> future = handler.reply(channel, msg);
future.whenComplete((appResult, t) -> {
try {
if (t == null) {
res.setStatus(Response.OK);
res.setResult(appResult);
} else {
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(t));
}
channel.send(res);
} catch (RemotingException e) {
logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e);
} finally {
}
});
} catch (Throwable e) {
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(e));
channel.send(res);
}
}
6.3 HeaderExchangeHandler#handleResponse
这个比较特殊, 是客户端发送请求后, 服务端返回响应数据, 客户端接收的数据类为Response, 就会调用handleResponse处理响应结果;
static void handleResponse(Channel channel, Response response) throws RemotingException {
if (response != null && !response.isHeartbeat()) {
DefaultFuture.received(channel, response);
}
}
- DefaultFuture#received
- 客户端请求数据时, Netty的channel#send方法是异步发送数据,发送完立即返回,没有返回值,为了等待返回的数据,AsyncToSyncInvoker异步转同步执行器会无限等待; 而客户端在HeaderExchangeChannel中,调用requst方法时,会创建一个DefaultFuture实例放入缓存DEFUTURES中,key为请求ID, value为DefaultFuture;
- DefaultFuture#received中,会从FUTURES缓存中,根据响应的ID拿到DefaultFuture实例, 如果future为空, 代表已经超时了。如果不为空, 调用future#doreceived方法,处理返回值;
- doReceived中,如果状态为Response.OK, 则通过Future#complete将返回值appResult设置给future,代表future的阻塞后的返回值, 也就是服务器执行结果在客户端显示;
- 如果发生超时, 则调用completeExceptionally抛出超时异常;
- 如果发生了业务异常,则将远程调用异常信息作为返回值,在客户端显示;
- 这个也就是Dubbo的异步变同步的机制;Spring与Redis的通道, openFeign的异步转同步机制也是差不多如此;
public static void received(Channel channel, Response response) {
received(channel, response, false);
}
public static void received(Channel channel, Response response, boolean timeout) {
try {
DefaultFuture future = FUTURES.remove(response.getId());
if (future != null) {
Timeout t = future.timeoutCheckTask;
if (!timeout) {
t.cancel();
}
future.doReceived(response);
} else {
logger.warn("The timeout response finally returned at "
+ (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
+ ", response " + response
+ (channel == null ? "" : ", channel: " + channel.getLocalAddress()
+ " -> " + channel.getRemoteAddress()));
}
} finally {
CHANNELS.remove(response.getId());
}
}
private void doReceived(Response res) {
if (res == null) {
throw new IllegalStateException("response cannot be null");
}
if (res.getStatus() == Response.OK) {
this.complete(res.getResult());
} else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()));
} else {
this.completeExceptionally(new RemotingException(channel, res.getErrorMessage()));
}
}
7. ExchangeHandlerAdapter#reply
- 将请求附带的数据转换为Invocation类型, 调用getInvoker方法,调用invoker#invoke方法,然后就是服务端Invoker层层包装链的执行;
- 调用完invoke放回一个Result实例;Result中实现了CompleteStage与Future接口, 因此具备异步编程类的基本功能, 调用esult.completionFuture().thenApply(Function.identity()) 统一返回一个CompleteFuture实例;
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
@Override
public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
Invocation inv = (Invocation) message;
Invoker<?> invoker = getInvoker(channel, inv);
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
Result result = invoker.invoke(inv);
return result.completionFuture().thenApply(Function.identity());
}
@Override
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
reply((ExchangeChannel) channel, message);
} else {
super.received(channel, message);
}
}
}
会调用invoker处理请求。但是这里的invoker是经过包装后的invoker;
在服务导出时, 会调用protocol#export(invoker)方法,经过SPI机制,会经过ProtocolFilterWrapper和ProtocolListenerWrapper包装, 生成不同的Invoker;
Invoker的包装次序类似如下:
- CallbackRegistrationInvoker#invoke
1.1 EchoFilter#invoke 1.2 ClassLoaderFilter#invoke 1.3 GenericFilter#invoke 1.4 ContextFilter#invoke 1.5 TimeoutFilter#invoke 1.6 ExceptionFilter#invoke - DelegateProviderMetaDataInvoker#invoke
- JavassitProxyFactory#AbstractProxyInvoker#invoke
8.CallbackRegistrationInvoker#invoke
static class CallbackRegistrationInvoker<T> implements Invoker<T> {
@Override
public Result invoke(Invocation invocation) throws RpcException {
Result asyncResult = filterInvoker.invoke(invocation);
asyncResult = asyncResult.whenCompleteWithContext((r, t) -> {
for (int i = filters.size() - 1; i >= 0; i--) {
Filter filter = filters.get(i);
if (filter instanceof ListenableFilter) {
Filter.Listener listener = ((ListenableFilter) filter).listener();
if (listener != null) {
if (t == null) {
listener.onResponse(r, filterInvoker, invocation);
} else {
listener.onError(t, filterInvoker, invocation);
}
}
} else {
filter.onResponse(r, filterInvoker, invocation);
}
}
});
return asyncResult;
}
}
9. EchoFilter#invoke
提供回声功能, 即客户端判断服务是不是可用的;如果方法名是echo, 则调用结束, 返回结果是参数的第一个值;
@Activate(group = CommonConstants.PROVIDER, order = -110000)
public class EchoFilter implements Filter {
@Override
public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
if (inv.getMethodName().equals($ECHO) && inv.getArguments() != null && inv.getArguments().length == 1) {
return AsyncRpcResult.newDefaultAsyncResult(inv.getArguments()[0], inv);
}
return invoker.invoke(inv);
}
}
10. ClassLoaderFilter#invoke
类加载过滤器: 设置当前线程的类加载器, 为Invoker的代理的接口类interface的类加载器。
@Activate(group = CommonConstants.PROVIDER, order = -30000)
public class ClassLoaderFilter implements Filter {
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
ClassLoader ocl = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(invoker.getInterface().getClassLoader());
try {
return invoker.invoke(invocation);
} finally {
Thread.currentThread().setContextClassLoader(ocl);
}
}
}
11. GenericFilter#invoke
Dubbo的泛化服务使用; 暂时没有用到;
@Activate(group = CommonConstants.PROVIDER, order = -20000)
public class GenericFilter extends ListenableFilter {
public GenericFilter() {
super.listener = new GenericListener();
}
@Override
public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
return invoker.invoke(inv);
}
12. ContextFilter#invoke
服务的RpcContext上下文对象 设置Invoker, 参数信息invocation, 与IP端口号信息;
@Activate(group = PROVIDER, order = -10000)
public class ContextFilter extends ListenableFilter {
private static final String TAG_KEY = "dubbo.tag";
public ContextFilter() {
super.listener = new ContextListener();
}
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
Map<String, String> attachments = invocation.getAttachments();
if (attachments != null) {
attachments = new HashMap<>(attachments);
attachments.remove(PATH_KEY);
attachments.remove(INTERFACE_KEY);
attachments.remove(GROUP_KEY);
attachments.remove(VERSION_KEY);
attachments.remove(DUBBO_VERSION_KEY);
attachments.remove(TOKEN_KEY);
attachments.remove(TIMEOUT_KEY);
attachments.remove(ASYNC_KEY);
attachments.remove(TAG_KEY);
attachments.remove(FORCE_USE_TAG);
}
RpcContext context = RpcContext.getContext();
context.setInvoker(invoker)
.setInvocation(invocation)
.setLocalAddress(invoker.getUrl().getHost(), invoker.getUrl().getPort());
String remoteApplication = invocation.getAttachment(REMOTE_APPLICATION_KEY);
if (StringUtils.isNotEmpty(remoteApplication)) {
context.setRemoteApplicationName(remoteApplication);
} else {
context.setRemoteApplicationName(RpcContext.getContext().getAttachment(REMOTE_APPLICATION_KEY));
}
if (attachments != null) {
if (RpcContext.getContext().getAttachments() != null) {
RpcContext.getContext().getAttachments().putAll(attachments);
} else {
RpcContext.getContext().setAttachments(attachments);
}
}
if (invocation instanceof RpcInvocation) {
((RpcInvocation) invocation).setInvoker(invoker);
}
try {
return invoker.invoke(invocation);
} finally {
RpcContext.removeContext();
RpcContext.removeServerContext();
}
}
13. TimeoutFilter#invoke
服务端的超时处理, 只会打印日志, 而不会终止服务的运行;
TimeoutListener#onResponse中, 会根据当前时间 - 开始时间 得出业务执行时间, 然后在和timeout配置比较, 大于timeout设置的超时时间, 就会打印警告日志
@Activate(group = CommonConstants.PROVIDER)
public class TimeoutFilter extends ListenableFilter {
private static final Logger logger = LoggerFactory.getLogger(TimeoutFilter.class);
private static final String TIMEOUT_FILTER_START_TIME = "timeout_filter_start_time";
public TimeoutFilter() {
super.listener = new TimeoutListener();
}
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
invocation.setAttachment(TIMEOUT_FILTER_START_TIME, String.valueOf(System.currentTimeMillis()));
return invoker.invoke(invocation);
}
static class TimeoutListener implements Listener {
@Override
public void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) {
String startAttach = invocation.getAttachment(TIMEOUT_FILTER_START_TIME);
if (startAttach != null) {
long elapsed = System.currentTimeMillis() - Long.valueOf(startAttach);
if (invoker.getUrl() != null && elapsed > invoker.getUrl().getMethodParameter(invocation.getMethodName(), "timeout", Integer.MAX_VALUE)) {
if (logger.isWarnEnabled()) {
logger.warn("invoke time out. method: " + invocation.getMethodName() + " arguments: " + Arrays.toString(invocation.getArguments()) + " , url is " + invoker.getUrl() + ", invoke elapsed " + elapsed + " ms.");
}
}
}
}
}
}
14. ExceptionFilter#invoke
@Activate(group = CommonConstants.PROVIDER)
public class ExceptionFilter extends ListenableFilter {
public ExceptionFilter() {
super.listener = new ExceptionListener();
}
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
return invoker.invoke(invocation);
}
15. DelegateProviderMetaDataInvoker#invoke
啥都没干;
public class DelegateProviderMetaDataInvoker<T> implements Invoker {
protected final Invoker<T> invoker;
private ServiceConfig metadata;
@Override
public Result invoke(Invocation invocation) throws RpcException {
return invoker.invoke(invocation);
}
16. AbstractProxyInvoker#invoke
真正的执行业务方法method了。
- 调用doInvoke方法, 传入proxy代理实例, 方法名methodName, 参数类型parameterTypes, 方法参数arguments;返回一个Object值;
- 将Object返回值value波安装为CompleteFuture实例future;
- 创建AsyncRpcResult 实例asyncRpcResult
- 当方法执行完后, 设置会调用方法future.whenComplete,回调方法中,创建AppResponse实例, 代表响应的内容, 根据是否发生异常设置响应的内容 ; 而AppResponse与Response的区别在于代表整个响应,包括内容,状态码等;
- 为什么要将Object结果value,包装为CompleteFuture实例呢 ?
因此在Dubbo中,如果业务方法中,返回值是一个CompleteFuture类型的话,代表的时异步方式, 会当作一个异步方法, ; 而在常见情况下,我们返回一般都是同步调用,返回值可以是String等,因此统一封装成了CompleteFuture实例future; 又因为同步方式是需要返回值 , 所以通过future#whencomplete来获取返回值。
public abstract class AbstractProxyInvoker<T> implements Invoker<T> {
@Override
public Result invoke(Invocation invocation) throws RpcException {
try {
Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());
CompletableFuture<Object> future = wrapWithFuture(value, invocation);
AsyncRpcResult asyncRpcResult = new AsyncRpcResult(invocation);
future.whenComplete((obj, t) -> {
AppResponse result = new AppResponse();
if (t != null) {
if (t instanceof CompletionException) {
result.setException(t.getCause());
} else {
result.setException(t);
}
} else {
result.setValue(obj);
}
asyncRpcResult.complete(result);
});
return asyncRpcResult;
} catch (InvocationTargetException e) {
return AsyncRpcResult.newDefaultAsyncResult(null, e.getTargetException(), invocation);
} catch (Throwable e) {
}
private CompletableFuture<Object> wrapWithFuture (Object value, Invocation invocation) {
if (RpcContext.getContext().isAsyncStarted()) {
return ((AsyncContextImpl)(RpcContext.getContext().getAsyncContext())).getInternalFuture();
} else if (value instanceof CompletableFuture) {
return (CompletableFuture<Object>) value;
}
return CompletableFuture.completedFuture(value);
}
protected abstract Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable;
wrapper.invokeMethod方式调用method方法, 最终应该是会获取Method方法,通过反射method#invoke执行。这里的动态代理类是在运行时动态生成的, 看不到直接的实现。
public class JavassistProxyFactory extends AbstractProxyFactory {
@Override
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
}
|