1.RpcServer类:
object RpcServerTest {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
val sparkSession = SparkSession
.builder()
.config(conf)
.master("local[*]")
.appName("test rpc").getOrCreate()
val sparkContext: SparkContext = sparkSession.sparkContext
val sparkEnv: SparkEnv = sparkContext.env
/**
* 注释: 系统
*/
val rpcEnv = RpcEnv
.create(HelloRpcSettings.getName(), HelloRpcSettings.getHostname(), HelloRpcSettings.getHostname(), HelloRpcSettings.getPort(), conf,
sparkEnv.securityManager, 1, false)
// TODO 注释:创建endpoint
// RpcEndPoint
val helloEndpoint: RpcEndpoint = new HelloEndPoint(rpcEnv)
// 启动endpoint
rpcEnv.setupEndpoint(HelloRpcSettings.getName(), helloEndpoint)
rpcEnv.awaitTermination()
}
}
2.RpcClient类
object RpcClientTest {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
val sparkSession =
SparkSession
.builder()
.config(conf)
.master("local[*]")
.appName("test rpc")
.getOrCreate()
val sparkContext: SparkContext = sparkSession.sparkContext
val sparkEnv: SparkEnv = sparkContext.env
/**
* 注释: RpcEnv
*/
val rpcEnv: RpcEnv = RpcEnv
.create(HelloRpcSettings.getName(), HelloRpcSettings.getHostname(), HelloRpcSettings.getPort(), conf, sparkEnv.securityManager, false)
/**
* 注释: endPointRef
*/
val endPointRef: RpcEndpointRef = rpcEnv
.setupEndpointRef(RpcAddress(HelloRpcSettings.getHostname(), HelloRpcSettings.getPort()), HelloRpcSettings.getName())
import scala.concurrent.ExecutionContext.Implicits.global
// TODO 注释: 同步。无返回结果
endPointRef.send(SayHi("test send"))
// TODO 注释:同步,有返回结果
val future: Future[String] = endPointRef.ask[String](SayHi("neo"))
future.onComplete { case scala.util.Success(value) => println(s"Got the result = $value")
case scala.util.Failure(e) => println(s"Got error: $e")
}
Await.result(future, Duration.apply("30s"))
// TODO 注释:异步,有返回结果
val res = endPointRef.askSync[String](SayBye("test askSync"))
println(res)
sparkSession.stop()
}
}
3.RpcServer处理类
class HelloEndPoint(override val rpcEnv: RpcEnv) extends RpcEndpoint {
override def onStart(): Unit = {
println(rpcEnv.address)
println("start hello endpoint")
}
override def receive: PartialFunction[Any, Unit] = {
case SayHi(msg) => println(s"receive $msg")
}
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case SayHi(msg) => {
println(s"receive $msg")
context.reply(s"hi, $msg")
}
case SayBye(msg) => {
println(s"receive $msg")
context.reply(s"bye, $msg")
}
}
override def onStop(): Unit = {
println("stop hello endpoint")
}
}
case class SayHi(msg: String)
case class SayBye(msg: String)
4.Util
object HelloRpcSettings {
val rpcName = "hello-rpc-service"
val port = 9527
val hostname = "localhost"
def getName() = {
rpcName
}
def getPort(): Int = {
port
}
def getHostname(): String = {
hostname
}
}
5.代码解析
1. new HelloEndPoint(rpcEnv): Endpoint的构造方法,初始化HelloEndPoint,则则个类中{}里面的所有能执行(属性赋值,代码块)的代码都会执行,不能执行的为其内部类,方法 ????????rpcEnv.setupEndpoint(HelloRpcSettings.getName(), helloEndpoint) 调用其preStart()
|