一、实现简单服务注册功能
\quad\quad
RPC框架一般由服务端,消费端,注册中心三部分组成。注册中心负责持久化服务名称,IP地址以及端口等。本次只实现简单的服务注册功能。
\quad\quad
本篇文章实现简单的注册中心,即注册中心中仅仅存放服务,没有存放地址、端口信息。而接口、实现类和序列化在前面两篇文章中已经实现,不再赘述。
1.1 服务端
1.1.1 发布rpc服务
public class RpcBootStrap {
public static void main(String[] args) throws IOException {
CalculatorService calculator=new CalculatorServiceImpl();
RpcProvider.export(8081,calculator);
}
}
1.2 注册中心
- 服务端发布服务后,以列表形式在注册中心进行注册,只存放服务信息,并传入服务地址。
- 根据服务地址,监听socket端口,是否收到客户端请求信息。
- 若收到客户端请求,开启新的线程,首先检查注册中心中是否有客户端请求的服务,若无则返回没有服务。
- 若有,则处理序列化后的请求信息(此时数据是通过网络发到服务端处理的),通过反射调用服务端相关服务方法,并返回服务信息处理结果。
1.2.1 实现rpc服务注册
public class RpcProvider {
private static List<Object> serviceList;
public static void export(int port,Object... service) throws IOException {
serviceList= Arrays.asList(service);
ServerSocket server = new ServerSocket(port);
Socket client=null;
while (true){
client=server.accept();
new Thread(new ServerThread(client,serviceList)).start();
}
}
1.2.2 注册服务同时,开启线程,监听客户端请求
- 此时服务端客户端通过socket进行RPC通信
- 接收客户端请求服务信息
- 检查注册中心是否存有客户端存有的请求服务信息
- 若有通过反射调用相关服务,并将计算结果序列化后传回客户端
public class ServerThread implements Runnable{
private Socket client=null;
private List<Object> serviceList=null;
public ServerThread(Socket client,List<Object> services){
this.client=client;
this.serviceList=services;
}
public void run(){
ObjectInputStream objectInputStream=null;
ObjectOutputStream objectOutputStream=null;
try{
objectInputStream=new ObjectInputStream(client.getInputStream());
objectOutputStream=new ObjectOutputStream(client.getOutputStream());
RpcSerializable rpcSerializable=(RpcSerializable)objectInputStream.readObject();
Class serviceClass=(Class) rpcSerializable.getClassName();
Object obj=getService(serviceClass);
if(obj==null){
throw new Exception("not service");
}
String methodName=rpcSerializable.getMethodName();
Class<?>[] parameterTypes=rpcSerializable.getParameterTypes();
Object[] parameters=rpcSerializable.getArguments();
System.out.println(String.format("收到消费者远程调用请求:类名 = {%s},方法名 = {%s},调用入参 = %s,方法入参列表 = %s",
serviceClass, methodName, Arrays.toString(parameters), Arrays.toString(parameterTypes)));
Method method = obj.getClass().getMethod(methodName,parameterTypes);
Object invoke = method.invoke(obj, parameters);
System.out.println("方法调用结果:" + invoke);
objectOutputStream.writeObject(invoke);
} catch (IOException | ClassNotFoundException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
public Object getService(Class servicesClass){
for(Object obj:serviceList){
boolean isFather=servicesClass.isAssignableFrom(obj.getClass());
if(isFather){
return obj;
}
}
return null;
}
}
1.3 客户端
1.3.1 实现代理类,在代理类中请求服务
public class ProxyHandler implements InvocationHandler {
private String ip;
private int port;
public ProxyHandler(String ip, int port) {
this.ip = ip;
this.port = port;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Exception {
Socket socket = new Socket(ip,port);
ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
RpcSerializable rpcSerializable = new RpcSerializable(proxy.getClass().getInterfaces()[0], method.getName(), method.getParameterTypes(), args);
objectOutputStream.writeObject(rpcSerializable);
ObjectInputStream inputStream = new ObjectInputStream(socket.getInputStream());
return inputStream.readObject();
}
}
1.3.2 获取代理对象
public class RpcConsumer {
public static <T> T getService(Class<T> clazz,String ip,int port) {
ProxyHandler proxyHandler =new ProxyHandler(ip,port);
return (T) Proxy.newProxyInstance(RpcConsumer.class.getClassLoader(), new Class<?>[] {clazz}, proxyHandler);
}
}
1.4 通过RPC远程调用(测试示例,客户端)
public class RpcTest {
public static void main(String[] args) {
CalculatorService calculator= RpcConsumer.getService(CalculatorService.class,"127.0.0.1",8081);
int res=calculator.add(100,86);
System.out.println(res);
}
}
|