需要了解动态反射和RPC
概述
Akka系统的核心ActorSystem和Actor,若需构建一个Akka系统,首先需要创建ActorSystem,创建完ActorSystem后,可通过其创建Actor(注意:Akka不允许直接new一个Actor,只能通过 Akka 提供的某些 API 才能创建或查找 Actor,一般会通过 ActorSystem#actorOf和ActorContext#actorOf来创建 Actor),另外,我们只能通过ActorRef(Actor的引用, 其对原生的 Actor 实例做了良好的封装,外界不能随意修改其内部状态)来与Actor进行通信。 1、ActorSystem 是管理 Actor生命周期的组件, Actor是负责进行通信的组件 2、每个 Actor 都有一个 MailBox,别的 Actor 发送给它的消息都首先储存在 MailBox 中,通过这种 方式可以实现异步通信。 3、每个 Actor 是单线程的处理方式,不断的从 MailBox 拉取消息执行处理,所以对于 Actor 的消息处 理,不适合调用会阻塞的处理方法。 4、Actor 可以改变他自身的状态,可以接收消息,也可以发送消息,还可以生成新的 Actor。 5、如果一个 Actor 要和另外一个 Actor进行通信,则必须先获取对方 Actor 的ActorRef 对象,然后通过该对象发送消息即可。 6、通过 tell 发送异步消息,不接收响应,通过 ask 发送异步消息,得到 Future 返回,通过异步返回处理结果。 7、当在任意地方发现要创建这四个组件的任何一个组件的实例对象的时候,创建成功了之后,都会要去执行他的 onStart() ,在集群启动的源码分析中,其实这些组件的很多的工作流程,都被放在 onStart() 里面。 先执行构造方法,后执行onStart方法。
四个组件
1、RpcGateway 网关(路由),各种其他RPC组件,都是 RpcGateWay 的子类 2、RpcServer RpcService 和 RpcEndpoint 之间的粘合层 3、RpcEndpoint 业务逻辑载体,对应的 Actor 的封装 4、RpcService 对应 ActorSystem 的封装
启动流程
在RpcEndpoint中还定义了一些方法如runAsync(Runnable)、callAsync(Callable, Time)方法来执行Rpc调用,值得注意的是在Flink的设计中,对于同一个Endpoint,所有的调用都运行在主线程,因此不会有并发问题,当启动RpcEndpoint/进行Rpc调用时,其会委托RcpServer进行处理。进入rpcService.startServer(this),在RpcService中调用connect()方法与对端的RpcEndpoint(RpcServer)建立连接,connect()方法根据给的地址返回InvocationHandler(AkkaInvocationHandler或FencedAkkaInvocationHandler,也就是对方的代理) xxxRpcGateWay(例如connect(rpcEndpoint.getAddress())) ,连接后返回一个RpcGateway,即是他的实现类CompletableFuture。AkkaRpcService中封装了ActorSystem,并保存了ActorRef到RpcEndpoint的映射关系,在构造RpcEndpoint时会启动指定rpcEndpoint上的RpcServer,其会根据Endpoint类型(FencedRpcEndpoint或其他)来创建不同的Actor(FencedAkkaRpcActor或AkkaRpcActor),并将RpcEndpoint和Actor对应的ActorRef保存起来,然后使用动态代理创建RpcServer,具体代码如下:
rpcService.startServer(this)
RPCEndpoint:
protected RpcEndpoint(final RpcService rpcService, final String endpointId) {
this.rpcService = checkNotNull(rpcService, "rpcService");
this.endpointId = checkNotNull(endpointId, "endpointId");
this.rpcServer = rpcService.startServer(this);
this.mainThreadExecutor = new MainThreadExecutor(rpcServer, this::validateRunsInMainThread);
}
public <C extends RpcEndpoint & RpcGateway> RpcServer startServer(C rpcEndpoint) {
checkNotNull(rpcEndpoint, "rpc endpoint");
CompletableFuture<Void> terminationFuture = new CompletableFuture<>();
final Props akkaRpcActorProps;
if (rpcEndpoint instanceof FencedRpcEndpoint) {
akkaRpcActorProps = Props.create(
FencedAkkaRpcActor.class,
rpcEndpoint,
terminationFuture,
getVersion(),
configuration.getMaximumFramesize());
} else {
akkaRpcActorProps = Props.create(
AkkaRpcActor.class,
rpcEndpoint,
terminationFuture,
getVersion(),
configuration.getMaximumFramesize());
}
ActorRef actorRef;
synchronized (lock) {
checkState(!stopped, "RpcService is stopped");
actorRef = actorSystem.actorOf(akkaRpcActorProps, rpcEndpoint.getEndpointId());
actors.put(actorRef, rpcEndpoint);
}
LOG.info("Starting RPC endpoint for {} at {} .", rpcEndpoint.getClass().getName(), actorRef.path());
final String akkaAddress = AkkaUtils.getAkkaURL(actorSystem, actorRef);
final String hostname;
Option<String> host = actorRef.path().address().host();
if (host.isEmpty()) {
hostname = "localhost";
} else {
hostname = host.get();
}
Set<Class<?>> implementedRpcGateways = new HashSet<>(RpcUtils.extractImplementedRpcGateways(rpcEndpoint.getClass()));
implementedRpcGateways.add(RpcServer.class);
implementedRpcGateways.add(AkkaBasedEndpoint.class);
final InvocationHandler akkaInvocationHandler;
if (rpcEndpoint instanceof FencedRpcEndpoint) {
akkaInvocationHandler = new FencedAkkaInvocationHandler<>(
akkaAddress,
hostname,
actorRef,
configuration.getTimeout(),
configuration.getMaximumFramesize(),
terminationFuture,
((FencedRpcEndpoint<?>) rpcEndpoint)::getFencingToken);
implementedRpcGateways.add(FencedMainThreadExecutable.class);
} else {
akkaInvocationHandler = new AkkaInvocationHandler(
akkaAddress,
hostname,
actorRef,
configuration.getTimeout(),
configuration.getMaximumFramesize(),
terminationFuture);
}
ClassLoader classLoader = getClass().getClassLoader();
@SuppressWarnings("unchecked")
RpcServer server = (RpcServer) Proxy.newProxyInstance(
classLoader,
implementedRpcGateways.toArray(new Class<?>[implementedRpcGateways.size()]),
akkaInvocationHandler);
return server;
}
-
调用RpcEndpoint#start; -
委托给RpcServer#start; -
调用动态代理的AkkaInvocationHandler#invoke;发现调用的是StartStoppable#start方法,则直接进行本地方法调用;invoke方法的代码如下: RPCEndpoint:
public final void start() {
rpcServer.start();
}
AkkaInvocationHandler:
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Class<?> declaringClass = method.getDeclaringClass();
Object result;
if (declaringClass.equals(AkkaBasedEndpoint.class) ||
declaringClass.equals(Object.class) ||
declaringClass.equals(RpcGateway.class) ||
declaringClass.equals(StartStoppable.class) ||
declaringClass.equals(MainThreadExecutable.class) ||
declaringClass.equals(RpcServer.class)) {
result = method.invoke(this, args);
} else if (declaringClass.equals(FencedRpcGateway.class)) {
throw new UnsupportedOperationException("AkkaInvocationHandler does not support the call FencedRpcGateway#" +
method.getName() + ". This indicates that you retrieved a FencedRpcGateway without specifying a " +
"fencing token. Please use RpcService#connect(RpcService, F, Time) with F being the fencing token to " +
"retrieve a properly FencedRpcGateway.");
} else {
result = invokeRpc(method, args);
}
return result;
}
-
调用AkkaInvocationHandler#start; -
通过ActorRef#tell给对应的Actor发送消息rpcEndpoint.tell(ControlMessages.START, ActorRef.noSender()); -
调用AkkaRpcActor#handleControlMessage处理控制类型消息; -
在主线程中将自身状态变更为Started状态; 经过上述步骤就完成了Actor的启动过程,Actor启动后便可与Acto通信让其执行代码(如runSync/callSync等)和处理Rpc请求了。下面分别介绍处理执行代码和处理Rpc请求;
Rpc调用流程
AkkaInvocationHandler#invokeRpc,其方法如下:
private Object invokeRpc(Method method, Object[] args) throws Exception {
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
Annotation[][] parameterAnnotations = method.getParameterAnnotations();
Time futureTimeout = extractRpcTimeout(parameterAnnotations, args, timeout);
final RpcInvocation rpcInvocation = createRpcInvocationMessage(methodName, parameterTypes, args);
Class<?> returnType = method.getReturnType();
final Object result;
if (Objects.equals(returnType, Void.TYPE)) {
tell(rpcInvocation);
result = null;
} else {
CompletableFuture<?> resultFuture = ask(rpcInvocation, futureTimeout);
CompletableFuture<?> completableFuture = resultFuture.thenApply((Object o) -> {
if (o instanceof SerializedValue) {
try {
return ((SerializedValue<?>) o).deserializeValue(getClass().getClassLoader());
} catch (IOException | ClassNotFoundException e) {
throw new CompletionException(
new RpcException("Could not deserialize the serialized payload of RPC method : "
+ methodName, e));
}
} else {
return o;
}
});
if (Objects.equals(returnType, CompletableFuture.class)) {
result = completableFuture;
} else {
try {
result = completableFuture.get(futureTimeout.getSize(), futureTimeout.getUnit());
} catch (ExecutionException ee) {
throw new RpcException("Failure while obtaining synchronous RPC result.", ExceptionUtils.stripExecutionException(ee));
}
}
}
return result;
}
然后转到服务器接收
AkkaRpcActor#handleRpcInvocation,其代码如下:
private void handleRpcInvocation(RpcInvocation rpcInvocation) {
Method rpcMethod = null;
try {
String methodName = rpcInvocation.getMethodName();
Class<?>[] parameterTypes = rpcInvocation.getParameterTypes();
rpcMethod = lookupRpcMethod(methodName, parameterTypes);
} catch (ClassNotFoundException e) {
log.error("Could not load method arguments.", e);
RpcConnectionException rpcException = new RpcConnectionException("Could not load method arguments.", e);
getSender().tell(new Status.Failure(rpcException), getSelf());
} catch (IOException e) {
log.error("Could not deserialize rpc invocation message.", e);
RpcConnectionException rpcException = new RpcConnectionException("Could not deserialize rpc invocation message.", e);
getSender().tell(new Status.Failure(rpcException), getSelf());
} catch (final NoSuchMethodException e) {
log.error("Could not find rpc method for rpc invocation.", e);
RpcConnectionException rpcException = new RpcConnectionException("Could not find rpc method for rpc invocation.", e);
getSender().tell(new Status.Failure(rpcException), getSelf());
}
if (rpcMethod != null) {
try {
rpcMethod.setAccessible(true);
if (rpcMethod.getReturnType().equals(Void.TYPE)) {
rpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs());
}
else {
final Object result;
try {
result = rpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs());
}
catch (InvocationTargetException e) {
log.debug("Reporting back error thrown in remote procedure {}", rpcMethod, e);
getSender().tell(new Status.Failure(e.getTargetException()), getSelf());
return;
}
final String methodName = rpcMethod.getName();
if (result instanceof CompletableFuture) {
final CompletableFuture<?> responseFuture = (CompletableFuture<?>) result;
sendAsyncResponse(responseFuture, methodName);
} else {
sendSyncResponse(result, methodName);
}
}
} catch (Throwable e) {
log.error("Error while executing remote procedure call {}.", rpcMethod, e);
getSender().tell(new Status.Failure(e), getSelf());
}
}
}
- 将结果返回给调用者AkkaInvocationHandler#ask;
补充:
AkkaRpcActor,会根据类型的不同,进行不同的处理
protected void handleRpcMessage(Object message) {
if (message instanceof RunAsync) {
handleRunAsync((RunAsync) message);
} else if (message instanceof CallAsync) {
handleCallAsync((CallAsync) message);
} else if (message instanceof RpcInvocation) {
handleRpcInvocation((RpcInvocation) message);
} else {
log.warn(
"Received message of unknown type {} with value {}. Dropping this message!",
message.getClass().getName(),
message);
sendErrorIfSender(new AkkaUnknownMessageException("Received unknown message " + message +
" of type " + message.getClass().getSimpleName() + '.'));
}
}
|