public class TimeServer {
public static void main(String[] args) {
int port=8080;
MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port);
new Thread(timeServer,"NIO-MultiplexerTimeServer-001").start();
}
}
服务端实现代码
package Nio;
import com.sun.xml.internal.ws.policy.privateutil.PolicyUtils;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;
public class MultiplexerTimeServer implements Runnable {
private Selector selector;
private ServerSocketChannel serverSocketChannel;
private volatile boolean stop;
public MultiplexerTimeServer(int port) {
try {
selector = Selector.open();
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().bind(new InetSocketAddress(port), 1024);
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("The time server is start in port :" + port);
} catch (Exception e) {
e.printStackTrace();
System.exit(1);
}
}
public void stop() {
this.stop = true;
}
@Override
public void run() {
while (!stop) {
try {
selector.select(1000);
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
SelectionKey selectionKey = null;
while (iterator.hasNext()) {
selectionKey = iterator.next();
iterator.remove();
try {
handleInput(selectionKey);
} catch (Exception e) {
if (selectionKey != null) {
selectionKey.cancel();
if (selectionKey.channel() != null) {
selectionKey.channel().close();
}
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
if (selector != null) {
try {
selector.close();
} catch (IOException ioException) {
ioException.printStackTrace();
}
}
}
private void handleInput(SelectionKey selectionKey) throws IOException {
if (selectionKey.isValid()) {
if (selectionKey.isAcceptable()) {
ServerSocketChannel channel = (ServerSocketChannel) selectionKey.channel();
SocketChannel accept = channel.accept();
accept.configureBlocking(false);
accept.register(selector, SelectionKey.OP_READ);
}
if (selectionKey.isReadable()) {
SocketChannel channel = (SocketChannel) selectionKey.channel();
ByteBuffer allocate = ByteBuffer.allocate(1024);
int read = channel.read(allocate);
if (read > 0) {
allocate.flip();
byte[] bytes = new byte[allocate.remaining()];
allocate.get(bytes);
for (byte b :
bytes) {
System.out.print(Integer.toHexString(b));
}
System.out.println();
String body = new String(bytes, "UTF-8");
System.out.println("order:" + body);
String currentTime = body.contains("ORDER") ? new java.util.Date(System.currentTimeMillis()).toString() : "BAD ORDER";
doWrite(channel, currentTime);
} else if (read < 0) {
selectionKey.cancel();
channel.close();
} else {
}
}
}
}
private void doWrite(SocketChannel channel, String currentTime) throws IOException {
if (currentTime != null && currentTime.trim().length() > 0) {
byte[] bytes = currentTime.getBytes();
ByteBuffer write = ByteBuffer.allocate(bytes.length);
write.put(bytes);
write.flip();
channel.write(write);
}
}
}
客户端启动类,此处把发送独立成一个方法可以主动发送
public class Client {
public static void main(String[] args) throws Exception {
int port = 15001;
TimeClientHandle_Demo timeClientHandle = new TimeClientHandle_Demo("127.0.0.1", port, 10000);
new Thread(timeClientHandle, "Time_client").start();
Thread.sleep(5000);
Map<SocketChannel, Integer> map = timeClientHandle.getMap();
Set<Map.Entry<SocketChannel, Integer>> entries = map.entrySet();
for (int i = 0; i < 10; i++) {
long start=System.currentTimeMillis();
Iterator<Map.Entry<SocketChannel, Integer>> iterator = entries.iterator();
while (iterator.hasNext()) {
Map.Entry<SocketChannel, Integer> next = iterator.next();
SocketChannel key = next.getKey();
if (i == 0) {
timeClientHandle.doWrite(key, ProtoModel.Online(UUID.randomUUID().toString()));
}else {
timeClientHandle.doWrite(key, "000000028E8F");
}
}
System.out.println(System.currentTimeMillis()-start);
Thread.sleep(30000);
}
timeClientHandle.setStop(true);
}
}
客户端实现
class TimeClientHandle_Demo implements Runnable {
private String host;
private int port;
private Selector selector;
public void setStop(boolean stop) {
this.stop = stop;
}
private volatile boolean stop;
private ArrayList<SocketChannel> arrayList = new ArrayList<>();
public Map<SocketChannel, Integer> getMap() {
return map;
}
private Map<SocketChannel, Integer> map = new HashMap<>();
public static byte[] hexStrToBinaryStr(String hexString) {
if (hexString.isEmpty()) {
return null;
}
hexString = hexString.replaceAll(" ", "");
int len = hexString.length();
int index = 0;
byte[] bytes = new byte[len / 2];
while (index < len) {
String sub = hexString.substring(index, index + 2);
bytes[index / 2] = (byte) Integer.parseInt(sub, 16);
index += 2;
}
return bytes;
}
public TimeClientHandle_Demo(String ipadress, int port, int count) {
this.host = ipadress == null ? "127.0.0.1" : ipadress;
this.port = port;
try {
selector = Selector.open();
while (count-- > 0) {
arrayList.add(SocketChannel.open());
}
try {
doConnect();
} catch (Exception e) {
e.printStackTrace();
System.exit(1);
}
} catch (Exception e) {
e.printStackTrace();
System.exit(1);
}
}
@Override
public void run() {
while (!stop) {
try {
selector.select(100);
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
SelectionKey key = null;
while (iterator.hasNext()) {
key = iterator.next();
iterator.remove();
try {
handleInput(key);
} catch (Exception e) {
if (key != null) {
key.cancel();
if (key.channel() != null) {
key.channel().close();
}
}
}
}
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
}
if (selector != null) {
try {
selector.close();
Iterator<SocketChannel> iterator = arrayList.iterator();
while (iterator.hasNext()) {
SocketChannel next = iterator.next();
next.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void doConnect() throws Exception {
Iterator<SocketChannel> iterator = arrayList.iterator();
while (iterator.hasNext()) {
SocketChannel socketChannel = iterator.next();
socketChannel.configureBlocking(false);
if (socketChannel.connect(new InetSocketAddress(host, port))) {
socketChannel.register(selector, SelectionKey.OP_READ);
} else {
socketChannel.register(selector, SelectionKey.OP_CONNECT);
}
}
}
public void doWrite(SocketChannel socketChannel, String msg) throws Exception {
byte[] bytes = hexStrToBinaryStr(msg);
ByteBuffer allocate = ByteBuffer.allocate(bytes.length);
allocate.put(bytes);
allocate.flip();
socketChannel.write(allocate);
}
private void handleInput(SelectionKey selectionKey) throws Exception {
if (selectionKey.isValid()) {
SocketChannel sc = (SocketChannel) selectionKey.channel();
if (selectionKey.isConnectable()) {
if (sc.finishConnect()) {
sc.register(selector, SelectionKey.OP_READ);
} else {
map.remove(sc);
System.exit(1);
}
if (!map.containsKey(sc)) {
map.put(sc, sc.socket().getLocalPort());
}
}
if (selectionKey.isReadable()) {
if (!map.containsKey(sc)) {
map.put(sc, sc.socket().getLocalPort());
}
ByteBuffer allocate = ByteBuffer.allocate(1024);
int read = sc.read(allocate);
if (read > 0) {
allocate.flip();
byte[] bytes = new byte[allocate.remaining()];
allocate.get(bytes);
String body = new String(bytes, "UTF-8");
} else if (read < 0) {
selectionKey.cancel();
sc.close();
} else {
}
}
}
}
}
|