IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 网络协议 -> Scala中使用SOFA jraft 实现rpc的优化 二 -> 正文阅读

[网络协议]Scala中使用SOFA jraft 实现rpc的优化 二

背景

第一版实现Processable宏之后,各方面已经满足需求,也正常使用,并引入进bitlap,使得我们能以方法的形式管理所有Processor对象,而不需要创建太多的类文件。
但是细心的人会发现,为了实现这个小小的功能,我们在scala-macro-tools中引入了重量级包"com.alipay.sofa" % "jraft-core" % "1.3.9"。同时为了测试,我们还引入了protobuf-java

特别是sofa这个包,场景有限,当我们不需要使用Processable宏,却被迫引入了这么大的jar,着实是划不来的。所以接下来优化目的是减小包的依赖。

之所以我们需要依赖jraft,是因为我们期望通过泛型参数限定符([_ <: Rpcxxx])来自动使用编译期的类型检测,现在我们可以换个思路,能不能既保留类型检查又不需要引入包?如果我们在宏的实现中自己检查类型呢?

答案当然有,那就是类似gradle的API依赖,将依赖的导入转交给使用者决定。在宏中这种方式更为彻底,甚至都不需要引入包,只需在代码中直接用全类名即可,因为宏没有调用前不会类型检查。

使用该宏的人不引入jraft则会编译不了

重新设计Processable

为此需要新增泛型参数,同时在宏里面处理类型检查。该实现和上篇文章一样,有三种不同定义,下面先给出最简洁的一种定义:

一 简单粗暴

只有泛型和2个函数参数

宏定义如下:

  def apply[Service, RRC, RRP[_ <: Req], RC, Req, Resp]
    (
    processRequest:   (Service, RRC, Req) ? Resp,
    processException: (Service, RC, Exception) ? Resp
  ): RRP[Req] = macro ProcessorCreatorMacro.OnlyWithFunctionalParameters[Service, RRC, RRP[_ <: Req], RC, Req, Resp]
}

宏实现如下:

  def OnlyWithFunctionalParameters[Service: c.WeakTypeTag, RRC: c.WeakTypeTag, RRP: c.WeakTypeTag, RC: c.WeakTypeTag, Req: c.WeakTypeTag, Resp: c.WeakTypeTag]
    (c: blackbox.Context)(
    processRequest:   c.Expr[(Service, RRC, Req) ? Req],
    processException: c.Expr[(Service, RC, Exception) ? Req]
  ): c.Expr[RRP] = {
    import c.universe._
    // 类型检查,这里是编译期间的检查,如果宏不被调用,则不会执行,这很重要,这样我们才能实现下面的代码,这是前提!!!
    checkTree[RRC, RRP, RC, Service](c)
    // 使用弱类型标记得到类型即可,类型检查会弱点,因为弱标记,serviceType中的属性有的是空的,但是在这里我们不需要那么多信息,有类型就足够。
    val serviceType = weakTypeOf[Service]
    // 类型使用ID自增计数拼接一个前缀
    val className = TypeName(classNamePrefix + MacroCache.getIdentityId)
    val reqProtoType = weakTypeOf[Req]
    val rpcRequestClosureType = weakTypeOf[RRC]
    val rpcContextType = weakTypeOf[RC]
    val respProtoType = weakTypeOf[Resp]
    val respProtoCompanionType = weakTypeOf[Resp].companion //getDefaultInstance is static method, it's in companion
    val processor =
      q"""
       // 前面使用了通用的父类对RpcRequestProcessor做了一层简单包装,这里已经不需要了。
       class $className(private val service: $serviceType, executor: java.util.concurrent.Executor = null)
         // getDefaultInstance是Resp的静态方法,所以使用respProtoCompanionType,这个是Resp的伴生对象
         extends com.alipay.sofa.jraft.rpc.RpcRequestProcessor[$reqProtoType](executor, $respProtoCompanionType.getDefaultInstance)
         with com.typesafe.scalalogging.LazyLogging {
            override def handleRequest(rpcCtx: com.alipay.sofa.jraft.rpc.RpcContext, request: $reqProtoType) {
              try {
                val msg = processRequest(request, new com.alipay.sofa.jraft.rpc.RpcRequestClosure(rpcCtx, this.defaultResp))
                if (msg != null) {
                  rpcCtx.sendResponse(msg)
                }
              } catch {
                case e: Exception =>
                  logger.error("handleRequest" + request + "failed", e)
                  rpcCtx.sendResponse(processError(rpcCtx, e))
              }
            }
           override def interest(): String = classOf[$reqProtoType].getName

           def processRequest(request: $reqProtoType, done: $rpcRequestClosureType): $respProtoType = {
              $processRequest(service, done, request)
           }

           def processError(rpcCtx: $rpcContextType, exception: Exception): $respProtoType = {
              $processException(service, rpcCtx, exception)
           }
       }
       //运行时反射service对象,使用无参构造函数
       val service = new io.github.dreamylost.macros.Creator[$serviceType].createInstance(null)(0)
       new $className(service)
     """
    printTree[RRP](c)(processor)
  }

