ElasticsearchSink SocketTimeoutException异常问题排查解决
1.问题记录
不多说,上异常信息:
2021-08-o5 20:06:12 ERROR:
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:734)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
at com.ddmc.streaming.filter.DataSyncBurringPointV2FlatMap.flatMap(DataSyncBurringPointV2FlatMap.java:180)
at com.ddmc.streaming.filter.DataSyncBurringPointV2FlatMap.flatMap(DataSyncBurringPointV2FlatMap.java:47)
at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)
2.第一次排查问题
异常:Could not forward element to next operator,这个异常对于熟悉Flink开发的大佬都知道什么原因,流数据中不允许字段为null。 review代码发现,所有传递的流数据(POJO/JSONObject),不存在为null值的情况,而且报出来次异常的element,是可以进行正常解析进行下游传递的,诡异的是job从checkpoint重启之后又恢复正常。 故此次问题将所有的POJO以及JSONObject转换为String进行传输。并优化日志输出,方便问题复现时定位问题(这一步真的很关键)。
3.问题复现
job稳定运行了10天,在我以为就这样解决了的时候,它又双叒叕出现了,还好优化了日志记录,可以定位的具体的异常:
2021-08-20 07:06:48 ERROR:PascalAlarm:
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:734)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
at com.ddmc.streaming.filter.DataSyncBurringPointV2FlatMap.flatMap(DataSyncBurringPointV2FlatMap.java:180)
at com.ddmc.streaming.filter.DataSyncBurringPointV2FlatMap.flatMap(DataSyncBurringPointV2FlatMap.java:47)
at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)
Caused by: java.lang.RuntimeException: An error occurred in ElasticsearchSink.
at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkErrorAndRethrow(ElasticsearchSinkBase.java:383)
at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkAsyncErrorsAndRequests(ElasticsearchSinkBase.java:388)
at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:309)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
... 22 common frames omitted
Caused by: java.net.SocketTimeoutException: 30,000 milliseconds timeout on connection http-outgoing-3 [ACTIVE]
at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.timeout(HttpAsyncRequestExecutor.java:387)
at org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:92)
at org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:39)
at org.apache.http.impl.nio.reactor.AbstractIODispatch.timeout(AbstractIODispatch.java:175)
at org.apache.http.impl.nio.reactor.BaseIOReactor.sessionTimedOut(BaseIOReactor.java:261)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.timeoutCheck(AbstractIOReactor.java:502)
at org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:211)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280)
at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591)
at java.lang.Thread.run(Thread.java:748)
4.定位问题
Flink写入ES 产生了socketTimeOut Exception,但是从ES集群监控看,这个时间点的读写量都不高,为什么会产生超时呢? ESSink代码如下:
private static ElasticsearchSink<String> getEsSinkFunc(String hostName, int port, String schema) {
List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost(hostName, port, schema));
ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink
.Builder<String>(httpHosts, new ElasticsearchSinkFunction<String>() {
@Override
public void process(String value, RuntimeContext ctx, RequestIndexer indexer) {
try {
DataSyncBurringPointDto element = JSON.parseObject(value, DataSyncBurringPointDto.class);
indexer.add(createIndexRequest(element));
} catch (IOException e) {
log.error("ES sink Error:{}",e.getMessage());
e.printStackTrace();
}
}
private IndexRequest createIndexRequest(DataSyncBurringPointDto element) throws IOException {
return Requests.indexRequest()
.index(Configurations.getString("es.index.prefix")+"-"+element.getDate())
.type("_doc")
.id(getESId(element))
.source(XContentFactory
.jsonBuilder()
.startObject()
.field("EventId",element.getEventId())
.
.
.
.field("@timestamp", element.getTime())
.endObject()
);
}
});
esSinkBuilder.setBulkFlushMaxActions(Configurations.getInteger("es.v2.max.bulk.flush"));
esSinkBuilder.setBulkFlushInterval(Configurations.getInteger("es.v2.bulk.flush.interval"));
esSinkBuilder.setFailureHandler(new RetryRejectedExecutionFailureHandler());
esSinkBuilder.setRestClientFactory(new RestClientFactory() {
@Override
public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials(Configurations.getString("es.sensors.untreated.name"),
Configurations.getString("es.sensors.untreated.passwd")));
restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
httpClientBuilder.disableAuthCaching();
httpClientBuilder.setDefaultCredentialsProvider( credentialsProvider);
return httpClientBuilder;
}
});
restClientBuilder.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
@Override
public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder requestConfigBuilder) {
return requestConfigBuilder
.setConnectionRequestTimeout(1000 * 60 * 2)
.setSocketTimeout(1000 * 60 * 2);
}
});
}
});
return esSinkBuilder.build();
}
因为基本上可以确定,数据没有问题,ES集群也没有压力(有短时间的oldGC),所以就把排查重点放在了sink这一块,由于Flink写ES采用的Http方式,所以排查HttpAsyncClientBuilder类的参数发现: private ConnectionKeepAliveStrategy keepAliveStrategy; 而ES sink使用的默认连接时间是-1。即永不过期 怀疑ES集群在OldGC时造成 http 长连接dead,而且不会创建新的连接,导致数据刷ES超时。
5.问题解决
添加自定义的KeepAliveStrategy,这里维持时间设置为 5min
esSinkBuilder.setRestClientFactory(new RestClientFactory() {
@Override
public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials(Configurations.getString("es.sensors.untreated.name"),
Configurations.getString("es.sensors.untreated.passwd")));
restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
httpClientBuilder.disableAuthCaching();
httpClientBuilder.setDefaultCredentialsProvider( credentialsProvider);
httpClientBuilder.setKeepAliveStrategy((response, context) -> Duration.ofMinutes(5).toMillis());
return httpClientBuilder;
}
});
restClientBuilder.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
@Override
public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder requestConfigBuilder) {
return requestConfigBuilder
.setConnectionRequestTimeout(1000 * 60 * 2)
.setSocketTimeout(1000 * 60 * 2);
}
});
}
});
至此,困扰我一个月的问题似乎得到了解决。。。暂时记录一下,希望问题不要复现!
|