IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 网络协议 -> 用Netty制作的RPC框架 -> 正文阅读

[网络协议]用Netty制作的RPC框架

服务器端

package com.zhao.server;

import com.zhao.protocol.MessageCodecSharable;
import com.zhao.protocol.ProcotolFrameDecoder;
import com.zhao.server.handler.RpcRequestMessageHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class RpcServer {
    public static void main(String[] args) {
        NioEventLoopGroup boss = new NioEventLoopGroup();
        NioEventLoopGroup worker = new NioEventLoopGroup();
        LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
        MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
        RpcRequestMessageHandler RPC_HANDLER = new RpcRequestMessageHandler();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.channel(NioServerSocketChannel.class);
            serverBootstrap.group(boss, worker);
            serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new ProcotolFrameDecoder());
                    ch.pipeline().addLast(LOGGING_HANDLER);
                    ch.pipeline().addLast(MESSAGE_CODEC);
                    ch.pipeline().addLast(RPC_HANDLER);
                }
            });
            Channel channel = serverBootstrap.bind(8080).sync().channel();
            channel.closeFuture().sync();
        } catch (InterruptedException e) {
            log.error("server error", e);
        } finally {
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }
}

服务器端 Handler

package com.zhao.server.handler;

import com.zhao.message.RpcRequestMessage;
import com.zhao.message.RpcResponseMessage;
import com.zhao.server.service.HelloService;
import com.zhao.server.service.ServicesFactory;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;

@Slf4j
@ChannelHandler.Sharable
public class RpcRequestMessageHandler extends SimpleChannelInboundHandler<RpcRequestMessage> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RpcRequestMessage message) {

        RpcResponseMessage response = new RpcResponseMessage();

        try {
            HelloService service = (HelloService) ServicesFactory.getService(Class.forName(message.getInterfaceName()));
            Method method = service.getClass().getMethod(message.getMethodName(), message.getParameterTypes());
            Object invoke = method.invoke(service, message.getParameterValue());

            response.setReturnValue(invoke);
        } catch (Exception e) {
            e.printStackTrace();
            response.setExceptionValue(e);
        }

        ctx.writeAndFlush(response);

    }

    public static void main(String[] args) throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IllegalAccessException {
        RpcRequestMessage message = new RpcRequestMessage(
                1,
                "com.zhao.server.service.HelloService",
                "sayHello",
                String.class,
                new Class[]{String.class},
                new Object[]{"张三"}
        );
        
    }
}

客户端

package com.zhao.client;

import com.zhao.client.handler.RpcResponseMessageHandler;
import com.zhao.message.RpcRequestMessage;
import com.zhao.protocol.MessageCodecSharable;
import com.zhao.protocol.ProcotolFrameDecoder;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class RpcClient {
    public static void main(String[] args) {
        NioEventLoopGroup group = new NioEventLoopGroup();
        LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
        MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
        RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.group(group);
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new ProcotolFrameDecoder());
                    ch.pipeline().addLast(LOGGING_HANDLER);
                    ch.pipeline().addLast(MESSAGE_CODEC);
                    ch.pipeline().addLast(RPC_HANDLER);
                }
            });
            Channel channel = bootstrap.connect("localhost", 8080).sync().channel();

            // 向服务器发送消息
            channel.writeAndFlush(new RpcRequestMessage(
                    1,
                    "com.zhao.server.service.HelloService",
                    "sayHello",
                    String.class,
                    new Class[]{String.class},
                    new Object[]{"张三"}
            ));

            channel.closeFuture().sync();
        } catch (Exception e) {
            log.error("client error", e);
        } finally {
            group.shutdownGracefully();
        }
    }
}

客户端向服务器请求消息,服务器向客户端返回结果

编解码用的是JSON

客户端控制台,结果并没有打印出来

添加异步监听

// 向服务器发送消息
ChannelFuture future = channel.writeAndFlush(new RpcRequestMessage(
        1,
        "com.zhao.server.service.HelloService",
        "sayHello",
        String.class,
        new Class[]{String.class},
        new Object[]{"张三"}
)).addListener(promise -> {
    // 如果返回的结果并不是成功的
    if (!promise.isSuccess()) {

        // 得到错误消息
        Throwable cause = promise.cause();
        // 打印日志
        log.error("error", cause);
    }
});

