简单 RPC 实现(一)
项目结构
[root@localhost ppr-parent]
├── pom.xml
├── ppr-consumer
├── ppr-provider
├── ppr-sdk
└── ppr-service
ppr-consumer 依赖 ppr-sdk,ppr-service
ppr-provider 依赖 ppr-sdk,ppr-service
展开
├── pom.xml
├── ppr-consumer
│ ├── pom.xml
│ └── src
│ ├── main
│ │ └── java
│ │ └── com.ddup.consumer
│ │ └── service
│ │ └── HelloServicePpr.java
├── ppr-provider
│ ├── pom.xml
│ └── src
│ ├── main
│ │ └── java
│ │ └── com.ddup.provider
│ │ ├── server
│ │ │ └── Server.java
│ │ └── service
│ │ └── HelloServiceImpl.java
├── ppr-sdk
│ ├── pom.xml
│ └── src
│ ├── main
│ │ └── java
│ │ └── com.ddup. sdk
│ │ ├── exception
│ │ │ └── RpcException.java
│ │ ├── handler
│ │ │ ├── NetClient.java
│ │ │ └── PprProxyInvocationHandler.java
│ │ ├── PprRequest.java
│ │ └── server
│ │ └── ProviderServer.java
└── ppr-service
├── pom.xml
└── src
├── main
│ └── java
│ └── com.ddup. service
│ └── HelloService.java
ppr-service 定义接口
public interface HelloService {
String sayHi(String name);
}
ppr-consumer 消费者
PprRequest
@Data
public class PprRequest implements Serializable {
private Object[] args;
private Class<?> clz;
private String method;
private Class<?>[] parameterTypes;
}
请求代理类 PprProxyInvocationHandler
所有的接口都通过代理类调用 socket 通信
public class PprProxyInvocationHandler<T> implements InvocationHandler {
private Class<T> service;
private NetClient client = new NetClient();
public PprProxyInvocationHandler(Class<T> service) {
this.service = service;
}
public T getProxy() {
return (T) Proxy.newProxyInstance(service.getClassLoader(), new Class[]{service}, this);
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
PprRequest request = new PprRequest();
request.setArgs(args);
request.setMethod(method.getName());
request.setParameterTypes(method.getParameterTypes());
request.setClz(service);
return client.execute(request);
}
}
Rpc 通信 NetClient
public class NetClient {
public Object execute(PprRequest request) {
ObjectInputStream ois = null;
ObjectOutputStream oos = null;
try {
String host = "localhost";
int port = 8888;
Socket socket = new Socket(host, port);
oos = new ObjectOutputStream(socket.getOutputStream());
oos.writeObject(request);
oos.flush();
ois = new ObjectInputStream(socket.getInputStream());
Object result = ois.readObject();
return result;
} catch (Exception e) {
try {
if (ois != null) {
ois.close();
}
if (oos != null) {
oos.close();
}
} catch (IOException ex) {
ex.printStackTrace();
}
}
throw new RpcException("ppr调用异常");
}
}
客户端调用:
public class HelloServicePpr {
public void helloTest() {
String name = "小明";
PprProxyInvocationHandler invocationHandler = new PprProxyInvocationHandler(HelloService.class);
HelloService proxy = (HelloService) invocationHandler.getProxy();
String s = proxy.sayHi(name);
System.out.println(s);
}
public static void main(String[] args) {
new HelloServicePpr().helloTest();
}
}
服务提供者 ppr-provider
- socket 反序列化 生成 PprRequest
- 根据 PprRequest 中 className 找到具体实现类
- 使用反射执行方法
- 序列化结果返回给消费者
服务端通信模板
public abstract class ProviderServer {
public void start() {
ObjectInputStream ois = null;
ObjectOutputStream oos = null;
try {
int port = 8888;
ServerSocket serverSocket = new ServerSocket(port);
System.out.println("ppr server start at 8888...");
while (true) {
try {
Socket socket = serverSocket.accept();
ois = new ObjectInputStream(socket.getInputStream());
PprRequest request = (PprRequest) ois.readObject();
Object result = invoke(request);
oos = new ObjectOutputStream(socket.getOutputStream());
oos.writeObject(result);
oos.flush();
} catch (Exception e) {
try {
if (ois != null) {
ois.close();
}
if (oos != null) {
oos.close();
}
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
protected Object invoke(PprRequest request) {
try {
Method method = request.getClz().getMethod(request.getMethod(), request.getParameterTypes());
Object o = getProvider(request);
if (o != null) {
return method.invoke(o, request.getArgs());
} else {
System.out.println("bean 不存在");
}
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
protected abstract Object getProvider(PprRequest request);
}
- 在 ppr-provider 中 启动服务:
public class Server extends ProviderServer {
private ConcurrentHashMap<String, Object> factory = new ConcurrentHashMap<>();
{
factory.put("com.ddup.service.HelloService", new HelloServiceImpl());
}
public static void main(String[] args) throws IOException {
new Server().start();
System.in.read();
}
@Override
protected Object getProvider(PprRequest request) {
return factory.get(request.getClz().getName());
}
}
2.HelloService 实现业务
public class HelloServiceImpl implements HelloService {
@Override
public String sayHi(String name) {
return "echo from server, hi: " + name;
}
}
启动 ppr-provider 服务
ppr server start at 8888...
ppr-consumer 调用
echo from server, hi: 小明
good luck!
|