前言
java 1.4版本推出了一种新型的IO API,与原来的IO具有相同的作用和目的;可代替标准java IO,只是实现的方式不一样,NIO是面向缓冲区、基于通道的IO操作;通过NIO可以提高对文件的读写操作。基于这种优势,现在使用NIO的场景越来愈多,很多主流行的框架都使用到了NIO技术,如Tomcat、Netty、Jetty等;所以学习和掌握NIO技术已经是一个java开发的必备技能了。
一、IO与NIO
1.面向流与面向缓冲区
在Java IO中读取数据和写入数据是**面向流(Stream)**的,这表示当我们从流中读取数据,写入数据时也将其写入流,流的含义在于没有缓存 ,就好像我们站在流水线前,所有的数据沿着流水线依次到达我们的面前,我们只能读取当前的数据。如果需要获取某个数据的前一项或后一项数据那就必须自己缓存数据,而不能直接从流中获取。
而在Java NIO中数据的读写是面向**缓冲区(Buffer)**的,读取时可以将整块的数据读取到缓冲区中,在写入时则可以将整个缓冲区中的数据一起写入。这就好像是将流水线传输变成了卡车运送,面向流的数据读写只提供了一个数据流切面,而面向缓冲区的IO则使我们能够看到数据的上下文,也就是说在缓冲区中获取某项数据的前一项数据或者是后一项数据十分方便。这种便利是有代价的,因为我们必须管理好缓冲区,这包括不能让新的数据覆盖了缓冲区中还没有被处理的有用数据;将缓冲区中的数据正确的分块,分清哪些被处理过哪些还没有等等。
2.阻塞与非阻塞
传统的 IO 流都是阻塞式的。也就是说,当一个线程调用 read() 或 write()时,该线程被阻塞,直到有一些数据被读取或写入,该线程在此期间不能执行其他任务。因此,在完成网络通信进行 IO 操作时,由于线程会阻塞,所以服务器端必须为每个客户端都提供一个独立的线程进行处理,当服务器端需要处理大量客户端时,性能急剧下降。
Java NIO 是非阻塞模式的。当线程从某通道进行读写数据时,若没有数据可用时,该线程可以进行其他任务。线程通常将非阻塞 IO 的空闲时间用于在其他通道上执行 IO 操作,所以单独的线程可以管理多个输入和输出通道。因此,NIO 可以让服务器端使用一个或有限几个线程来同时处理连接到服务器端的所有客户端。
二、TCP聊天程序
1.基于IO
IO服务端:
import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class IOServer {
@SuppressWarnings("resource")
public static void main(String[] args) throws Exception {
ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
ServerSocket server=new ServerSocket(8081);
System.out.println("服务器启动!");
int count=0;
while(true){
final Socket socket = server.accept();
System.out.println("欢迎第"+(++count)+"个同学");
newCachedThreadPool.execute(new Runnable() {
@Override
public void run() {
handler(socket);
}
});
}
}
public static void handler(Socket socket){
try {
byte[] bytes = new byte[1024];
InputStream inputStream = socket.getInputStream();
while(true){
int read = inputStream.read(bytes);
if(read != -1){
System.out.println(new String(bytes, 0, read));
}else{
break;
}
}
} catch (Exception e) {
e.printStackTrace();
}finally{
try {
System.out.println("socket关闭");
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
IO客户端:
import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
public class IOClient {
public static void main(String[] args) throws IOException {
for (int i=0;i<10;i++){
Socket socket=new Socket("127.0.0.1", 8081);
OutputStream os=socket.getOutputStream();
os.write(("御神楽"+i).getBytes());
socket.close();
}
}
}
效果:
2.基于NIO
NIO服务端:
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
public class NIOServer {
private Selector selector;
public static void main(String[] args) throws IOException {
NIOServer server = new NIOServer();
server.initServer(8081);
server.listen();
}
public void initServer(int port) throws IOException {
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
serverChannel.socket().bind(new InetSocketAddress(port));
this.selector = Selector.open();
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
}
public void listen() throws IOException {
System.out.println("服务端启动成功!");
while (true) {
selector.select();
Iterator<?> ite = this.selector.selectedKeys().iterator();
while (ite.hasNext()) {
SelectionKey key = (SelectionKey) ite.next();
ite.remove();
handler(key);
}
}
}
public void handler(SelectionKey key) throws IOException {
if (key.isAcceptable()) {
handlerAccept(key);
} else if (key.isReadable()) {
handelerRead(key);
}
}
public void handlerAccept(SelectionKey key) throws IOException {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel channel = server.accept();
channel.configureBlocking(false);
System.out.println("检测到新客户连接");
channel.register(this.selector, SelectionKey.OP_READ);
}
public void handelerRead(SelectionKey key) throws IOException {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
int read = channel.read(buffer);
if(read > 0){
byte[] data = buffer.array();
String msg = new String(data).trim();
System.out.println("用户名为:" + msg);
ByteBuffer outBuffer = ByteBuffer.wrap("服务器已接收".getBytes());
channel.write(outBuffer);
}else{
System.out.println("客户端关闭");
key.cancel();
}
}
}
NIO客户端:
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
public class NIOClient {
public static void main(String[] args) throws Exception {
final int count[]=new int[1];
count[0]=1;
for(int i=0;i<5;i++){
new Thread(new Runnable() {
@Override
public void run() {
SocketChannel socketChannel = null;
String str = "御神楽"+count[0]++;
ByteBuffer byteBuffer = ByteBuffer.wrap(str.getBytes());
ByteBuffer buffer = ByteBuffer.allocate(1024);
try {
socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
if (!socketChannel.connect(new InetSocketAddress("127.0.0.1", 8081))) {
while (!socketChannel.finishConnect()) {
}
}
socketChannel.write(byteBuffer);
} catch (IOException e) {
e.printStackTrace();
}
int time=1;
while (time<10){
time++;
try {
int read=socketChannel.read(buffer);
if(read > 0) {
byte[] data = buffer.array();
String msg = new String(data).trim();
System.out.println("客户端收到信息:" + msg);
}
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
try {
socketChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}).start();
Thread.sleep(100);
}
}
}
测试效果: 服务器: 客户端:
3.基于Netty
Netty服务端:
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.util.Scanner;
public class NettyServer {
public static void main(String[] args) {
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("decoder",new StringDecoder());
pipeline.addLast("encoder",new StringEncoder());
pipeline.addLast(new ServerHandler());
}
});
System.out.println("服务器正在启动");
ChannelFuture cf = bootstrap.bind(8083).sync();
cf.addListener(cd->{
if(cd.isSuccess()){
System.out.println("启动成功");
}else{
System.out.println("启动失败");
}
});
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()){
String msg = scanner.nextLine();
ServerHandler.sendAll(msg);
}
cf.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.text.SimpleDateFormat;
import java.util.Date;
public class ServerHandler extends SimpleChannelInboundHandler<String> {
private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
Channel channel = ctx.channel();
System.out.println(channel.remoteAddress()+" == " +msg);
channelGroup.forEach(ch->{
if (channel!=ch) {
ch.writeAndFlush("[ 客户端 ]" + channel.remoteAddress() + "发送了消息 : " + msg + "\n");
}else{
ch.writeAndFlush("[ 客户 ] 发送了消息: " + msg + "\n");
}
});
}
public static void sendAll(String msg){
channelGroup.forEach(channel -> {
channel.writeAndFlush("服务器: "+msg+"\n");
});
}
public void channelActive(ChannelHandlerContext ctx){
Channel channel = ctx.channel();
channelGroup.writeAndFlush("[ 客户端]"+channel.remoteAddress()+" 已连接 "+sf.format(new Date())+"\n");
channelGroup.add(channel);
System.out.println(ctx.channel().remoteAddress()+" 上线了" + "\n");
}
public void channelInactive(ChannelHandlerContext ctx) {
Channel channel = ctx.channel();
channelGroup.writeAndFlush("[ 客户端 ] " +channel.remoteAddress()+ " 断开连接"+"\n");
System.out.println(channel.remoteAddress()+" 下线了.\n");
System.out.println("channelGroup size = "+ channelGroup.size());
}
}
Netty客户端:
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.util.ArrayList;
import java.util.List;
public class NettyClient {
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup group = new NioEventLoopGroup();
List<ChannelFuture> channelFutures = new ArrayList<ChannelFuture>();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.option(ChannelOption.SO_REUSEADDR, true);
bootstrap.group(group).channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast("decoder",new StringDecoder());
pipeline.addLast("encoder",new StringEncoder());
pipeline.addLast(new ClientHandler());
}
});
final int count[] =new int[1];
count[0]=0;
for(int i=0;i<3;i++){
channelFutures.add(bootstrap.connect("127.0.0.1",8083).sync());
new Thread(new Runnable() {
@Override
public void run() {
int index=count[0]++;
Channel channel = channelFutures.get(index).channel();
System.out.println( "======"+channel.localAddress()+"======");
int time=0;
while (time++<3){
String msg =" 御神楽 "+(index)+": "+time;
channel.writeAndFlush(msg);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
channel.close();
}
}).start();
}
int time=0;
while (time++<5){
Thread.sleep(1000);
}
} finally {
group.shutdownGracefully();
}
}
}
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class ClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {
System.out.println(msg.trim());
}
}
演示 服务端: 客户端
参考
https://blog.csdn.net/linjpg/article/details/80962453 https://blog.csdn.net/qq_47281915/article/details/121802536
|