重新启动服务器和客户端

客户端打印结果

是类型转换异常

?

String 类型gson不会转换

解决方式

方式1,将编解码换成 JDK就可解决

方式2,自定义转换类型,修改编解码的代码

package com.zhao.protocol.impl;

import com.google.gson.*;
import com.zhao.message.Message;
import com.zhao.protocol.Serializer;
import jdk.nashorn.internal.runtime.JSONFunctions;

import java.io.*;
import java.lang.reflect.Type;
import java.nio.charset.StandardCharsets;

/**
 * @Auther: HackerZhao
 * @Date: 2021/11/24 15:11
 * @Description: 枚举类,用来实现序列化算法
 */
@SuppressWarnings({"all"})
public enum Algorithm implements Serializer {
    JAVA {
        // 解码
        @Override
        public <T> T deserialize(Class<T> clazz, byte[] bytes) {
            try {
                ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
                return (T) ois.readObject();
            } catch (IOException | ClassNotFoundException e) {
                throw new RuntimeException("反序列化失败", e);
            }
        }

        @Override
        public <T> byte[] serialize(T object) {

            try {
                ByteArrayOutputStream bos = new ByteArrayOutputStream();
                ObjectOutputStream oos = new ObjectOutputStream(bos);
                oos.writeObject(object);
                return bos.toByteArray();

            } catch (IOException e) {
                throw new RuntimeException("序列化失败", e);
            }
        }
    },
    JSON {
        // 解码
        @Override
        public <T> T deserialize(Class<T> clazz, byte[] bytes) {

            Gson gson = new GsonBuilder().registerTypeAdapter(Class.class, new Algorithm.ClassCodec()).create();

            // 把 byte 数组转成 JSON格式的字符串
            String json = new String(bytes, StandardCharsets.UTF_8);

            // 把JSON格式的字符串 转成对象
            return gson.fromJson(json, clazz);
        }

        @Override
        public <T> byte[] serialize(T object) {

            Gson gson = new GsonBuilder().registerTypeAdapter(Class.class, new Algorithm.ClassCodec()).create();

            // 使用 Gson 把对象转成 JSON格式的字符串
            String json = gson.toJson(object);

            // 把 JSON格式的字符串 转成数组
            return json.getBytes(StandardCharsets.UTF_8);

        }
    };

    public static class ClassCodec implements JsonSerializer<Class<?>>, JsonDeserializer<Class<?>> {

        @Override
        public Class<?> deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) throws JsonParseException {
            try {
                String str = json.getAsString();
                return Class.forName(str);
            } catch (ClassNotFoundException e) {
                throw new JsonParseException(e);
            }
        }

        @Override             //   String.class
        public JsonElement serialize(Class<?> src, Type typeOfSrc, JsonSerializationContext context) {
            // class -> json
            return new JsonPrimitive(src.getName());
        }
    }

}

打印结果

上面的发送消息的 channel 已经被嵌入在代码中了,发送消息不应该是这样的,应该是客户端请求一个接口名,通过服务器中的接口调用接口实现类中的方法,channel 是用来发送数据的,每次发送数据共用一个channel就行。使用 单例模式 创建 channel

package com.zhao.client;

import com.zhao.client.handler.RpcResponseMessageHandler;
import com.zhao.message.RpcRequestMessage;
import com.zhao.protocol.MessageCodecSharable;
import com.zhao.protocol.ProcotolFrameDecoder;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;

/**
 * @Auther: HackerZhao
 * @Date: 2021/12/1 18:23
 * @Description: 创建一个 channel,谁想向服务器发送数据,拿着这个 channle去发送
 */
@Slf4j
public class RpcClientManager {

    private static Channel channel = null;
    private static volatile Object obj = new Object();

    // 测试改造后的单例模式,消息能否发送出去
    public static void main(String[] args) {
        getChannel().writeAndFlush(new RpcRequestMessage(
                1,
                "com.zhao.server.service.HelloService",
                "sayHello",
                String.class,
                new Class[]{String.class},
                new Object[]{"张三"}
        ));
    }

    // 添加单例方法,只创建一个 channel
    public static Channel getChannel() {

        if (channel != null) {
            return channel;
        }
        // 创建 channel
        synchronized (obj) {
            if (channel == null) {
                // 创建 channel
                initChannel();
            }
            return channel;
        }
    }

