IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 网络协议 -> Netty手写RPC框架 -> 正文阅读

[网络协议]Netty手写RPC框架

首发于Enaium的个人博客


协议就用上篇文章的协议

public class Message implements Serializable {
    private final long order;

    public Message(long order) {
        this.order = order;
    }

    public long getOrder() {
        return order;
    }
}

只不过Message加了个Order熟悉,

创建Request类,继承Message,klass是调用的Class目标,name,parameterType,argument分别是方法名称,参数类型,参数

public class Request extends Message {
    private final String klass;
    private final String name;
    private final Class<?>[] parameterType;
    private final Object[] argument;

    public Request(long order, String klass, String name, Class<?>[] parameterType, Object[] argument) {
        super(order);
        this.klass = klass;
        this.name = name;
        this.parameterType = parameterType;
        this.argument = argument;
    }

    public String getKlass() {
        return klass;
    }

    public String getName() {
        return name;
    }

    public Class<?>[] getParameterType() {
        return parameterType;
    }

    public Object[] getArgument() {
        return argument;
    }
}

创建Response类继承Message,result调用的结果,throwable调用的异常

public class Response extends Message {
    private final Object result;
    private final Throwable throwable;

    public Response(long order, Object result, Throwable throwable) {
        super(order);
        this.result = result;
        this.throwable = throwable;
    }

    public Object getResult() {
        return result;
    }

    public Throwable getThrowable() {
        return throwable;
    }
}

创建一个PRCHandler类,来处理请求,用反射调用即可

public class PRCHandler extends SimpleChannelInboundHandler<Request> {
    @Override
    public void channelRead0(ChannelHandlerContext channelHandlerContext, Request request) {
        try {
            Class<?> aClass = Class.forName(request.getKlass());
            Object o = aClass.getConstructor().newInstance();
            Object invoke = aClass.getMethod(request.getName(), request.getParameterType()).invoke(o, request.getArgument());
            channelHandlerContext.channel().writeAndFlush(new Response(request.getOrder(), invoke, null));
        } catch (ClassNotFoundException | InvocationTargetException | InstantiationException | IllegalAccessException | NoSuchMethodException e) {
            e.printStackTrace();
            channelHandlerContext.channel().writeAndFlush(new Response(request.getOrder(), null, e.getCause()));
        }
    }
}

接着启动服务器,服务器就这样写好了

public class RPCServer {

    private static final LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.INFO);
    private static final MessageCodec MESSAGE_CODEC = new MessageCodec();
    private static final PRCHandler PRC_HANDLER = new PRCHandler();

    public static void main(String[] args) {
        NioEventLoopGroup boss = new NioEventLoopGroup();
        NioEventLoopGroup worker = new NioEventLoopGroup();
        try {
            Channel localhost = new ServerBootstrap().group(boss, worker).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel channel) {
                    channel.pipeline().addLast(LOGGING_HANDLER);
                    channel.pipeline().addLast(MESSAGE_CODEC);
                    channel.pipeline().addLast(PRC_HANDLER);
                }
            }).bind("localhost", 3828).sync().channel();
            System.out.println("Runnable...");
            localhost.closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }
}

现在在test里测试一下,写好客户端连接,Hanlder先不用太关注

public class Main {

    private static final LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.INFO);
    private static final MessageCodec MESSAGE_CODEC = new MessageCodec();
    private static final Handler HANDLER = new Handler();

    private static Channel channel;

    public static void main(String[] args) {
        NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();
        try {
            channel = new Bootstrap().group(nioEventLoopGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel channel) {
                    channel.pipeline().addLast(LOGGING_HANDLER);
                    channel.pipeline().addLast(MESSAGE_CODEC);
                    channel.pipeline().addLast(HANDLER);
                }
            }).connect("localhost", 3828).sync().channel();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        Runtime.getRuntime().addShutdownHook(new Thread(nioEventLoopGroup::shutdownGracefully));
    }
}

创建一个Call注解,klass是目标类,name是目标类的方法

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface Call {
    String klass();

    String name();
}

现在创建一个目标类

public class Target {
    public String render() {
        return "RENDER HELLO WORLD!";
    }
}

创建个一个Service接口

public interface Service {
    @Call(klass = "cn.enaium.Target", name = "render")
    String render();
}

接着使用动态代理

@SuppressWarnings("unchecked")
private static <T> T getService(Class<T> klass) {

}

没有Call注解的返回null

Object o = Proxy.newProxyInstance(klass.getClassLoader(), new Class<?>[]{klass}, (proxy, method, args) -> {
    if (!method.isAnnotationPresent(Call.class)) {
        return null;
    }
}

使用Promise来获取结果

Object o = Proxy.newProxyInstance(klass.getClassLoader(), new Class<?>[]{klass}, (proxy, method, args) -> {
    if (!method.isAnnotationPresent(Call.class)) {
        return null;
    }
    Promise<Object> promise = new DefaultPromise<>(channel.eventLoop());
    Call annotation = method.getAnnotation(Call.class);
    long increment = Util.increment();
    channel.writeAndFlush(new Request(increment, annotation.klass(), annotation.name(), method.getParameterTypes(), args));
    Main.HANDLER.getPromiseMap().put(increment, promise);
    promise.await();
    if (promise.cause() != null) {
        return new RuntimeException(promise.cause());
    }
    return promise.getNow();
});
@SuppressWarnings("unchecked")
private static <T> T getService(Class<T> klass) {
    Object o = Proxy.newProxyInstance(klass.getClassLoader(), new Class<?>[]{klass}, (proxy, method, args) -> {
        if (!method.isAnnotationPresent(Call.class)) {
            return null;
        }
        Promise<Object> promise = new DefaultPromise<>(channel.eventLoop());
        Call annotation = method.getAnnotation(Call.class);
        long increment = Util.increment();
        channel.writeAndFlush(new Request(increment, annotation.klass(), annotation.name(), method.getParameterTypes(), args));
        Main.HANDLER.getPromiseMap().put(increment, promise);
        promise.await();
        if (promise.cause() != null) {
            return new RuntimeException(promise.cause());
        }
        return promise.getNow();
    });
    return (T) o;
}

序号自增

public class Util {
    private static final AtomicLong atomicLong = new AtomicLong();

    public static long increment() {
        return atomicLong.incrementAndGet();
    }
}

Handler来处理响应,根据请求的order获取返回值

public class Handler extends SimpleChannelInboundHandler<Response> {

    private final Map<Long, Promise<Object>> promiseMap = new HashMap<>();

    @Override
    public void channelRead0(ChannelHandlerContext channelHandlerContext, Response response) throws Exception {

        if (null == promiseMap.get(response.getOrder())) {
            return;
        }

        Promise<Object> promise = promiseMap.remove(response.getOrder());

        if (response.getResult() != null) {
            promise.setSuccess(response.getResult());
        } else {
            promise.setFailure(response.getThrowable());
        }
    }

    public Map<Long, Promise<Object>> getPromiseMap() {
        return promiseMap;
    }
}

现在来运行服务器和客户端测试一下

源码

视频

  网络协议 最新文章
使用Easyswoole 搭建简单的Websoket服务
常见的数据通信方式有哪些?
Openssl 1024bit RSA算法---公私钥获取和处
HTTPS协议的密钥交换流程
《小白WEB安全入门》03. 漏洞篇
HttpRunner4.x 安装与使用
2021-07-04
手写RPC学习笔记
K8S高可用版本部署
mySQL计算IP地址范围
上一篇文章      下一篇文章      查看所有文章
加:2022-03-15 23:04:40  更:2022-03-15 23:04:52 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/4 19:40:32-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码