一、项目需求
- 1)服务端监听8001端口
- 2)客户端能发送消息给服务器
- 3)服务器可以返回消息给客户端
二、代码编辑
1、服务端
public class NettyServer {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try{
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new NettyServerHandler());
}
});
System.out.println("Server is ready");
ChannelFuture cf = bootstrap.bind(8001).sync();
cf.channel().closeFuture().sync();
}finally{
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
2、自定义服务端处理器
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("server ctx = " + ctx);
ByteBuf buf = (ByteBuf) msg;
System.out.println("client sent: " + buf.toString(CharsetUtil.UTF_8));
System.out.println("Address of Client: " + ctx.channel().remoteAddress());
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer("Hello, Client~~~", CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.channel().close();
}
}
4、客户端
public class NettyClient {
public static void main(String[] args) throws Exception{
EventLoopGroup eventExecutors = new NioEventLoopGroup();
try{
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventExecutors)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new NettyClientHandler());
}
});
System.out.println("Client is ready~~~");
ChannelFuture channelFuture =
bootstrap.connect("127.0.0.1", 8001).sync();
channelFuture.channel().closeFuture().sync();
}finally{
eventExecutors.shutdownGracefully();
}
}
}
5、自定义客户端处理器
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("client ctx=" + ctx);
ctx.writeAndFlush(Unpooled.copiedBuffer("Hello Netty Server!!", CharsetUtil.UTF_8));
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("from server: " + buf.toString(CharsetUtil.UTF_8));
System.out.println("Server Address: " + ctx.channel().remoteAddress());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println(cause.getMessage());
ctx.close();
}
}
6、输出结果
Server is ready
server ctx = ChannelHandlerContext(NettyServerHandler#0, [id: 0x97209815, L:/127.0.0.1:8001 - R:/127.0.0.1:50900])
client sent: Hello Netty Server!!
Address of Client: /127.0.0.1:50900
Client is ready~~~
client ctx=ChannelHandlerContext(NettyClientHandler#0, [id: 0xbdbdca5e, L:/127.0.0.1:50900 - R:/127.0.0.1:8001])
from server: Hello, Client~~~
Server Address: /127.0.0.1:8001
三、分析
- 1)bossGroup 和 workerGroup 含有的子线程(NioEventLoop)的个数,默认是CPU核心数*2,循环分配
- 2)Netty 抽象出两组线程池,BossGroup 专门负责接受客户的连接,WorkerGroup负责网络读写操作
- 3)NioEventLoop 表示一个不断循环处理任务的线程,每个都含有用于监听绑定在其上的socket channel 的 selector
- 4)NioEventLoop 内部采用串行化设计,从消息的读取-》解码-》处理-》编码-》发送,始终由 IO 线程的 NioEventLoop 负责
1、Netty 的异步模型(ChannelFuture)
所谓异步,就是指调用异步方法后不会立即得到结果,而是先去执行其他任务,而等到实际处理这个调用的组件完成后,通过状态、通知和回调来通知调用者 ;Netty 的IO操作都是异步的,包括 Bind、Write和Connect 等操作会简单的返回一个ChannelFuture。
Netty 的异步模型,是建立在 future 和 callback 之上的 ,future 的核心思想是:假设一个方法的处理过程非常耗时,显然一直等待方法处理显然不合适,通常都会在调用该方法时立马返回一个future,后续可以通过该future去监控对应方法的处理进度,这就是 Netty 的 Future-Listener 机制。
使用 Netty,拦截操作和转换出入站数据,只需提供对应的callback 或利用 future 即可,这使得链式操作简单、高效,并有利于编写可重用的、通用的代码
1.1、Future
1)表示异步的执行结果,可以通过它提供的方法来检测执行是否完成,如检索计算等 2)ChannelFuture 是一个接口,支持添加监听器,当监听的事件发生时就会通知监听器
|