使用Zookeeper作为注册中心,RMI作为连接技术,手写RPC框架
1.框架结构
● 连接器:提供默认链接信息配置和提供连接 ● 注册器:提供注册服务和获取代理对象(没有具体的注册信息) ● RPC静态工厂:创建注册器、获取连接、注册服务和获取代理对象(已经通过静态初始化注册信息)
2.依赖
<dependencies>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.11</version>
</dependency>
</dependencies>
3.项目
3.1 连接器
package com.fyp.rpc.connection;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
public class ZkConnection {
private String zkServer;
private int sessionTimeout;
public ZkConnection() {
super();
this.zkServer = "localhost:2181";
this.sessionTimeout = 10000;
}
public ZkConnection(String zkServer, int sessionTimeout) {
this.zkServer = zkServer;
this.sessionTimeout = sessionTimeout;
}
public ZooKeeper getConnection() throws IOException {
return new ZooKeeper(zkServer, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
}
});
}
}
3.2 注册器
package com.fyp.rpc.registry;
import com.fyp.rpc.connection.ZkConnection;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.rmi.Naming;
import java.rmi.NotBoundException;
import java.rmi.Remote;
import java.util.List;
public class FypRpcRegistry {
private ZkConnection connection;
private String ip;
private int port;
public void registerService(Class<? extends Remote> serviceInterface, Remote serviceObject) throws IOException, KeeperException, InterruptedException {
String rmi = "rmi://" + ip + ":" + port + "/" + serviceInterface.getName();
String path = "/fyp/rpc/" + serviceInterface.getName();
List<String> children = connection.getConnection().getChildren("/fyp/rpc", false);
if(!children.contains(serviceInterface.getName())) {
Stat stat = new Stat();
connection.getConnection().getData(path, false, stat);
connection.getConnection().delete(path, stat.getCversion());
}
connection.getConnection().create(path,rmi.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
Naming.rebind(rmi, serviceObject);
}
public <T extends Remote> T getServiceProxy(Class<T> serviceInterface) throws IOException, NotBoundException, KeeperException, InterruptedException {
String path = "/fyp/rpc/" + serviceInterface.getName();
byte[] datas = connection.getConnection().getData(path, false, null);
String rmi = new String(datas);
Object obj = Naming.lookup(rmi);
return (T) obj;
}
public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public ZkConnection getConnection() {
return connection;
}
public void setConnection(ZkConnection connection) {
this.connection = connection;
}
}
3.3 RPC静态工厂
package com.fyp.rpc;
import com.fyp.rpc.connection.ZkConnection;
import com.fyp.rpc.registry.FypRpcRegistry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import java.io.IOException;
import java.io.InputStream;
import java.rmi.NotBoundException;
import java.rmi.Remote;
import java.rmi.registry.LocateRegistry;
import java.util.List;
import java.util.Properties;
public class FypRpcFactory {
private static final Properties config = new Properties();
private static final ZkConnection connection;
private static final FypRpcRegistry registry;
static {
try {
InputStream input = FypRpcRegistry.class.getClassLoader().getResourceAsStream("fyp-rpc.properties");
config.load(input);
String serverIp = config.getProperty("registry.ip") == null ? "localhost" : config.getProperty("registry.ip");
int serverPort= config.getProperty("registry.port") == null ? 9090 : Integer.parseInt(config.getProperty("registry.port"));
String zkServe = config.getProperty("zk.server") == null ? "localhost:2181" : config.getProperty("zk.server");
int zkSessionTimeout = config.getProperty("zk.sessionTimeout") == null ? 10000 : Integer.parseInt(config.getProperty("zk.sessionTimeout"));
connection = new ZkConnection(zkServe,zkSessionTimeout);
registry = new FypRpcRegistry();
registry.setIp(serverIp);
registry.setPort(serverPort);
registry.setConnection(connection);
LocateRegistry.createRegistry(serverPort);
List<String> children = connection.getConnection().getChildren("/", false);
if(!children.contains("fyp")) {
connection.getConnection().create("/fyp",null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
List<String> fypChildren = connection.getConnection().getChildren("/fyp", false);
if(!fypChildren.contains("rpc")) {
connection.getConnection().create("/fyp/rpc",null,ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
}
} catch (Exception e) {
e.printStackTrace();
throw new ExceptionInInitializerError(e);
}
}
public static void registerService(Class<? extends Remote> serviceInterface, Remote serviceObject) throws IOException, InterruptedException, KeeperException {
registry.registerService(serviceInterface, serviceObject);
}
public static <T extends Remote> T getServiceProxy(Class<T> serviceInterface) throws IOException, KeeperException, InterruptedException, NotBoundException {
return registry.getServiceProxy(serviceInterface);
}
}
总结: 说白了,RPC框架已经被实现了,最大众的dubbo大家应该都用过了,这篇文章就是基于RMI技术实现的简易版dubbo,后续会给出优化————服务自动发现注册、服务容错和负载均衡,想了解的的不妨加个收藏。 最后,如果有需要先了解dubbo再来学习RPC框架的,可以参考学习下面这篇文章。 《Linux环境下Dubbo环境搭建及启动》
学习参考: 尚学堂RPC远程过程调用:https://www.bilibili.com/video/BV11i4y1N7LQ
|