IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 网络协议 -> Flink RPC源码流程 -> 正文阅读

[网络协议]Flink RPC源码流程

需要了解动态反射和RPC

概述

Akka系统的核心ActorSystem和Actor,若需构建一个Akka系统,首先需要创建ActorSystem,创建完ActorSystem后,可通过其创建Actor(注意:Akka不允许直接new一个Actor,只能通过 Akka 提供的某些 API 才能创建或查找 Actor,一般会通过 ActorSystem#actorOf和ActorContext#actorOf来创建 Actor),另外,我们只能通过ActorRef(Actor的引用, 其对原生的 Actor 实例做了良好的封装,外界不能随意修改其内部状态)来与Actor进行通信。
1、ActorSystem 是管理 Actor生命周期的组件, Actor是负责进行通信的组件
2、每个 Actor 都有一个 MailBox,别的 Actor 发送给它的消息都首先储存在 MailBox 中,通过这种
方式可以实现异步通信。
3、每个 Actor 是单线程的处理方式,不断的从 MailBox 拉取消息执行处理,所以对于 Actor 的消息处
理,不适合调用会阻塞的处理方法。
4、Actor 可以改变他自身的状态,可以接收消息,也可以发送消息,还可以生成新的 Actor。
5、如果一个 Actor 要和另外一个 Actor进行通信,则必须先获取对方 Actor 的ActorRef 对象,然后通过该对象发送消息即可。
6、通过 tell 发送异步消息,不接收响应,通过 ask 发送异步消息,得到 Future 返回,通过异步返回处理结果。
7、当在任意地方发现要创建这四个组件的任何一个组件的实例对象的时候,创建成功了之后,都会要去执行他的 onStart() ,在集群启动的源码分析中,其实这些组件的很多的工作流程,都被放在 onStart() 里面。 先执行构造方法,后执行onStart方法。
在这里插入图片描述

四个组件

1、RpcGateway 网关(路由),各种其他RPC组件,都是 RpcGateWay 的子类
2、RpcServer RpcService 和 RpcEndpoint 之间的粘合层
3、RpcEndpoint 业务逻辑载体,对应的 Actor 的封装
4、RpcService 对应 ActorSystem 的封装
在这里插入图片描述

启动流程

在RpcEndpoint中还定义了一些方法如runAsync(Runnable)、callAsync(Callable, Time)方法来执行Rpc调用,值得注意的是在Flink的设计中,对于同一个Endpoint,所有的调用都运行在主线程,因此不会有并发问题,当启动RpcEndpoint/进行Rpc调用时,其会委托RcpServer进行处理。进入rpcService.startServer(this),在RpcService中调用connect()方法与对端的RpcEndpoint(RpcServer)建立连接,connect()方法根据给的地址返回InvocationHandler(AkkaInvocationHandler或FencedAkkaInvocationHandler,也就是对方的代理) xxxRpcGateWay(例如connect(rpcEndpoint.getAddress())) ,连接后返回一个RpcGateway,即是他的实现类CompletableFuture。AkkaRpcService中封装了ActorSystem,并保存了ActorRef到RpcEndpoint的映射关系,在构造RpcEndpoint时会启动指定rpcEndpoint上的RpcServer,其会根据Endpoint类型(FencedRpcEndpoint或其他)来创建不同的Actor(FencedAkkaRpcActor或AkkaRpcActor),并将RpcEndpoint和Actor对应的ActorRef保存起来,然后使用动态代理创建RpcServer,具体代码如下:

rpcService.startServer(this)

RPCEndpointprotected RpcEndpoint(final RpcService rpcService, final String endpointId) {
		this.rpcService = checkNotNull(rpcService, "rpcService");
		this.endpointId = checkNotNull(endpointId, "endpointId");
		//在构造RpcEndpoint时会启动指定rpcEndpoint上的RpcServer
		this.rpcServer = rpcService.startServer(this);
		this.mainThreadExecutor = new MainThreadExecutor(rpcServer, this::validateRunsInMainThread);
	}
public <C extends RpcEndpoint & RpcGateway> RpcServer startServer(C rpcEndpoint) {
        checkNotNull(rpcEndpoint, "rpc endpoint");

        CompletableFuture<Void> terminationFuture = new CompletableFuture<>();
final Props akkaRpcActorProps;
// 根据RpcEndpoint类型创建不同类型的Props
if (rpcEndpoint instanceof FencedRpcEndpoint) {
            akkaRpcActorProps = Props.create(
                FencedAkkaRpcActor.class,
                rpcEndpoint,
                terminationFuture,
                getVersion(),
                configuration.getMaximumFramesize());
        } else {
            akkaRpcActorProps = Props.create(
                AkkaRpcActor.class,
                rpcEndpoint,
                terminationFuture,
                getVersion(),
                configuration.getMaximumFramesize());
        }

        ActorRef actorRef;
// 同步块,创建Actor,并获取对应的ActorRef
synchronized (lock) {
            checkState(!stopped, "RpcService is stopped");
            actorRef = actorSystem.actorOf(akkaRpcActorProps, rpcEndpoint.getEndpointId());
            actors.put(actorRef, rpcEndpoint);
        }

        LOG.info("Starting RPC endpoint for {} at {} .", rpcEndpoint.getClass().getName(), actorRef.path());

// 获取Actor的路径
final String akkaAddress = AkkaUtils.getAkkaURL(actorSystem, actorRef);
final String hostname;
        Option<String> host = actorRef.path().address().host();
if (host.isEmpty()) {
            hostname = "localhost";
        } else {
            hostname = host.get();
        }
// 解析该RpcEndpoint实现的所有RpcGateway接口
        Set<Class<?>> implementedRpcGateways = new HashSet<>(RpcUtils.extractImplementedRpcGateways(rpcEndpoint.getClass()));
        
// 额外添加RpcServer和AkkaBasedEnpoint类
        implementedRpcGateways.add(RpcServer.class);
        implementedRpcGateways.add(AkkaBasedEndpoint.class);

final InvocationHandler akkaInvocationHandler;

// 根据不同类型动态创建代理对象
if (rpcEndpoint instanceof FencedRpcEndpoint) {
// a FencedRpcEndpoint needs a FencedAkkaInvocationHandler
            akkaInvocationHandler = new FencedAkkaInvocationHandler<>(
                akkaAddress,
                hostname,
                actorRef,
                configuration.getTimeout(),
                configuration.getMaximumFramesize(),
                terminationFuture,
                ((FencedRpcEndpoint<?>) rpcEndpoint)::getFencingToken);

            implementedRpcGateways.add(FencedMainThreadExecutable.class);
        } else {
            akkaInvocationHandler = new AkkaInvocationHandler(
                akkaAddress,
                hostname,
                actorRef,
                configuration.getTimeout(),
                configuration.getMaximumFramesize(),
                terminationFuture);
        }

// Rather than using the System ClassLoader directly, we derive the ClassLoader
// from this class . That works better in cases where Flink runs embedded and all Flink
// code is loaded dynamically (for example from an OSGI bundle) through a custom ClassLoader
        ClassLoader classLoader = getClass().getClassLoader();

// 生成RpcServer对象,而后对该server的调用都会进入Handler的invoke方法处理,handler实现了多个接口的方法
@SuppressWarnings("unchecked")
        RpcServer server = (RpcServer) Proxy.newProxyInstance(
            classLoader,
            implementedRpcGateways.toArray(new Class<?>[implementedRpcGateways.size()]),
            akkaInvocationHandler);

return server;
    }
  • 调用RpcEndpoint#start;

  • 委托给RpcServer#start;

  • 调用动态代理的AkkaInvocationHandler#invoke;发现调用的是StartStoppable#start方法,则直接进行本地方法调用;invoke方法的代码如下:

    RPCEndpointpublic final void start() {
    		rpcServer.start();
    	}
    AkkaInvocationHandlerpublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
          Class<?> declaringClass = method.getDeclaringClass();
    
          Object result;
    // 先匹配指定类型(handler已实现接口的方法),若匹配成功则直接进行本地方法调用;若匹配为FencedRpcGateway类型,则抛出异常(应该在FencedAkkaInvocationHandler中处理);其他则进行Rpc调用
    if (declaringClass.equals(AkkaBasedEndpoint.class) ||
              declaringClass.equals(Object.class) ||
              declaringClass.equals(RpcGateway.class) ||
              declaringClass.equals(StartStoppable.class) ||
              declaringClass.equals(MainThreadExecutable.class) ||
              declaringClass.equals(RpcServer.class)) {
              result = method.invoke(this, args);
          } else if (declaringClass.equals(FencedRpcGateway.class)) {
    throw new UnsupportedOperationException("AkkaInvocationHandler does not support the call FencedRpcGateway#" +
                  method.getName() + ". This indicates that you retrieved a FencedRpcGateway without specifying a " +
    "fencing token. Please use RpcService#connect(RpcService, F, Time) with F being the fencing token to " +
    "retrieve a properly FencedRpcGateway.");
          } else {
              result = invokeRpc(method, args);
          }
    
    return result;
      }
    
    • 调用AkkaInvocationHandler#start;

    • 通过ActorRef#tell给对应的Actor发送消息rpcEndpoint.tell(ControlMessages.START, ActorRef.noSender());

    • 调用AkkaRpcActor#handleControlMessage处理控制类型消息;

    • 在主线程中将自身状态变更为Started状态;

    经过上述步骤就完成了Actor的启动过程,Actor启动后便可与Acto通信让其执行代码(如runSync/callSync等)和处理Rpc请求了。下面分别介绍处理执行代码和处理Rpc请求;

