IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Netty学习前置知识(一) -> 正文阅读

[大数据]Netty学习前置知识(一)

作者:recommend-item-box type_blog clearfix

前言

此篇博文主要根据尚硅谷的视频课程把使用Netty需要了解的知识进行整理并根据自己的理解加以补充。由于内容较多所以将笔记分为多个部分,同时更新速度可能会比较慢,希望读者多包涵。

Netty 介绍

  • Netty 是由 JBOSS提供的一个Java 开源框架,现为Github上的独立项目。
  • Netty是一个异步的,基于事件驱动的网络应用框架,用一快速开发高性能、高可靠的网络IO程序。
  • Netty 主要针对TCP协议下,面向Clients端高并发应用,或者Peer-to-Peer(简称P2P)场景下的大量数据持续传输的应用。
  • Netty 本质是一个NIO框架,使用与服务器通信相关的多种应用场景。

Netty 使用场景

互联网行业

  1. 在分布式系统中,各个节点之间需要远程服务调用,高性能的RPC框架必不可少,Netty作为异步高性能通信框架,往往作为基础通信组件被这些RPC框架使用。
  2. 典型的应用有:阿里分布式服务框架Dubbo的RPC框架,使用协议进行节点间通信,Dubbo协议默认使用Netty作为基础通信组件,用于实现各个进程节点之间的内部通信。

游戏行业

  1. 无论是手游服务端还是大型网络游戏Java语言得到了越来越广的应用。
  2. Netty作为高性能的基础通信组件,提供了TCP/UDP和HTTP协议栈,方便定制和开发私有协议栈,账号登陆服务器。
  3. 地图服务器之间可以方便的通过Netty进行高性能的通信。

大数据领域

  1. 经典的 Hadoop 的高性能通信和序列化组件 Avro 的 RPC 框架,默认采用 Netty 进行跨界点通信。
  2. 它的 NettyService 基于 Netty 框架二次封装实现。

Java IO模型

IO模型

  1. I/O模型的简单理解:就是用什么样的通道进行数据发送和接受,很大程度上决定了程序的通信性能。

  2. Java 支持3种网络编程模型:BIO/NIO/AIO

  3. Java BIO:同步且阻塞,服务器实现模式是一个连接一个线程,即客户端有连接请求是服务端就需要启一个线程来进行处理,如果这个连接一直不处理会造成不必要的线程开销。
    在这里插入图片描述

  4. Java Nio:同步非阻塞,服务器实现模式为一个线程处理多个连接请求,即客户端发送的连接请求注册多路复用器上,多路复用器轮询到连接有I/O请求进行处理。
    在这里插入图片描述

  5. Java AIO(NIO.2):异步非阻塞,AIO 引入异步通道的概念,采用了 Proactor 模式,简化了程序编写,有效的请求才启动线程,它的特点是先由操作系统完成后才通知服务端程序启动线程去处理,一般适用于连接数较多且连接时间较长的应用。

BIO,NIO,AIO场景分析比较

  1. BIO 方式适用于连接数目比较小且固定的架构,这种方式对服务器资源要求比较高,并发局限于应用中,JDK1.4 以前的唯一选择,但程序简单易理解。
  2. NIO 方式适用于 连接数目多且连接比较短(轻操作)的架构,比如聊天服务器,弹幕系统,服务器间通讯等。编程比较复杂,JDK1.4 开始支持。
  3. AIO 方式使用于连接数目多且连接比较长(重操作)的架构,比如相册服务器,充分调用 OS 参与并发操作,编程比较复杂,JDK7 开始支持。

Java BIO 编程

Java BIO介绍

  1. Java BIO 就是传统的 Java I/O 编程,其相关的类和接口在 java.io。
  2. BIO(BlockingI/O):同步阻塞,服务器实现模式为一个连接一个线程, 即客户端有连接请求时服务器端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销,可以通过线程池机制改善(实现多个客户连接服务器)。
  3. BIO 方式适用于连接数目比较小且固定的架构,这种方式对服务器资源要求比较高,并发局限于应用中,JDK1.4 以前的唯一选择,程序简单易理解。

