diyTomcat系列三,引入线程池并处理TCP长连接
本节欲解决的几个问题:
线程的频繁创建和销毁、每一次HTTP请求都会产生一个新的Socket连接,当请求数过多时开销较大
解决方案:
引入线程池、利用Connection=keep-alive、连接超时时间控制TCP长连接来处理http请求
1. 引入线程池
在正常的网络请求中,需要同时处理大量的http请求,如果每个http请求都交给一个单独的线程来处理,这个线程处理完后马上又会被销毁掉,这样将会消耗掉性能,造成不必要的开销,所以我们引入线程池 来处理请求
ThreadPoolExecutor(corePoolSize,maxPoolSize,keepAliveTime,unit,workQueue,threadFactory,handler);
/核心线程池的大小,获取服务器处理器的核心数
int corePoolSize = Runtime.getRuntime().availableProcessors();
int maxPoolSize=corePoolSize * 2;
long keepAliveTime=10;
TimeUnit unit=TimeUnit.SECONDS;
BlockingQueue<Runnable> workQueue=new ArrayBlockingQueue<>(maxPoolSize * 4);
ThreadFactory threadFactory = new NameThreadFactory();
RejectedExecutionHandler handler = new MyIgnorePolicy();
? 如果连接数不大的情况下使用线程池其实更浪费资源,所有Tomcat 默认情况下是不使用线程池的,等到了一定的请求数再使用线程池,当然,基于高内聚低耦合 的设计思路,我们也可以在server.xml 第74行的配置文件中手动进行设置,设置默认启动线程池
2. 处理TCP长连接
? 首先我们要知道,HTTP协议 是一个应用层的协议 ,应用层的协议 协议是定义消息对话,确保正在发送的消息得到期待的响应 ,并且在传输数据时调用正确的服务 。HTTP协议是基于请求/响应 模式的。我们平时讨论的长连接是TCP长连接 ,应用层 不关注是长连接还是短连接,只要服务端给了响应 ,本次HTTP连接就结束了,或者更准确的说,是本次HTTP请求就结束了,下一次又是一个新的请求和新的响应,因此根本没有长连接这一说。那么自然也就没有短连接这一说了。
对应到代码上来看TCP长连接也就是socket 一直没有被关闭,一直处于连接状态。
那怎么设置长连接呢?
? 我们只需要在请求头里设置Connection:keep-alive 就可以了,这样当我们的服务器接收到这个请求时,就不会主动去关闭socket连接 ,当一个网页打开完成后,客户端和服务器之间用于传输HTTP数据的 TCP连接 不会关闭,如果客户端再次访问这个服务器上的网页,会继续使用这一条已经建立的连接。当然设置为Keep-Alive 的服务器也不会永久保持连接,它有一个保持时间 ,可以在不同的服务器软件(如Tomcat)中设定这个时间,如果客户端在规定的时间里没有进行http 请求,就好断开连接,落实到代码里也就是关闭socket 。实现长连接要客户端和服务端都支持长连接。HTTP协议的长连接和短连接,实质上是TCP协议的长连接和短连接。
Tomcat中设置连接超时时间可以在server.xml 中第69行Connector 里进行配置,可以看到Tomcat长连接的默认的时间是20s
在笔者的第二篇文章中没有处理长连接 ,可以看到每一次http 请求后都关闭了连接,然后又会进行一次新的连接
接下来我们去优化他(其实也很简单,就是不关掉socket连接并设置超时时间就可以了):
主体代码是在第二篇文章中TaskService 里面进行修改的,这里先贴修改和代码,完整代码在文章最后面后贴出,笔者这里还对连接次数进行了限制,防止一个线程处理过多的请求造成异常,配合线程池可以增加活跃状态的线程数 来处理请求。
其中HttpServletRequest 和HttpServletResponse 里都不能关闭流的连接,因为关闭通过socket 获得的流,socket 也会关闭,这样就会造成异常。
public void run() {
long connectionTime = 0L;
while (flag){
try {
connectionCount++;
logger.info(Thread.currentThread().getName()+",此长连接处理了"+connectionCount+"次请求");
if(connectionCount > MAX_REQUEST_COUNT){
connection = "close";
}
HttpServletRequest req = new HttpServletRequest(socket);
HttpServletResponse resp = new HttpServletResponse(req, socket);
} catch (Exception e) {
logger.error("线程运行错误",e);
connection = "close";
}finally {
if(connectionTime == 0){
connectionTime = System.currentTimeMillis();
}else {
connectionTime = System.currentTimeMillis() - connectionTime;
if(connectionTime > 20000){
flag = false;
connection = "close";
}
}
if("close".equals(connection)){
flag = false;
try {
if(socket != null){
socket.close();
}
} catch (IOException ioException) {
logger.error(ioException.getMessage());
}
try {
if(in != null){
in.close();
}
} catch (IOException ioException) {
logger.error(ioException.getMessage());
}
try {
if(out != null){
out.close();
}
} catch (IOException ioException) {
logger.error(ioException.getMessage());
}
}
}
}
}
3. 项目完整代码
参照第二期的代码,这里只做了少许修改。
这里要切记HttpServletRequest 和HttpServletResponse 里都不能关闭流的连接,因为关闭通过socket 获得的流,socket 也会关闭,这样就很造成异常,关闭Socket 连接在TaskService 进行处理
3.1 MyCatServer
package com.fx.tomcat;
import org.apache.log4j.Logger;
import org.dom4j.Document;
import org.dom4j.DocumentException;
import org.dom4j.Element;
import org.dom4j.io.SAXReader;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;
import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class MyCatServer {
private final static Logger logger = Logger.getLogger(MyCatServer.class);
private static Map<String, Map<String,String>> map;
private static boolean flag=true;
public static void main(String[] args) {
MyCatServer myCatServer = new MyCatServer();
int tomcatServicePort = Integer.parseInt(map.get("Connector").get("port"));
if("true".equalsIgnoreCase(map.get("Connector").get("executor"))){
myCatServer.startServerByThreadPool(tomcatServicePort);
}else {
myCatServer.startServerByThread(tomcatServicePort);
}
}
private void startServerByThread(int port){
try (ServerSocket ss = new ServerSocket(port)) {
while (flag){
Socket socket = ss.accept();
logger.debug("有用户进行请求,他是"+socket.getLocalSocketAddress());
new Thread(new TaskService(socket)).start();
}
} catch (Exception e) {
logger.error("服务器启动异常"+e);
}
}
private void startServerByThreadPool(int port) {
int corePoolSize = Runtime.getRuntime().availableProcessors();
int maxPoolSize=corePoolSize * 2;
long keepAliveTime=10;
TimeUnit unit=TimeUnit.SECONDS;
BlockingQueue<Runnable> workQueue=new ArrayBlockingQueue<>(maxPoolSize * 4);
ThreadFactory threadFactory = new NameThreadFactory();
RejectedExecutionHandler handler = new MyIgnorePolicy();
ThreadPoolExecutor executor;
executor = new ThreadPoolExecutor(corePoolSize,maxPoolSize,keepAliveTime,unit,workQueue,threadFactory,handler);
executor.prestartAllCoreThreads();
try (ServerSocket ss = new ServerSocket(port)) {
while(flag){
Socket socket = ss.accept();
logger.debug("有用户进行请求,他是"+socket.getLocalSocketAddress());
executor.submit(new TaskService(socket));
}
} catch (Exception e) {
logger.error("服务器启动异常:{}",e);
} finally {
executor.shutdown();
}
}
static class NameThreadFactory implements ThreadFactory {
Logger logger = Logger.getLogger(NameThreadFactory.class);
private final AtomicInteger threadId=new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "线程" + threadId.getAndIncrement());
logger.info(t.getName()+"已经被创建了");
return t;
}
}
public static class MyIgnorePolicy implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
logger.error("线程池+"+ e.toString()+r.toString()+"被拒绝");
}
}
static {
map = new ConcurrentHashMap<>();
Map<String,String> sonMap = new HashMap<>();
SAXReader saxReader = new SAXReader();
Document document = null;
try {
document = saxReader.read("conf/server.xml");
} catch (DocumentException e) {
logger.error("配置文件解析失败{}",e);
}
assert document != null;
Element root = document.getRootElement();
Element connector = root.element("Service").element("Connector");
sonMap.put("port",connector.attributeValue("port"));
sonMap.put("protocol",connector.attributeValue("protocol"));
sonMap.put("connectionTimeout",connector.attributeValue("connectionTimeout"));
sonMap.put("redirectPort",connector.attributeValue("redirectPort"));
sonMap.put("executor",connector.attributeValue("executor"));
map.put("Connector",sonMap);
}
}
3.2 TaskService
package com.fx.tomcat;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.time.Duration;
import java.time.Instant;
public class TaskService implements Runnable{
private final Logger logger = Logger.getLogger(TaskService.class);
private Socket socket;
private InputStream in = null;
private OutputStream out = null;
private String connection="keep-alive";
private boolean flag;
private final int MAX_REQUEST_COUNT = 30;
private int connectionCount;
public TaskService(Socket socket){
this.socket=socket;
flag = true;
}
@Override
public void run() {
long connectionTime = 0L;
while (flag){
try {
connectionCount++;
logger.info(Thread.currentThread().getName()+",此长连接处理了"+connectionCount+"次请求");
if(connectionCount > MAX_REQUEST_COUNT){
connection = "close";
}
HttpServletRequest req = new HttpServletRequest(socket);
HttpServletResponse resp = new HttpServletResponse(req, socket);
} catch (Exception e) {
logger.error("线程运行错误",e);
connection = "close";
}finally {
if(connectionTime == 0){
connectionTime = System.currentTimeMillis();
}else {
connectionTime = System.currentTimeMillis() - connectionTime;
if(connectionTime > 20000){
flag = false;
connection = "close";
}
}
if("close".equals(connection)){
flag = false;
try {
if(socket != null){
socket.close();
}
} catch (IOException ioException) {
logger.error(ioException.getMessage());
}
try {
if(in != null){
in.close();
}
} catch (IOException ioException) {
logger.error(ioException.getMessage());
}
try {
if(out != null){
out.close();
}
} catch (IOException ioException) {
logger.error(ioException.getMessage());
}
}
}
}
}
}
HttpServletResponse和HttpServletRequest只是将关闭流的代码全部剔除了,其他未进行修改
4. 项目演示效果和新问题思考
这里又有一个有意思的点
**为什么设置了长连接还是会产生这么多新的socket连接呢?**是不是超时了?
我们将超时时间设置为天长和地久再测试一遍
测试结果:
我们会发现还是会产生6个socket 连接,但是我们多测试几次后就会发现,每次都会产生6个socket 连接
这是为什么呢?
? 那是因为浏览器在请求资源的时候不可能只是一个线程进行资源请求,这样请求的话会导致速度很慢,所以浏览器都会对同一个服务器设置有默认的HTTP最大并发连接数 ,主流浏览器默认并发数如下:
Browser | HTTP/1.1 | HTTP/1.0 |
---|
IE 8,9 | 6 | 6 | IE 6,7 | 2 | 4 | Firefox 17 | 6 | 6 | Firefox 3 | 6 | 6 | Firefox 2 | 2 | 8 | Safari 3,4 | 4 | 4 | Chrome 1,2 | 6 | ? | Chrome 3 | 4 | 4 | Chrome 4+ | 6 | ? |
? 我们都知道在Tomcat中可以使用@WebServlet 代替xml 文件来进行地址映射,下一篇文章笔者将自定义这个注解并实现Servlet动态资源 的访问。自定义注解的过程可以看下笔者的文章,注解学习一、Java内置注解及注解书写和注解学习二、使用注解仿写junit测试框架,先尝试着写几个注解。
|