Rpc调用流程

AkkaInvocationHandler#invokeRpc,其方法如下:

private Object invokeRpc(Method method, Object[] args) throws Exception {
// 获取方法相应的信息
String methodName = method.getName();
      Class<?>[] parameterTypes = method.getParameterTypes();
      Annotation[][] parameterAnnotations = method.getParameterAnnotations();
      Time futureTimeout = extractRpcTimeout(parameterAnnotations, args, timeout);

// 创建RpcInvocationMessage(可分为LocalRpcInvocation/RemoteRpcInvocation)
      final RpcInvocation rpcInvocation = createRpcInvocationMessage(methodName, parameterTypes, args);

      Class<?> returnType = method.getReturnType();

      final Object result;

// 无返回,则使用tell方法
if (Objects.equals(returnType, Void.TYPE)) {
          tell(rpcInvocation);

          result = null;
      } else {
// execute an asynchronous call
// 有返回,则使用ask方法
          CompletableFuture<?> resultFuture = ask(rpcInvocation, futureTimeout);

          CompletableFuture<?> completableFuture = resultFuture.thenApply((Object o) -> {
// 调用返回后进行反序列化
if (o instanceof SerializedValue) {
try {
return  ((SerializedValue<?>) o).deserializeValue(getClass().getClassLoader());
                  } catch (IOException | ClassNotFoundException e) {
throw new CompletionException(
new RpcException("Could not deserialize the serialized payload of RPC method : "
                              + methodName, e));
                  }
              } else {
// 直接返回
return o;
              }
          });

// 若返回类型为CompletableFuture则直接赋值
if (Objects.equals(returnType, CompletableFuture.class)) {
              result = completableFuture;
          } else {
try {
// 从CompletableFuture获取
                  result = completableFuture.get(futureTimeout.getSize(), futureTimeout.getUnit());
              } catch (ExecutionException ee) {
throw new RpcException("Failure while obtaining synchronous RPC result.", ExceptionUtils.stripExecutionException(ee));
              }
          }
      }

return result;
  }

