IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> Netty框架之多线程的Selector优化 -> 正文阅读

[Java知识库]Netty框架之多线程的Selector优化


学习Selector之后,我们发现单个Selector轮询器在一个线程中既要处理 连接事件(OP_ACCPET),又要处理 IO读写事件(OP_READ,OP_WRITE),效率是很低的,所以利用多线程创建多个Selector分别负责连接事件,IO读写事件,充分利用电脑的CPU资源.

那么这里我们就分类两个线程:
1.Boss线程:专门负责OP_ACCPET事件,也就是连接事件
2.Woker线程:专门负责OP_READ和OP_WRITE事件,也就是IO读写事件

在这里插入图片描述

1.一个Boss线程,一个Woker线程实现服务器

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;

/**
 * 多线程结合selector优化,充分利用CPU资源
 * 一个boss线程,一个woker线程
 * boss专门负责处理连接请求,woker专门负责处理IO读写请求
 */
public class WokerTest {
    public static void main(String[] args) throws Exception {
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);

        Selector boss = Selector.open();
        ssc.bind(new InetSocketAddress(8080));
        SelectionKey ssck = ssc.register(boss, 0, null);
        //设置只关心连接事件
        ssck.interestOps(SelectionKey.OP_ACCEPT);
        Woker woker = new Woker("woker-0");
        //处理连接请求
        while(true){
            //如果有连接请求就处理连接,如果没有就阻塞
            boss.select();
            Iterator<SelectionKey> iterator = boss.selectedKeys().iterator();
            while(iterator.hasNext()){
                SelectionKey sk = iterator.next();
                SocketChannel sc =(SocketChannel)sk.channel();
                //设置为非阻塞模式
                sc.configureBlocking(false);
                //注册到woker中,让woker负责此通道的IO处理
                woker.registe(sc);
            }
        }
    }
    static class Woker implements Runnable{
        private String name;
        private Selector WokerSelector;
        private Thread thread;
        private boolean flag=false;
        private ConcurrentLinkedQueue<Runnable>queue = new ConcurrentLinkedQueue<>();
        //注册通道SocketChannel到Selector(一直有,一直做),并且完成WokerSelector和thread的初始化工作(仅一次)
        public void registe(SocketChannel sc) throws Exception {
            //初始化工作还没有做,那么完成初始化工作
            if(flag==false){
                flag = true;
                WokerSelector = Selector.open();
                thread = new Thread(this,name);
                thread.start();
            }
            queue.add(()->{
                try {
                    sc.register(WokerSelector,0,null);
                } catch (ClosedChannelException e) {
                    e.printStackTrace();
                }
            });
            WokerSelector.wakeup();//唤醒WokerSelector
        }
        public Woker(String name){
            this.name = name;
        }
        @Override
        public void run() {
            SelectionKey sk = null;
            ByteBuffer buf = ByteBuffer.allocate(1024);
            //把读写事件的通道Socketchannel注册到WokerSelector,并且完成读写事件的处理
            while(true){
                try {
                    /*如果有读写事件要处理,没有就阻塞
                     如果有通道需要注册,可以通过WokerSelector.wakeup()唤醒WokerSelector*/
                    WokerSelector.select();
                    //如果有通道Socketchannel需要注册道WokerSelector
                    Runnable task = queue.poll();
                    //队列可能为空,此时poll()不会抛异常而是返回null值
                    if(task!=null){
                        //完成注册事件
                        task.run();
                    }
                    Iterator<SelectionKey> iterator = WokerSelector.selectedKeys().iterator();
                    while(iterator.hasNext()){
                        sk = iterator.next();
                        iterator.remove();
                        SocketChannel sc = (SocketChannel)sk.channel();
                        sc.read(buf);
                        buf.flip();
                        //读取信息
                        System.out.println(Charset.defaultCharset().decode(buf));
                        buf.clear();
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                    //事件要么被处理,要么被取消,不然会一直触发WokerSelector.select();
                    if(sk!=null) sk.cancel();
                }

            }
        }
    }
}

2.一个Boss线程,多个Woker线程实现服务器

