IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 网络协议 -> 御神楽的学习记录之基于IO、NIO、Netty的TCP聊天程序 -> 正文阅读

[网络协议]御神楽的学习记录之基于IO、NIO、Netty的TCP聊天程序


前言

java 1.4版本推出了一种新型的IO API,与原来的IO具有相同的作用和目的;可代替标准java IO,只是实现的方式不一样,NIO面向缓冲区、基于通道的IO操作;通过NIO可以提高对文件的读写操作。基于这种优势,现在使用NIO的场景越来愈多,很多主流行的框架都使用到了NIO技术,如Tomcat、Netty、Jetty等;所以学习和掌握NIO技术已经是一个java开发的必备技能了。


一、IO与NIO

1.面向流与面向缓冲区

Java IO中读取数据和写入数据是**面向流(Stream)**的,这表示当我们从流中读取数据,写入数据时也将其写入流,流的含义在于没有缓存 ,就好像我们站在流水线前,所有的数据沿着流水线依次到达我们的面前,我们只能读取当前的数据。如果需要获取某个数据的前一项或后一项数据那就必须自己缓存数据,而不能直接从流中获取。

而在Java NIO中数据的读写是面向**缓冲区(Buffer)**的,读取时可以将整块的数据读取到缓冲区中,在写入时则可以将整个缓冲区中的数据一起写入。这就好像是将流水线传输变成了卡车运送,面向流的数据读写只提供了一个数据流切面,而面向缓冲区的IO则使我们能够看到数据的上下文,也就是说在缓冲区中获取某项数据的前一项数据或者是后一项数据十分方便。这种便利是有代价的,因为我们必须管理好缓冲区,这包括不能让新的数据覆盖了缓冲区中还没有被处理的有用数据;将缓冲区中的数据正确的分块,分清哪些被处理过哪些还没有等等。

2.阻塞与非阻塞

传统的 IO 流都是阻塞式的。也就是说,当一个线程调用 read() 或 write()时,该线程被阻塞,直到有一些数据被读取或写入,该线程在此期间不能执行其他任务。因此,在完成网络通信进行 IO 操作时,由于线程会阻塞,所以服务器端必须为每个客户端都提供一个独立的线程进行处理,当服务器端需要处理大量客户端时,性能急剧下降。

Java NIO非阻塞模式的。当线程从某通道进行读写数据时,若没有数据可用时,该线程可以进行其他任务。线程通常将非阻塞 IO 的空闲时间用于在其他通道上执行 IO 操作,所以单独的线程可以管理多个输入和输出通道。因此,NIO 可以让服务器端使用一个或有限几个线程来同时处理连接到服务器端的所有客户端。

二、TCP聊天程序

1.基于IO

IO服务端