这样就完成了新的宏设计,来看怎么使用:

    // 参数有点多?其实对于ProcessorCreator来说,四个参数对一个服务而言基本是相同的,NetService, RpcRequestClosure, RpcRequestProcessor, RpcContext
    // 可以再用个工具类包层即可,但是考虑到依赖,应该由使用者自己实现,这很简单。
    val openSession = ProcessorCreator[NetService, RpcRequestClosure, RpcRequestProcessor, RpcContext, BOpenSessionReq, BOpenSessionResp](
      (service, rpc, req) => {
        import scala.jdk.CollectionConverters.MapHasAsScala
        val username = req.getUsername
        val password = req.getPassword
        val configurationMap = req.getConfigurationMap
        val ret = service.openSession(username, password, configurationMap.asScala.toMap)
        BOpenSessionResp.newBuilder().setSessionHandle(ret).build()
      },
      (service, rpc, exception) => {
        BOpenSessionResp.newBuilder().setStatus(exception.getLocalizedMessage).build()
      }
    )

二 灵活拓展

灵活又不失优雅

宏定义如下:

  def apply[RRC, RRP[_ <: Req], RC, Req, Resp, Service, E <: Executor]
    (
    defaultResp:      Resp,
    processRequest:   (Service, RRC, Req) ? Resp,
    processException: (Service, RC, Exception) ? Resp
  )(implicit service: Service, executor: E): RRP[Req] = macro ProcessorCreatorMacro.SimpleImpl[RRC, RRP[_ <: Req], RC, Req, Resp, Service, E]

由于serviceexecutor通常是固定的,所以我们使用implicit传入,既支持了自定义参数,又能方便的被别的Processor复用。

宏的使用如下:

    // 注意:这样所以同作用域的Processor对象将使用一个该类型的对象,这样很方便,不用每次都传进去
    implicit val service = new NetService
    implicit val executor: Executor = new Executor {
      override def execute(command: Runnable): Unit = ()
    }

    val openSession = ProcessorCreator[RpcRequestClosure, RpcRequestProcessor, RpcContext, BOpenSessionReq, BOpenSessionResp, NetService, Executor](
      BOpenSessionResp.getDefaultInstance,
      (service, rpcRequestClosure, req) => {
        import scala.jdk.CollectionConverters.MapHasAsScala
        val username = req.getUsername
        val password = req.getPassword
        val configurationMap = req.getConfigurationMap
        val ret = service.openSession(username, password, configurationMap.asScala.toMap)
        BOpenSessionResp.newBuilder().setSessionHandle(ret).build()
      },
      (service, rpcContext, exception) => {
        BOpenSessionResp.newBuilder().setStatus(exception.getLocalizedMessage).build()
      }
    )

宏定义就不贴了,感兴趣可以看源码

  网络协议 最新文章
使用Easyswoole 搭建简单的Websoket服务
常见的数据通信方式有哪些?
Openssl 1024bit RSA算法---公私钥获取和处
HTTPS协议的密钥交换流程
《小白WEB安全入门》03. 漏洞篇
HttpRunner4.x 安装与使用
2021-07-04
手写RPC学习笔记
K8S高可用版本部署
mySQL计算IP地址范围
上一篇文章      下一篇文章      查看所有文章
加:2021-12-08 14:09:52  更:2021-12-08 14:10:58 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/8 5:53:50-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码