准备工作
这次我们用RMI+Zookeeper实现一个远程调用的RPC框架,RMI实现远程调用,Zookeeper作为注册中心,具体的操作之前的文章都提到过,这里不再做过多赘述。
自定义 RPC框架2——RMI实现RPC
https://blog.csdn.net/qq_45587153/article/details/124211478?spm=1001.2014.3001.5502
自定义 RPC框架3——JAVA实现Zookeeper节点增删改查
https://blog.csdn.net/qq_45587153/article/details/124225572?spm=1001.2014.3001.5502
代码实现
项目结构
导入POM依赖
导入zookeeper依赖的同时,要排除其中的logback依赖。
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.10</version>
<exclusions>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</exclusion>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
</exclusion>
</exclusions>
</dependency>
ZkConnection
package com.shen.rpc.connection;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
public class ZkConnection {
private String zkServer;
private int sessionTimeout;
public ZkConnection(){
super();
this.zkServer = "localhost:2181";
this.sessionTimeout = 10000;
}
public ZkConnection(String zkServer,int sessionTimeout){
this.zkServer = zkServer;
this.sessionTimeout = sessionTimeout;
}
public ZooKeeper getConnection() throws IOException {
return new ZooKeeper(this.zkServer, this.sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("zookeeper监听");
}
});
}
}
ShenRpcRegistry
package com.shen.rpc.registry;
import com.shen.rpc.connection.ZkConnection;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.rmi.Naming;
import java.rmi.NotBoundException;
import java.rmi.Remote;
import java.util.List;
public class ShenRpcRegistry {
private ZkConnection connection;
private String ip;
private int port;
public void registerService(Class<? extends Remote> serviceInterface, Remote remote) throws IOException, InterruptedException, KeeperException {
String rmi = "rmi://" + ip + ":" + port + "/" + serviceInterface.getName();
String path = "/shen/rpc/" + serviceInterface.getName();
List<String> children = connection.getConnection().getChildren("/shen/rpc",false);
if(children.contains(serviceInterface.getName())){
Stat stat = new Stat();
connection.getConnection().getData(path,false,stat);
connection.getConnection().delete(path,stat.getCversion());
}
connection.getConnection().create(path,rmi.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
Naming.rebind(rmi,remote);
}
public <T extends Remote> T getServiceProxy(Class<T> serviceInterface) throws IOException, InterruptedException, KeeperException, NotBoundException {
String path = "/shen/rpc/" + serviceInterface.getName();
byte[] datas = connection.getConnection().getData(path,false,null);
String rmi = new String(datas);
Object obj = Naming.lookup(rmi);
return (T) obj;
}
public ZkConnection getConnection() {
return connection;
}
public void setConnection(ZkConnection connection) {
this.connection = connection;
}
public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
}
ShenRpcFactory
package com.shen.rpc;
import com.shen.rpc.connection.ZkConnection;
import com.shen.rpc.registry.ShenRpcRegistry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import java.io.IOException;
import java.io.InputStream;
import java.rmi.NotBoundException;
import java.rmi.Remote;
import java.rmi.registry.LocateRegistry;
import java.util.List;
import java.util.Properties;
public class ShenRpcFactory {
private static final Properties config = new Properties();
private static final ZkConnection connection;
private static final ShenRpcRegistry registry;
private static final Properties services = new Properties();
static {
try {
InputStream input = ShenRpcFactory.class.getClassLoader().getResourceAsStream("shen.properties");
config.load(input);
String serverIp = config.getProperty("registry.ip") == null ? "localhost" : config.getProperty("registry.ip");
int serverPort = config.getProperty("registry.port") == null ?
9090 : Integer.parseInt(config.getProperty("registry.port"));
String zkServer = config.getProperty("zk.server") == null ? "localhost:2181" : config.getProperty("zk.server");
int zkSessionTimeout = config.getProperty("zk.sessionTimeout") == null ?
10000 : Integer.parseInt(config.getProperty("zk.sessionTimeout"));
connection = new ZkConnection(zkServer,zkSessionTimeout);
registry = new ShenRpcRegistry();
registry.setIp(serverIp);
registry.setConnection(connection);
registry.setPort(serverPort);
LocateRegistry.createRegistry(serverPort);
List<String> children = connection.getConnection().getChildren("/",false);
if(!children.contains("shen")){
connection.getConnection().create("/shen",null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
List<String> shenChildren = connection.getConnection().getChildren("/shen",false);
if(!shenChildren.contains("rpc")){
connection.getConnection().create("/shen/rpc",null,ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
}
InputStream servicesInout = ShenRpcFactory.class.getClassLoader().getResourceAsStream("shen-services.properties");
if(servicesInout != null){
services.load(servicesInout);
for (Object key: services.keySet()) {
Object value = services.get(key);
Class<Remote> serviceInterface = (Class<Remote>) Class.forName(key.toString());
Remote serviceObject = (Remote) Class.forName(value.toString()).newInstance();
registry.registerService(serviceInterface,serviceObject);
}
}
}catch (Exception e){
e.printStackTrace();
throw new ExceptionInInitializerError(e);
}
}
public static void registerSercice(Class<? extends Remote> serviceInterface,Remote remote) throws IOException, InterruptedException, KeeperException {
registry.registerService(serviceInterface,remote);
}
public static <T extends Remote> T getServiceProxy(Class<T> serviceInterface) throws IOException, InterruptedException, KeeperException, NotBoundException{
return registry.getServiceProxy(serviceInterface);
}
}
后续工作
后面我会在博客里写两个使用该框架的案例
|