与一个Woker线程不同的区别在于,如何把IO读写事件均分到多个Woker线程中,使得并发效率得到提升,减少单个Woker的处理压力

/*注册到多个woker中,让woker负责此通道的IO处理
                  这里用pos变量来实现负载均衡*/
                wokers[pos.getAndIncrement()%number].registe(sc);

完整代码

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 多线程结合selector优化,充分利用CPU资源
 * 一个boss线程,多个woker线程
 * boss专门负责处理连接请求,woker专门负责处理IO读写请求
 */
public class WokersTest {
    public static void main(String[] args) throws Exception {
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);

        Selector boss = Selector.open();
        ssc.bind(new InetSocketAddress(8080));
        SelectionKey ssck = ssc.register(boss, 0, null);
        //设置只关心连接事件
        ssck.interestOps(SelectionKey.OP_ACCEPT);
        //wokers个数最好设置为物理机的CPU核数,通过Runtime.getRuntime().availableProcessors()获取
        int number = Runtime.getRuntime().availableProcessors();
        Woker[]wokers = new Woker[number];
        AtomicInteger pos = new AtomicInteger(0);
        //处理连接请求
        while(true){
            //如果有连接请求就处理连接,如果没有就阻塞
            boss.select();
            Iterator<SelectionKey> iterator = boss.selectedKeys().iterator();
            while(iterator.hasNext()){
                SelectionKey sk = iterator.next();
                SocketChannel sc =(SocketChannel)sk.channel();
                //设置为非阻塞模式
                sc.configureBlocking(false);
                /*注册到多个woker中,让woker负责此通道的IO处理
                  这里用pos变量来实现负载均衡*/
                wokers[pos.getAndIncrement()%number].registe(sc);
            }
        }
    }
    static class Woker implements Runnable{
        private String name;
        private Selector WokerSelector;
        private Thread thread;
        private boolean flag=false;
        private ConcurrentLinkedQueue<Runnable>queue = new ConcurrentLinkedQueue<>();
        //注册通道SocketChannel到Selector(一直有,一直做),并且完成WokerSelector和thread的初始化工作(仅一次)
        public void registe(SocketChannel sc) throws Exception {
            //初始化工作还没有做,那么完成初始化工作
            if(flag==false){
                flag = true;
                WokerSelector = Selector.open();
                thread = new Thread(this,name);
                thread.start();
            }
            queue.add(()->{
                try {
                    sc.register(WokerSelector,0,null);
                } catch (ClosedChannelException e) {
                    e.printStackTrace();
                }
            });
            WokerSelector.wakeup();//唤醒WokerSelector
        }
        public Woker(String name){
            this.name = name;
        }
        @Override
        public void run() {
            SelectionKey sk = null;
            ByteBuffer buf = ByteBuffer.allocate(1024);
            //把读写事件的通道Socketchannel注册到WokerSelector,并且完成读写事件的处理
            while(true){
                try {
                    /*如果有读写事件要处理,没有就阻塞
                     如果有通道需要注册,可以通过WokerSelector.wakeup()唤醒WokerSelector*/
                    WokerSelector.select();
                    //如果有通道Socketchannel需要注册道WokerSelector
                    Runnable task = queue.poll();
                    //队列可能为空,此时poll()不会抛异常而是返回null值
                    if(task!=null){
                        //完成注册事件
                        task.run();
                    }
                    Iterator<SelectionKey> iterator = WokerSelector.selectedKeys().iterator();
                    while(iterator.hasNext()){
                        sk = iterator.next();
                        iterator.remove();
                        SocketChannel sc = (SocketChannel)sk.channel();
                        sc.read(buf);
                        buf.flip();
                        //读取信息
                        System.out.println(Charset.defaultCharset().decode(buf));
                        buf.clear();
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                    //事件要么被处理,要么被取消,不然会一直触发WokerSelector.select();
                    if(sk!=null) sk.cancel();
                }

            }
        }
    }
}

  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2022-04-27 11:11:09  更:2022-04-27 11:11:36 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 2:31:37-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码