import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class IOServer {

    @SuppressWarnings("resource")
    public static void main(String[] args) throws Exception {

        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        //创建socket服务,监听8081端口
        ServerSocket server=new ServerSocket(8081);
        System.out.println("服务器启动!");
        int count=0;
        while(true){
            //获取一个套接字(阻塞)
            final Socket socket = server.accept();
            System.out.println("欢迎第"+(++count)+"个同学");
            newCachedThreadPool.execute(new Runnable() {

                @Override
                public void run() {
                    //业务处理
                    handler(socket);
                }
            });

        }
    }

  
 //读取数据
    
    public static void handler(Socket socket){
        try {
            byte[] bytes = new byte[1024];
            InputStream inputStream = socket.getInputStream();

            while(true){
                //读取数据(阻塞)
                int read = inputStream.read(bytes);
                if(read != -1){
                    System.out.println(new String(bytes, 0, read));
                }else{
                    break;
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }finally{
            try {
                System.out.println("socket关闭");

                socket.close();
            } catch (IOException e) {

                e.printStackTrace();
            }
        }
    }
}

IO客户端

import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;

public class IOClient {
    public static void main(String[] args) throws IOException {
        //发送十次
        for (int i=0;i<10;i++){
            Socket socket=new Socket("127.0.0.1", 8081);
            //写数据
            OutputStream os=socket.getOutputStream();
            os.write(("御神楽"+i).getBytes());
            //释放资源
            socket.close();
        }

    }

}

效果:
在这里插入图片描述

2.基于NIO

NIO服务端

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

public class NIOServer {
    // 通道管理器
    private Selector selector;


     //启动服务端测试

    public static void main(String[] args) throws IOException {
        NIOServer server = new NIOServer();
        server.initServer(8081);
        server.listen();
    }



     // 获得一个ServerSocket通道,并对该通道做一些初始化的工作

    public void initServer(int port) throws IOException {
        // 获得一个ServerSocket通道
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        // 设置通道为非阻塞
        serverChannel.configureBlocking(false);
        // 将该通道对应的ServerSocket绑定到port端口
        serverChannel.socket().bind(new InetSocketAddress(port));
        // 获得一个通道管理器
        this.selector = Selector.open();
        // 将通道管理器和该通道绑定,并为该通道注册SelectionKey.OP_ACCEPT事件,注册该事件后,
        // 当该事件到达时,selector.select()会返回,如果该事件没到达selector.select()会一直阻塞。
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);
    }


     //采用轮询的方式监听selector上是否有需要处理的事件,如果有,则进行处理

    public void listen() throws IOException {
        System.out.println("服务端启动成功!");
        // 轮询访问selector
        while (true) {
            // 当注册的事件到达时,方法返回;否则,该方法会一直阻塞
            selector.select();
            // 获得selector中选中的项的迭代器,选中的项为注册的事件
            Iterator<?> ite = this.selector.selectedKeys().iterator();
            while (ite.hasNext()) {
                SelectionKey key = (SelectionKey) ite.next();
                // 删除已选的key,以防重复处理
                ite.remove();

                handler(key);
            }
        }
    }


     //处理请求

    public void handler(SelectionKey key) throws IOException {

        // 客户端请求连接事件
        if (key.isAcceptable()) {
            handlerAccept(key);
            // 获得了可读的事件
        } else if (key.isReadable()) {
            handelerRead(key);
        }
    }


     // 处理连接请求

    public void handlerAccept(SelectionKey key) throws IOException {
        ServerSocketChannel server = (ServerSocketChannel) key.channel();
        // 获得和客户端连接的通道
        SocketChannel channel = server.accept();
        // 设置成非阻塞
        channel.configureBlocking(false);

        // 在这里可以给客户端发送信息哦
        System.out.println("检测到新客户连接");
        // 在和客户端连接成功之后,为了可以接收到客户端的信息,需要给通道设置读的权限。
        channel.register(this.selector, SelectionKey.OP_READ);
    }


     // 处理读的事件

    public void handelerRead(SelectionKey key) throws IOException {
        // 服务器可读取消息:得到事件发生的Socket通道
        SocketChannel channel = (SocketChannel) key.channel();
        // 创建读取的缓冲区
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        int read = channel.read(buffer);
        if(read > 0){
            byte[] data = buffer.array();
            String msg = new String(data).trim();
            System.out.println("用户名为:" + msg);

            //回写数据
            ByteBuffer outBuffer = ByteBuffer.wrap("服务器已接收".getBytes());
            channel.write(outBuffer);// 将消息回送给客户端
        }else{
            System.out.println("客户端关闭");
            key.cancel();
        }
    }
}

NIO客户端

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;

public class NIOClient {
    public static void main(String[] args) throws Exception {
        final int count[]=new int[1];
        count[0]=1;
        for(int i=0;i<5;i++){
            new Thread(new Runnable() {
                @Override
                public void run() {
                    SocketChannel socketChannel = null;
                    //发送的数据
                    String str = "御神楽"+count[0]++;
                    ByteBuffer byteBuffer = ByteBuffer.wrap(str.getBytes());

                    //接受的数据
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    try {
                        //建立连接
                        socketChannel = SocketChannel.open();
                        socketChannel.configureBlocking(false);
                        if (!socketChannel.connect(new InetSocketAddress("127.0.0.1", 8081))) {
                            //等待连接
                            while (!socketChannel.finishConnect()) {
                            }
                        }
                        //写入数据
                        socketChannel.write(byteBuffer);
                    } catch (IOException e) {
                        e.printStackTrace();
                    }

                    //10s后自动断开连接
                    int time=1;
                    while (time<10){
                        time++;
                        try {
                            //读取数据
                            int read=socketChannel.read(buffer);
                            if(read > 0) {
                                byte[] data = buffer.array();
                                String msg = new String(data).trim();
                                System.out.println("客户端收到信息:" + msg);
                            }
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
                    try {
                        socketChannel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
            Thread.sleep(100);
        }
    }

}

测试效果:
服务器:
在这里插入图片描述
客户端:
在这里插入图片描述

3.基于Netty

Netty服务端:

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

import java.util.Scanner;

public class NettyServer {
    public static void main(String[] args) {
        //用于处理服务器端接收客户端连接
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
        //进行网络通信(读写)
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            //辅助工具类,用于服务器通道的一系列配置
            ServerBootstrap bootstrap = new ServerBootstrap();
            //绑定两个线程组
            bootstrap.group(bossGroup,workerGroup)
                    //设置boss selector建立channel使用的对象
                    .channel(NioServerSocketChannel.class)
                    //boss 等待连接的 队列长度
                    .option(ChannelOption.SO_BACKLOG,1024)
                    //处理消息对象
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            //创建管道
                            ChannelPipeline pipeline = ch.pipeline();
                            //解码方式
                            pipeline.addLast("decoder",new StringDecoder());
                            //编码方式
                            pipeline.addLast("encoder",new StringEncoder());
                            //自定义处理消息对象
                            pipeline.addLast(new ServerHandler());
                        }
                    });
            System.out.println("服务器正在启动");
            //绑定端口号
            ChannelFuture cf = bootstrap.bind(8083).sync();

            cf.addListener(cd->{
                if(cd.isSuccess()){
                    System.out.println("启动成功");
                }else{
                    System.out.println("启动失败");
                }
            });
            //服务端给所有客户端发信息
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNextLine()){
                String msg = scanner.nextLine();
                ServerHandler.sendAll(msg);
            }
            //阻塞当前线程
            cf.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }



}
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.text.SimpleDateFormat;
import java.util.Date;

public class ServerHandler extends SimpleChannelInboundHandler<String> {
    private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        Channel channel = ctx.channel();
        System.out.println(channel.remoteAddress()+" == " +msg);
        channelGroup.forEach(ch->{
            if (channel!=ch) {
                ch.writeAndFlush("[ 客户端 ]" + channel.remoteAddress() + "发送了消息 : " + msg + "\n");
            }else{
                ch.writeAndFlush("[ 客户 ] 发送了消息: " + msg + "\n");
            }
        });

    }
    //用于服务端发信息给所有客户端
    public static void sendAll(String msg){
        channelGroup.forEach(channel -> {
            channel.writeAndFlush("服务器: "+msg+"\n");
        });
    }


     // 当有新的用户连接触发

    public void channelActive(ChannelHandlerContext ctx){
        Channel channel = ctx.channel();
        channelGroup.writeAndFlush("[ 客户端]"+channel.remoteAddress()+" 已连接 "+sf.format(new Date())+"\n");
        //把新来的连接加入
        channelGroup.add(channel);
        System.out.println(ctx.channel().remoteAddress()+" 上线了" + "\n");
    }


     //当用户断开连接触发

    public void channelInactive(ChannelHandlerContext ctx) {
        Channel channel = ctx.channel();
        channelGroup.writeAndFlush("[ 客户端 ] " +channel.remoteAddress()+ " 断开连接"+"\n");
        System.out.println(channel.remoteAddress()+" 下线了.\n");
        System.out.println("channelGroup size = "+ channelGroup.size());
    }


}

Netty客户端:

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

import java.util.ArrayList;
import java.util.List;

public class NettyClient {
    public static void main(String[] args) throws InterruptedException {
        NioEventLoopGroup group = new NioEventLoopGroup();
        List<ChannelFuture> channelFutures = new ArrayList<ChannelFuture>();
        try {
            Bootstrap bootstrap = new Bootstrap();
            //服务器可以主动断开连接
            bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
            //地址复用
            bootstrap.option(ChannelOption.SO_REUSEADDR, true);
            bootstrap.group(group).channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline.addLast("decoder",new StringDecoder());
                            pipeline.addLast("encoder",new StringEncoder());
                            pipeline.addLast(new ClientHandler());
                        }
                    });
            final int count[] =new int[1];
            count[0]=0;
            for(int i=0;i<3;i++){
                //添加连接
                channelFutures.add(bootstrap.connect("127.0.0.1",8083).sync());
                //新建线程模拟多用户
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        int index=count[0]++;
                        //获取对应管道
                        Channel channel = channelFutures.get(index).channel();
                        System.out.println( "======"+channel.localAddress()+"======");
                        int time=0;
                        while (time++<3){
                            //发送数据
                            String msg =" 御神楽 "+(index)+": "+time;
                            channel.writeAndFlush(msg);
                            try {
                                Thread.sleep(1000);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                        //关闭连接
                        channel.close();

                    }
                }).start();

            }

            //阻塞主线程,否则会直接执行finally关闭EventLoopGroup
            int time=0;
            while (time++<5){
                Thread.sleep(1000);
            }

        } finally {
            //关闭EventLoopGroup
            group.shutdownGracefully();
        }

    }

}
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

public class ClientHandler extends SimpleChannelInboundHandler<String> {
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {
        System.out.println(msg.trim());
    }

}

演示
服务端:
在这里插入图片描述
客户端
**加粗样式**


参考

https://blog.csdn.net/linjpg/article/details/80962453
https://blog.csdn.net/qq_47281915/article/details/121802536

  网络协议 最新文章
使用Easyswoole 搭建简单的Websoket服务
常见的数据通信方式有哪些?
Openssl 1024bit RSA算法---公私钥获取和处
HTTPS协议的密钥交换流程
《小白WEB安全入门》03. 漏洞篇
HttpRunner4.x 安装与使用
2021-07-04
手写RPC学习笔记
K8S高可用版本部署
mySQL计算IP地址范围
上一篇文章      下一篇文章      查看所有文章
加:2021-12-16 18:03:49  更:2021-12-16 18:05:53 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/9 1:47:35-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码