Tomcat 请求处理流程
请求流程
设计了这么多层次的容器,Tomcat是怎么确定每一个请求应该由哪个Wrapper容器里的 Servlet来处理的呢?
答案是,Tomcat是用Mapper组件来完成这个任务的。
Mapper组件的功能就是将用户请求的URL定位到一个Servlet,它的工作原理是:
Mapper组件里保存了Web应用的配置信息,其实就是容器组件与访问路径的映射关系, 比如Host容器里配置的域名、Context容器里的Web应用路径,以及Wrapper容器里 Servlet映射的路径,你可以想象这些配置信息就是一个多层次的Map。
当一个请求到来时,Mapper组件通过解析请求URL里的域名和路径,再到自己保存的 Map里去查找,就能定位到一个Servlet。请你注意,一个请求URL最后只会定位到一个 Wrapper容器,也就是一个Servlet。
下面的示意图中 , 就描述了 当用户请求链接 http://www.itcast.cn/bbs/findAll 之 后, 是如何找到最终处理业务逻辑的servlet 。
那上面这幅图只是描述了根据请求的URL如何查找到需要执行的Servlet , 那么下面我们 再来解析一下 , 从Tomcat的设计架构层面来分析Tomcat的请求处理
步骤如下:
-
Connector组件Endpoint中的Acceptor监听客户端套接字连接并接收Socket。 -
将连接交给线程池Executor处理,开始执行请求响应任务。 -
Processor组件读取消息报文,解析请求行、请求体、请求头,封装成Request对象。 -
Mapper组件根据请求行的URL值和请求头的Host值匹配由哪个Host容器、Context容器、Wrapper容器处理请求。 -
CoyoteAdaptor组件负责将Connector组件和Engine容器关联起来,把生成的 Request对象和响应对象Response传递到Engine容器中,调用 Pipeline。 -
Engine容器的管道开始处理,管道中包含若干个Valve、每个Valve负责部分处理逻 辑。执行完Valve后会执行基础的 Valve–StandardEngineValve,负责调用Host容器的 Pipeline。 -
Host容器的管道开始处理,流程类似,最后执行 Context容器的Pipeline。 -
Context容器的管道开始处理,流程类似,最后执行 Wrapper容器的Pipeline。 -
Wrapper容器的管道开始处理,流程类似,最后执行 Wrapper容器对应的Servlet对象 的 处理方法。
请求流程源码解析
在前面所讲解的Tomcat的整体架构中,我们发现Tomcat中的各个组件各司其职,组件 之间松耦合,确保了整体架构的可伸缩性和可拓展性,那么在组件内部,如何增强组件 的灵活性和拓展性呢? 在Tomcat中,每个Container组件采用责任链模式来完成具体的 请求处理。
在Tomcat中定义了Pipeline 和 Valve 两个接口,Pipeline 用于构建责任链, 后者代表责 任链上的每个处理器。Pipeline 中维护了一个基础的Valve,它始终位于Pipeline的末端 (最后执行),封装了具体的请求处理和输出响应的过程。当然,我们也可以调用 addValve()方法, 为Pipeline 添加其他的Valve, 后添加的Valve 位于基础的Valve之 前,并按照添加顺序执行。Pipiline通过获得首个Valve来启动整合链条的执行 。
源码研究
建议看源码流程前先去回顾一下责任链模式,因为tomcat的请求流程中主要使用了责任链模式
责任链模式
我们把请求过程的源码分为两部分来进行分析:
第一部分: 请求由Endpoint捕获,并转交给Processor处理.
1.Acceptor.run()
请求的流程由NioEndpoint 中的Accepter 类的run 方法开始.
首先一个浏览器的请求会由tomcat 中的Endpoint 中的Accepter 所捕获并开启会话.
protected class Acceptor extends AbstractEndpoint.Acceptor {
@Override
public void run() {
int errorDelay = 0;
while (running) {
while (paused && running) {
state = AcceptorState.PAUSED;
try {
Thread.sleep(50);
} catch (InterruptedException e) {
}
}
if (!running) {
break;
}
state = AcceptorState.RUNNING;
try {
countUpOrAwaitConnection();
SocketChannel socket = null;
try {
socket = serverSock.accept();
} catch (IOException ioe) {
countDownConnection();
if (running) {
errorDelay = handleExceptionWithDelay(errorDelay);
throw ioe;
} else {
break;
}
}
errorDelay = 0;
if (running && !paused) {
if (!setSocketOptions(socket)) {
closeSocket(socket);
}
} else {
closeSocket(socket);
}
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
log.error(sm.getString("endpoint.accept.fail"), t);
}
}
state = AcceptorState.ENDED;
}
2.Poller.run()
当接收客户端请求时,NioEndpoint.Poller会获取事件并开始迭代执行.(一下省略部分代码)
public void run() {
while (true) {
.......
while (iterator != null && iterator.hasNext()) {
SelectionKey sk = iterator.next();
NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment();
if (attachment == null) {
iterator.remove();
} else {
iterator.remove();
processKey(sk, attachment);
}
}
timeout(keyCount,hasEvents);
}
getStopLatch().countDown();
}
2.1 Poller.processKey()
processKey方法继续调用其中的processSocket方法,并开始处理会话.
protected void processKey(SelectionKey sk, NioSocketWrapper attachment) {
try {
if ( close ) {
cancelledKey(sk);
} else if ( sk.isValid() && attachment != null ) {
if (sk.isReadable() || sk.isWritable() ) {
if ( attachment.getSendfileData() != null ) {
processSendfile(sk,attachment, false);
} else {
unreg(sk, attachment, sk.readyOps());
boolean closeSocket = false;
if (sk.isReadable()) {
if (!processSocket(attachment, SocketEvent.OPEN_READ, true)) {
closeSocket = true;
}
}
if (!closeSocket && sk.isWritable()) {
if (!processSocket(attachment, SocketEvent.OPEN_WRITE, true)) {
closeSocket = true;
}
}
if (closeSocket) {
cancelledKey(sk);
}
}
}
} else {
cancelledKey(sk);
}
} catch ( CancelledKeyException ckx ) {
cancelledKey(sk);
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
log.error("",t);
}
}
2.2 AbstractEndpoint.processSocket()
当跟踪进processSocket时会发现调用的是AbstractEndpoint的processSocket方法.
他会首先获取Socket的处理器,并获取线程池为处理Socket单独开启一个线程.
public boolean processSocket(SocketWrapperBase<S> socketWrapper,
SocketEvent event, boolean dispatch) {
try {
if (socketWrapper == null) {
return false;
}
SocketProcessorBase<S> sc = processorCache.pop();
if (sc == null) {
sc = createSocketProcessor(socketWrapper, event);
} else {
sc.reset(socketWrapper, event);
}
Executor executor = getExecutor();
if (dispatch && executor != null) {
executor.execute(sc);
} else {
sc.run();
}
} catch (RejectedExecutionException ree) {
getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree);
return false;
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
getLog().error(sm.getString("endpoint.process.fail"), t);
return false;
}
return true;
}
3. NioEndpoint.SocketProcessor.doRun()
经过跟踪会最终发现,sc.run()方法实际上是在调用NioEndpoint.SocketProcessor.doRun();
protected void doRun() {
NioChannel socket = socketWrapper.getSocket();
SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
....
if (handshake == 0) {
SocketState state = SocketState.OPEN;
if (event == null) {
state = getHandler().process(socketWrapper, SocketEvent.OPEN_READ);
} else {
state = getHandler().process(socketWrapper, event);
}
if (state == SocketState.CLOSED) {
close(socket, key);
}
}
....
}
}
4. AbstractProtocol.ConnectionHandler.process
在doRun的方法中最后会交给AbstractProtocol.ConnectionHandler.process方法执行.
@Override
public SocketState process(SocketWrapperBase<S> wrapper, SocketEvent status) {
if (getLog().isDebugEnabled()) {
getLog().debug(sm.getString("abstractConnectionHandler.process",
wrapper.getSocket(), status));
}
if (wrapper == null) {
return SocketState.CLOSED;
}
S socket = wrapper.getSocket();
Processor processor = connections.get(socket);
....
do {
state = processor.process(wrapper, status);
.....
}
} while ( state == SocketState.UPGRADING);
}
至此,请求由Endpoint组件正式转交给Processor进行处理.
主要内容为在请求转交给Coyote适配器后的流程分析,紧接上文中请求交由Processor处理.
5.AbstractProcessorLight.process()
当请求由Endpoint交由Processor处理时,首先经过的就是AbstractProcessorLight.process()
public SocketState process(SocketWrapperBase<?> socketWrapper, SocketEvent status)
throws IOException {
SocketState state = SocketState.CLOSED;
Iterator<DispatchType> dispatches = null;
do {
if (dispatches != null) {
DispatchType nextDispatch = dispatches.next();
if (getLog().isDebugEnabled()) {
getLog().debug("Processing dispatch type: [" + nextDispatch + "]");
}
state = dispatch(nextDispatch.getSocketStatus());
if (!dispatches.hasNext()) {
state = checkForPipelinedData(state, socketWrapper);
}
} else if (status == SocketEvent.DISCONNECT) {
} else if (isAsync() || isUpgrade() || state == SocketState.ASYNC_END) {
state = dispatch(status);
state = checkForPipelinedData(state, socketWrapper);
} else if (status == SocketEvent.OPEN_WRITE) {
state = SocketState.LONG;
} else if (status == SocketEvent.OPEN_READ) {
state = service(socketWrapper);
} else if (status == SocketEvent.CONNECT_FAIL) {
logAccess(socketWrapper);
} else {
state = SocketState.CLOSED;
}
....
}
} while (state == SocketState.ASYNC_END ||
dispatches != null && state != SocketState.CLOSED);
return state;
}
6.Http11Processor的service方法
@Override
public SocketState service(SocketWrapperBase<?> socketWrapper)
....
while (!getErrorState().isError() && keepAlive && !isAsync() && upgradeToken == null &&
sendfileState == SendfileState.DONE && !endpoint.isPaused()) {
try {
if (!inputBuffer.parseRequestLine(keptAlive)) {
if (inputBuffer.getParsingRequestLinePhase() == -1) {
return SocketState.UPGRADING;
} else if (handleIncompleteRequestLineRead()) {
break;
}
}
if (endpoint.isPaused()) {
response.setStatus(503);
setErrorState(ErrorState.CLOSE_CLEAN, null);
} else {
keptAlive = true;
request.getMimeHeaders().setLimit(endpoint.getMaxHeaderCount());
if (!inputBuffer.parseHeaders()) {
openSocket = true;
readComplete = false;
break;
}
if (!disableUploadTimeout) {
socketWrapper.setReadTimeout(connectionUploadTimeout);
}
}
} catch (IOException e) {
if (log.isDebugEnabled()) {
log.debug(sm.getString("http11processor.header.parse"), e);
}
setErrorState(ErrorState.CLOSE_CONNECTION_NOW, e);
break;
} catch (Throwable t) {
....
}
response.setStatus(400);
setErrorState(ErrorState.CLOSE_CLEAN, t);
}
Enumeration<String> connectionValues = request.getMimeHeaders().values("Connection");
boolean foundUpgrade = false;
while (connectionValues.hasMoreElements() && !foundUpgrade) {
foundUpgrade = connectionValues.nextElement().toLowerCase(
Locale.ENGLISH).contains("upgrade");
}
if (foundUpgrade) {
String requestedProtocol = request.getHeader("Upgrade");
UpgradeProtocol upgradeProtocol = httpUpgradeProtocols.get(requestedProtocol);
if (upgradeProtocol != null) {
if (upgradeProtocol.accept(request)) {
response.setStatus(HttpServletResponse.SC_SWITCHING_PROTOCOLS);
response.setHeader("Connection", "Upgrade");
response.setHeader("Upgrade", requestedProtocol);
action(ActionCode.CLOSE, null);
getAdapter().log(request, response, 0);
InternalHttpUpgradeHandler upgradeHandler =
upgradeProtocol.getInternalUpgradeHandler(
getAdapter(), cloneRequest(request));
UpgradeToken upgradeToken = new UpgradeToken(upgradeHandler, null, null);
action(ActionCode.UPGRADE, upgradeToken);
return SocketState.UPGRADING;
}
}
}
if (getErrorState().isIoAllowed()) {
rp.setStage(org.apache.coyote.Constants.STAGE_PREPARE);
try {
prepareRequest();
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
if (log.isDebugEnabled()) {
log.debug(sm.getString("http11processor.request.prepare"), t);
}
response.setStatus(500);
setErrorState(ErrorState.CLOSE_CLEAN, t);
}
}
...
if (getErrorState().isIoAllowed()) {
try {
rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);
getAdapter().service(request, response);
...
7.CoyoteAdapter.service()
至此请求就即将被发送给Engine引擎.
在CoyoteAdapter中将Request和Response装换成Servlet容器中处理的Request和Response,然后从service中获取容器,再调用管道Pipeline的阀门Valve的invoke方法
在Coyote适配器中,会获取Engine容器的第一个Valve,即StandardEngineValve,并执行.
@Override
public void service(org.apache.coyote.Request req, org.apache.coyote.Response res)
throws Exception {
Request request = (Request) req.getNote(ADAPTER_NOTES);
Response response = (Response) res.getNote(ADAPTER_NOTES);
.......
if (connector.getXpoweredBy()) {
response.addHeader("X-Powered-By", POWERED_BY);
}
boolean async = false;
boolean postParseSuccess = false;
req.getRequestProcessor().setWorkerThreadName(THREAD_NAME.get());
try {
postParseSuccess = postParseRequest(req, request, res, response);
if (postParseSuccess) {
request.setAsyncSupported(
connector.getService().getContainer().getPipeline().isAsyncSupported());
connector.getService().getContainer().getPipeline().getFirst().invoke(
request, response);
}
.......
整个容器中执行的链路如图:
8.StandardEngineValve#invoke
因为这里默认只有一个标准的Engine value实现:
@Override
public final void invoke(Request request, Response response)
throws IOException, ServletException {
Host host = request.getHost();
if (host == null) {
response.sendError
(HttpServletResponse.SC_BAD_REQUEST,
sm.getString("standardEngine.noHost",
request.getServerName()));
return;
}
if (request.isAsyncSupported()) {
request.setAsyncSupported(host.getPipeline().isAsyncSupported());
}
host.getPipeline().getFirst().invoke(request, response);
}
9.StandardHostValve#invoke
说明一下:在Host管道中,默认还会加入两个Value,一个是accessLogValue,一个是errorLogValue后者用来处理请求处理过程中发生的错误
接下来依次调用context,wrapper的valve并执行.
10.StandardContextValve#invoke
获取当前请求对应的wrapper
Wrapper wrapper = request.getWrapper();
调用wrapper管道中所有value
wrapper.getPipeline().getFirst().invoke(request, response);
10.StandardWrapperValve#invoke
重点代码一:分配一个servlet实例对象,来处理当前请求,这里的servlet其实就是我们自己定义的
try {
if (!unavailable) {
servlet = wrapper.allocate();
}
这里的allocate就是从mapped中获取到当前请求映射的servlet,联系初始化时,扫描web.xml和相关注解
ApplicationFilterChain filterChain =
ApplicationFilterFactory.createFilterChain(request, wrapper, servlet);
给当前请求创建一个过滤器链
public static ApplicationFilterChain createFilterChain(ServletRequest request,
Wrapper wrapper, Servlet servlet) {
if (servlet == null)
return null;
ApplicationFilterChain filterChain = null;
if (request instanceof Request) {
Request req = (Request) request;
if (Globals.IS_SECURITY_ENABLED) {
filterChain = new ApplicationFilterChain();
} else {
filterChain = (ApplicationFilterChain) req.getFilterChain();
if (filterChain == null) {
filterChain = new ApplicationFilterChain();
req.setFilterChain(filterChain);
}
}
} else {
filterChain = new ApplicationFilterChain();
}
filterChain.setServlet(servlet);
filterChain.setServletSupportsAsync(wrapper.isAsyncSupported());
StandardContext context = (StandardContext) wrapper.getParent();
FilterMap filterMaps[] = context.findFilterMaps();
if ((filterMaps == null) || (filterMaps.length == 0))
return (filterChain);
DispatcherType dispatcher =
(DispatcherType) request.getAttribute(Globals.DISPATCHER_TYPE_ATTR);
String requestPath = null;
Object attribute = request.getAttribute(Globals.DISPATCHER_REQUEST_PATH_ATTR);
if (attribute != null){
requestPath = attribute.toString();
}
String servletName = wrapper.getName();
for (int i = 0; i < filterMaps.length; i++) {
if (!matchDispatcher(filterMaps[i] ,dispatcher)) {
continue;
}
if (!matchFiltersURL(filterMaps[i], requestPath))
continue;
ApplicationFilterConfig filterConfig = (ApplicationFilterConfig)
context.findFilterConfig(filterMaps[i].getFilterName());
if (filterConfig == null) {
continue;
}
filterChain.addFilter(filterConfig);
}
for (int i = 0; i < filterMaps.length; i++) {
if (!matchDispatcher(filterMaps[i] ,dispatcher)) {
continue;
}
if (!matchFiltersServlet(filterMaps[i], servletName))
continue;
ApplicationFilterConfig filterConfig = (ApplicationFilterConfig)
context.findFilterConfig(filterMaps[i].getFilterName());
if (filterConfig == null) {
continue;
}
filterChain.addFilter(filterConfig);
}
return filterChain;
}
调用完创建过滤器的方法后,回到invoke方法中
正式对当前请求调用过滤器链
filterChain.doFilter
(request.getRequest(), response.getResponse());
11.ApplicationFilterChain#doFilter
重点代码:
internalDoFilter(request,response);
开始真正执行过滤流程
12.ApplicationFilterChain#internalDoFilter
private void internalDoFilter(ServletRequest request,
ServletResponse response)
throws IOException, ServletException {
if (pos < n) {
ApplicationFilterConfig filterConfig = filters[pos++];
try {
Filter filter = filterConfig.getFilter();
if (request.isAsyncSupported() && "false".equalsIgnoreCase(
filterConfig.getFilterDef().getAsyncSupported())) {
request.setAttribute(Globals.ASYNC_SUPPORTED_ATTR, Boolean.FALSE);
}
if( Globals.IS_SECURITY_ENABLED ) {
final ServletRequest req = request;
final ServletResponse res = response;
Principal principal =
((HttpServletRequest) req).getUserPrincipal();
Object[] args = new Object[]{req, res, this};
SecurityUtil.doAsPrivilege ("doFilter", filter, classType, args, principal);
} else {
filter.doFilter(request, response, this);
}
} catch (IOException | ServletException | RuntimeException e) {
throw e;
} catch (Throwable e) {
e = ExceptionUtils.unwrapInvocationTargetException(e);
ExceptionUtils.handleThrowable(e);
throw new ServletException(sm.getString("filterChain.filter"), e);
}
return;
}
try {
if (ApplicationDispatcher.WRAP_SAME_OBJECT) {
lastServicedRequest.set(request);
lastServicedResponse.set(response);
}
if (request.isAsyncSupported() && !servletSupportsAsync) {
request.setAttribute(Globals.ASYNC_SUPPORTED_ATTR,
Boolean.FALSE);
}
if ((request instanceof HttpServletRequest) &&
(response instanceof HttpServletResponse) &&
Globals.IS_SECURITY_ENABLED ) {
final ServletRequest req = request;
final ServletResponse res = response;
Principal principal =
((HttpServletRequest) req).getUserPrincipal();
Object[] args = new Object[]{req, res};
SecurityUtil.doAsPrivilege("service",
servlet,
classTypeUsedInService,
args,
principal);
} else {
servlet.service(request, response);
}
} catch (IOException | ServletException | RuntimeException e) {
throw e;
} catch (Throwable e) {
e = ExceptionUtils.unwrapInvocationTargetException(e);
ExceptionUtils.handleThrowable(e);
throw new ServletException(sm.getString("filterChain.servlet"), e);
} finally {
if (ApplicationDispatcher.WRAP_SAME_OBJECT) {
lastServicedRequest.set(null);
lastServicedResponse.set(null);
}
}
}
这是过滤器的原理
下面看sevlet的调用,下面演示的是正常流程:
13.HttpServlet#service
protected void service(HttpServletRequest req, HttpServletResponse resp)
throws ServletException, IOException {
String method = req.getMethod();
if (method.equals(METHOD_GET)) {
long lastModified = getLastModified(req);
if (lastModified == -1) {
doGet(req, resp);
} else {
long ifModifiedSince;
try {
ifModifiedSince = req.getDateHeader(HEADER_IFMODSINCE);
} catch (IllegalArgumentException iae) {
ifModifiedSince = -1;
}
if (ifModifiedSince < (lastModified / 1000 * 1000)) {
maybeSetLastModified(resp, lastModified);
doGet(req, resp);
} else {
resp.setStatus(HttpServletResponse.SC_NOT_MODIFIED);
}
}
} else if (method.equals(METHOD_HEAD)) {
long lastModified = getLastModified(req);
maybeSetLastModified(resp, lastModified);
doHead(req, resp);
} else if (method.equals(METHOD_POST)) {
doPost(req, resp);
} else if (method.equals(METHOD_PUT)) {
doPut(req, resp);
} else if (method.equals(METHOD_DELETE)) {
doDelete(req, resp);
} else if (method.equals(METHOD_OPTIONS)) {
doOptions(req,resp);
} else if (method.equals(METHOD_TRACE)) {
doTrace(req,resp);
} else {
String errMsg = lStrings.getString("http.method_not_implemented");
Object[] errArgs = new Object[1];
errArgs[0] = method;
errMsg = MessageFormat.format(errMsg, errArgs);
resp.sendError(HttpServletResponse.SC_NOT_IMPLEMENTED, errMsg);
}
}
这里的doGet和doPost等方法的调用就是在我们自己的servlet中调用了,因为我们自己的servlet没有重写service方法,所以这里service是调用父类的,而重写的doGet等方法,是调用对应子类的实现
13.DefaultServlet#service
如果没有找到对应的请求映射servlet,会返回DefaultServlet
@Override
protected void service(HttpServletRequest req, HttpServletResponse resp)
throws ServletException, IOException {
if (req.getDispatcherType() == DispatcherType.ERROR) {
doGet(req, resp);
} else {
super.service(req, resp);
}
}
最终走的是HttpSerlvet的最后一个else分支,因为DefaultServlet是HttpServlet的子类
else {
String errMsg = lStrings.getString("http.method_not_implemented");
Object[] errArgs = new Object[1];
errArgs[0] = method;
errMsg = MessageFormat.format(errMsg, errArgs);
resp.sendError(HttpServletResponse.SC_NOT_IMPLEMENTED, errMsg);
}
到此请求流程代码分析完毕
|