netty实现心跳检测
检测逻辑:
1) 服务端启动,客户端建立连接,连接的目的是互相发送消息。 2) 如果客户端在工作,服务端一定能收到数据,如果客户端空闲,服务端会出现资源浪费。 3) 服务端需要一种检测机制,验证客户端的活跃状态,不活跃则关闭。
需求设计:
1) 客户端向服务端发送 “I am alive” , sleep一个随机时间,模拟空闲状态 2) 服务端收到消息后,返回“over”, 客户端有空闲,记录空闲次数 3) 设定阈值,达到阈值时主动关闭连接
服务端编写
public class HreatBeatServer {
public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(new IdleStateHandler(5, 10, 20, TimeUnit.SECONDS));
socketChannel.pipeline().addLast(new HreatBeatServerHandler());
}
});
System.out.println("服务端初始化完成");
try {
ChannelFuture future = serverBootstrap.bind(2020).sync();
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
IdleStateHandler , 是netty提供的处理器
1)超过多长时间没有读 readerIdleTime 2) 超过多长时间没有写 writerIdleTime 3) 超过多长时间没有读和写 allIdleTime
底层实现检测的是 IdleStateEvent事件,通过管道传递给下一个handler处理,处理方法是userEventTriggered。
处理器编写
public class HreatBeatServerHandler extends SimpleChannelInboundHandler<String> {
private int times;
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
if ("I am alive".equals(msg)) {
ctx.writeAndFlush(Unpooled.copiedBuffer("over", CharsetUtil.UTF_8));
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
IdleStateEvent event = (IdleStateEvent) evt;
String eventDesc = null;
switch (event.state()) {
case READER_IDLE:
eventDesc = "读空闲";
break;
case WRITER_IDLE:
eventDesc = "写空闲";
break;
case ALL_IDLE:
eventDesc = "读写空闲";
break;
}
System.out.println(ctx.channel().remoteAddress() + "发生超时事件--" + eventDesc);
times++;
if (times > 3) {
System.out.println("空闲次数超过三次 关闭连接");
ctx.writeAndFlush("you are out");
ctx.channel().close();
}
}
}
其中IdleStateEvent事件,分为READER_IDLE、WRITER_IDLE、ALL_IDLE三大类
客户端编写
客户端不断循环给服务端发消息确认存活的期间 线程睡眠 模拟失去心跳场景
package com.hyc.netty.Hreatbeat;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.util.Random;
public class HreatbeatClient {
public static void main(String[] args) {
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
socketChannel.pipeline().addLast(new HreatbeatClientHandler());
}
});
System.out.println("客户端初始化完成");
try {
ChannelFuture future = bootstrap.connect("127.0.0.1", 2020).sync();
String data = "I am alive";
while (future.channel().isActive()) {
int num = new Random().nextInt(10);
Thread.sleep(num * 1000);
future.channel().writeAndFlush(data);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}
static class HreatbeatClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
System.out.println("server data:" + s);
if ("you are out".equals(s)) {
System.out.println("关闭");
channelHandlerContext.channel().close();
}
}
}
}
客户端随机线程睡眠 一旦接受到 服务端返回的you are out 代表空闲次数超过了 3次 则关闭客户端连接
|