Apache HttpClient连接池泄露问题排查
问题背景
问题来源
- 在生产环境,大同步功能(20多个任务)发现跑了一半多的任务时候卡住,在测试环境并没有发现这个问题
同步接口
public interface SyncHelper {
Order syncOrder();
void syncAllAccount();
void syncSingleAccount(Long accountId);
default boolean enableSync() {
return true;
}
}
大同步功能实现
@Slf4j
@Component
public class SyncAccountResourceListener {
@Autowired
private final List<SyncHelper> helpers;
private static final ExecutorService EXECUTOR = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
.setDaemon(false)
.setNameFormat("h3c-sync-resource-%d")
.build()
);
public void sync(){
for (SyncHelper helper : helpers) {
if (Thread.currentThread().isInterrupted()) {
log.error("[{}] sync task interrupted,account:[{}]", className, accountId);
continue;
}
Future<?> future = EXECUTOR.submit(() -> helper.syncSingleAccount(accountId));
try {
future.get(helper.getTimeOut(), TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException|TimeoutException e) {
log.error("[{}] sync error,account:[{}]", className, accountId, e);
} finally {
future.cancel(true);
}
}
}
}
排查步骤
本想着以最快速度解决问题,系统上同步进度列表 显示都卡在同一个同步类,然后粗略看了一下相关同步类的代码,发现并没有相关可能导致死循环的代码
尝试复现
- 在测试环境测试大同步,发现没问题(包括请求来回数据日志、数据库sql打印日志),顺利完成所有的同步任务
- 那就针对卡住的同步类做单元测试反复执行多次,结果发现也并没有问题
至此,问题就更加疑惑。并无法在测试环境和本地单元测试复现,生产怎么就会有相关的问题?
死锁
一开始没去排查死锁问题,因为大部分同步都没有用到多线程
可能原因
- 用到多线程在大同步资源使用单线程的线程池跑任务,然后任务超时 TimeOut 没做好任务中断的处理,导致后面任务全部阻塞
- 看到有同事同步数据用了多线程,用的不是很合理,类似以下代码:
List<SysDept> deptList = ......
List<CompletableFuture<CmdbUsageReport>> futureList = new ArrayList<>();
deptList.forEach(t -> futureList.add(
CompletableFuture.supplyAsync(() -> {
return report;
}, ioPool)));
CompletableFuture.allOf(futureList.toArray(new CompletableFuture[]{})).join();
List<CmdbUsageReport> reportList = futureList.stream().map(CompletableFuture::join).filter(Objects::nonNull).collect(toList());
既然排查线程的问题,直接使用相关的分析工具去分析,看一下到底是怎么回事
分析线程状态
使用 阿里 arthas 或者 visualvm 查看同步任务的线程状态
启动 arthas attach 相应进程
java -jar arthas-boot.jar
thread --all 查看所有线程简单信息
使用单线程线程池跑同步任务,执行线程池线程也有自定义名称,名称为 `h3c-sync-resource-0`(进程 ID 为 250 ,线程状态为 `WAITING` )
thread 250 查看同步信息进程的详细信息
"h3c-sync-resource-0" - Thread t@195
java.lang.Thread.State: WAITING
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <66bb3d00> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at org.apache.http.pool.AbstractConnPool.getPoolEntryBlocking(AbstractConnPool.java:393)
at org.apache.http.pool.AbstractConnPool.access$300(AbstractConnPool.java:70)
at org.apache.http.pool.AbstractConnPool$2.get(AbstractConnPool.java:253)
- locked <637b83f5> (a org.apache.http.pool.AbstractConnPool$2)
at org.apache.http.pool.AbstractConnPool$2.get(AbstractConnPool.java:198)
at org.apache.http.impl.conn.PoolingHttpClientConnectionManager.leaseConnection(PoolingHttpClientConnectionManager.java:306)
at org.apache.http.impl.conn.PoolingHttpClientConnectionManager$1.get(PoolingHttpClientConnectionManager.java:282)
at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:190)
at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186)
at org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:89)
at org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:110)
at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:108)
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
at feign.httpclient.ApacheHttpClient.execute(ApacheHttpClient.java:83)
at feign.SynchronousMethodHandler.executeAndDecode(SynchronousMethodHandler.java:119)
at feign.SynchronousMethodHandler.invoke(SynchronousMethodHandler.java:89)
at feign.ReflectiveFeign$FeignInvocationHandler.invoke(ReflectiveFeign.java:100)
at com.sun.proxy.$Proxy360.osAggregates(Unknown Source)
根据线程信息看到关键代码,卡在 feign 请求的地方,再细看发现是 apache http client dead lock 死锁
AbstractConnPool.java:393 位于 org.apache.http.pool.AbstractConnPool.getPoolEntryBlocking
详细看代码,发现是取 http client 连接池的空闲连接阻塞等待导致的问题
源码追踪
现在访问 http client 官网看一下简单的 demo example,demo 案例访问地址:httpcomponents-client-quickstart
会看到一个简单的使用案例:
try (CloseableHttpClient httpclient = HttpClients.createDefault()) {
HttpGet httpGet = new HttpGet("http://httpbin.org/get");
try (CloseableHttpResponse response1 = httpclient.execute(httpGet)) {
System.out.println(response1.getCode() + " " + response1.getReasonPhrase());
HttpEntity entity1 = response1.getEntity();
EntityUtils.consume(entity1);
}
}
- 官方简单使用 java7
try-with-resources 的形式去使用 httpclient ,使用完成自动释放资源 - 官方文档解释也说了:如果连接安全回收重用,需要使用
EntityUtils.consume 去消费 响应内容 并关闭流 - 最后线程阻塞等待问题可能原因是没有及时消费完响应内容
追踪 Feign 源码
现在看一下 Apache HttpClient 转换 Feign 请求的大概流程和源码, HttpClient 转换 Feign Response 方法 : feign.httpclient.ApacheHttpClient.toFeignBody
Response.Body toFeignBody(HttpResponse httpResponse) {
final HttpEntity entity = httpResponse.getEntity();
if (entity == null) {
return null;
}
return new Response.Body() {
@Override
public Integer length() {
return entity.getContentLength() >= 0 && entity.getContentLength() <= Integer.MAX_VALUE
? (int) entity.getContentLength()
: null;
}
@Override
public boolean isRepeatable() {
return entity.isRepeatable();
}
@Override
public InputStream asInputStream() throws IOException {
return entity.getContent();
}
@SuppressWarnings("deprecation")
@Override
public Reader asReader() throws IOException {
return new InputStreamReader(asInputStream(), UTF_8);
}
@Override
public Reader asReader(Charset charset) throws IOException {
Util.checkNotNull(charset, "charset should not be null");
return new InputStreamReader(asInputStream(), charset);
}
@Override
public void close() throws IOException {
EntityUtils.consume(entity);
}
};
}
因为响应内容需要完全被消费才能回到连接池重用连接, org.apache.http.util.EntityUtils.consume 大概代码如下:
public static void consume(final HttpEntity entity) throws IOException {
if (entity == null) {
return;
}
if (entity.isStreaming()) {
final InputStream inStream = entity.getContent();
if (inStream != null) {
inStream.close();
}
}
}
EntityUtils.consume 消费响应内容并安全重用连接 流程如下,如果有兴趣可以自己去看一下,这边就不长篇讨论了:
org.apache.http.util.EntityUtils.consume
org.apache.http.impl.execchain.ResponseEntityProxy.getContent 包装成自动释放连接的 EofSensorInputStream
org.apache.http.conn.EofSensorInputStream.close
org.apache.http.conn.EofSensorInputStream.checkClose
org.apache.http.impl.execchain.ResponseEntityProxy.streamClosed
org.apache.http.impl.execchain.ResponseEntityProxy.releaseConnection
feign.AsyncResponseHandler#handleResponse
void handleResponse(CompletableFuture<Object> resultFuture,
String configKey,
Response response,
Type returnType,
long elapsedTime) {
boolean shouldClose = true;
try {
if (logLevel != Level.NONE) {
response = logger.logAndRebufferResponse(configKey, logLevel, response,
elapsedTime);
}
if (Response.class == returnType) {
if (response.body() == null) {
resultFuture.complete(response);
} else if (response.body().length() == null
|| response.body().length() > MAX_RESPONSE_BUFFER_SIZE) {
shouldClose = false;
resultFuture.complete(response);
} else {
final byte[] bodyData = Util.toByteArray(response.body().asInputStream());
resultFuture.complete(response.toBuilder().body(bodyData).build());
}
} else if (response.status() >= 200 && response.status() < 300) {
if (isVoidType(returnType)) {
resultFuture.complete(null);
} else {
final Object result = decode(response, returnType);
shouldClose = closeAfterDecode;
resultFuture.complete(result);
}
} else if (decode404 && response.status() == 404 && !isVoidType(returnType)) {
final Object result = decode(response, returnType);
shouldClose = closeAfterDecode;
resultFuture.complete(result);
} else {
resultFuture.completeExceptionally(errorDecoder.decode(configKey, response));
}
} catch (final IOException e) {
if (logLevel != Level.NONE) {
logger.logIOException(configKey, logLevel, e, elapsedTime);
}
resultFuture.completeExceptionally(errorReading(response.request(), response, e));
} catch (final Exception e) {
resultFuture.completeExceptionally(e);
} finally {
if (shouldClose) {
ensureClosed(response.body());
}
}
}
feign.slf4j.Slf4jLogger.logAndRebufferResponse
protected Response logAndRebufferResponse(String configKey,
Level logLevel,
Response response,
long elapsedTime)
throws IOException {
if (logger.isDebugEnabled()) {
return super.logAndRebufferResponse(configKey, logLevel, response, elapsedTime);
}
return response;
}
feign.Logger.logAndRebufferResponse
protected Response logAndRebufferResponse(String configKey,
Level logLevel,
Response response,
long elapsedTime)
throws IOException {
String reason =
response.reason() != null && logLevel.compareTo(Level.NONE) > 0 ? " " + response.reason()
: "";
int status = response.status();
log(configKey, "<--- HTTP/1.1 %s%s (%sms)", status, reason, elapsedTime);
if (logLevel.ordinal() >= Level.HEADERS.ordinal()) {
for (String field : response.headers().keySet()) {
for (String value : valuesOrEmpty(response.headers(), field)) {
log(configKey, "%s: %s", field, value);
}
}
int bodyLength = 0;
if (response.body() != null && !(status == 204 || status == 205)) {
if (logLevel.ordinal() >= Level.FULL.ordinal()) {
log(configKey, "");
}
byte[] bodyData = Util.toByteArray(response.body().asInputStream());
bodyLength = bodyData.length;
if (logLevel.ordinal() >= Level.FULL.ordinal() && bodyLength > 0) {
log(configKey, "%s", decodeOrDefault(bodyData, UTF_8, "Binary data"));
}
log(configKey, "<--- END HTTP (%s-byte body)", bodyLength);
return response.toBuilder().body(bodyData).build();
} else {
log(configKey, "<--- END HTTP (%s-byte body)", bodyLength);
}
}
return response;
}
feign.Util.toByteArray
public static byte[] toByteArray(InputStream in) throws IOException {
checkNotNull(in, "in");
try {
ByteArrayOutputStream out = new ByteArrayOutputStream();
copy(in, out);
return out.toByteArray();
} finally {
ensureClosed(in);
}
}
追踪 Feign Slf4jLogger `feign/Logger.java:84` , 发现 `feign.Logger#logAndRebufferResponse` 会读取一次 `body` 内容
大于`HEADERS` 级别:`logLevel.ordinal() >= Level.HEADERS.ordinal()`,在后面 `byte[] bodyData = Util.toByteArray(response.body().asInputStream());`,进行一次数据拷贝,然后 close 掉原来的 `InputStream`
读源码思考
源码分析到这个地方以后,发现一般情况下也会自动释放掉相应内容:
验证问题
生产环境 Feign 日志级别是 FULL(大于 HEADERS ),但是 Logger 开启的日志级别是 INFO ,尝试以下步骤复现问题
- 构建 ApacheHttpClient 把连接池数量调整到最低
CLIENT = new ApacheHttpClient(
HttpClientBuilder.create()
.setSSLSocketFactory(
new SSLConnectionSocketFactory(trustAllSslSocketFactory(),
(hostname, session) -> true)
)
.setMaxConnPerRoute(1)
.setMaxConnTotal(1)
.setDefaultRequestConfig(
RequestConfig.custom()
.setConnectTimeout(CONNECT_TIMEOUT)
.setSocketTimeout(REQ_TIMEOUT)
.build()
)
.useSystemProperties()
.build()
- 本地同步的单元测试
Logger 级别配置等于 INFO && Feign 日志级别 大于/等于 HEADERS ,单元测试结果同步任务一段时间后 死锁 - 本地同步的单元测试
Logger 级别配置等于 DEBUG && Feign 日志级别 小于 HEADERS ,单元测试结果同步任务一段时间后 死锁 - 本地同步的单元测试
Logger 级别配置等于 DEBUG && Feign 日志级别 大于/等于 HEADERS (这就是测试、开发环境的配置),单元测试结果同步任务一段时间后 没发现死锁
单元测试和源码分析后得到问题结果就非常明显:没有完全消费释放 响应信息 导致连接池连接无法安全复用
追溯问题根源
回顾上面源码分析的 feign.AsyncResponseHandler#handleResponse 源码, closeAfterDecode 默认为 true ,那就只有是以下情况才会不释放响应内容: Response.class == returnType && (response.body().length() == null || response.body().length() > MAX_RESPONSE_BUFFER_SIZE)
排查项目代码 是否存在 Response 返回类型,且请求响应体的长度为空的
回想起来请求对应 api 时候会异步校验并刷新 token ,检验 token 不需要处理返回值,之前用了 Response 并判断返回码,以下是部分关键代码:
校验 token Feign 接口
@RequestLine("GET /sys/identity/v2/tokens")
@Headers({"X-Subject-Token: {token}"})
Response verifyToken(@Param("token") String token);
校验 token 代码
@Override
public boolean verifyToken(H3CClientConfig config, String token) {
IdentityApi identityApi = Feigns.h3c(IdentityApi.class, config);
Response response = identityApi.verifyToken(token);
return response.status() == HttpStatus.OK.value();
}
请求响应日志
GET https://19.50.81.200:8100/sys/identity/v2/tokens
HTTP/1.1 200 OK
Date: Sun, 05 Sep 2021 02:55:09 GMT
Content-Type: application/json; charset=UTF-8
Transfer-Encoding: chunked
Connection: keep-alive
X-Subject-Token: eyJjdHkiOiJKV1QiLCJlbmMiOiJBMTI4Q0JDLUhTMjU2IiwiYWxnIjoiZGlyIn0..yWaex8RVEvBvw1D-kk_LMQ.IwiMpoRWSPsUZiEpr09tL7WDZ1-vIRZqFsGqQq2CV4wBp6S8mBIhICI3Ce2sE_TLA_A2oX_NnMpAf5D4C_DwunJaiJ3lnD51Sg1bxWao_gXnPS7JdfpyaRXY-rtPMaxs-0FisUuyVlKfQh3Ab8t3WsCLzU9Yz7sQ367CKtW1z32ttafrWRlotLN0y7XX3ZRz7Ttznm2cZ5Ae79MEPQF1-hbKiGoz4B8kR1NRgeL-arlpa8qtgERYEEtr-VtJgydDpylusItc_uOtPqwEh0HAgYQjJovF75pej5WlCgdzYVQMr08OGT0JnBrReWYxl0h2P0xxZQtNcM2d0T54TebvvRhQKRyywvasQ064FS4B4mGN-8E3TZkxSfSfr4OWZ1Nmwpr3xFGBOSVpKf5-AufCoXPW3yGu3vFSpCahoKq01n9_gd4UbKLE82Cwou4uZf4VMZ7A7hOAdWYo_geb1bTzLUyTdDSUgbS8XiiYCOpaA4euv409ELE22U77F940M2DO2y8lbaDuk4iAv3QIp5gCGg.9pzTvRPM-FAMa-17a2J5kQ
上述代码我并没有处理 Response 响应,只是单纯判断一下响应码 ,但是通过请求来回日志发现请求响应的响应信息有点不一样:没有 Content-Length 字段且 带有响应头为 Transfer-Encoding: chunked
关于 Http Chuncked Chuncked InputStream 的响应头是 Transfer-Encoding: chunked , 是未知长度, 没有 Content-Length 字段,最后一个数据块的长度必须为 0 ,也可以理解为 http 报文以0\r\n\r\n 来结尾。
刚好符合泄露的条件 Response.class == returnType && (response.body().length() == null || response.body().length() > MAX_RESPONSE_BUFFER_SIZE) 如果没及时释放掉 Response 会导致连接池的连接无法安全重用。
问题总结
最终发现问题的根源是 输入流 InputStream 泄露导致请求连接池未能正常释放,feign 请求返回值使用 Response 并没有用 try-with-resources 形式包裹,导致没执行 close 方法
排查问题的关键:
- 对
Feign 源码需要有一定的调试分析能力 - 掌握常规的应用性能分析方法
- 业务上了解生产、测试环境的区别,最好测试环境和生产的配置尽量保持一致,尽早发现问题
解决方案
Feign 请求返回值使用 Response 响应类型的时候,使用 java7 try-with-resources 形式或者 try-finally 及时释放资源
|