Netty是什么
Netty 是一个利用 Java 的高级网络的能力,隐藏其背后的复杂性而提供一个易于使用的 API 的客户端/服务器框架。 Netty 是一个广泛使用的 Java 网络编程框架(Netty 在 2011 年获得了Duke’s Choice Award,见https://www.java.net/dukeschoice/2011)。它活跃和成长于用户社区,像大型公司 Facebook 和 Instagram 以及流行 开源项目如 Infinispan, HornetQ, Vert.x, Apache Cassandra 和 Elasticsearch 等,都利用其强大的对于网络抽象的核心代码。
Netty的优点
1.并发高 2.传输快 3.封装好
Netty为什么并发高
Netty是一款基于NIO(Nonblocking I/O,非阻塞IO)开发的网络通信框架,对比于BIO(Blocking I/O,阻塞IO),他的并发性能得到了很大提高
创建TCP长连接实例
- 该类的作用是创建客户端连接与创建该链接的监听并无限重连
mport java.util.concurrent.TimeUnit;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
public class CreateTCPConnection {
private String host; //连接主机ip
private Integer port; //连接主机端口
public EventLoopGroup group;
public Channel channel;
public ChannelFuture f;
public CreateTCPConnection(String host, Integer port) {
this.host = host;
this.port = port;
}
/**
* 创建TCP长连接并实例化channel
*/
public void connect() {
if(group!=null){
group.shutdownGracefully(); //这里调用这个看看会不会释放
}
group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group) //1 设置reactor 线程
.option(ChannelOption.SO_KEEPALIVE, true) //1 设置通道选项
.channel(NioSocketChannel.class) //2 设置nio类型的channel
.handler(new ClientChannelInitializer(CreateTCPConnection.this)); //3 装配流水线
//创建连接
f = b.connect(host, port);
//检测并执行断线重连
f.addListener((ChannelFutureListener) channelFuture -> {
if (!channelFuture.isSuccess()) {
final EventLoop loop = channelFuture.channel().eventLoop();
loop.schedule(() -> {
TdLog.error(host + " : 服务端链接不上,开始重连操作...");
connect();
//此处处理重连次数
}, 1L, TimeUnit.SECONDS);
} else {
channel = channelFuture.channel();
TdLog.info("服务端链接成功...");
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
}
- 客户端通道配置类
import java.util.concurrent.TimeUnit;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;
/**
* 类描述:TCP通道配置
*/
public class ClientChannelInitializer extends ChannelInitializer {
private CreateTCPConnection createTCPConnection;
public ClientChannelInitializer(CreateTCPConnection createTCPConnection) {
this.createTCPConnection = createTCPConnection;
}
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//心跳检测
pipeline.addLast(new IdleStateHandler(0, 30, 0, TimeUnit.SECONDS));
//字符串编码(默认编码)
pipeline.addLast("encoder", new StringEncoder());
//字符串解码(重写类,如不需要可使用默认)
pipeline.addLast(new TCPBaseMessageDecoder());
//客户端的逻辑
pipeline.addLast("handler", new NettyClientHandler(createTCPConnection));
}
}
- 重写decode类
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import org.apache.commons.collections.CollectionUtils;
/**
* TCP消息解码器
* @author 07408
*
*/
@TdClassLog
public class TCPBaseMessageDecoder extends ByteToMessageDecoder {
private List<byte[]> byteList =new ArrayList<>();
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
//处理实际业务
while (in.isReadable()) {
System.err.print((char)in.readByte());
}
}
}
- SimpleChannelInboundHandler实现类能管理并使用该客户端连接
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoop;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
/**
* 类描述:TCP客户端适配器
*/
public class NettyClientHandler extends SimpleChannelInboundHandler {
private CreateTCPConnection nettyClient;
private int heartNumber;
public NettyClientHandler(CreateTCPConnection createTCPConnection) {
nettyClient = createTCPConnection;
}
//这个方法接收不到数据
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg){}
/**
* 长连接读取数据方法-处理业务逻辑
* @param ctx
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
String clientIP = insocket.getAddress().getHostAddress();
System.out.println(clientIP+" Server say : " + msg.toString());
}
/**
* 通道连接时调用-处理业务逻辑
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) {
InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
String clientIP = insocket.getAddress().getHostAddress();
TdLog.error(clientIP + " :通道已连接!");
}
/**
* 通道闲置触发-启动断线重连功能
* @param ctx
* @throws Exception
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) {
//使用过程中断线重连
final EventLoop eventLoop = ctx.channel().eventLoop();
eventLoop.schedule(() -> {
TdLog.error("断线连接中...");
nettyClient.connect();
}, 1, TimeUnit.SECONDS);
ctx.fireChannelInactive();
}
/**
* 心跳方法
* @param ctx
* @param evt
* @throws Exception
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state().equals(IdleState.READER_IDLE)) {
System.out.println("READER_IDLE");
} else if (event.state().equals(IdleState.WRITER_IDLE)) {
/**发送心跳,保持长连接*/
byte[] buff = {'V','Z',1,0,0,0,0,0};
String cmd = new String(buff);
ctx.channel().writeAndFlush(cmd);
} else if (event.state().equals(IdleState.ALL_IDLE)) {
System.out.println("ALL_IDLE");
}
}
super.userEventTriggered(ctx, evt);
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
super.handlerAdded(ctx);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
super.handlerRemoved(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
}
}
- 本地调试测试连接能否正常使用
public static void main(String[] args) throws InterruptedException {
CreateTCPConnection pu = new CreateTCPConnection("10.30.20.185", 8131);
pu.connect();
}
|