1.服务器代码
public class NIOServer {
Map<String, SocketChannel> users;
private Selector selector;
private static final int PORT = 8866;
private ServerSocketChannel server;
private ByteBuffer serverBuffer;
private CharsetDecoder decoder;
private CharsetEncoder encoder;
public void setServer() throws IOException {
server = ServerSocketChannel.open();
serverBuffer = ByteBuffer.allocate(1024);
server.socket().bind(new InetSocketAddress(PORT));
server.configureBlocking(false);
selector = Selector.open();
server.register(selector, SelectionKey.OP_ACCEPT);
}
public NIOServer() throws IOException {
setServer();
users = new ConcurrentHashMap<>();
decoder = Charset.forName("UTF-8").newDecoder();
encoder = Charset.forName("UTF-8").newEncoder();
}
public void startServer() throws IOException {
System.out.println("***** 服务器开始工作 *****");
while (true) {
selector.select();
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel clientChannel = server.accept();
clientChannel.configureBlocking(false);
ByteBuffer clientBuffer = ByteBuffer.allocate(1024);
Information channelInformation = new Information(null, clientBuffer);
clientChannel.register(selector, SelectionKey.OP_READ, channelInformation);
System.out.println("---------------------------------");
String s = clientChannel.socket().getInetAddress().toString();
System.out.print("获取连接IP: " + s.substring(1, s.length()) + " 端口:");
System.out.println(clientChannel.socket().getPort());
} else if (key.isReadable()) {
SocketChannel clientChannel = (SocketChannel) key.channel();
try {
Information information = (Information) key.attachment();
ByteBuffer clientBuffer = information.getBuffer();
clientBuffer.clear();
int n = clientChannel.read(clientBuffer);
if (n == -1) {
String username = information.getUsername();
information.release();
key.cancel();
clientChannel.close();
System.out.println("---------------------------------");
System.out.println("用户: " + username + " 已下线");
System.out.println("---------------------------------");
} else {
clientBuffer.flip();
String message = decoder.decode(clientBuffer).toString();
if (message.startsWith("To Server:")) {
String username = message.substring(10, message.length());
information.setUsername(username);
users.put(username, clientChannel);
System.out.println("用户名: " + username);
System.out.println("---------------------------------");
} else {
String[] data = message.split("\\:");
String fromUsername = information.getUsername();
String toUsername = data[0].substring(3, data[0].length());
String content = data[1];
System.out.println(fromUsername + "——>" + toUsername + ": " + content);
if (users.containsKey(toUsername)) {
SocketChannel toChannel = users.get(toUsername);
toChannel.write(encoder.encode(CharBuffer.wrap(("收到来自用户" + fromUsername + "的消息" + ": " + content))));
}
}
}
} catch (Exception e) {
key.cancel();
clientChannel.close();
System.out.println("客户端异常断开。。。 服务器关闭连接,客户端IP地址为: " + clientChannel.socket().getInetAddress() + " 的连接");
}
}
}
}
}
public static void main(String[] args) throws IOException {
new NIOServer().startServer();
}
}
2.附件Information类
import java.nio.ByteBuffer;
@Data
public class Information {
private String username;
private ByteBuffer buffer;
public Information(String username, ByteBuffer buffer) {
this.username = username;
this.buffer = buffer;
}
public void release() {
this.buffer = null;
}
}
3.客户端
public class NIOClient extends Thread {
private CharsetDecoder decoder;
private CharsetEncoder encoder;
private String username;
private Selector selector;
private SocketChannel clientChannel;
private ByteBuffer buffer;
public NIOClient(String username) throws IOException {
decoder = Charset.forName("UTF-8").newDecoder();
encoder = Charset.forName("UTF-8").newEncoder();
buffer = ByteBuffer.allocate(1024);
this.username = username;
clientChannel = SocketChannel.open();
clientChannel.configureBlocking(false);
selector = Selector.open();
clientChannel.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
clientChannel.connect(new InetSocketAddress("localhost", 8866));
}
@Override
public void run() {
while (true) {
try {
selector.select();
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isConnectable()) {
SocketChannel channel = (SocketChannel) key.channel();
if (channel.isConnectionPending()) {
channel.finishConnect();
}
send("To Server:" + username);
} else {
SocketChannel channel = (SocketChannel) key.channel();
channel.read(buffer);
buffer.flip();
String msg = decoder.decode(buffer).toString();
buffer.clear();
System.out.println(msg);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
public void send(String msg) throws IOException {
this.clientChannel.write(encoder.encode(CharBuffer.wrap(msg)));
}
public void close() throws IOException {
selector.close();
clientChannel.socket().close();
}
}
4.测试类
public class Client {
public static void main(String[] args) throws IOException {
String username = "XXX";
NIOClient client = new NIOClient(username);
client.start();
BufferedReader sin = new BufferedReader(new InputStreamReader(System.in));
try {
String readline;
while ((readline = sin.readLine()) != null) {
if (readline.equals("bye")) {
client.close();
System.exit(0);
}
client.send(readline);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
5.测试结果
先启动服务器,然后启动两个客户端,一个用户名为kd7,另一个为shstart7
服务器控制台
客户端shstart7控制台
客户端kd7控制台
6.运行流程
- 1.初始化服务器,包括绑定注册Selector,监听accept()事件等,然后开启服务器。
- 2.客户端
(绑定了客户端自己的Slector,监听Connect和Read事件) 连接服务器,服务器监听到连接,获取SocketChannel,将其注册到Selector上,监听read()事件,并打印客户端IP地址与端口号。 - 3.客户端监听到Connect事件,向服务器发送自己的用户名
(固定格式 To Server:username) 。 - 4.服务器监听到read()事件,根据前缀To Server判断是用户发来的用户名,然后获取用户名,将
<username:SocketChannel> 存到Map中,然后申请一个ByteBuffer,将获取到的username和ByteBuffer封装成一个Information ,以附件的形式添加到此连接SocketChannel中。 - 5.客户端A向客户端B发送消息。
- 固定格式:To usernameB: msg,服务器监听到read()事件,将消息读取到自己绑定的buffer中,解析出消息然后根据前缀判断此消息是发给其他客户端的,获取消息中B的username,查询Map获取B的SocketChannel,然后将消息发送给B。服务器打印一下A——>B : msg
|