本文代码地址:https://github.com/MSC419/msc-rpc-framework
0.实现的改进
1.重构部分代码,将RPC消息包装成RpcMessage
2.增加Netty心跳机制确保TCP长连接有效
3.通过注解自动扫描并注册服务
1.RPC消息通用格式RpcMessage
这一部分是为了给后续增加Netty心跳机制做准备。有了RPC消息通用格式之后,可以将RpcRequest、RpcResponse、心跳信息都封装成RpcMessage
具体步骤如下:
1.增加RpcMessage类
2.修改Netty客户端发送RpcRequest与接收RpcResponse的NettyClientClientTransport,将RpcRequest包装成RpcMessage再发送
3.修改Netty客户端与服务端处理业务的Handler:NettyClientHandler与NettyServerHandler,入站数据先是RpcMessage,再根据MessageType判断是RpcRequest、RpcResponse或心跳信息,做对应处理
4.修改序列化与反序列化过程:NettyEncoder与NettyDecoder
2.Netty心跳机制
本次改进的代码参考自:从零开始实现简单 RPC 框架 9:网络通信之心跳与重连机制_小新是也的博客-CSDN博客_rpc心跳机制
2.1 心跳
在TCP保持长连接的过程中,可能会由于突发情况例如断电或者网线被拔出等因素,导致服务端和客户端的连接中断。在这种情况下,如果服务端和客户端之间没有交互的话,就不能发现连接已经中断。为了解决这一问题,我们引入心跳机制。
心跳机制:为了确保长连接有效,客户端与服务端之间一种通知机制告知对方存活状态的机制。
心跳机制的工作原理:客户端和服务端在一定时间段内没有交互后,某一端向另一端发送消息类型为PING消息,另一端接收到PING消息,返回PONG消息,此即一个PING-PONG交互,说明对方仍然在线,长连接有效;如果没有收到PONG消息,则长连接无效,断开该连接。
2.2 IdleStateHandler
Netty提供了实现心跳机制的Handler:IdleStateHandler
构造函数
public IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) {
this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
}
readerIdleTime , 读超时. 即当在指定的时间间隔内没有从 Channel 读取到数据时, 会触发一个 READER_IDLE 的 IdleStateEvent 事件.writerIdleTime , 写超时. 即当在指定的时间间隔内没有数据写入到 Channel 时, 会触发一个 WRITER_IDLE 的 IdleStateEvent 事件.allIdleTime , 读/写超时. 即当在指定的时间间隔内没有读或写操作时, 会触发一个 ALL_IDLE 的 IdleStateEvent 事件.unit ,前三者的单位
IdleStateHandler 的源码阅读参考:从零开始实现简单 RPC 框架 9:网络通信之心跳与重连机制_小新是也的博客
2.3 实现Netty心跳机制
2.3.1 客户端
IdleStateHandler 放到启动类的 PipleLine 注册上
public class NettyClient {
static {
...
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new IdleStateHandler(0, 5, 0, TimeUnit.SECONDS));
...
NettyClientHandler 设置处理心跳事件
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
...
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception{
if (evt instanceof IdleStateEvent) {
IdleState state = ((IdleStateEvent) evt).state();
if (state == IdleState.WRITER_IDLE) {
log.info("write idle happen [{}]", ctx.channel().remoteAddress());
Channel channel = ctx.channel();
RpcMessage rpcMessage = new RpcMessage();
rpcMessage.setSerializeType(SerializerCode.KRYO.getCode());
rpcMessage.setMessageType(PackageType.HEARTBEAT_PACK.getCode());
channel.writeAndFlush(rpcMessage).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
} else {
super.userEventTriggered(ctx, evt);
}
}
...
2.3.3 服务端
服务端的 IdleStateHandler 放到启动类的 PipleLine 注册上
public class NettyServer {
...
public void start(){
...
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new IdleStateHandler(30, 0, 0, TimeUnit.SECONDS));
ch.pipeline().addLast(new NettyDecoder());
...
服务端收到超过 30 秒没有读请求的事件后,调用 ctx.close 将连接关闭。
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcMessage requestMsg) {
if (requestMsg.getMessageType() != MessageType.REQUEST_PACK.getType()) {
return;
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleState state = ((IdleStateEvent) evt).state();
if (state == IdleState.READER_IDLE) {
log.info("idle check happen, so close the connection");
ctx.close();
}
} else {
super.userEventTriggered(ctx, evt);
}
}
案例里单位是秒(TimeUnit.SECONDS),如果把单位换成TimeUnit.MILLISECONDS,就能在控制台看见效果
3.通过注解自动扫描注册服务
参考:
(1条消息) 使用注解的方式来自动注册服务_米拉娜的博客-CSDN博客
一起写个Dubbo——7. 服务端自动注册服务_何人听我楚狂声的博客-CSDN博客_dubbo自动注册服务
现在我们是在服务端手动注册服务,要想通过注解自动注册服务,首先我们需要一个注解@RpcService来标识一个类是服务类。还需要一个注解@RpcScan 放在启动的入口类上(main 方法所在的类),标识服务的扫描的包的范围。
@RpcService注解的值定义为该服务的名称,默认值是该类的完整类名。
@RpcScan的值定义为扫描范围的根包,默认值为空(如果为空,其值被赋为入口类所在的包名),扫描时会扫描该包及其子包下所有的类,找到标记有 Service 的类,并注册。
3.1 注解类
@RpcScan
@Inherited
@Documented
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface RpcScan {
String value() default "";
}
@RpcService
@Inherited
@Documented
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface RpcService {
String name() default "";
}
3.2 工具类 ReflectUtil
主要作用是扫描该包及其子包下所有的类,并将其 Class 对象放入一个 Set 中返回。
public class ReflectUtil {
public static String getStackTrace() {
StackTraceElement[] stack = new Throwable().getStackTrace();
return stack[stack.length - 1].getClassName();
}
public static Set<Class<?>> getClasses(String packageName) {
Set<Class<?>> classes = new LinkedHashSet<>();
boolean recursive = true;
String packageDirName = packageName.replace('.', '/');
Enumeration<URL> dirs;
try {
dirs = Thread.currentThread().getContextClassLoader().getResources(
packageDirName);
while (dirs.hasMoreElements()) {
URL url = dirs.nextElement();
String protocol = url.getProtocol();
if ("file".equals(protocol)) {
String filePath = URLDecoder.decode(url.getFile(), "UTF-8");
findAndAddClassesInPackageByFile(packageName, filePath,
recursive, classes);
} else if ("jar".equals(protocol)) {
JarFile jar;
try {
jar = ((JarURLConnection) url.openConnection())
.getJarFile();
Enumeration<JarEntry> entries = jar.entries();
while (entries.hasMoreElements()) {
JarEntry entry = entries.nextElement();
String name = entry.getName();
if (name.charAt(0) == '/') {
name = name.substring(1);
}
if (name.startsWith(packageDirName)) {
int idx = name.lastIndexOf('/');
if (idx != -1) {
packageName = name.substring(0, idx)
.replace('/', '.');
}
if ((idx != -1) || recursive) {
if (name.endsWith(".class")
&& !entry.isDirectory()) {
String className = name.substring(
packageName.length() + 1, name
.length() - 6);
try {
classes.add(Class
.forName(packageName + '.'
+ className));
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
return classes;
}
private static void findAndAddClassesInPackageByFile(String packageName,
String packagePath, final boolean recursive, Set<Class<?>> classes) {
File dir = new File(packagePath);
if (!dir.exists() || !dir.isDirectory()) {
return;
}
File[] dirfiles = dir.listFiles(new FileFilter() {
public boolean accept(File file) {
return (recursive && file.isDirectory())
|| (file.getName().endsWith(".class"));
}
});
for (File file : dirfiles) {
if (file.isDirectory()) {
findAndAddClassesInPackageByFile(packageName + "."
+ file.getName(), file.getAbsolutePath(), recursive,
classes);
} else {
String className = file.getName().substring(0,
file.getName().length() - 6);
try {
classes.add(Thread.currentThread().getContextClassLoader().loadClass(packageName + '.' + className));
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
}
}
}
3.3 扫描服务
将服务端抽象成RpcServer 接口
public interface RpcServer {
void start();
<T> void publishService(Object service, Class<T> serviceClass);
}
AbstractRpcServer继承RpcServer 接口,并实现扫描注册的功能:扫描启动包下所有带@RpcService 的类,将其实例化并注册
@Slf4j
public abstract class AbstractRpcServer implements RpcServer{
protected String host;
protected int port;
protected ServiceRegistry serviceRegistry;
protected ServiceProvider serviceProvider;
public void scanServices() {
String mainClassName = ReflectUtil.getStackTrace();
Class<?> startClass;
try {
startClass = Class.forName(mainClassName);
if(!startClass.isAnnotationPresent(RpcScan.class)) {
log.error("启动类缺少 @RpcScan 注解");
throw new RpcException(RpcErrorMessageEnum.SERVICE_SCAN_PACKAGE_NOT_FOUND);
}
} catch (ClassNotFoundException e) {
log.error("出现未知错误");
throw new RpcException(RpcErrorMessageEnum.UNKNOWN_ERROR);
}
String basePackage = startClass.getAnnotation(RpcScan.class).value();
if("".equals(basePackage)) {
basePackage = mainClassName.substring(0, mainClassName.lastIndexOf("."));
}
log.info("basePackage:{}",basePackage);
Set<Class<?>> classSet = ReflectUtil.getClasses(basePackage);
for(Class<?> clazz : classSet) {
if(clazz.isAnnotationPresent(RpcService.class)) {
String serviceName = clazz.getAnnotation(RpcService.class).name();
Object obj;
try {
obj = clazz.newInstance();
} catch (InstantiationException | IllegalAccessException e) {
log.error("创建 " + clazz + " 时有错误发生");
continue;
}
if("".equals(serviceName)) {
Class<?>[] interfaces = clazz.getInterfaces();
for (Class<?> oneInterface: interfaces){
publishService(obj, oneInterface);
}
} else {
publishService(obj, clazz);
}
}
}
}
public <T> void publishService(Object service, Class<T> serviceClass) {
serviceProvider.addServiceProvider(service);
serviceRegistry.registerService(serviceClass.getCanonicalName(), new InetSocketAddress(host, port));
}
}
3.4 服务端开启自动注册
在NettyServer 构造方法后面加上scanServices();
public class NettyServer extends AbstractRpcServer {
...
public NettyServer(String host, int port) {
this.host = host;
this.port = port;
serviceRegistry = new ZkServiceRegistry();
serviceProvider = new ServiceProviderImpl();
scanServices();
}
...
在服务类HelloServiceImpl 和UserServiceImpl 加上注解@RpcService
3.5 测试
不用手动注册。服务端代码清爽好多!
@RpcScan
public class NettyServerMain {
public static void main(String[] args) {
NettyServer nettyServer = new NettyServer("127.0.0.1",9999);
nettyServer.start();
}
}
4.总结
4.1 学到的知识
1.行号附近的小图标
实现接口类方法:
重载方法:
2.查看类图:右键单击你要查看的类,选择Diagrams中的show Diagrams
3.多看几个同类型开源项目(视野要开阔),看看他们的异同点
4.{@link}注解的使用:将内容相互关联起来
5.抽象类的使用
|