1.接受请求
@RequestMapping("/book")
public void getBook(
HttpServletRequest request;
@RequestParam(value="skuId") final Long skuId;
@RequestParam(value="cat1") final Integer cat1;
@RequestParam(value="cat2") final Integer cat2
) throws Exception
{
oneLevelAsyncContext.submitFuture(request,()->bookService.getBook(skuId,cat1,cat2));
}
2.业务线程池封装
public void submitFuture{
final HttpServletRequest req,final Callable<Object> task){
final String uri = req.getRquestURI();
final Map<String,String[]> params = req.getParameterMap();
final AsyncContext asyncContext = req.startAsync();
asyncContext.getRequest().setAttribute("uri",uri);
asyncContext.getRequest().setAttribute("params",params);
asyncContext.setTimeout(asyncTimeoutInSeconds*1000);
if(asyncListener != null){
asyncContext.addListener(asyncListener);
}
executor.submit(new CanceledCallable(asyncContext){
@Override
public Object call() throws Exception{
Object.o = task.call();
if(o == null){
callback(asyncContext,o,uri,params);
}
if(o instanceof CompletableFuture){
CompletableFuture<Object> future = (CompletableFuture<Object>)o;
future.thenAccept(resultObject->
.callback(asyncContext,resultObject,uri,params)
.exceptionally(e->
callback(asynccontext,"",uri,params))
return null;);
})else if(o instanceof String){
callback(asyncContext,o,uri,params);
}
return null;
}
});
}
}
private void callback(AsyncContext asyncContext,Object result,String uri,Map<String,String[]> params){
HttpServletResponse resp = (HttpServletResponse) asyncContext.getResponse();
try{
if(result instanceof String){
write(resp,(String)String);
}else{
write(resp,JSONUtils.toJSON(result));
}
}catch(Throwable e){
resp.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
try{
LOG.error("get info error,uri:(), params: {}");
}catch(Exception ex){}
}finally{
asyncContext.complete();
}
}
3.线程池初始化
public void afterPropertiesSer() throws Exception{
String[] poolSize = poolSize.aplit("_");
int corePoolSize = Integer.valueOf(poolSize[0]);
int maximumPoolSize = Integer.valueOf(poolSize[1]);
queue = new LinkedBlockingDeque<Runnable>(queueCapacity);
executor = new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTimeinSeconds,TimeUnit.SECONDS,queue);
executor.allowCoreThreadTimeout(true);
executor.serRejectedExeturionHandler(
new RejectedExecutionHandler(){
@Override
public void rejectedExecution(Runnable r,ThreadPoolExecutor executor){
if(r instanceof CanceledCallable){
CanceledCallable cc = ((CanceledCallable)r)
AsyncContext asyncContext = cc.asyncContext;
if(asyncContext != null){
try{
ServletRequest req = asyncContext.getRequest();
String uri = (String)req.getAttribute("uri");
Map params = (Map)req.getAttribute("params");
LOG.error("async request rejected,uri:{},params{}",uri,JSONUtils.toJSON(params));
}catch(Exception e){}
try{
HttpServletResponse resp = (HttpServletResponse) asynContext.getResponse();
resp.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
}finally{
asyncContext.complete();
})
}
}
}
}
);
if(asyncListener == null){
asyncListener = new AsyncListener(){
@Override
public void onComplete(AsyncEvent event) throws IOException{};
@Override
public void onTimeout(AsyncEvent event) throws IOException{
AsyncContext asyncContext = event.getAsyncContext();
try{
ServletRequest req = asyncContext.getRequest();
String uri = (String) req.getAttribute("uri");
Map params = (Map)req.getAttribute("params");
LOG.error("async request timeout,uri:{}",uri,JSONUtils.toJSON(params));
}catch(){}
try{
HttpServletResponse resp = (HttpServletResponse) asyncContext.getResponse();
resp.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
}finally{
asyncContext.complete();
}
}
@Override
public void onError(AsyncEvent event) throws IOException{
}
@Override
public void onStartAsync(AsyncEvent event) throws IOException{}
}
}
}
4.tomcat 的server.xml配置
<connector port="1601" asyncTimeout="100000"
acceptCount="10240" maxConnections="10240" acceptorThreadCount="1" minSpareThreads="1"
maxThreads="1" redirectPort="8443"
processorCache="1024" URIEncoding="UTF-8"
protocol="org.apache.coyote.http11.HttpNioProtocol"
enableLookups="false"/>
|