学习Selector之后,我们发现单个Selector轮询器在一个线程中既要处理
连接事件(OP_ACCPET),又要处理
IO读写事件(OP_READ,OP_WRITE),效率是很低的,所以利用多线程创建多个Selector分别负责连接事件,IO读写事件,充分利用电脑的CPU资源.
那么这里我们就分类两个线程: 1.Boss线程:专门负责OP_ACCPET事件,也就是连接事件 2.Woker线程:专门负责OP_READ和OP_WRITE事件,也就是IO读写事件
1.一个Boss线程,一个Woker线程实现服务器
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;
public class WokerTest {
public static void main(String[] args) throws Exception {
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
Selector boss = Selector.open();
ssc.bind(new InetSocketAddress(8080));
SelectionKey ssck = ssc.register(boss, 0, null);
ssck.interestOps(SelectionKey.OP_ACCEPT);
Woker woker = new Woker("woker-0");
while(true){
boss.select();
Iterator<SelectionKey> iterator = boss.selectedKeys().iterator();
while(iterator.hasNext()){
SelectionKey sk = iterator.next();
SocketChannel sc =(SocketChannel)sk.channel();
sc.configureBlocking(false);
woker.registe(sc);
}
}
}
static class Woker implements Runnable{
private String name;
private Selector WokerSelector;
private Thread thread;
private boolean flag=false;
private ConcurrentLinkedQueue<Runnable>queue = new ConcurrentLinkedQueue<>();
public void registe(SocketChannel sc) throws Exception {
if(flag==false){
flag = true;
WokerSelector = Selector.open();
thread = new Thread(this,name);
thread.start();
}
queue.add(()->{
try {
sc.register(WokerSelector,0,null);
} catch (ClosedChannelException e) {
e.printStackTrace();
}
});
WokerSelector.wakeup();
}
public Woker(String name){
this.name = name;
}
@Override
public void run() {
SelectionKey sk = null;
ByteBuffer buf = ByteBuffer.allocate(1024);
while(true){
try {
WokerSelector.select();
Runnable task = queue.poll();
if(task!=null){
task.run();
}
Iterator<SelectionKey> iterator = WokerSelector.selectedKeys().iterator();
while(iterator.hasNext()){
sk = iterator.next();
iterator.remove();
SocketChannel sc = (SocketChannel)sk.channel();
sc.read(buf);
buf.flip();
System.out.println(Charset.defaultCharset().decode(buf));
buf.clear();
}
} catch (IOException e) {
e.printStackTrace();
if(sk!=null) sk.cancel();
}
}
}
}
}
2.一个Boss线程,多个Woker线程实现服务器
与一个Woker线程不同的区别在于,如何把IO读写事件均分到多个Woker线程中,使得并发效率得到提升,减少单个Woker的处理压力
wokers[pos.getAndIncrement()%number].registe(sc);
完整代码
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
public class WokersTest {
public static void main(String[] args) throws Exception {
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
Selector boss = Selector.open();
ssc.bind(new InetSocketAddress(8080));
SelectionKey ssck = ssc.register(boss, 0, null);
ssck.interestOps(SelectionKey.OP_ACCEPT);
int number = Runtime.getRuntime().availableProcessors();
Woker[]wokers = new Woker[number];
AtomicInteger pos = new AtomicInteger(0);
while(true){
boss.select();
Iterator<SelectionKey> iterator = boss.selectedKeys().iterator();
while(iterator.hasNext()){
SelectionKey sk = iterator.next();
SocketChannel sc =(SocketChannel)sk.channel();
sc.configureBlocking(false);
wokers[pos.getAndIncrement()%number].registe(sc);
}
}
}
static class Woker implements Runnable{
private String name;
private Selector WokerSelector;
private Thread thread;
private boolean flag=false;
private ConcurrentLinkedQueue<Runnable>queue = new ConcurrentLinkedQueue<>();
public void registe(SocketChannel sc) throws Exception {
if(flag==false){
flag = true;
WokerSelector = Selector.open();
thread = new Thread(this,name);
thread.start();
}
queue.add(()->{
try {
sc.register(WokerSelector,0,null);
} catch (ClosedChannelException e) {
e.printStackTrace();
}
});
WokerSelector.wakeup();
}
public Woker(String name){
this.name = name;
}
@Override
public void run() {
SelectionKey sk = null;
ByteBuffer buf = ByteBuffer.allocate(1024);
while(true){
try {
WokerSelector.select();
Runnable task = queue.poll();
if(task!=null){
task.run();
}
Iterator<SelectionKey> iterator = WokerSelector.selectedKeys().iterator();
while(iterator.hasNext()){
sk = iterator.next();
iterator.remove();
SocketChannel sc = (SocketChannel)sk.channel();
sc.read(buf);
buf.flip();
System.out.println(Charset.defaultCharset().decode(buf));
buf.clear();
}
} catch (IOException e) {
e.printStackTrace();
if(sk!=null) sk.cancel();
}
}
}
}
}
|