package com.HyChat.server;
import com.HyChat.server.Handle.MegHandel;
import com.HyChat.server.Handle.MegHandelimpl;
import com.HyChat.server.untity.LoggerUntity;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class HyChatServer {
private Selector selector;
private final int MaxFollwer=Runtime.getRuntime().availableProcessors()*2;;
private ServerSocketChannel socketChannel;
private MegHandel megHandel;
private int keys=0;
private FollowerServer[] followerServer=new FollowerServer[MaxFollwer];
private ExecutorService factory= Executors.newFixedThreadPool(3);
public void Start() throws IOException {
init();
HandelConn();
}
private void init() throws IOException {
socketChannel= ServerSocketChannel.open();
selector=Selector.open();
socketChannel.configureBlocking(false);
socketChannel.socket().bind(new InetSocketAddress("127.0.0.1",8888));
socketChannel.register(selector, SelectionKey.OP_ACCEPT);
megHandel=new MegHandelimpl();
for (int i=0;i<MaxFollwer;i++){
followerServer[i]=new FollowerServer();
}
}
private void HandelConn() throws IOException {
System.out.println("链接启动");
int index=0;
while (true) {
keys = selector.select();
if (keys>0){
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while(iterator.hasNext()){
SelectionKey currKey=iterator.next();
iterator.remove();
if (currKey.isAcceptable()){
socketChannel= (ServerSocketChannel) currKey.channel();
SocketChannel channel = socketChannel.accept();
channel.configureBlocking(false);
followerServer[0].Regist(channel);
LoggerUntity.LogInfo("收到链接");
}
}
}
}
}
public static void main(String[] args) {
try {
new HyChatServer().Start();
} catch (IOException e) {
e.printStackTrace();
}
}
}
package com.HyChat.server;
import com.HyChat.server.Handle.MegHandel;
import com.HyChat.server.Handle.MegHandelimpl;
import com.HyChat.server.Message.ReqMessage;
import com.HyChat.server.ThreadPool.TaskPool;
import com.HyChat.server.untity.LoggerUntity;
import lombok.SneakyThrows;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.*;
import java.util.concurrent.*;
public class FollowerServer {
private Selector selector;
private MegHandel megHandel;
private ExecutorService factory;
public FollowerServer() throws IOException {
this.selector = Selector.open();
factory= Executors.newCachedThreadPool();
megHandel=new MegHandelimpl();
new Thread(new Runnable() {
@SneakyThrows
@Override
public void run() {
ReadPoll();
}
}).start();
}
public void Regist(SocketChannel channel) throws ClosedChannelException {
try {
LoggerUntity.LogInfo(String.format("收到新的链接%s", channel.getRemoteAddress().toString()));
} catch (IOException e) {
e.printStackTrace();
}
channel.register(selector, SelectionKey.OP_READ);
}
private void ReadPoll() throws IOException {
while (true){
int selectLen=0;
selectLen=selector.select(1000);
if (selectLen>0) {
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey currKey = iterator.next();
iterator.remove();
if (currKey.isReadable()){
SocketChannel channel = (SocketChannel) currKey.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
int length = channel.read(buffer);
if (length==-1){
LoggerUntity.LogInfo("用户下线了");
MegHandelimpl.Offline(currKey);
currKey.cancel();
continue;
}
try {
TaskPool.Sumit(new Runnable() {
@SneakyThrows
@Override
public void run() {
ReqMessage.MegBody megBody = ReqMessage.MegBody.parseFrom(Arrays.copyOfRange(buffer.array(),0,length));
LoggerUntity.LogInfo(String.format("发送给 %s 的消息",megBody.getTarget()));
try {
megHandel.DoHandel(megBody,currKey);
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
catch (Exception e){
LoggerUntity.LogWaring("消息类型错误");
}
}
}
}
}
}
}
|