    // 初始化 channel方法
    private static void initChannel() {
        NioEventLoopGroup group = new NioEventLoopGroup();
        LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
        MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
        RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler();

        Bootstrap bootstrap = new Bootstrap();
        bootstrap.channel(NioSocketChannel.class);
        bootstrap.group(group);
        bootstrap.handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline().addLast(new ProcotolFrameDecoder());
                ch.pipeline().addLast(LOGGING_HANDLER);
                ch.pipeline().addLast(MESSAGE_CODEC);
                ch.pipeline().addLast(RPC_HANDLER);
            }
        });
        try {
            channel = bootstrap.connect("localhost", 8080).sync().channel();

            // 不能使用 sync() 方法将 main线程阻塞在这里,这样不到关闭 channel,就永远阻塞在这里。应该使用异步关闭
            // channel.closeFuture().sync();
            channel.closeFuture().addListener(future -> {
                group.shutdownGracefully();
            });
        } catch (Exception e) {
            log.error("client error", e);
        }
    }
}

上面客户端的虽然已经达到 RPC 的远程调用,但每次调用接口,就要修改客户端的代码信息,我们更期望的是在客户端拿到一个接口,通过接口调用它的实现方法,可以通过代理来实现

package com.zhao.client;

import com.zhao.client.handler.RpcResponseMessageHandler;
import com.zhao.message.RpcRequestMessage;
import com.zhao.protocol.MessageCodecSharable;
import com.zhao.protocol.ProcotolFrameDecoder;
import com.zhao.protocol.SequenceIdGenerator;
import com.zhao.server.service.HelloService;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;

import java.lang.reflect.Proxy;

/**
 * @Auther: HackerZhao
 * @Date: 2021/12/1 18:23
 * @Description: 创建一个 channel,谁想向服务器发送数据,拿着这个 channle去发送
 */
@Slf4j
public class RpcClientManager {

    private static Channel channel = null;
    private static volatile Object obj = new Object();

    // 测试改造后的单例模式,消息能否发送出去
    public static void main(String[] args) {
        HelloService helloService = getProxyService(HelloService.class);
        helloService.sayHello("zhangsan");
        helloService.sayHello("lisi");
        helloService.sayHello("wangwu");
    }

    // 创建代理类
    // 参数,当传入一个 接口.class,就会返回一个实现该接口的对象
    public static <T> T getProxyService(Class<T> serviceClass) {

        // 使用 JDK自带的代理创建代理对象
        ClassLoader classLoader = serviceClass.getClassLoader();

        // 返回这个接口实现的所有接口,这样写是不对的,如果有目标对象的话,就是返回这个目标对象实现的所有接口
        // 错误写法 Class<?>[] interfaces1 = serviceClass.getInterfaces();
        Class<?>[] interfaces = new Class[]{serviceClass};

        Object o = Proxy.newProxyInstance(classLoader, interfaces, (proxy, method, args) -> {
            // 1、代理类需要执行的操作
            RpcRequestMessage message = new RpcRequestMessage(
                    SequenceIdGenerator.nextId(),
                    serviceClass.getName(),
                    method.getName(),
                    method.getReturnType(),
                    method.getParameterTypes(),
                    args
            );
            // 2、发送消息
            getChannel().writeAndFlush(message);

            // 3、接收消息
            return null;
        });

        return (T) o;
    }

    // 添加单例方法,只创建一个 channel
    public static Channel getChannel() {

        if (channel != null) {
            return channel;
        }
        // 创建 channel
        synchronized (obj) {
            if (channel == null) {
                // 创建 channel
                initChannel();
            }
            return channel;
        }
    }

    // 初始化 channel方法
    private static void initChannel() {
        NioEventLoopGroup group = new NioEventLoopGroup();
        LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
        MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
        RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler();

        Bootstrap bootstrap = new Bootstrap();
        bootstrap.channel(NioSocketChannel.class);
        bootstrap.group(group);
        bootstrap.handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline().addLast(new ProcotolFrameDecoder());
                ch.pipeline().addLast(LOGGING_HANDLER);
                ch.pipeline().addLast(MESSAGE_CODEC);
                ch.pipeline().addLast(RPC_HANDLER);
            }
        });
        try {
            channel = bootstrap.connect("localhost", 8080).sync().channel();

            // 不能使用 sync() 方法将 main线程阻塞在这里,这样不到关闭 channel,就永远阻塞在这里。应该使用异步关闭
            // channel.closeFuture().sync();
            channel.closeFuture().addListener(future -> {
                group.shutdownGracefully();
            });
        } catch (Exception e) {
            log.error("client error", e);
        }
    }
}

