IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 网络协议 -> 简易RPC框架实现——2、引人入注册表 -> 正文阅读

[网络协议]简易RPC框架实现——2、引人入注册表

本章引入了注册表来存储服务端注册的服务,并实现了对于反射调用与线程池线程解耦的工作。本章对应commit为2e1351e

在上一章我们实现了一个基于jdk序列化机制的简单BIO RPC框架。但是我么在测试中会发现,由于我们的服务是伴随着服务端开启进行注入的,那么我们每一个服务就只能注册一个服务。我们在本章就引入一个服务注册表,来实现多个服务的注册。

服务注册表

创建一个服务注册的顶级接口,里边分别有着服务注册以及发现服务的方法:

public interface ServerPublisher {

    <T> void publishService(T service);

    Object getService(String serviceName);
}

实现这个服务发现接口,其中用一个ConcurrentHashMap保存服务实现的接口即对应的服务类(你可以根绝自己的需要设计map的容量,防止注入服务过多,发生OOM),这样我们就可以根据RpcRequest中的接口名称找到我们需要调用的服务类:

public class DefaultServerPublisher implements ServerPublisher {

    private static final Logger logger = LoggerFactory.getLogger(DefaultServerPublisher.class);
    private final ConcurrentHashMap<String,Object> serviceMap = new ConcurrentHashMap<>();
    private final Set<String> registeredService = ConcurrentHashMap.newKeySet();
    @Override
    public synchronized <T> void publishService(T service) {
        String serviceName = service.getClass().getCanonicalName();
        if(serviceMap.containsKey(serviceName)) return;
        registeredService.add(serviceName);
        Class<?>[] interfaces = service.getClass().getInterfaces();
        if(interfaces.length == 0){
            throw new RpcException(RpcError.SERVICE_NOT_REGISTERED);
        }
        for (Class<?> anInterface : interfaces) {
            serviceMap.put(anInterface.getCanonicalName(),service);
        }
        logger.info("向接口:{}注册服务:{}",interfaces,serviceName);
    }

    @Override
    public synchronized Object getService(String serviceName) {
        Object o = serviceMap.get(serviceName);
        if(null == o){
            throw new RpcException(RpcError.SERVICE_NOT_FOUND);
        }
        return o;
    }
}

避免多个线程对于服务的重复注册,因此我们将其中的两个方法都是用sychronized关键字修饰。

由于加入了注册表,因此我们只需要在SocketServer的构造方法中传入这个注册表完成注册即可:

public class SocketServer implements CommonServer{

    private final ExecutorService threadPool;
    private static final Logger logger = LoggerFactory.getLogger(SocketServer.class);
    private final ServerPublisher serverPublisher;


    public SocketServer(ServerPublisher serverPublisher){
        this.serverPublisher = serverPublisher;
        int corePoolSize = 5;
        int maximumPoolSize = 50;
        int keepAliveTime = 60;
        BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(100);
        threadPool = new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime, TimeUnit.SECONDS,workQueue,Executors.defaultThreadFactory());
    }


    @Override
    public void start(int port){
        try(ServerSocket serverSocket = new ServerSocket(port)){
            logger.info("服务器成功启动。。。。");
            Socket socket;
            while((socket = serverSocket.accept()) != null){
                logger.info("连接成功,客户端ip为:" + socket.getInetAddress());
                threadPool.execute(new RequestHandlerThread(new RequestHandler(),serverPublisher,socket));
            }
            threadPool.shutdown();
        }catch (IOException e){
            logger.error("服务器启动时发生错误:",e);
        }
    }
}

反射解耦

对于线程池部分,我们也可以进行优化,对于需要使用反射调用的部分,我们抽象出一个RequestHandler进行单独处理,之后得到的结果再返回给线程池处理:

public class RequestHandler {

    private static final Logger logger = LoggerFactory.getLogger(RequestHandler.class);

    public Object handle(RpcRequest rpcRequest,Object service){
        Object result = null;
        try{
            result = doMethod(rpcRequest,service);
        }catch (InvocationTargetException|IllegalAccessException e){
            logger.error("反射调用时发生错误:",e);
        }
        return result;
    }

    private Object doMethod(RpcRequest rpcRequest , Object service) throws InvocationTargetException, IllegalAccessException {
        Method method;
        try{
            method = service.getClass().getMethod(rpcRequest.getMethodName(),rpcRequest.getParameterTypes());
        }catch (NoSuchMethodException e){
            logger.error("调用方法时有错误发生:",e);
            return RpcResponse.fail(rpcRequest.getRequestId());
        }
        return method.invoke(service,rpcRequest.getParameters());
    }
}

线程中就只需要处理IO操作即可:

public class RequestHandlerThread implements Runnable{

    private static final Logger logger = LoggerFactory.getLogger(RequestHandlerThread.class);

    private final RequestHandler handler;
    private final ServerPublisher serverPublisher;
    private final Socket socket;

    public RequestHandlerThread(RequestHandler handler,ServerPublisher serverPublisher,Socket socket){
        this.handler = handler;
        this.serverPublisher = serverPublisher;
        this.socket = socket;
    }

    @Override
    public void run() {
        try(ObjectInputStream ois = new ObjectInputStream(socket.getInputStream());
            ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream())){
            RpcRequest rpcRequest = (RpcRequest) ois.readObject();
            Object service = serverPublisher.getService(rpcRequest.getInterfaceName());
            Object result = handler.handle(rpcRequest, service);
            oos.writeObject(RpcResponse.success(result,rpcRequest.getRequestId()));
            oos.flush();
        }catch (IOException | ClassNotFoundException e){
            logger.error("调用或发送时发生错误:",e);
        }
    }
}

我们只是修改了服务端的调用逻辑,客户端无需进行任何修改。

测试

客户端代码无需改动,服务端的代码如下:

public class SocketServerTest {
    public static void main(String[] args) {
        ServerPublisher serverPublisher = new DefaultServerPublisher();
        HelloService helloService = new HelloServiceImpl();
        serverPublisher.publishService(helloService);
        SocketServer socketServer = new SocketServer(serverPublisher);
        socketServer.start(9000);
    }
}

运行后得到的结果,客户端:

这是id为:1发送的:This is SocketClient!

服务端:

[main] INFO cn.fzzfrjf.core.DefaultServerPublisher - 向接口:[interface cn.fzzfrjf.entity.HelloService]注册服务:cn.fzzfrjf.service.HelloServiceImpl
[main] INFO cn.fzzfrjf.core.SocketServer - 服务器成功启动。。。。
[main] INFO cn.fzzfrjf.core.SocketServer - 连接成功,客户端ip为:/127.0.0.1
  网络协议 最新文章
使用Easyswoole 搭建简单的Websoket服务
常见的数据通信方式有哪些?
Openssl 1024bit RSA算法---公私钥获取和处
HTTPS协议的密钥交换流程
《小白WEB安全入门》03. 漏洞篇
HttpRunner4.x 安装与使用
2021-07-04
手写RPC学习笔记
K8S高可用版本部署
mySQL计算IP地址范围
上一篇文章      下一篇文章      查看所有文章
加:2022-05-24 18:32:39  更:2022-05-24 18:34:01 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/12 13:14:23-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码