说明
1、Netty抽象出两组线程池,BossGroup专门负责接收客户端的链接,WorkerGroup专门负责网络的读写 2、BossGroup和WorkerGroup类型都是NioEventLooGroup 3、NioEventLoogGroup相当于事件循环组,这个组中含有多个事件循环,每个事件循环是NioEventLoop 4、NioEventLoopGroup可以有多个线程,即可以含有多个NioEventLoop 5、NioEventLoop表示一个不断循环的执行处理任务的线程,每个NioEventLoop都有一个Selector,用于监听绑定在其上的socket的网络通讯 6、每个Boss NioEventLoop循环执行的步骤有三步
1)轮询accept事件 2)处理accept事件,与client建立链接,生成NioSocketChannel,并将其注册到某个worker NioEventLoop上的Selector 3)处理任务队列的任务,即runAllTasks
7、每个Worker NioEventLoop循环执行步骤
1)轮询read,write事件 2)处理I/O事件,即read/write事件,在对应NioSocketChannel处理 3)处理任务队列的任务,即runAllTasks
8、每个Worker NioEventLoop处理业务时,会使用pipeline(管道),pipeline中包含了channel
示意图
代码示例
1、服务端代码
1)服务端启动类
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class NettyServer {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap().group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new NettyServerHandler());
}
});
ChannelFuture future = bootstrap.bind(6688).sync();
System.out.println("---服务器已启动----");
future.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
2)服务端管道处理器
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("有客户端接入:" + ctx.channel().remoteAddress());
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("收到客户信息:" + byteBuf.toString(CharsetUtil.UTF_8));
System.out.println("客户端地址:" + ctx.channel().remoteAddress());
System.out.println("处理线程:" + Thread.currentThread().getName());
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer("Hello 客户端", CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
2、客户端代码
1)客户端启动类
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class NettyClient {
public static void main(String[] args) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new NettyClientHandler());
}
});
System.out.println("客户端已启动");
ChannelFuture future = bootstrap.connect("127.0.0.1", 6688).sync();
future.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
}
2)客户端管道处理器实现
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("客户端已连接成功");
ctx.writeAndFlush(Unpooled.copiedBuffer("Hello 服务端", CharsetUtil.UTF_8));
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("收到服务器发送的信息:" + byteBuf.toString(CharsetUtil.UTF_8));
System.out.println("服务器端地址:"+ctx.channel().remoteAddress());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
控制台输出
分别启动服务端和三个客户端,控制台日志如下
---服务器已启动----
有客户端接入:/127.0.0.1:60024
收到客户信息:Hello 服务端
客户端地址:/127.0.0.1:60024
处理线程:nioEventLoopGroup-3-1
有客户端接入:/127.0.0.1:60130
收到客户信息:Hello 服务端
客户端地址:/127.0.0.1:60130
处理线程:nioEventLoopGroup-3-2
有客户端接入:/127.0.0.1:60203
收到客户信息:Hello 服务端
客户端地址:/127.0.0.1:60203
处理线程:nioEventLoopGroup-3-3
有客户端接入:/127.0.0.1:60316
收到客户信息:Hello 服务端
客户端地址:/127.0.0.1:60316
处理线程:nioEventLoopGroup-3-4
有客户端接入:/127.0.0.1:60391
收到客户信息:Hello 服务端
客户端地址:/127.0.0.1:60391
处理线程:nioEventLoopGroup-3-5
客户端已启动
客户端已连接成功
收到服务器发送的信息:Hello 客户端
服务器端地址:/127.0.0.1:6688
|