返回结果

缺点,以上虽然得到返回消息,但也是在 RpcResponseMessageHandler 中得到的,现在想在客户端消息发送后就等待响应结果(main线程中得到结果),就涉及到两个线程的通信问题,(nio线程与 main线程的相互通信)

添加装盛结果的容器

package com.zhao.client.handler;

import com.zhao.message.RpcResponseMessage;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.concurrent.Promise;
import lombok.extern.slf4j.Slf4j;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@Slf4j
@ChannelHandler.Sharable
public class RpcResponseMessageHandler extends SimpleChannelInboundHandler<RpcResponseMessage> {

    public static final Map<Integer,Promise<?>> PROMISE = new ConcurrentHashMap<>();

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception {
        log.debug("{}", msg);

    }
}

每次发送完消息后,开启一个新线程,等待客户端响应结果,等结果返回后,再把结果放入PROMISE容器中,放开正在等待的 nio线程

// 3、准备一个空 Promise 容器,来接收结果             指定 Promise 对象接收结果的线程
DefaultPromise<Object> promise = new DefaultPromise<>(getChannel().eventLoop());
RpcResponseMessageHandler.PROMISE.put(sequenceId,promise);

// 4、等待 promise结果
promise.await();  // await 会让这个 main线程陷入等待,不会立马返回,直到响应结果回来。它和 sync相比不会抛出异常

// 5、处理结果
if (promise.isSuccess()){

    // 调用正常
    return promise.getNow();
}else {
    // 出现异常
    throw new RuntimeException(promise.cause());
}

handler中的代码

package com.zhao.client.handler;

import com.zhao.message.RpcResponseMessage;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.concurrent.Promise;
import lombok.extern.slf4j.Slf4j;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@Slf4j
@ChannelHandler.Sharable
public class RpcResponseMessageHandler extends SimpleChannelInboundHandler<RpcResponseMessage> {

    public static final Map<Integer, Promise<Object>> PROMISE = new ConcurrentHashMap<>();

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception {
        log.debug("{}", msg);
        Promise<Object> promise = PROMISE.remove(msg.getSequenceId());

        if (promise != null) {
            Object returnValue = msg.getReturnValue();
            Exception exceptionValue = msg.getExceptionValue();

            // 不管成功,还是异常,都会放开正在等待的 main线程
            if (exceptionValue != null) {
                promise.setFailure(exceptionValue);
            }else {
                promise.setSuccess(returnValue);
            }

        }

    }
}

当?promise 执行?setFailure 或?setSuccess 方法时会唤醒?promise.await() 方法,此时对象信息已经存放到?promise 容器中了,nio线程只管获取就行

客户端完整代码

package com.zhao.client;

import com.zhao.client.handler.RpcResponseMessageHandler;
import com.zhao.message.RpcRequestMessage;
import com.zhao.protocol.MessageCodecSharable;
import com.zhao.protocol.ProcotolFrameDecoder;
import com.zhao.protocol.SequenceIdGenerator;
import com.zhao.server.service.HelloService;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.concurrent.DefaultPromise;
import lombok.extern.slf4j.Slf4j;

import java.lang.reflect.Proxy;

/**
 * @Auther: HackerZhao
 * @Date: 2021/12/1 18:23
 * @Description: 创建一个 channel,谁想向服务器发送数据,拿着这个 channle去发送
 */
@Slf4j
public class RpcClientManager {

    private static Channel channel = null;
    private static volatile Object obj = new Object();

    // 测试改造后的单例模式,消息能否发送出去
    public static void main(String[] args) {
        HelloService helloService = getProxyService(HelloService.class);
        System.out.println(helloService.sayHello("zhangsan"));
        System.out.println(helloService.sayHello("lisi"));
        System.out.println(helloService.sayHello("wangwu"));
    }

