介绍
- Netty是由JBoos提供的一个Java网络应用开源框架,它是一个异步的,基于事件驱动的网络应用框架,用于开发高性能,高可靠的网络IO程序
- Netty主要针对TCP协议下,面向客户端的高并发应用,或者在Peer0toPeer场景下的大量数据持续传输的应用。
- Netty本质上是一个NIO框架,适用于服务器通信相关的多种应用场景,使用Netty可以帮助我们快速,简单的开发出一个网络应用;
Netty核心API介绍
ChannelHandler及其实现类 ChannelHandler接口定义了许多事件处理的方法,我们可以通过重写这些方法实现具体的业务逻辑,以下是ChannelHandler接口的类图继承关系 Netty开发中需要自定义一个Handler类去实现ChannelHandler接口或者子接口或者接口的实现类,然后通过重写相应的方法来实现业务逻辑,下面是一般都需要重写的方法
public void channelActive(ChannelHandlerContext ctx);
public void channelRead(ChannelHandlerContext ctx,Object msg);
public void channelReadComplete(ChannelHandlerContext ctx);
public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause);
ChannelHandlerContext常用方法
ChannelFuture close()
ChannelOutboundInvoker flush()
ChannelFuture writeAndFlush(Object msg)
ChannelPipeline
ChannelPipeline 是一个ChannelHandler的集合,它负责处理和拦截 inbound(请求入站)和 outbound(请求出站)的事件和操作,相当于一个贯穿Netty的责任链 ChannelOption Netty在创建Channel实例后,一般都需要设置ChannelOption 参数。ChannelOption是Socket的标准参数,而非Netty独创。常用的参数配置有:
● ChannelOption.SO_BACKLOG 对应TCP/IP协议listen函数中的backlog参数,用来初始化服务器的可连接队列大小。服务端处理客户端请求是顺序处理的,所以同一时间只能处理一个客户端连接。多个客户端来的时候,服务端将不能处理的客户端请求放在队列中等待处理,backlog参数指定了队列大小
● ChannelOption.SO_KEEPALIVE 一直保持连接的活动状态,该参数用于设置TCP连接,当设置该选项以后,链接时会测试连接的状态,这个选项用于可能长时间没有数据交流的连接,当设置这个选项以后,如果两个小时没有数据通信时,TCP会自动发送一个活动探测数据报文
ChannelFuture 表示Channel通道中一步IO操作的结果,在Netty中所有的IO操作都是异步的,调用者并不能立刻获取结果,但是可以通过ChannelFuture来获取IO操作的处理状态 常用方法如下:
Channel channel();
ChannelFuture sync();
EventLoopGroup和实现类NioEventLoopGroup
EventLoopGroup是一组EventLoop的抽象(可以将EventLoopGroup理解为一个线程池,EventLoop理解为线程池中的线程,Netty为了更好的利用多核CPU的资源,会有多个EventLoop 同时工作,每个EventLoop 维护着一个Selector 实例。
EventLoopGroup提供next接口,可以从组中按照一定规则获取EventLoop 来处理任务,在Netty服务端编程中我们需要提供两个EventLoopGroup:BossEventLoopGroup 和 WorkerEventLoopGroup , BossEventLoop负责接收处理客户端连接,将SocketChannel 封装成 NioSocketChannel 交给 WorkerEventLoop 进行处理,WorkerEventLoop只负责处理读写事件;
一般情况下我们都是用实现类NioEventLoopGroup:
常用方法:
public NioEventLoopGroup();
public Future<?> shutdownGracefully();
ServerBootstrap和Bootstrap ServerBootstrap 是 Netty 中的服务器端启动助手,通过它可以完成服务器端的各种配置; Bootstrap 是 Netty 中的客户端启动助手,通过它可以完成客户端的各种配置。
常用方法:
# 该方法用于服务器端,用来设置两个 EventLoop
# 其中:parentGroup 对应的就是:BossEventLoopGroup
# childGroup 对应的就是:WorkerEventLoopGroup
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup)
#该方法用于客户端,用来设置一个 EventLoop
public B group(EventLoopGroup group)
#该方法用来设置一个服务器端的通道 实现
public B channel(Class<? extends C> channelClass)
#用来给 ServerChannel 添加配置
public B option(ChannelOption option, T value)
#用来给接收到的通道添加配置
public ServerBootstrap childOption(ChannelOption childOption, T value)
#该方法用来设置业务 处理类(自定义的 handler)
public ServerBootstrap childHandler(ChannelHandler childHandler)
# 该方法用于服务器端,用来设置占用的端口号
public ChannelFuture bind(int inetPort)
# 该方法用于客户端,用来连 接服务器端
public ChannelFuture connect(String inetHost, int inetPort)
Unpooled类(缓冲区Buffer工具类) 这是 Netty 提供的一个专门用来操作缓冲区的工具类, 常用方法如下:
#通过给定的数据 和 字符编码返回一个 ByteBuf 对象(类似于 NIO 中的 ByteBuffer 对象)
public static ByteBuf copiedBuffer(CharSequence string, Charset charset)
Netty入门案例
引入Netty依赖坐标
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.69.Final</version>
</dependency>
服务端实现
package cn.hu.netty.primer;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class NettyServer {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(20);
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workerGroup);
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.option(ChannelOption.SO_BACKLOG,128);
serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE,true);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) {
channel.pipeline().addLast(new MyServerHandler());
}
});
ChannelFuture channelFuture = serverBootstrap.bind(8989).sync();
System.err.println("Netty服务端启动成功");
channelFuture.channel().closeFuture().sync();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
服务端业务类
package cn.hu.netty.primer;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.nio.charset.StandardCharsets;
public class MyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg){
ByteBuf byteBuf= (ByteBuf) msg;
System.err.println("客户端说:"+byteBuf.toString(StandardCharsets.UTF_8));
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx){
ctx.writeAndFlush(Unpooled.copiedBuffer("你好我是Netty服务端".getBytes(StandardCharsets.UTF_8)));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause){
System.err.println("出现异常了。。。。。。。。。。。");
cause.printStackTrace();
ctx.close();
}
}
客户端实现
package cn.hu.netty.primer;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class NettyClient {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup eventExecutors = new NioEventLoopGroup(5);
Bootstrap bootstrap=new Bootstrap();
bootstrap.group(eventExecutors);
bootstrap.channel(NioSocketChannel.class);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline().addLast(new MyClientandler());
}
});
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8989).sync();
channelFuture.channel().closeFuture().sync();
eventExecutors.shutdownGracefully();
}
}
客户端业务类
package cn.hu.netty.primer;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.nio.charset.StandardCharsets;
public class MyClientandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx){
ctx.writeAndFlush(Unpooled.copiedBuffer("你好,我是Netty客户端".getBytes(StandardCharsets.UTF_8)));
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg){
ByteBuf byteBuf= (ByteBuf) msg;
System.err.println("服务端说:"+byteBuf.toString(StandardCharsets.UTF_8));
ctx.close();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause){
System.err.println("出现异常了。。。。。。。。。。。");
cause.printStackTrace();
ctx.close();
}
}
|