然后转到服务器接收

AkkaRpcActor#handleRpcInvocation,其代码如下:

private void handleRpcInvocation(RpcInvocation rpcInvocation) {
      Method rpcMethod = null;

try {
// 获取方法的信息
          String methodName = rpcInvocation.getMethodName();
          Class<?>[] parameterTypes = rpcInvocation.getParameterTypes();

// 在RpcEndpoint中找指定方法
          rpcMethod = lookupRpcMethod(methodName, parameterTypes);
      } catch (ClassNotFoundException e) {
          log.error("Could not load method arguments.", e);

// 异常处理
          RpcConnectionException rpcException = new RpcConnectionException("Could not load method arguments.", e);
          getSender().tell(new Status.Failure(rpcException), getSelf());
      } catch (IOException e) {
          log.error("Could not deserialize rpc invocation message.", e);
// 异常处理
          RpcConnectionException rpcException = new RpcConnectionException("Could not deserialize rpc invocation message.", e);
          getSender().tell(new Status.Failure(rpcException), getSelf());
      } catch (final NoSuchMethodException e) {
          log.error("Could not find rpc method for rpc invocation.", e);
// 异常处理
          RpcConnectionException rpcException = new RpcConnectionException("Could not find rpc method for rpc invocation.", e);
          getSender().tell(new Status.Failure(rpcException), getSelf());
      }

if (rpcMethod != null) {
try {
// this supports declaration of anonymous classes
              rpcMethod.setAccessible(true);

// 返回类型为空则直接进行invoke
if (rpcMethod.getReturnType().equals(Void.TYPE)) {
// No return value to send back
                  rpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs());
              }
else {
final Object result;
try {
                      result = rpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs());
                  }
catch (InvocationTargetException e) {
                      log.debug("Reporting back error thrown in remote procedure {}", rpcMethod, e);

// tell the sender about the failure
                      getSender().tell(new Status.Failure(e.getTargetException()), getSelf());
						return;
                  }

final String methodName = rpcMethod.getName();

// 方法返回类型为CompletableFuture
if (result instanceof CompletableFuture) {
final CompletableFuture<?> responseFuture = (CompletableFuture<?>) result;
// 发送结果(使用Patterns发送结果给调用者,并会进行序列化并验证结果大小)
                      sendAsyncResponse(responseFuture, methodName);
                  } else {
// 类型非CompletableFuture,发送结果(使用Patterns发送结果给调用者,并会进行序列化并验证结果大小)
                      sendSyncResponse(result, methodName);
                  }
              }
          } catch (Throwable e) {
              log.error("Error while executing remote procedure call {}.", rpcMethod, e);
// tell the sender about the failure
              getSender().tell(new Status.Failure(e), getSelf());
          }
      }
  }
  • 将结果返回给调用者AkkaInvocationHandler#ask;

