服务器端
package com.zhao.server;
import com.zhao.protocol.MessageCodecSharable;
import com.zhao.protocol.ProcotolFrameDecoder;
import com.zhao.server.handler.RpcRequestMessageHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class RpcServer {
public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
RpcRequestMessageHandler RPC_HANDLER = new RpcRequestMessageHandler();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.group(boss, worker);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ProcotolFrameDecoder());
ch.pipeline().addLast(LOGGING_HANDLER);
ch.pipeline().addLast(MESSAGE_CODEC);
ch.pipeline().addLast(RPC_HANDLER);
}
});
Channel channel = serverBootstrap.bind(8080).sync().channel();
channel.closeFuture().sync();
} catch (InterruptedException e) {
log.error("server error", e);
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
服务器端 Handler
package com.zhao.server.handler;
import com.zhao.message.RpcRequestMessage;
import com.zhao.message.RpcResponseMessage;
import com.zhao.server.service.HelloService;
import com.zhao.server.service.ServicesFactory;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
@Slf4j
@ChannelHandler.Sharable
public class RpcRequestMessageHandler extends SimpleChannelInboundHandler<RpcRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcRequestMessage message) {
RpcResponseMessage response = new RpcResponseMessage();
try {
HelloService service = (HelloService) ServicesFactory.getService(Class.forName(message.getInterfaceName()));
Method method = service.getClass().getMethod(message.getMethodName(), message.getParameterTypes());
Object invoke = method.invoke(service, message.getParameterValue());
response.setReturnValue(invoke);
} catch (Exception e) {
e.printStackTrace();
response.setExceptionValue(e);
}
ctx.writeAndFlush(response);
}
public static void main(String[] args) throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IllegalAccessException {
RpcRequestMessage message = new RpcRequestMessage(
1,
"com.zhao.server.service.HelloService",
"sayHello",
String.class,
new Class[]{String.class},
new Object[]{"张三"}
);
}
}
客户端
package com.zhao.client;
import com.zhao.client.handler.RpcResponseMessageHandler;
import com.zhao.message.RpcRequestMessage;
import com.zhao.protocol.MessageCodecSharable;
import com.zhao.protocol.ProcotolFrameDecoder;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class RpcClient {
public static void main(String[] args) {
NioEventLoopGroup group = new NioEventLoopGroup();
LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(group);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ProcotolFrameDecoder());
ch.pipeline().addLast(LOGGING_HANDLER);
ch.pipeline().addLast(MESSAGE_CODEC);
ch.pipeline().addLast(RPC_HANDLER);
}
});
Channel channel = bootstrap.connect("localhost", 8080).sync().channel();
// 向服务器发送消息
channel.writeAndFlush(new RpcRequestMessage(
1,
"com.zhao.server.service.HelloService",
"sayHello",
String.class,
new Class[]{String.class},
new Object[]{"张三"}
));
channel.closeFuture().sync();
} catch (Exception e) {
log.error("client error", e);
} finally {
group.shutdownGracefully();
}
}
}
客户端向服务器请求消息,服务器向客户端返回结果
编解码用的是JSON
客户端控制台,结果并没有打印出来
添加异步监听
// 向服务器发送消息
ChannelFuture future = channel.writeAndFlush(new RpcRequestMessage(
1,
"com.zhao.server.service.HelloService",
"sayHello",
String.class,
new Class[]{String.class},
new Object[]{"张三"}
)).addListener(promise -> {
// 如果返回的结果并不是成功的
if (!promise.isSuccess()) {
// 得到错误消息
Throwable cause = promise.cause();
// 打印日志
log.error("error", cause);
}
});
重新启动服务器和客户端
客户端打印结果
是类型转换异常
?
String 类型gson不会转换
解决方式
方式1,将编解码换成 JDK就可解决
方式2,自定义转换类型,修改编解码的代码
package com.zhao.protocol.impl;
import com.google.gson.*;
import com.zhao.message.Message;
import com.zhao.protocol.Serializer;
import jdk.nashorn.internal.runtime.JSONFunctions;
import java.io.*;
import java.lang.reflect.Type;
import java.nio.charset.StandardCharsets;
/**
* @Auther: HackerZhao
* @Date: 2021/11/24 15:11
* @Description: 枚举类,用来实现序列化算法
*/
@SuppressWarnings({"all"})
public enum Algorithm implements Serializer {
JAVA {
// 解码
@Override
public <T> T deserialize(Class<T> clazz, byte[] bytes) {
try {
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
return (T) ois.readObject();
} catch (IOException | ClassNotFoundException e) {
throw new RuntimeException("反序列化失败", e);
}
}
@Override
public <T> byte[] serialize(T object) {
try {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(object);
return bos.toByteArray();
} catch (IOException e) {
throw new RuntimeException("序列化失败", e);
}
}
},
JSON {
// 解码
@Override
public <T> T deserialize(Class<T> clazz, byte[] bytes) {
Gson gson = new GsonBuilder().registerTypeAdapter(Class.class, new Algorithm.ClassCodec()).create();
// 把 byte 数组转成 JSON格式的字符串
String json = new String(bytes, StandardCharsets.UTF_8);
// 把JSON格式的字符串 转成对象
return gson.fromJson(json, clazz);
}
@Override
public <T> byte[] serialize(T object) {
Gson gson = new GsonBuilder().registerTypeAdapter(Class.class, new Algorithm.ClassCodec()).create();
// 使用 Gson 把对象转成 JSON格式的字符串
String json = gson.toJson(object);
// 把 JSON格式的字符串 转成数组
return json.getBytes(StandardCharsets.UTF_8);
}
};
public static class ClassCodec implements JsonSerializer<Class<?>>, JsonDeserializer<Class<?>> {
@Override
public Class<?> deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) throws JsonParseException {
try {
String str = json.getAsString();
return Class.forName(str);
} catch (ClassNotFoundException e) {
throw new JsonParseException(e);
}
}
@Override // String.class
public JsonElement serialize(Class<?> src, Type typeOfSrc, JsonSerializationContext context) {
// class -> json
return new JsonPrimitive(src.getName());
}
}
}
打印结果
上面的发送消息的 channel 已经被嵌入在代码中了,发送消息不应该是这样的,应该是客户端请求一个接口名,通过服务器中的接口调用接口实现类中的方法,channel 是用来发送数据的,每次发送数据共用一个channel就行。使用 单例模式 创建 channel
package com.zhao.client;
import com.zhao.client.handler.RpcResponseMessageHandler;
import com.zhao.message.RpcRequestMessage;
import com.zhao.protocol.MessageCodecSharable;
import com.zhao.protocol.ProcotolFrameDecoder;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
/**
* @Auther: HackerZhao
* @Date: 2021/12/1 18:23
* @Description: 创建一个 channel,谁想向服务器发送数据,拿着这个 channle去发送
*/
@Slf4j
public class RpcClientManager {
private static Channel channel = null;
private static volatile Object obj = new Object();
// 测试改造后的单例模式,消息能否发送出去
public static void main(String[] args) {
getChannel().writeAndFlush(new RpcRequestMessage(
1,
"com.zhao.server.service.HelloService",
"sayHello",
String.class,
new Class[]{String.class},
new Object[]{"张三"}
));
}
// 添加单例方法,只创建一个 channel
public static Channel getChannel() {
if (channel != null) {
return channel;
}
// 创建 channel
synchronized (obj) {
if (channel == null) {
// 创建 channel
initChannel();
}
return channel;
}
}
// 初始化 channel方法
private static void initChannel() {
NioEventLoopGroup group = new NioEventLoopGroup();
LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler();
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(group);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ProcotolFrameDecoder());
ch.pipeline().addLast(LOGGING_HANDLER);
ch.pipeline().addLast(MESSAGE_CODEC);
ch.pipeline().addLast(RPC_HANDLER);
}
});
try {
channel = bootstrap.connect("localhost", 8080).sync().channel();
// 不能使用 sync() 方法将 main线程阻塞在这里,这样不到关闭 channel,就永远阻塞在这里。应该使用异步关闭
// channel.closeFuture().sync();
channel.closeFuture().addListener(future -> {
group.shutdownGracefully();
});
} catch (Exception e) {
log.error("client error", e);
}
}
}
上面客户端的虽然已经达到 RPC 的远程调用,但每次调用接口,就要修改客户端的代码信息,我们更期望的是在客户端拿到一个接口,通过接口调用它的实现方法,可以通过代理来实现
package com.zhao.client;
import com.zhao.client.handler.RpcResponseMessageHandler;
import com.zhao.message.RpcRequestMessage;
import com.zhao.protocol.MessageCodecSharable;
import com.zhao.protocol.ProcotolFrameDecoder;
import com.zhao.protocol.SequenceIdGenerator;
import com.zhao.server.service.HelloService;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
import java.lang.reflect.Proxy;
/**
* @Auther: HackerZhao
* @Date: 2021/12/1 18:23
* @Description: 创建一个 channel,谁想向服务器发送数据,拿着这个 channle去发送
*/
@Slf4j
public class RpcClientManager {
private static Channel channel = null;
private static volatile Object obj = new Object();
// 测试改造后的单例模式,消息能否发送出去
public static void main(String[] args) {
HelloService helloService = getProxyService(HelloService.class);
helloService.sayHello("zhangsan");
helloService.sayHello("lisi");
helloService.sayHello("wangwu");
}
// 创建代理类
// 参数,当传入一个 接口.class,就会返回一个实现该接口的对象
public static <T> T getProxyService(Class<T> serviceClass) {
// 使用 JDK自带的代理创建代理对象
ClassLoader classLoader = serviceClass.getClassLoader();
// 返回这个接口实现的所有接口,这样写是不对的,如果有目标对象的话,就是返回这个目标对象实现的所有接口
// 错误写法 Class<?>[] interfaces1 = serviceClass.getInterfaces();
Class<?>[] interfaces = new Class[]{serviceClass};
Object o = Proxy.newProxyInstance(classLoader, interfaces, (proxy, method, args) -> {
// 1、代理类需要执行的操作
RpcRequestMessage message = new RpcRequestMessage(
SequenceIdGenerator.nextId(),
serviceClass.getName(),
method.getName(),
method.getReturnType(),
method.getParameterTypes(),
args
);
// 2、发送消息
getChannel().writeAndFlush(message);
// 3、接收消息
return null;
});
return (T) o;
}
// 添加单例方法,只创建一个 channel
public static Channel getChannel() {
if (channel != null) {
return channel;
}
// 创建 channel
synchronized (obj) {
if (channel == null) {
// 创建 channel
initChannel();
}
return channel;
}
}
// 初始化 channel方法
private static void initChannel() {
NioEventLoopGroup group = new NioEventLoopGroup();
LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler();
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(group);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ProcotolFrameDecoder());
ch.pipeline().addLast(LOGGING_HANDLER);
ch.pipeline().addLast(MESSAGE_CODEC);
ch.pipeline().addLast(RPC_HANDLER);
}
});
try {
channel = bootstrap.connect("localhost", 8080).sync().channel();
// 不能使用 sync() 方法将 main线程阻塞在这里,这样不到关闭 channel,就永远阻塞在这里。应该使用异步关闭
// channel.closeFuture().sync();
channel.closeFuture().addListener(future -> {
group.shutdownGracefully();
});
} catch (Exception e) {
log.error("client error", e);
}
}
}
返回结果
缺点,以上虽然得到返回消息,但也是在 RpcResponseMessageHandler 中得到的,现在想在客户端消息发送后就等待响应结果(main线程中得到结果),就涉及到两个线程的通信问题,(nio线程与 main线程的相互通信)
添加装盛结果的容器
package com.zhao.client.handler;
import com.zhao.message.RpcResponseMessage;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.concurrent.Promise;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
@ChannelHandler.Sharable
public class RpcResponseMessageHandler extends SimpleChannelInboundHandler<RpcResponseMessage> {
public static final Map<Integer,Promise<?>> PROMISE = new ConcurrentHashMap<>();
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception {
log.debug("{}", msg);
}
}
每次发送完消息后,开启一个新线程,等待客户端响应结果,等结果返回后,再把结果放入PROMISE容器中,放开正在等待的 nio线程
// 3、准备一个空 Promise 容器,来接收结果 指定 Promise 对象接收结果的线程
DefaultPromise<Object> promise = new DefaultPromise<>(getChannel().eventLoop());
RpcResponseMessageHandler.PROMISE.put(sequenceId,promise);
// 4、等待 promise结果
promise.await(); // await 会让这个 main线程陷入等待,不会立马返回,直到响应结果回来。它和 sync相比不会抛出异常
// 5、处理结果
if (promise.isSuccess()){
// 调用正常
return promise.getNow();
}else {
// 出现异常
throw new RuntimeException(promise.cause());
}
handler中的代码
package com.zhao.client.handler;
import com.zhao.message.RpcResponseMessage;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.concurrent.Promise;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
@ChannelHandler.Sharable
public class RpcResponseMessageHandler extends SimpleChannelInboundHandler<RpcResponseMessage> {
public static final Map<Integer, Promise<Object>> PROMISE = new ConcurrentHashMap<>();
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception {
log.debug("{}", msg);
Promise<Object> promise = PROMISE.remove(msg.getSequenceId());
if (promise != null) {
Object returnValue = msg.getReturnValue();
Exception exceptionValue = msg.getExceptionValue();
// 不管成功,还是异常,都会放开正在等待的 main线程
if (exceptionValue != null) {
promise.setFailure(exceptionValue);
}else {
promise.setSuccess(returnValue);
}
}
}
}
当?promise 执行?setFailure 或?setSuccess 方法时会唤醒?promise.await() 方法,此时对象信息已经存放到?promise 容器中了,nio线程只管获取就行
客户端完整代码
package com.zhao.client;
import com.zhao.client.handler.RpcResponseMessageHandler;
import com.zhao.message.RpcRequestMessage;
import com.zhao.protocol.MessageCodecSharable;
import com.zhao.protocol.ProcotolFrameDecoder;
import com.zhao.protocol.SequenceIdGenerator;
import com.zhao.server.service.HelloService;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.concurrent.DefaultPromise;
import lombok.extern.slf4j.Slf4j;
import java.lang.reflect.Proxy;
/**
* @Auther: HackerZhao
* @Date: 2021/12/1 18:23
* @Description: 创建一个 channel,谁想向服务器发送数据,拿着这个 channle去发送
*/
@Slf4j
public class RpcClientManager {
private static Channel channel = null;
private static volatile Object obj = new Object();
// 测试改造后的单例模式,消息能否发送出去
public static void main(String[] args) {
HelloService helloService = getProxyService(HelloService.class);
System.out.println(helloService.sayHello("zhangsan"));
System.out.println(helloService.sayHello("lisi"));
System.out.println(helloService.sayHello("wangwu"));
}
// 创建代理类
// 参数,当传入一个 接口.class,就会返回一个实现该接口的对象
public static <T> T getProxyService(Class<T> serviceClass) {
// 使用 JDK自带的代理创建代理对象
ClassLoader classLoader = serviceClass.getClassLoader();
// 返回这个接口实现的所有接口,这样写是不对的,如果有目标对象的话,就是返回这个目标对象实现的所有接口
// 错误写法 Class<?>[] interfaces1 = serviceClass.getInterfaces();
Class<?>[] interfaces = new Class[]{serviceClass};
Object o = Proxy.newProxyInstance(classLoader, interfaces, (proxy, method, args) -> {
int sequenceId = SequenceIdGenerator.nextId();
// 1、代理类需要执行的操作
RpcRequestMessage message = new RpcRequestMessage(
sequenceId,
serviceClass.getName(),
method.getName(),
method.getReturnType(),
method.getParameterTypes(),
args
);
// 2、发送消息
getChannel().writeAndFlush(message);
// 3、准备一个空 Promise 容器,来接收结果 指定 Promise 对象接收结果的线程
DefaultPromise<Object> promise = new DefaultPromise<>(getChannel().eventLoop());
RpcResponseMessageHandler.PROMISE.put(sequenceId,promise);
// 4、等待 promise结果
promise.await(); // await 会让这个 main线程陷入等待,不会立马返回,直到响应结果回来。它和 sync相比不会抛出异常
// 5、处理结果
if (promise.isSuccess()){
// 调用正常
return promise.getNow();
}else {
// 出现异常
throw new RuntimeException(promise.cause());
}
});
return (T) o;
}
// 添加单例方法,只创建一个 channel
public static Channel getChannel() {
if (channel != null) {
return channel;
}
// 创建 channel
synchronized (obj) {
if (channel == null) {
// 创建 channel
initChannel();
}
return channel;
}
}
// 初始化 channel方法
private static void initChannel() {
NioEventLoopGroup group = new NioEventLoopGroup();
LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler();
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(group);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ProcotolFrameDecoder());
ch.pipeline().addLast(LOGGING_HANDLER);
ch.pipeline().addLast(MESSAGE_CODEC);
ch.pipeline().addLast(RPC_HANDLER);
}
});
try {
channel = bootstrap.connect("localhost", 8080).sync().channel();
// 不能使用 sync() 方法将 main线程阻塞在这里,这样不到关闭 channel,就永远阻塞在这里。应该使用异步关闭
// channel.closeFuture().sync();
channel.closeFuture().addListener(future -> {
group.shutdownGracefully();
});
} catch (Exception e) {
log.error("client error", e);
}
}
}
mian线程处理完张三,再处理李四,再处理王五,handler处理器是nio线程获取的结果(日志上面有显示)
当异常发生时,当服务器端出现异常,模拟客户端接收到异常的情景
服务器的异常
package com.zhao.server.service;
public class HelloServiceImpl implements HelloService {
@Override
public String sayHello(String msg) {
int i = 1 / 0;
return "你好, " + msg + " from:服务器";
}
}
重启客户端和服务器,客户端打印的异常
LengthFieldBasedFrameDecoder 这个类打印的异常,桢大小太小导致的,服务器发送了较多的数据
服务器端打印
服务器把堆栈,对象信息全部发送给了客户端,有?11542 个字节,远远大于 1024,所以才会出现桢长度太小
解决方式
?
完整代码
密码:2nnm
|