Java BIO工作机制

在这里插入图片描述

BIO工作机制梳理

  1. 服务器端启动一个 ServerSocket
  2. 客户端启动 Socket 对服务器进行通信,默认情况下服务器端需要对每个客户建立一个线程与之通讯。
  3. 客户端发出请求后,先咨询服务器是否有线程响应,如果没有则会等待,或者被拒绝。如果有响应,客户端线程会等待请求结束后,再继续执行。

Java BIO编程实例

@Slf4j
public class Server {
    public static void main(String[] args) throws IOException {
        ExecutorService executorService = new ThreadPoolExecutor(4,8,0L,TimeUnit.MILLISECONDS,new LinkedBlockingDeque<>());
        ServerSocket serverSocket = new ServerSocket(12345);
        log.info("服务启动了");
        while (true){
            Socket socket = serverSocket.accept();
            log.info("有客户端连接了");
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    byte[] bytes = new byte[1024];
                    try {
                        while (true){
                        InputStream inputStream = socket.getInputStream();
                        int read = inputStream.read(bytes);
                        if(read!=-1){
                            System.out.println(new String(bytes,0,read));
                        }else {
                            break;
                        }
                        }
                    } catch (IOException e) {
                        e.printStackTrace();
                    }finally {
                        System.out.println("服务端关闭");
                    }
                }
            });
        }
    }
}

客户端采用telnet:
在这里插入图片描述

问题分析

  1. 每个请求都需要创建独立的线程,与对应的客户端进行数据Read、Write。
  2. 当并发数较大时需要创建大量线程来处理连接,系统资源占用较大。
  3. 连接建立后,如果当前线程暂时没有数据可读,则线程阻塞在Read操作上,造成资源浪费。

Java NIO 编程

Java NIO介绍

  1. Java NIO 全称Java non-blocking IO,是指 JDK 提供的新 API。从 JDK1.4 开始,Java 提供了一系列改进的输入/输出的新特性,被统称为 NIO(即 NewIO),是同步非阻塞的。
  2. NIO 相关类都被放在 java.nio 包及子包下,并且对原 java.io 包中的很多类进行改写。
  3. NIO 有三大核心部分:Channel(通道)、Buffer(缓冲区)、Selector(选择器)
  4. NIO 是面向缓冲区,或者面向块编程的。数据读取到一个它稍后处理的缓冲区,需要时可在缓冲区中前后移动,这就增加了处理过程中的灵活性,使用它可以提供非阻塞式的高伸缩性网络。
  5. Java NIO 的非阻塞模式,使一个线程从某通道发送请求或者读取数据,但是它仅能得到目前可用的数据,如果目前没有数据可用时,就什么都不会获取,而不是保持线程阻塞,所以直至数据变的可以读取之前,该线程可以继续做其他的事情。非阻塞写也是如此,一个线程请求写入一些数据到某通道,但不需要等待它完全写入,这个线程同时可以去做别的事情。
  6. 通俗理解:NIO 是可以做到用一个线程来处理多个操作的。假设有 10000 个请求过来,根据实际情况,可以分配 50 或者 100 个线程来处理。不像之前的阻塞 IO 那样,非得分配 10000 个。
  7. HTTP 2.0使用了多路复用的技术,做到同一个连接并发处理多个请求,而且并发请求的数量比 HTTP 1.1 大了好几个数量级。

