第二章
第一章中我们的服务端测试代码中只能注册一个服务,这一章对其进行优化,可以注册多个服务。
服务注册表
首先需要一个注册表来存放注册的服务,并且可以返回需要的服务实例。
package com.lany.rpc.registry;
public interface RpcRegistry {
<T> void register(T service);
Object getService(String serviceName);
}
对该接口进行默认的实现。
package com.lany.rpc.registry;
import com.lany.rpc.enumeration.RpcError;
import com.lany.rpc.exception.RpcException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
public class DefaultRpcRegistry implements RpcRegistry {
private static final Logger logger = LoggerFactory.getLogger(DefaultRpcRegistry.class);
private static Map<String, Object> serviceMap = new ConcurrentHashMap<>();
private static Set<String> registeredService = ConcurrentHashMap.newKeySet();
@Override
public <T> void register(T service) {
String serviceName = service.getClass().getCanonicalName();
if (registeredService.contains(serviceName)) {
return;
}
registeredService.add(serviceName);
Class<?>[] interfaces = service.getClass().getInterfaces();
if (interfaces.length == 0) {
throw new RpcException(RpcError.SERVICE_NOT_IMPLEMENT_ANY_INTERFACE);
}
for (Class<?> i : interfaces) {
serviceMap.put(i.getCanonicalName(), service);
}
}
@Override
public Object getService(String serviceName) {
Object service = serviceMap.get(serviceName);
if (service == null) {
throw new RpcException(RpcError.SERVICE_NOT_FOUND);
}
return service;
}
}
serviceMap用来存储注册进来的服务信息,registeredService用来存储已经注册过的服务,防止重复注册。因为serviceMap中存储的接口的名字,而不是实现类的名字,所以还需要一个set来记录存储过的实例信息。
同时因为服务端是通过接口调用的,因此这里将一个接口只对应一个实现类。
当通过名字获取实例时,直接将map中的value返回即可。
数据处理
上面这些完成之后就需要对RpcServer进行修改,将注册中心与RpcServer进行关联,而不是第一章中的实例与RpcServer关联。
package com.lany.rpc.server;
import com.lany.rpc.registry.RpcRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.imageio.spi.ServiceRegistry;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.*;
public class RpcServer {
private static final Logger logger = LoggerFactory.getLogger(RpcServer.class);
private final ExecutorService threadPool;
private static final int corePoolSize = 5;
private static final int maximumPoolSize = 50;
private static final long keepAliveTime = 60;
BlockingQueue<Runnable> workingQueue = new ArrayBlockingQueue<>(100);
ThreadFactory threadFactory = Executors.defaultThreadFactory();
private final RequestHandler requestHandler = new RequestHandler();
private final RpcRegistry rpcRegistry;
public RpcServer(RpcRegistry rpcRegistry) {
this.rpcRegistry = rpcRegistry;
this.threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime,
TimeUnit.SECONDS, workingQueue, threadFactory);
}
public void start(int port) {
try (ServerSocket serverSocket = new ServerSocket(port)) {
logger.info("服务器启动...");
Socket socket;
while ((socket = serverSocket.accept()) != null) {
logger.info("客户端连接!Ip为:{} port为:{}", socket.getInetAddress(), socket.getPort());
threadPool.execute(new RequestHandlerThread(socket, requestHandler, rpcRegistry));
}
threadPool.shutdown();
} catch (IOException e) {
logger.error("连接时有错误发生:", e);
}
}
}
创建RequestHandler和RequestHandleThread分别用来执行对应的方法并返回结果和创建客户端的连接传输数据。
对应代码如下:
package com.lany.rpc.server;
import com.lany.rpc.entity.RpcRequest;
import com.lany.rpc.entity.RpcResponse;
import com.lany.rpc.registry.RpcRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.imageio.spi.ServiceRegistry;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.Socket;
public class RequestHandlerThread implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(RequestHandlerThread.class);
private Socket socket;
private RequestHandler requestHandler;
private RpcRegistry rpcRegistry;
public RequestHandlerThread(Socket socket, RequestHandler requestHandler, RpcRegistry rpcRegistry) {
this.socket = socket;
this.requestHandler = requestHandler;
this.rpcRegistry = rpcRegistry;
}
@Override
public void run() {
try (ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream())) {
RpcRequest rpcRequest = (RpcRequest) objectInputStream.readObject();
Object service = rpcRegistry.getService(rpcRequest.getInterfaceName());
Object result = requestHandler.handle(rpcRequest, service);
objectOutputStream.writeObject(result);
objectOutputStream.flush();
} catch (IOException | ClassNotFoundException e) {
logger.error("调用或发送时有错误发生:", e);
}
}
}
package com.lany.rpc.server;
import com.lany.rpc.entity.RpcRequest;
import com.lany.rpc.entity.RpcResponse;
import com.lany.rpc.enumeration.ResponseCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
public class RequestHandler {
private static final Logger logger = LoggerFactory.getLogger(RequestHandler.class);
public Object handle(RpcRequest rpcRequest, Object service) {
Object result = null;
try {
result = invokeTargetMethod(rpcRequest, service);
logger.info("服务:{} 成功调用方法:{}", rpcRequest.getInterfaceName(), rpcRequest.getMethodName());
} catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) {
logger.error("调用或发送时有错误发生:", e);
}
return result;
}
private Object invokeTargetMethod(RpcRequest rpcRequest, Object service) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
Method method = null;
try {
method = service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParamTypes());
} catch (NoSuchMethodException | SecurityException e) {
return RpcResponse.fail(ResponseCode.METHOD_NOT_FOUND);
}
return RpcResponse.success(method.invoke(service, rpcRequest.getParameters()));
}
}
测试运行
客户端的测试代码没有变化,主要就是服务端的测试修改了。
package com.lany.test;
import com.lany.rpc.registry.DefaultRpcRegistry;
import com.lany.rpc.registry.RpcRegistry;
import com.lany.rpc.server.RpcServer;
public class TestServer {
public static void main(String[] args) {
HelloServiceImpl helloService = new HelloServiceImpl();
HelloServiceImpl2 helloService2 = new HelloServiceImpl2();
RpcRegistry rpcRegistry = new DefaultRpcRegistry();
rpcRegistry.register(helloService);
rpcRegistry.register(helloService2);
RpcServer rpcServer = new RpcServer(rpcRegistry);
rpcServer.start(9000);
}
}
testServer:
[main] INFO com.lany.rpc.server.RpcServer - 服务器启动...
[main] INFO com.lany.rpc.server.RpcServer - 客户端连接!Ip为:/127.0.0.1 port为:53812
[pool-1-thread-1] INFO com.lany.test.HelloServiceImpl2 - 接收到2:this is a message
[pool-1-thread-1] INFO com.lany.rpc.server.RequestHandler - 服务:com.lany.api.HelloService 成功调用方法:hello
testClient:
这时调用的返回值2,id2=12
|