补充:

AkkaRpcActor,会根据类型的不同,进行不同的处理

protected void handleRpcMessage(Object message) {
// 根据消息类型不同进行不同的处理
if (message instanceof RunAsync) {
          handleRunAsync((RunAsync) message);
      } else if (message instanceof CallAsync) {
          handleCallAsync((CallAsync) message);
      } else if (message instanceof RpcInvocation) {
          handleRpcInvocation((RpcInvocation) message);
      } else {
          log.warn(
"Received message of unknown type {} with value {}. Dropping this message!",
              message.getClass().getName(),
              message);

          sendErrorIfSender(new AkkaUnknownMessageException("Received unknown message " + message +
" of type " + message.getClass().getSimpleName() + '.'));
      }
  }
  网络协议 最新文章
使用Easyswoole 搭建简单的Websoket服务
常见的数据通信方式有哪些?
Openssl 1024bit RSA算法---公私钥获取和处
HTTPS协议的密钥交换流程
《小白WEB安全入门》03. 漏洞篇
HttpRunner4.x 安装与使用
2021-07-04
手写RPC学习笔记
K8S高可用版本部署
mySQL计算IP地址范围
上一篇文章      下一篇文章      查看所有文章
加:2022-02-26 12:07:35  更:2022-02-26 12:09:17 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/5 8:33:41-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码