本博客文章(学习笔记)导航 (点击这里访问)
一、RPC概况
1.1 RPC简介
RPC:远程过程调用,是分布式系统常见的一种通信方法,从跨进程到跨物理机已经有十几年的历史 优点:可以将远程调用变成像调用本地方法一样简单
1.2 系统交互
系统的交互方式有两种:直接交互 和 间接交互(中间件交互),下面介绍一下这两种交互方式
1.2.1直接交互
1.2.2 间接交互
1.3 各种RPC框架的对比
1.4 RPC核心原理
1.4.1 RPC调用的原理
step1 server把自己的服务注册到registery step2 client定于redistry,获取自己想知道的服务信息 step3 如果server信息发生了改变,registory会通知订阅者信息发生了改变 step4 client要发起调用,就可以根据从registory中获取的信息直接调用即可
1.4.2 Call的调用过程
step1 client调用接口方法(stub中的接口方法) step2 将调用信息序列号,以便于在网络上传输 step3 client和server之间建立网络连接 step4 server反序列化传输对象 step5 server的stub查找要调用的方法以及参数 step6 server找到实际实现类的对象,通过反射获取执行结果,再次发送到stub上 step7 stub序列化传输对象 step8 server和client建立网络连接 step9 client反序列化传输对象 step10 client获取调用结果
1.5 技术栈
二、RPC手动实现
2.1 创建工程、制定协议、通用工具方法
2.1.1 项目类图
一共5大模块
2.1.2 项目搭建
step1 新建项目
step2 在项目下,新建6个模块,删除src文件 client:客户端模块 server:服务端模块 codec:编码解码模块 common:通用模块 propto:协议模块 transport:网络通信模块
2.1.3 父依赖编写以及导入
<dependencyManagement>
<dependencies>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.5</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>
<version>9.4.19.v20190610</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.44</version>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.8</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.26</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
</dependency>
</dependencies>
2.1.4 编译的版本控制
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.3</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
2.1.5 子模块版本统一配置
<properties>
<java.version>1.8</java.version>
<common.version>2.5</common.version>
<jetty.version>9.4.19.v20190610</jetty.version>
<fastjson.version>1.2.44</fastjson.version>
<lombok.version>1.18.8</lombok.version>
<slf4j.version>1.7.26</slf4j.version>
<logback.version>1.2.3</logback.version>
<junit.version>4.12</junit.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>${common.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>
<version>${jetty.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>${logback.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.3</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
</plugins>
</build>
2.1.6 Lombok插件下载
去IDEA中的plugin中查找下载即可
2.1.8 设置annotation
2.2 协议模块编写
协议模块主要包括 网络结点、请求对象、响应对象、服务类
2.2.1 网络通信端点类
package com.smgeek.gkrpc;
import lombok.AllArgsConstructor;
import lombok.Data;
@Data
@AllArgsConstructor
public class Peer {
private String host;
private int port;
}
2.2.2 服务类
package com.smgeek.gkrpc;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class ServiceDescriptor {
private String clazz;
private String method;
private String returnType;
private String[] parameterTypes;
}
2.2.3 请求类
package com.smgeek.gkrpc;
import lombok.AllArgsConstructor;
import lombok.Data;
@Data
public class Request {
private ServiceDescriptor service;
private Object[] parameters;
}
2.2.4 响应类
package com.smgeek.gkrpc;
import lombok.Data;
@Data
public class Response {
private int code=0;
private String message="ok";
private Object data;
}
2.3 通用模块编写
通用模块只有一个动态代理
2.3.1 JDK动态代理类
动态代理
package com.smgeek.gkrpc.common.utils;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.List;
public class ReflectionUtils {
public static <T>T newInstance(Class<T> clazz){
try{
return clazz.newInstance();
}catch (Exception e){
throw new IllegalStateException(e);
}
}
public static Method[] getPublicMethods(Class clazz){
Method[] methods=clazz.getDeclaredMethods();
List<Method> pmethods=new ArrayList<>();
for(Method m:methods){
if(Modifier.isPublic(m.getModifiers())){
pmethods.add(m);
}
}
return pmethods.toArray(new Method[0]);
}
public static Object invoke(Object obj,Method method,Object... args){
try{
return method.invoke(obj,args);
}catch (Exception e){
throw new IllegalStateException(e);
}
}
}
2.3.2 代理类测试
package com.smgeek.gkrpc.common.utils;
import org.junit.Test;
public class TestClass {
private String a(){
return "a";
}
public String b(){
return "b";
}
protected String c(){
return "c";
}
}
package com.smgeek.gkrpc.common.utils;
import org.junit.Test;
import java.lang.reflect.Method;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
public class ReflectionUtilsTest {
@Test
public void newInstance(){
TestClass t=ReflectionUtils.newInstance(TestClass.class);
assertNotNull(t);
}
@Test
public void getPublicMethods(){
Method[] methods=ReflectionUtils.getPublicMethods(TestClass.class);
assertEquals(1,methods.length);
String mname=methods[0].getName();
assertEquals("b",mname);
}
@Test
public void invoke(){
Method[] methods=ReflectionUtils.getPublicMethods(TestClass.class);
Method b=methods[0];
TestClass t=new TestClass();
Object r = ReflectionUtils.invoke(t, b);
assertEquals("b",r);
}
}
2.4 序列化模块编写
便于对象在网络上传输,需要序列化和反序列化
2.4.1 编码和解码接口
package com.smgeek.gkrpc.codec;
public interface Encoder {
byte[] encode(Object obj);
}
package com.smgeek.gkrpc.codec;
public interface Decoder {
<T>T decode(byte[] bytes,Class<T> clazz);
}
2.4.2 pom中引入fastjson
<dependencies>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>
</dependencies>
2.4.3 序列化、反序列化接口实现
package com.smgeek.gkrpc.codec;
import com.alibaba.fastjson.JSON;
public class JSONEncoder implements Encoder {
@Override
public byte[] encode(Object obj) {
return JSON.toJSONBytes(obj);
}
}
package com.smgeek.gkrpc.codec;
import com.alibaba.fastjson.JSON;
public class JSONDecoder implements Decoder{
@Override
public <T> T decode(byte[] bytes, Class<T> clazz) {
return JSON.parseObject(bytes,clazz);
}
}
2.4.4 接口实现的测试
package com.smgeek.gkrpc.codec;
import lombok.Data;
@Data
public class TestBean {
private String name;
private int age;
}
package com.smgeek.gkrpc.codec;
import org.junit.Test;
import java.lang.reflect.Method;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
public class JSONEncoderTest {
@Test
public void encode(){
Encoder encoder=new JSONEncoder();
TestBean bean=new TestBean();
bean.setName("smgeek");
bean.setAge(18);
byte[] bytes=encoder.encode(bean);
assertNotNull(bytes);
}
}
package com.smgeek.gkrpc.codec;
import java.lang.reflect.Method;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import org.junit.Test;
public class JSONDecoderTest {
@Test
public void decode() {
Encoder encoder=new JSONEncoder();
TestBean bean=new TestBean();
bean.setName("smgeek");
bean.setAge(18);
byte[] bytes=encoder.encode(bean);
Decoder decoder=new JSONDecoder();
TestBean bean2 = decoder.decode(bytes, TestBean.class);
assertEquals(bean.getName(),bean2.getName());
assertEquals(bean.getAge(),bean2.getAge());
}
}
2.5 网络模块
2.5.1 依赖引入
<dependencies>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>
</dependency>
<dependency>
<groupId>hand.candy</groupId>
<artifactId>gk-rpc-proto</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
2.5.2 客户端网络传输接口
客户端主要作用
- 1 创建连接
- 2 发送数据 并且等待响应
- 3 关闭连接
package com.smgeek.gkrpc.transport;
import com.smgeek.gkrpc.Peer;
import java.io.InputStream;
public interface TransportClient {
void connect(Peer peer);
InputStream write(InputStream data);
void close();
}
2.5.3 服务端网络传输接口
服务端主要作用
package com.smgeek.gkrpc.transport;
public interface TransportServer {
void init(int port,RequestHandler handler);
void start();
void stop();
}
2.5.4 请求处理类接口
package com.smgeek.gkrpc.transport;
import java.io.InputStream;
import java.io.OutputStream;
public interface RequestHandler {
void onRequest(InputStream recive, OutputStream toResp);
}
2.5.5 接口实现
package com.smgeek.gkrpc.transport;
import com.smgeek.gkrpc.Peer;
import org.apache.commons.io.IOUtils;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URL;
public class HTTPTransportClient implements TransportClient {
private String url;
@Override
public void connect(Peer peer) {
this.url="http://"+peer.getHost()+":"+peer.getPort();
}
@Override
public InputStream write(InputStream data) {
try {
HttpURLConnection httpConn =(HttpURLConnection)new URL(url).openConnection();
httpConn.setDoOutput(true);
httpConn.setDoInput(true);
httpConn.setRequestMethod("POST");
httpConn.connect();
IOUtils.copy(data,httpConn.getOutputStream());
int resultCode=httpConn.getResponseCode();
if(resultCode==HttpURLConnection.HTTP_OK){
return httpConn.getInputStream();
}else{
return httpConn.getErrorStream();
}
} catch (IOException e) {
throw new IllegalStateException(e);
}
}
@Override
public void close() {
}
}
package com.smgeek.gkrpc.transport;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@Slf4j
public class HTTPTransportServer implements TransportServer{
private RequestHandler handler;
private Server server;
@Override
public void init(int port, RequestHandler handler) {
this.handler=handler;
this.server=new Server(port);
ServletContextHandler ctx=new ServletContextHandler();
server.setHandler(ctx);
ServletHolder holder=new ServletHolder(new ResquestServlet());
ctx.addServlet(holder,"/*");
}
@Override
public void start() {
try {
server.start();
server.join();
} catch (Exception e) {
log.error(e.getMessage(),e);
}
}
@Override
public void stop() {
try {
server.stop();
} catch (Exception e) {
log.error(e.getMessage(),e);
}
}
class ResquestServlet extends HttpServlet{
@Override
protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws IOException {
log.info("client connect");
InputStream in=req.getInputStream();
OutputStream out=resp.getOutputStream();
if(handler!=null){
handler.onRequest(in,out);
}
out.flush();
}
}
}
2.6 服务端模块
2.6.1 依赖引入
<dependencies>
<dependency>
<groupId>hand.candy</groupId>
<artifactId>gk-rpc-proto</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>hand.candy</groupId>
<artifactId>gk-rpc-codec</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>hand.candy</groupId>
<artifactId>gk-rpc-transport</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>hand.candy</groupId>
<artifactId>gk-rpc-common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
</dependencies>
2.6.2 重写ServiceDescriptor
package com.smgeek.gkrpc;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Objects;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class ServiceDescriptor {
private String clazz;
private String method;
private String returnType;
private String[] parameterTypes;
public static ServiceDescriptor from(Class clazz, Method method){
ServiceDescriptor sdp=new ServiceDescriptor();
sdp.setClazz(clazz.getName());
sdp.setMethod(method.getName());
sdp.setReturnType(method.getReturnType().getName());
Class[] parameterClasses=method.getParameterTypes();
String[] parameterTypes=new String[parameterClasses.length];
for(int i=0;i<parameterClasses.length;i++){
parameterTypes[i]=parameterClasses[i].getName();
}
sdp.setParameterTypes(parameterTypes);
return sdp;
}
@Override
public String toString() {
return "ServiceDescriptor{" +
"clazz='" + clazz + '\'' +
", method='" + method + '\'' +
", returnType='" + returnType + '\'' +
", parameterTypes=" + Arrays.toString(parameterTypes) +
'}';
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ServiceDescriptor that = (ServiceDescriptor) o;
return Objects.equals(clazz, that.clazz) &&
Objects.equals(method, that.method) &&
Objects.equals(returnType, that.returnType) &&
Arrays.equals(parameterTypes, that.parameterTypes);
}
@Override
public int hashCode() {
return toString().hashCode();
}
}
2.6.3 写服务端配置
package com.smgeek.gkrpc.server;
import com.smgeek.gkrpc.codec.Decoder;
import com.smgeek.gkrpc.codec.Encoder;
import com.smgeek.gkrpc.codec.JSONEncoder;
import com.smgeek.gkrpc.transport.HTTPTransportServer;
import com.smgeek.gkrpc.transport.TransportServer;
import lombok.Data;
@Data
public class RpcServerConfig {
private Class<? extends TransportServer> transportClass = HTTPTransportServer.class;
private Class<? extends Encoder> encoderClass= JSONEncoder.class;
private Class<? extends Decoder> decoderClass=Decoder.class;
private int port=3000;
}
2.6.4 写服务实例
package com.smgeek.gkrpc.server;
import lombok.AllArgsConstructor;
import lombok.Data;
import java.lang.reflect.Method;
@Data
@AllArgsConstructor
public class ServiceInstance {
private Object target;
private Method method;
}
2.6.5 写服务管理
package com.smgeek.gkrpc.server;
import com.smgeek.gkrpc.Request;
import com.smgeek.gkrpc.ServiceDescriptor;
import com.smgeek.gkrpc.common.utils.ReflectionUtils;
import lombok.extern.slf4j.Slf4j;
import java.lang.reflect.Method;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
public class ServiceManager {
private Map<ServiceDescriptor ,ServiceInstance> sercices;
public ServiceManager(){
this.sercices=new ConcurrentHashMap<>();
}
public <T> void register(Class<T> interfaceClass,T bean ){
Method[] methods = ReflectionUtils.getPublicMethods(interfaceClass);
for(Method method:methods){
ServiceInstance sis=new ServiceInstance(bean,method);
ServiceDescriptor sdp=ServiceDescriptor.from(interfaceClass,method);
sercices.put(sdp,sis);
log.info("register service: {} {}",sdp.getClazz(),sdp.getMethod());
}
}
public ServiceInstance lookup(Request request){
ServiceDescriptor sdp=request.getService();
return sercices.get(sdp);
}
}
2.6.6 注册与发现测试
package com.smgeek.gkrpc.server;
public interface TestInterface {
void hello();
}
package com.smgeek.gkrpc.server;
public class TestClass implements TestInterface {
@Override
public void hello() {
}
}
package com.smgeek.gkrpc.server;
import com.smgeek.gkrpc.Request;
import com.smgeek.gkrpc.ServiceDescriptor;
import com.smgeek.gkrpc.common.utils.ReflectionUtils;
import org.junit.Before;
import org.junit.Test;
import java.lang.reflect.Method;
import static org.junit.Assert.assertNotNull;
public class ServiceManagerTest {
ServiceManager sm;
@Before
public void init(){
sm=new ServiceManager();
TestInterface bean=new TestClass();
sm.register(TestInterface.class,bean);
}
@Test
public void register(){
TestInterface bean=new TestClass();
sm.register(TestInterface.class,bean);
}
@Test
public void lookup(){
Method method = ReflectionUtils.getPublicMethods(TestInterface.class)[0];
ServiceDescriptor sdp=ServiceDescriptor.from(TestInterface.class,method);
Request request=new Request();
request.setService(sdp);
ServiceInstance sis = sm.lookup(request);
assertNotNull(sis);
}
}
2.6.7 RPC Server代码实现
package com.smgeek.gkrpc.server;
import com.smgeek.gkrpc.Request;
import com.smgeek.gkrpc.Response;
import com.smgeek.gkrpc.ServiceDescriptor;
import com.smgeek.gkrpc.codec.Decoder;
import com.smgeek.gkrpc.codec.Encoder;
import com.smgeek.gkrpc.common.utils.ReflectionUtils;
import com.smgeek.gkrpc.transport.RequestHandler;
import com.smgeek.gkrpc.transport.TransportServer;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.IOUtils;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.Method;
@Slf4j
public class RpcServer {
private RpcServerConfig config;
private TransportServer net;
private Encoder encoder;
private Decoder decoder;
private ServiceManager serviceManager;
private ServiceInvoker serviceInvoker;
public RpcServer(RpcServerConfig config) {
this.config = config;
this.net = ReflectionUtils.newInstance(config.getTransportClass());
this.net.init(config.getPort(),handler);
this.encoder = ReflectionUtils.newInstance(config.getEncoderClass());;
this.decoder = ReflectionUtils.newInstance(config.getDecoderClass());
this.serviceManager = new ServiceManager();
this.serviceInvoker = new ServiceInvoker();
}
public <T> void register(Class<T> interfaceClass,T bean ){
serviceManager.register(interfaceClass,bean);
}
public void start(){
this.net.start();;
}
public void stop(){
this.net.stop();
}
private RequestHandler handler= new RequestHandler() {
@Override
public void onRequest(InputStream recive, OutputStream toResp) {
Response resp=new Response();
try {
byte[] inBytes= IOUtils.readFully(recive,recive.available());
Request request=decoder.decode(inBytes,Request.class);
log.info("get request:{}",request);
ServiceInstance sis=serviceManager.lookup(request);
Object ret=serviceInvoker.invoke(sis,request);
resp.setData(ret);
} catch (Exception e) {
log.warn(e.getMessage(),e);
resp.setCode(1);
resp.setMessage("RescServer got error:"+e.getClass().getName()+":"+e.getMessage());
}finally {
try {
byte[] outBytes=encoder.encode(resp);
toResp.write(outBytes);
} catch (IOException e) {
log.warn(e.getMessage(),e);
}
}
}
};
}
2.7 客户端模块
2.7.1 依赖引入
<dependencies>
<dependency>
<groupId>hand.candy</groupId>
<artifactId>gk-rpc-proto</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>hand.candy</groupId>
<artifactId>gk-rpc-codec</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>hand.candy</groupId>
<artifactId>gk-rpc-transport</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>hand.candy</groupId>
<artifactId>gk-rpc-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
</dependencies>
2.7.2 网络连接接口以及实现
package com.smgeek.gkrpc.client;
import com.smgeek.gkrpc.Peer;
import com.smgeek.gkrpc.transport.TransportClient;
import java.util.List;
public interface TransportSelector {
void init(List<Peer> peers, int count,Class<? extends TransportClient> clazz);
TransportClient select();
void release(TransportClient client);
void close();
}
package com.smgeek.gkrpc.client;
import com.smgeek.gkrpc.Peer;
import com.smgeek.gkrpc.common.utils.ReflectionUtils;
import com.smgeek.gkrpc.transport.TransportClient;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
@Slf4j
public class RandomTransportSelector implements TransportSelector {
private List<TransportClient> clients;
public RandomTransportSelector() {
clients=new ArrayList<>();
}
@Override
public synchronized void init(List<Peer> peers, int count, Class<? extends TransportClient> clazz) {
count=Math.max(count,1);
for(Peer peer:peers){
for(int i=0;i<count;i++){
TransportClient client= ReflectionUtils.newInstance(clazz);
client.connect(peer);
clients.add(client);
log.info("connect server:{} ",peer);
}
}
}
@Override
public synchronized TransportClient select() {
int i=new Random().nextInt(clients.size());
return clients.remove(i);
}
@Override
public synchronized void release(TransportClient client) {
clients.add(client);
}
@Override
public synchronized void close() {
for(TransportClient client:clients){
client.close();
}
clients.clear();
}
}
2.7.3 配置类
package com.smgeek.gkrpc.client;
import com.smgeek.gkrpc.Peer;
import com.smgeek.gkrpc.codec.Decoder;
import com.smgeek.gkrpc.codec.Encoder;
import com.smgeek.gkrpc.codec.JSONDecoder;
import com.smgeek.gkrpc.codec.JSONEncoder;
import com.smgeek.gkrpc.transport.HTTPTransportClient;
import com.smgeek.gkrpc.transport.TransportClient;
import lombok.Data;
import java.util.Arrays;
import java.util.List;
@Data
public class RpcClientConfig {
private Class<? extends TransportClient> transportClass= HTTPTransportClient.class;
private Class<? extends Encoder> encoderClass =JSONEncoder.class;
private Class<? extends Decoder> decoderClass = JSONDecoder.class;
private Class<? extends TransportSelector> selectorClass=RandomTransportSelector.class;
private int connectCount=1;
private List<Peer> severs=Arrays.asList(new Peer("127.0.0.1",3000));
}
2.7.4 Rpc客户端
package com.smgeek.gkrpc.client;
import com.smgeek.gkrpc.codec.Decoder;
import com.smgeek.gkrpc.codec.Encoder;
import com.smgeek.gkrpc.common.utils.ReflectionUtils;
import java.lang.reflect.Proxy;
public class RpcClient {
private RpcClientConfig config;
private Encoder encoder;
private Decoder decoder;
private TransportSelector selector;
public RpcClient() {
}
public RpcClient(RpcClientConfig config) {
this.config = config;
this.encoder= ReflectionUtils.newInstance(this.config.getEncoderClass());
this.decoder= ReflectionUtils.newInstance(this.config.getDecoderClass());
this.selector= ReflectionUtils.newInstance(this.config.getSelectorClass());
this.selector.init(this.config.getSevers(),this.config.getConnectCount(),this.config.getTransportClass());
}
public <T> T getProxy(Class<T> clazz){
return (T) Proxy.newProxyInstance(
getClass().getClassLoader(),
new Class[]{clazz},
new RemoteInvoker(clazz,encoder,decoder,selector)
);
}
}
package com.smgeek.gkrpc.client;
import com.smgeek.gkrpc.Request;
import com.smgeek.gkrpc.Response;
import com.smgeek.gkrpc.ServiceDescriptor;
import com.smgeek.gkrpc.codec.Decoder;
import com.smgeek.gkrpc.codec.Encoder;
import com.smgeek.gkrpc.transport.TransportClient;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.IOUtils;
import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
@Slf4j
public class RemoteInvoker implements InvocationHandler {
private Class clazz;
private Encoder encoder;
private Decoder decoder;
private TransportSelector selector;
public RemoteInvoker(Class clazz, Encoder encoder, Decoder decoder,TransportSelector selector){
this.decoder=decoder;
this.encoder=encoder;
this.selector=selector;
this.clazz=clazz;
};
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Request request=new Request();
request.setService(ServiceDescriptor.from(clazz,method));
request.setParameters(args);
Response resp = invokeRemote(request);
if(resp==null || resp.getCode()!=0){
throw new IllegalStateException("fail to invoke remotr"+resp);
}
return resp.getData();
}
private Response invokeRemote(Request request) {
TransportClient client=null;
Response resp=null;
try{
client=selector.select();
byte[] outBytes=encoder.encode(request);
InputStream revice = client.write(new ByteArrayInputStream(outBytes));
byte[] inBytes=IOUtils.readFully(revice,revice.available());
resp=decoder.decode(inBytes,Response.class);
}catch (Exception e){
log.warn(e.getMessage(),e);
resp=new Response();
resp.setCode(1);
resp.setMessage("RpcClient got error :"+e.getClass()+":"+e.getMessage());
} finally {
if(client!=null){
selector.release(client);
}
}
return resp;
}
}
2.8 RPC使用
2.8.1 新建模块example
2.8.2 引入依赖
<dependencies>
<dependency>
<groupId>hand.candy</groupId>
<artifactId>gk-rpc-client</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>hand.candy</groupId>
<artifactId>gk-rpc-server</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
2.8.3 Client
package single.rpc.example;
import single.rpc.client.RpcClient;
public class Client {
public static void main(String[] args) {
RpcClient client = new RpcClient();
CalcService service = client.getProxy(CalcService.class);
int add = service.add(1, 2);
int minus = service.minus(1, 2);
System.out.println(add);
System.out.println(minus);
}
}
2.8.4 Server
package single.rpc.example;
import single.rpc.server.RpcServer;
import single.rpc.server.RpcServerConfig;
public class Server {
public static void main(String[] args) {
RpcServer server = new RpcServer(new RpcServerConfig());
server.register(CalcService.class, new CalcServiceImpl());
server.start();
}
}
2.8.5 CascService
package single.rpc.example;
public interface CalcService {
int add(int a, int b);
int minus(int a, int b);
}
2.8.6 CalcServiceImpl
package single.rpc.example;
public class CalcServiceImpl implements CalcService {
@Override
public int add(int a, int b) {
return a + b;
}
@Override
public int minus(int a, int b) {
return a - b;
}
}
2.8.7 运行结果
视频课里面的代码有问题:
修正后的代码可以参考https://https://gitee.com/candydingding/rpc
先启动server 再启动client
server结果
client结果
三、参考
视频连接:https://www.imooc.com/video/20219 文档笔记:https://www.yuque.com/lililil-9bxsv/kb/tg9xha 代码地址:https://gitee.com/candydingding/rpc
|