NIO 与 BIO比较

  1. BIO流的方式处理数据,而 NIO的方式处理数据,块 I/O 的效率比流 I/O 高很多。
  2. BIO阻塞的,NIO则是非阻塞的。
  3. BIO基于字节流和字符流进行操作,而 NIO基于 Channel(通道)和 Buffer(缓冲区)进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。```Selector````(选择器)用于监听多个通道的事件(比如:连接请求,数据到达等),因此使用单个线程就可以监听多个客户端通道。
  4. Buffer和Channel之间的数据流向是双向
    在这里插入图片描述

NIO三大核心原理图

NIO 的 Selector、Channel 和、Buffer 之间关系

在这里插入图片描述

三大核心关系解读

1. 每个 Channel 都会对应一个 Buffer。
2. Selector 对应一个线程,一个线程对应多个 Channel(连接)。
3. 该图反应了有三个 Channel 注册到该 Selector //程序
4. 程序切换到哪个 Channel 是由事件决定的,Event 就是一个重要的概念。
5. Selector 会根据不同的事件,在各个通道上切换。
6. Buffer 就是一个内存块,底层是有一个数组。
7. 数据的读取写入是通过 Buffer,这个和 BIO是不同的,BIO 中要么是输入流,或者是输出流,不能双向,但是 NIO 的 Buffer 是可以读也可以写,需要 flip 方法切换 Channel 是双向的,可以返回底层操作系统的情况,比如 Linux,底层的操作系统通道就是双向的。

缓冲区Buffer

基本介绍

缓冲区(Buffer):
??缓冲区本质上是一个可以读写数据的内存块,Jdk1.8里面是这么说的A container for data of a specific primitive type也可以理解成是一个容器对象(含数组),该对象提供了一组方法,可以更轻松地使用内存块,,缓冲区对象内置了一些机制,能够跟踪和记录缓冲区的状态变化情况。Channel 提供从文件、网络读取数据的渠道,但是读取或写入的数据都必须经由 Buffer。
在这里插入图片描述

Buffer及其子类

1.在 NIO 中,Buffer 是一个顶层父类,它是一个抽象类,类的层级关系图:
在这里插入图片描述

2.Buffer 类定义了所有的缓冲区都具有的四个属性来提供关于其所包含的数据元素的信息:
在这里插入图片描述

基本属性: capacity, limit, and position

??初始化元素状态:新建的buffer的position始终是0,mark是未被标记,limit是0也可以是其他值这依赖于buffer的类型和使用方式

A newly-created buffer always has a position of zero and a mark that is
undefined.  The initial limit may be zero, or it may be some other value
that depends upon the type of the buffer and the manner in which it is
constructed.  Each element of a newly-allocated buffer is initialized
to zero.

Clearing, flipping, and rewinding

除了访问position, limit, capacity,mark,reset之外,此buffer类也定义了如下操作:
clear():为一个新的通道读取序列或相对于put操作准备好缓冲区:limit,capacity,position都设为0
flip():为一个新的通道写入序列或相对于get操作准备好缓冲区:此时limit设置为当前的positionposition设置为0
rewind():为重复读取它已经包含的数据准备一个缓冲区。此时limit不变position设置为0

线程安全(Thread safety)

  1. 多线程访问时缓冲区不是线程安全,如果要使用多线程访问缓冲区则适当使用同步控制
  2. 允许链式调用:b.flip().position(23).limit(42);

Channel(通道)

channel基本介绍

  • NIO 的通道类似于流,但有些区别如下:
    • 通道可以同时进行读写,而流只能读或者只能写
    • 通道可以实现异步读写数据
    • 通道可以从缓冲读数据,也可以写数据到缓冲:
  • BIO 中的 Stream 是单向的,例如 FileInputStream 对象只能进行读取数据的操作,而 NIO 中的通道(Channel)是双向的,可以读操作,也可以写操作。
  • Channel 在 NIO 中是一个接口 public interface Channel extends Closeable{}
    常用的 Channel 类有:FileChannel、DatagramChannel、ServerSocketChannel 和 SocketChannel。【ServerSocketChanne 类似 ServerSocket、SocketChannel 类似 Socket】
  • FileChannel 用于文件的数据读写,DatagramChannel 用于 UDP 的数据读写,ServerSocketChannel 和 SocketChannel 用于 TCP 的数据读写。

FileChannel 类

FileChannel 主要用来对本地文件进行 IO 操作,常见的方法有:
public int read(ByteBuffer dst),从通道读取数据并放到缓冲区中
public int write(ByteBuffer src),把缓冲区的数据写到通道中
public long transferFrom(ReadableByteChannel src, long position, long count),从目标通道中复制数据到当前通道
public long transferTo(long position, long count, WritableByteChannel target),把数据从当前通道复制给目标通道

FileChannel 向本地文件写入数据:实例一

/**
 * @ClassName: FIleServer
 * @Description: 使用NIO向文件写数据:将数据写入filechannel.txt文件中
 * @Author: admin
 */
public class FIleServer {
    public static void main(String[] args) {
        try {
            // 创建文本
            FileOutputStream fileOutputStream = new FileOutputStream("d:\\filechannel.txt");
            // 获取channel
            FileChannel fileChannel = fileOutputStream.getChannel();
            // 创建一个缓存区
            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
            // 将数据写入buffer
            byteBuffer.put("FileChannel 用于文件的数据读写,DatagramChannel 用于 UDP 的数据读写,ServerSocketChannel 和 SocketChannel 用于 TCP 的数据读写。".getBytes());
            // 反转
            byteBuffer.flip();
            // 将数据写入channel中
            fileChannel.write(byteBuffer);
            // 关闭channel
            fileOutputStream.close();
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

FileChannel 向本地文件写入数据:实例二

/**
 * @ClassName: FileInputServer
 * @Description: 通过channel读取d:\filechannel.txt中的数据并输出
 * @Author: admin
 */
public class FileInputServer {
    public static void main(String[] args) {
        String pathname = "d:\\filechannel.txt";
        File file = new File(pathname);
        try {
            // 通过输入流获取数据
            FileInputStream fileInputStream = new FileInputStream(file);
            // 获得channel
            FileChannel fileChannel = fileInputStream.getChannel();
            // 创建buffer
            ByteBuffer byteBuffer = ByteBuffer.allocate((int) file.length());
            // 读取数据到buffer中
            fileChannel.read(byteBuffer);
            // 输出数据
            System.out.println(new String(byteBuffer.array()));
            // 关闭流
            fileInputStream.close();
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

FileChannel 向本地文件写入数据:实例三

实例说明:
使用一个 Buffer 完成文件读取、写入 从filechannel.txt中读取数据。
将读取的数据写到file03.txt。
实例演示。

在这里插入图片描述

/**
 * @ClassName: FileChannel03
 * @Description: 从filechannel.txt中读取数据并写入file03.txt中
 * @Author: admin
 */
public class FileChannel03 {
    public static void main(String[] args) {
        try {
            // 从filechannel.txt读取数据
            String pathname = "d:\\filechannel.txt";
            File file = new File(pathname);
            FileInputStream fileInputStream = new FileInputStream(file);
            FileChannel channel1 = fileInputStream.getChannel();
            // 创建输出流写入数据到file03.txt中
            FileOutputStream fileOutputStream = new FileOutputStream("d:\\file03.txt");
            FileChannel channel2 = fileOutputStream.getChannel();
            ByteBuffer allocate = ByteBuffer.allocate((int)file.length());
            // 循环读取
            while(true){
                allocate.clear();
                int read = channel1.read(allocate);
                if(read==-1){
                  break;
                }
                // 写入数据到file03.txt中
                allocate.flip();
                channel2.write(allocate);
            }
            fileOutputStream.close();
            fileInputStream.close();
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

FileChannel 向本地文件写入数据:实例四

使用 FileChannel(通道)和方法 transferFrom,完成文件的拷贝 实例演示

/**
 * @ClassName: FileChannel04
 * @Description: 使用transferFrom将图片从D:\images\buffer.jpg拷贝到d:\buffer.jpg
 * @Author: admin
 */
public class FileChannel04 {
    public static void main(String[] args) {
        try {
            // 从D:\images\buffer.jpg读取文件
            String pathname = "D:\\images\\buffer.jpg";
            File file = new File(pathname);
            FileInputStream fileInputStream = new FileInputStream(file);
            FileChannel channel1 = fileInputStream.getChannel();
            // 创建输出流写入数据到file03.txt中
            FileOutputStream fileOutputStream = new FileOutputStream("d:\\buffer.jpg");
            FileChannel channel2 = fileOutputStream.getChannel();
            channel2.transferFrom(channel1,0,channel1.size());
            fileOutputStream.close();
            fileInputStream.close();
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

重点方法说明:

  • **transferFrom**是一个抽象方法,主要是将字节从给定的可读字节传输到此通道的文件中。
 public abstract long transferFrom(ReadableByteChannel src,long position, long count)throws IOException; ReadableByteChannel继承Channel,一个可读取字节的通道(A channel that can read bytes)
 - <p> Only one read operation upon a readable channel may be in progress at
 - any given time.  If one thread initiates a read operation upon a channel
 - then any other thread that attempts to initiate another read operation will
 - block until the first operation is complete.  Whether or not other kinds of
 - I/O operations may proceed concurrently with a read operation depends upon
 - the type of the channel. </p>
  • 任何时间可读信道上只能进行一次读取操作。
  • 如果一个线程在通道上启动读操作,则尝试启动另一个读操作的任何其他线程将阻塞,直到第一个操作完成。
  • 其他类型的I / O操作是否可以与读操作同时进行取决于通道的类型。

Buffer 和 Channel的附加说明

  1. **Buffer**存数据和取数据类型要一致。
  2. Buffer可以转成**只读**Buffer。
  3. 可以使用**MappedByteBuffer**让文件直接在内存(堆外的内存)中进行修改,而如何同步到文件由 NIO 来完成。
  4. Nio支持多个**Buffer**来完成数据读写,即 Scattering 和 Gathering。见ServerSocketChannel实例演示

ServerSocketChannel 实例演示

/**
 * @ClassName: ScatteringAndGathering05
 * @Description:使用Buffer数组读写数据
 * @Author: admin
 */
public class ScatteringAndGathering05 {
    public static void main(String[] args) {
        try {
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            // 设置地址
            InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 1234);
            ServerSocket socket = serverSocketChannel.socket();
            // 绑定端口
            socket.bind(inetSocketAddress);

            // 创建Buffer
            ByteBuffer[] byteBuffers = new ByteBuffer[2];
            byteBuffers[0] = ByteBuffer.allocate(5);
            byteBuffers[1] = ByteBuffer.allocate(3);

            // 等待连接
            SocketChannel accept = serverSocketChannel.accept();

            // 初始化消息长度
            int mesLength = 8;
            while (true){
                int readLen = 0;
                while (readLen < mesLength){
                    long read = accept.read(byteBuffers);
                    readLen += read;
                    System.out.println("readLen = " + readLen);
                    //使用流打印,看看当前的这个 buffer 的 position 和 limit
                    Arrays.asList(byteBuffers).stream().map(buffer -> "position = " + buffer.position() + ", limit = " + buffer.limit()).forEach(System.out::println);
                }
                // 反转数据
               Arrays.asList(byteBuffers).forEach(v->v.flip());
                //将数据读出显示到客户端
                long byteWirte = 0;
                while (byteWirte < mesLength) {
                    long l = accept.write(byteBuffers);
                    byteWirte += l;
                }

                //将所有的buffer进行clear
                Arrays.asList(byteBuffers).forEach(buffer -> {
                    buffer.clear();
                });
                System.out.println("readLen = " + readLen + ", byteWrite = " + byteWirte + ", mesLength = " + mesLength);
            }

        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

在这里插入图片描述

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-11-15 15:56:05  更:2021-11-15 15:57:38 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 5:36:49-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码