    // 创建代理类
    // 参数,当传入一个 接口.class,就会返回一个实现该接口的对象
    public static <T> T getProxyService(Class<T> serviceClass) {

        // 使用 JDK自带的代理创建代理对象
        ClassLoader classLoader = serviceClass.getClassLoader();

        // 返回这个接口实现的所有接口,这样写是不对的,如果有目标对象的话,就是返回这个目标对象实现的所有接口
        // 错误写法 Class<?>[] interfaces1 = serviceClass.getInterfaces();
        Class<?>[] interfaces = new Class[]{serviceClass};

        Object o = Proxy.newProxyInstance(classLoader, interfaces, (proxy, method, args) -> {
            int sequenceId = SequenceIdGenerator.nextId();
            // 1、代理类需要执行的操作
            RpcRequestMessage message = new RpcRequestMessage(
                    sequenceId,
                    serviceClass.getName(),
                    method.getName(),
                    method.getReturnType(),
                    method.getParameterTypes(),
                    args
            );
            // 2、发送消息
            getChannel().writeAndFlush(message);

            // 3、准备一个空 Promise 容器,来接收结果             指定 Promise 对象接收结果的线程
            DefaultPromise<Object> promise = new DefaultPromise<>(getChannel().eventLoop());
            RpcResponseMessageHandler.PROMISE.put(sequenceId,promise);

            // 4、等待 promise结果
            promise.await();  // await 会让这个 main线程陷入等待,不会立马返回,直到响应结果回来。它和 sync相比不会抛出异常

            // 5、处理结果
            if (promise.isSuccess()){

                // 调用正常
                return promise.getNow();
            }else {
                // 出现异常
                throw new RuntimeException(promise.cause());
            }

        });

        return (T) o;
    }

    // 添加单例方法,只创建一个 channel
    public static Channel getChannel() {

        if (channel != null) {
            return channel;
        }
        // 创建 channel
        synchronized (obj) {
            if (channel == null) {
                // 创建 channel
                initChannel();
            }
            return channel;
        }
    }

    // 初始化 channel方法
    private static void initChannel() {
        NioEventLoopGroup group = new NioEventLoopGroup();
        LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
        MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
        RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler();

        Bootstrap bootstrap = new Bootstrap();
        bootstrap.channel(NioSocketChannel.class);
        bootstrap.group(group);
        bootstrap.handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline().addLast(new ProcotolFrameDecoder());
                ch.pipeline().addLast(LOGGING_HANDLER);
                ch.pipeline().addLast(MESSAGE_CODEC);
                ch.pipeline().addLast(RPC_HANDLER);
            }
        });
        try {
            channel = bootstrap.connect("localhost", 8080).sync().channel();

            // 不能使用 sync() 方法将 main线程阻塞在这里,这样不到关闭 channel,就永远阻塞在这里。应该使用异步关闭
            // channel.closeFuture().sync();
            channel.closeFuture().addListener(future -> {
                group.shutdownGracefully();
            });
        } catch (Exception e) {
            log.error("client error", e);
        }
    }
}

mian线程处理完张三,再处理李四,再处理王五,handler处理器是nio线程获取的结果(日志上面有显示)

当异常发生时,当服务器端出现异常,模拟客户端接收到异常的情景

服务器的异常

package com.zhao.server.service;

public class HelloServiceImpl implements HelloService {
    @Override
    public String sayHello(String msg) {
        int i = 1 / 0;
        return "你好, " + msg + " from:服务器";
    }
}

重启客户端和服务器,客户端打印的异常

LengthFieldBasedFrameDecoder 这个类打印的异常,桢大小太小导致的,服务器发送了较多的数据

服务器端打印


服务器把堆栈,对象信息全部发送给了客户端,有?11542 个字节,远远大于 1024,所以才会出现桢长度太小

解决方式

?

完整代码

密码:2nnm

  网络协议 最新文章
使用Easyswoole 搭建简单的Websoket服务
常见的数据通信方式有哪些?
Openssl 1024bit RSA算法---公私钥获取和处
HTTPS协议的密钥交换流程
《小白WEB安全入门》03. 漏洞篇
HttpRunner4.x 安装与使用
2021-07-04
手写RPC学习笔记
K8S高可用版本部署
mySQL计算IP地址范围
上一篇文章      下一篇文章      查看所有文章
加:2021-12-04 13:46:48  更:2021-12-04 13:46:57 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/8 5:12:04-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码