初步尝试
HTTP请求接收
有两种策略,第一种如下,直接使用InputStream 进行接收,将bytes字节数组的长度设置大一点,可以一次性接收请求报文。
public void receive() throws Exception {
ServerSocket serverSocket = new ServerSocket(9000);
Socket socket = serverSocket.accept();
InputStream in = socket.getInputStream();
byte[] bytes = new byte[1024];
int read = in.read(bytes);
System.out.println(read);
System.out.println(new String(bytes, "UTF-8"));
}
第二种策略,使用转换流将InputStream转换为缓冲字符流,然后一行一行读取HTTP请求报文,这种方法无法知道请求报文达到结尾了,所以会一直阻塞,推荐使用上面那种一次读取
public void receive() throws Exception {
ServerSocket serverSocket = new ServerSocket(9000);
Socket socket = serverSocket.accept();
InputStream in = socket.getInputStream();
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(in));
String s = bufferedReader.readLine();
while (s != null) {
System.out.println(s);
s = bufferedReader.readLine();
}
}
HTTP响应构建
响应之前甚至可以不接收请求报文
public void receive() throws Exception {
ServerSocket serverSocket = new ServerSocket(9000);
Socket socket = serverSocket.accept();
OutputStream out = socket.getOutputStream();
PrintStream printStream = new PrintStream(out,true);
printStream.println("HTTP/1.1 200 OK");
printStream.println("Content-Type: text/html;charset=UTF-8" );
printStream.println("Connection: close");
printStream.println("Content-Length: "+"收到".getBytes().length);
printStream.println();
printStream.println("收到");
}
线程池
示例
高性能的HTTP服务器一定要有线程池的参与,可以将ServerSocket和Socket的代码分离开,ServerSocket只负责开启Socket,而将Socket要进行的任务放入线程池中
public void receive() throws IOException {
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(10000);
ServerSocket serverSocket = new ServerSocket(9000);
ThreadPoolExecutor threadPoolExecutor =
new ThreadPoolExecutor(8,16,60, TimeUnit.SECONDS, queue);
while (true) {
Socket accept = serverSocket.accept();
threadPoolExecutor.execute(new HandlerRunnable(accept));
}
}
线程池参数
上面的线程池不是Executors里面的四个线程池,而是自定义参数的线程池,参数如下
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
参数名 | 意义 |
---|
corePoolSize | 线程池中最小的线程数量,即使这些线程处理空闲状态,他们也不会被销毁,除非设置了allowCoreThreadTimeOut | maximumPoolSize | 线程池最大线程数量,需要等阻塞队列满了才会创建新线程 | keepAliveTime | 空闲线程存活时间,如果线程数大于corePoolSize,且又空闲线程存活时间,则会被销毁 | unit | 空闲线程存活时间单位,由 TimeUnit枚举类表示 | workQueue | 阻塞队列,存放等待执行的Runnable任务 | threadFactory | 线程工厂,配置线程池创建线程时方式以及创建什么样的线程 | handler | 拒绝策略 |
为什么线程池要有阻塞队列呢,因为线程池的原理就是,将任务放入阻塞队列,而其他线程会一直往这个队列中取任务并执行,即线程池中的线程代码都是类似如下的结构
public class HandlerRunnable implements Runnable {
@Override
public void run() {
while(true) {
Runnable task = workQueue.take();
task.run();
}
}
}
阻塞队列BlockingQueue接口的API如下,可以看到take就是该线程一直阻塞直到获取到任务task,然后执行任务的run方法,这样就不用创建新线程了
| 抛出异常 | 返回特殊值 | 阻塞 | 阻塞直到超时 |
---|
插入 | add(e) | offer(e) | put(e) | offer(Object, long, TimeUnit) | 移除 | remove() | poll() | take() | poll(long, TimeUnit) | 检索 | element() | peek() | not applicable | not applicable |
阻塞队列
BlockingQueue是一个接口,一般有五个实现类,通过选择不同的实现类,我们就可以让线程池具有不同的特性
实现类 | 作用 |
---|
LinkedBlockingQuene | 基于链表的无界阻塞队列,无界就是容量可以无限大,最多可以存Integer.MAX_VALUE个任务,但是可能会内存溢出 | ArrayBlockingQueue | 基于数组的有界阻塞队列,有界就是容量有限 | SynchronousQuene | 用于同步的阻塞队列,里面没有空间存储任务,是直接将任务交给线程,如果当前没有空闲线程则创建线程,如果线程数到达最大值则拒绝执行该任务 | PriorityBlockingQueue | 具有优先级的无界阻塞队列,可以根据优先级来优先调度任务 | DelayedWorkQueue | 队列会根据任务延时时间的不同进行排序,延时时间越短地就排在队列的前面,先被获取执行,可以实现延时任务效果 |
拒绝策略
RejectedExecutionHandler handler 就是拒绝策略类的接口,其有四个实现类,也就对应者四种拒绝策略,拒绝策略会在阻塞队列满了的时候执行
实现类 | 作用 |
---|
CallerRunsPolicy | 调用者线程中直接执行被拒绝任务的run方法,除非线程池已经关闭了 | AbortPolicy | 默认值,直接丢弃任务,并抛出RejectedExecutionException异常 | DiscardPolicy | 直接丢弃任务,并且什么都不做 | DiscardOldestPolicy | 抛弃进入队列最早的那个任务,然后尝试把这次拒绝的任务放入队列 |
默认线程池
JDK中给定的四种线程池其实就是帮我们设置好了,上面的参数
线程池 | 特点 |
---|
Executors.newSingleThreadExecutor() | 线程池corePoolSize和maximumPoolSize值都为1,空闲时间线程存活时间为0,即无须创建新线程和淘汰线程,,使用的LinkedBlockingQueue无界队列,淘汰策略是AbortPolicy | Executors.newFixedThreadPool() | 线程池corePoolSize和maximumPoolSize值都是创建时给定的值,空闲时间线程存活时间为0,即无须创建新线程和淘汰线程,使用的LinkedBlockingQueue无界队列,淘汰策略是AbortPolicy | Executors.newCachedThreadPool() | 线程池corePoolSize是0,maximumPoolSize是Integer.MAX_VALUE,空闲时间线程存活时间为60秒,使用SynchronousQueue队列即有空闲线程就使用,没有就创建一个再用,淘汰策略是AbortPolicy | Executors.newScheduledThreadPool() | 线程池corePoolSize是给定值,maximumPoolSize是Integer.MAX_VALUE,使用DelayedWorkQueue实现延时任务,淘汰策略是AbortPolicy |
测试
单线程版本
public class ReceiveHttp {
private Map<String, Handler> handlerMap = new HashMap<>();
public void receive() throws IOException {
ServerSocket serverSocket = new ServerSocket(9000);
while (true) {
Socket accept = serverSocket.accept();
try(Socket socket = accept;
InputStream in = socket.getInputStream();
OutputStream out = socket.getOutputStream();) {
System.out.println("开始");
byte[] bytes = new byte[1024];
int read = in.read(bytes);
System.out.println(read);
if (read != -1) {
System.out.println(new String(bytes, "UTF-8"));
PrintStream printStream = new PrintStream(out,true);
printStream.println("HTTP/1.1 200 OK");
printStream.println("Content-Type: text/html;charset=UTF-8" );
printStream.println("Connection: close");
printStream.println("Content-Length: "+Thread.currentThread().getName().getBytes().length);
printStream.println();
printStream.println(Thread.currentThread().getName());
}
} catch (IOException e) {
e.printStackTrace();
}
System.out.println("运行");
}
}
}
没有使用线程池单线程吞吐量为每秒280
多线程版本
public class ReceiveHttp {
private Map<String, Handler> handlerMap = new HashMap<>();
public void receive() throws IOException {
ServerSocket serverSocket = new ServerSocket(9000);
while (true) {
Socket accept = serverSocket.accept();
Thread thread = new Thread(new HandlerRunnable(accept));
thread.start();
}
}
}
public class HandlerRunnable implements Runnable {
private Socket accept;
public HandlerRunnable(Socket socket) {
this.accept = socket;
}
@Override
public void run() {
try(Socket socket = accept;
InputStream in = socket.getInputStream();
OutputStream out = socket.getOutputStream();) {
System.out.println("开始");
byte[] bytes = new byte[1024];
int read = in.read(bytes);
System.out.println(read);
if (read != -1) {
System.out.println(new String(bytes, "UTF-8"));
PrintStream printStream = new PrintStream(out,true);
printStream.println("HTTP/1.1 200 OK");
printStream.println("Content-Type: text/html;charset=UTF-8" );
printStream.println("Connection: close");
printStream.println("Content-Length: "+Thread.currentThread().getName().getBytes().length);
printStream.println();
printStream.println(Thread.currentThread().getName());
}
} catch (IOException e) {
e.printStackTrace();
}
System.out.println("运行");
}
}
使用多线程,没有使用线程池,吞吐量每秒2700
线程池版本
public class ReceiveHttp {
private Map<String, Handler> handlerMap = new HashMap<>();
public void receive() throws IOException {
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(10000);
ServerSocket serverSocket = new ServerSocket(9000);
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(8,16,60, TimeUnit.SECONDS, queue);
while (true) {
Socket accept = serverSocket.accept();
threadPoolExecutor.execute(new HandlerRunnable(accept));
}
}
}
使用了线程池的吞吐量每秒有6000多 博主将new PrintStream(out,true)的true改为false,关闭自动flush,然后使用手动flush,吞吐量提高到7000
nio版本且没有使用多线程和线程池
以下代码设计至少有两个问题,一个是接收请求报文时,socketChannel.read(byteBuffer),并不能保证一次性将所有数据接收到(主要也不知道HTTP请求报文如何判断接收完毕了),但代码里只接收了一次。第二个问题是socketChannel.write(byteBuffer),写入响应报文时,并不能保证一次性将所有响应报文数据写入,这个有办法解决,因为一开始要写入的报文长度已知,则只需要每个socketChannel维护一个长度变量,写入一点减少一点,直到长度变量等于0,则响应报文写入完毕,再关闭socketChannel即可
public class ReceiveHttp {
public void receive() throws IOException {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
Selector selector = Selector.open();
serverSocketChannel.bind(new InetSocketAddress(9000));
serverSocketChannel.configureBlocking(false);
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
int select = selector.select(2000);
if (select==0) {
continue;
}
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isAcceptable()) {
new NIOServerSocketRunnable(key).run();
} else if (key.isReadable()){
new NIOSocketRequestRunnable(key).run();
} else if (key.isWritable()){
new NIOSocketResponseRunnable(key).run();
}
}
}
}
}
public class NIOSocketRequestRunnable implements Runnable {
private SelectionKey selectionKey;
public NIOSocketRequestRunnable(SelectionKey selectionKey) {
this.selectionKey = selectionKey;
}
@Override
public void run() {
try {
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
Selector selector = selectionKey.selector();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
if (socketChannel != null) {
socketChannel.read(byteBuffer);
byteBuffer.flip();
System.out.println(new String(byteBuffer.array(), 0, byteBuffer.limit()));
byteBuffer.clear();
socketChannel.register(selector, SelectionKey.OP_WRITE);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
public class NIOSocketResponseRunnable implements Runnable {
private SelectionKey selectionKey;
public NIOSocketResponseRunnable(SelectionKey selectionKey) {
this.selectionKey = selectionKey;
}
@Override
public void run() {
try {
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
Selector selector = selectionKey.selector();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
String response = "HTTP/1.1 200 OK\r\n" +
"Content-Type: text/html;charset=UTF-8\r\n" +
"Connection: close" +
"Content-Length: "+Thread.currentThread().getName().getBytes().length+"\r\n" +
"\r\n"+
Thread.currentThread().getName()+"\r\n";
byteBuffer.put(response.getBytes());
byteBuffer.flip();
int length = response.getBytes().length;
int remainder = length;
int write = socketChannel.write(byteBuffer);
socketChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
吞吐量有3600
nio使用多线程版本
nio如果使用多线程,必须让线程数小于等于Selector数,一个线程使用的Selector,另一个线程不能使用,否则会报错,所以我们需要使用单独的一个线程和单独的一个Selector来管理ServerSocketChannel(下面使用主线程),用一个线程和一个Selector管理多个SocketChannel,而后面这种是可以扩展的,可以扩展为多个线程和多个Selector管理更多个SocketChannel,线程和Selector仍然是一对一的模式。
public class ReceiveHttp {
private List<WorkRunnable> workRunnableList = new ArrayList<>();
{
try {
for (int i = 0; i < 8; i++) {
WorkRunnable workRunnable = new WorkRunnable();
workRunnableList.add(workRunnable);
new Thread(workRunnable).start();
}
} catch (IOException e) {
e.printStackTrace();
}
}
public void receive() throws IOException {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
Selector selector = Selector.open();
serverSocketChannel.bind(new InetSocketAddress(9000));
serverSocketChannel.configureBlocking(false);
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
int i = 0;
while (true) {
int select = selector.select(2000);
if (select==0) {
continue;
}
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
if (key.isAcceptable()) {
serverSocketChannel = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = serverSocketChannel.accept();
if (socketChannel != null) {
workRunnableList.get(i % workRunnableList.size()).register(socketChannel);
i++;
}
}
iterator.remove();
}
}
}
}
public class WorkRunnable implements Runnable{
private Selector selector;
public WorkRunnable() throws IOException {
selector = Selector.open();
}
public void register(SocketChannel socketChannel) {
try {
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
} catch (ClosedChannelException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
try {
while (true) {
System.out.println(2);
int select = selector.select(1000);
if (select==0) {
continue;
}
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isReadable()) {
new NIOSocketRequestRunnable(key).run();
} else if (key.isWritable()) {
new NIOSocketResponseRunnable(key).run();
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
实现了一下,发现性能及其低下,可能因为同时开了9个线程一直循环,有很多上下文切换,使用postman请求发现过三四秒才收到响应,所以基本上全部超时导致Error率几乎百分百,可能NIO就不适合和多线程结合使用
nio使用线程池版本
public class Server4Single {
static final int port = 9000;
static ExecutorService fixPool = Executors.newCachedThreadPool();
private Selector selector;
private List<Server4Worker> workers = new ArrayList<>();
private AtomicInteger i = new AtomicInteger(0);
public Server4Single() throws IOException {
for (int i = 0; i < 10; i++) {
Server4Worker t = new Server4Worker();
workers.add(t);
fixPool.execute(() -> {
t.service();
});
}
selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress("127.0.0.1", port));
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("服务器开启服务端口"+port+"……");
}
public void service() {
try {
while (true) {
selector.select();
Set<SelectionKey> selectionKeySet = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeySet.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isAcceptable()) {
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
SocketChannel channel = serverSocketChannel.accept();
System.out.println("接受客户端的连接,开始会话……");
workers.get(i.getAndIncrement() % workers.size()).addChannel(channel);
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws IOException {
Server4Single server = new Server4Single();
server.service();
}
}
public class Server4Worker {
private Selector selector;
private List<SocketChannel> bufChannels = new ArrayList<>();
public Server4Worker() throws IOException {
selector = Selector.open();
}
synchronized public void addChannel(SocketChannel channel) {
bufChannels.add(channel);
}
synchronized private void registerAllBufferedChannels() throws IOException {
Iterator<SocketChannel> iterator = bufChannels.iterator();
while (iterator.hasNext()) {
SocketChannel channel = iterator.next();
channel.configureBlocking(false);
channel.register(selector,
SelectionKey.OP_READ);
System.out.println("数据通道注册成功");
iterator.remove();
}
}
private void closeChannel(SelectionKey key) throws Exception {
key.channel().close();
key.cancel();
System.out.println("会话结束,服务端关闭连接");
}
public void service() {
while (true) {
try {
registerAllBufferedChannels();
selector.select(500);
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
SocketChannel channel = (SocketChannel) key.channel();
if (key.isReadable()) {
new NIOSocketRequestRunnable(key).run();
} else if (key.isWritable()) {
new NIOSocketResponseRunnable(key).run();
}
}
} catch (IOException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
AIO版本
public class ReceiveHttp {
public void receive() throws IOException {
AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(9000));
serverSocketChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() {
@Override
public void completed(AsynchronousSocketChannel result, Void attachment) {
try {
serverSocketChannel.accept(null, this);
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
result.read(byteBuffer);
byteBuffer.flip();
System.out.println(new String(byteBuffer.array(), 0, byteBuffer.limit()));
byteBuffer.clear();
String response = "HTTP/1.1 200 OK\r\n" +
"Content-Type: text/html;charset=UTF-8\r\n" +
"Connection: close" +
"Content-Length: "+Thread.currentThread().getName().getBytes().length+"\r\n" +
"\r\n"+
Thread.currentThread().getName()+"\r\n";
byteBuffer.put(response.getBytes());
byteBuffer.flip();
result.write(byteBuffer);
result.close();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, Void attachment) {
exc.printStackTrace();
}
});
try {
Thread.currentThread().join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
没有想到AIO竟然如此之强,吞吐量有9700每秒,但是错误率有点高,百分之1,高于BIO线程池的百分之0.2
|