背景
本文基于Flink 1.13.3 Flink计算引擎VVR版本的hbase Connector 具体maven依赖如下:
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-cloudhbase</artifactId>
<version>1.13-vvr-4.0.7</version>
</dependency>
在基于VVR版本的cloudHbase维表查询的时候,发现同步查询的速度很慢,所以我们打算做基于异步的维表查询。
在运行的过程中发现了NPE问题,具体的报错堆栈如下:
2022-06-08 15:01:05
java.lang.Exception: Could not complete the stream element: Record @ (undef) : org.apache.flink.table.data.binary.BinaryRowData@d8014011.
at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator$ResultHandler.completeExceptionally(AsyncWaitOperator.java:382)
at org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinRunner$JoinedRowResultFuture.completeExceptionally(AsyncLookupJoinRunner.java:253)
at org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinRunner$JoinedRowResultFuture$DelegateResultFuture.completeExceptionally(AsyncLookupJoinRunner.java:275)
at org.apache.flink.table.runtime.collector.TableFunctionResultFuture.completeExceptionally(TableFunctionResultFuture.java:61)
at org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinWithCalcRunner$TemporalTableCalcResultFuture.complete(AsyncLookupJoinWithCalcRunner.java:121)
at org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinRunner$JoinedRowResultFuture.complete(AsyncLookupJoinRunner.java:219)
at org.apache.flink.table.runtime.operators.join.lookup.DelegatingResultFuture.accept(DelegatingResultFuture.java:48)
at org.apache.flink.table.runtime.operators.join.lookup.DelegatingResultFuture.accept(DelegatingResultFuture.java:32)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
at org.apache.flink.connector.xx.cloudhbase.source.AsyncHBaseLRURowFetcher.lambda$fetchResult$6(AsyncHBaseLRURowFetcher.java:223)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:443)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: java.lang.NullPointerException
at TableCalcMapFunction$8.flatMap(Unknown Source)
at org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinWithCalcRunner$TemporalTableCalcResultFuture.complete(AsyncLookupJoinWithCalcRunner.java:119)
... 15 more
先说结论
- Flink计算引擎VVR版本的hbase Connector把hbase的数据转化为RowData的时候存在多线程问题,这种会导致NPE问题
- 相比Asynchronous I/O for External Data Access 的实现,我们不需要实现RichAsyncFunction类的asyncInvoke方法,只需要实现*eval(CompletableFuture<Collection> future, RowData rowData)*方法即可,因为flink做在codegen的时候做封装
分析
- 初始定位
定位到 AsyncLookupJoinWithCalcRunner 119行如下:
@Override
public void complete(Collection<RowData> result) {
if (result == null || result.size() == 0) {
joinConditionResultFuture.complete(result);
} else {
for (RowData row : result) {
try {
calc.flatMap(row, calcCollector);
} catch (Exception e) {
joinConditionResultFuture.completeExceptionally(e);
}
}
joinConditionResultFuture.complete(calcCollector.collection);
}
}
起初的时候是怀疑 calc是null,后来经过排查不是此问题.
- 再定位
重新定位到自己实现的类 AsyncHBaseLRURowFetcher 223行,如下:
RowData rowData = readHelper.convertToRow(result);
if (cache != null) {
resultFuture.complete(Collections.singletonList(rowData));
cache.put(rowKey, rowData);
} else {
resultFuture.complete(Collections.singletonList(rowData));
}
经过测试发现rowData数据居然为null,也就是说 传给calc.flatMap(row, calcCollector) 中的row是null,难道calc没有对null做处理么?(calc 是flink codegen出来的对象)。我们把codegen的代码打印出来如下:
public class TableCalcMapFunction$8
extends org.apache.flink.api.common.functions.RichFlatMapFunction {
private transient org.apache.flink.table.runtime.typeutils.StringDataSerializer typeSerializer$6;
org.apache.flink.table.data.GenericRowData out = new org.apache.flink.table.data.GenericRowData(1);
public TableCalcMapFunction$8(Object[] references) throws Exception {
typeSerializer$6 = (((org.apache.flink.table.runtime.typeutils.StringDataSerializer) references[0]));
}
@Override
public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
}
@Override
public void flatMap(Object _in1, org.apache.flink.util.Collector c) throws Exception {
org.apache.flink.table.data.RowData in1 = (org.apache.flink.table.data.RowData) _in1;
org.apache.flink.table.data.binary.BinaryStringData field$5;
boolean isNull$5;
org.apache.flink.table.data.binary.BinaryStringData field$7;
isNull$5 = in1.isNullAt(0);
field$5 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
if (!isNull$5) {
field$5 = ((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(0));
}
field$7 = field$5;
if (!isNull$5) {
field$7 = (org.apache.flink.table.data.binary.BinaryStringData) (typeSerializer$6.copy(field$7));
}
if (isNull$5) {
out.setField(0, null);
} else {
out.setField(0, field$7);
}
c.collect(out);
}
@Override
public void close() throws Exception {
}
}
还真是没有对null进行处理,难道是Flink的codegen的实现有问题?不是的。 再次找到 RowData rowData = readHelper.convertToRow(result) 这段代码,发现该方法存在多线程问题,如下:
this.rowDataGenerator.start();
...
return this.rowDataGenerator.end();
this.rowDataGenerator.start()的实现如下:
this.rowData = genericRowData()
其中this.rowDataGenerator.end()内部实现如下:
GenericRowData localRowData = this.rowData;
this.rowData = null;
return localRowData;
对同一个rowData对象进行操作,这显然在多线程环境下是有问题的。
所以说这块代码进行修改,每次都重新创建对象,即可规避这个问题,也解决了NPE问题。
额外话题
对于Flink SQL在内部是怎么实现异步操作呢?如果按照Asynchronous I/O for External Data Access,我们是应该继承RichAsyncFunction类从而实现asyncInvoke方法,然而我们的实现仅仅是*eval(CompletableFuture<Collection> future, RowData rowData)*方法。
这还是得从AsyncLookupJoinRunner 这个类说起,
public class AsyncLookupJoinRunner extends RichAsyncFunction<RowData, RowData> {
...
private final GeneratedFunction<AsyncFunction<RowData, Object>> generatedFetcher;
...
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.fetcher = generatedFetcher.newInstance(getRuntimeContext().getUserCodeClassLoader());
FunctionUtils.setFunctionRuntimeContext(fetcher, getRuntimeContext());
FunctionUtils.openFunction(fetcher, parameters);
'''
@Override
public void asyncInvoke(RowData input, ResultFuture<RowData> resultFuture) throws Exception {
JoinedRowResultFuture outResultFuture = resultFutureBuffer.take();
// the input row is copied when object reuse in AsyncWaitOperator
outResultFuture.reset(input, resultFuture);
// fetcher has copied the input field when object reuse is enabled
fetcher.asyncInvoke(input, outResultFuture);
}
其中generatedFetcher这个就是codegen生成的代码,我们打印generatedFetcher中的code,如下:
public class LookupFunction$3
extends org.apache.flink.streaming.api.functions.async.RichAsyncFunction {
private transient org.apache.flink.table.runtime.typeutils.StringDataSerializer typeSerializer$1;
private transient org.apache.flink.connector.xx.cloudhbase.source.AsyncLookupFunctionWrapper function_org$apache$flink$connector$xx$cloudhbase$source$AsyncLookupFunctionWrapper$352ff579c3a5d7c2997681cba992cdd3;
public LookupFunction$3(Object[] references) throws Exception {
typeSerializer$1 = (((org.apache.flink.table.runtime.typeutils.StringDataSerializer) references[0]));
function_org$apache$flink$connector$xx$cloudhbase$source$AsyncLookupFunctionWrapper$352ff579c3a5d7c2997681cba992cdd3 = (((org.apache.flink.connector.xx.cloudhbase.source.AsyncLookupFunctionWrapper) references[1]));
}
@Override
public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
function_org$apache$flink$connector$xx$cloudhbase$source$AsyncLookupFunctionWrapper$352ff579c3a5d7c2997681cba992cdd3.open(new org.apache.flink.table.functions.FunctionContext(getRuntimeContext()));
}
@Override
public void asyncInvoke(Object _in1, org.apache.flink.streaming.api.functions.async.ResultFuture c) throws Exception {
org.apache.flink.table.data.RowData in1 = (org.apache.flink.table.data.RowData) _in1;
org.apache.flink.table.data.binary.BinaryStringData field$0;
boolean isNull$0;
org.apache.flink.table.data.binary.BinaryStringData field$2;
isNull$0 = in1.isNullAt(2);
field$0 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
if (!isNull$0) {
field$0 = ((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(2));
}
field$2 = field$0;
if (!isNull$0) {
field$2 = (org.apache.flink.table.data.binary.BinaryStringData) (typeSerializer$1.copy(field$2));
}
if (isNull$0) {
c.complete(java.util.Collections.emptyList());
} else {
org.apache.flink.table.runtime.operators.join.lookup.DelegatingResultFuture delegates = new org.apache.flink.table.runtime.operators.join.lookup.DelegatingResultFuture(c);
function_org$apache$flink$connector$xx$cloudhbase$source$AsyncLookupFunctionWrapper$352ff579c3a5d7c2997681cba992cdd3.eval(
delegates.getCompletableFuture(),
isNull$0 ? null : ((org.apache.flink.table.data.binary.BinaryStringData) field$2));
}
}
@Override
public void close() throws Exception {
function_org$apache$flink$connector$xx$cloudhbase$source$AsyncLookupFunctionWrapper$352ff579c3a5d7c2997681cba992cdd3.close();
}
}
可以看到:
org.apache.flink.table.runtime.operators.join.lookup.DelegatingResultFuture delegates = new org.apache.flink.table.runtime.operators.join.lookup.DelegatingResultFuture(c);
function_org$apache$flink$connector$xx$cloudhbase$source$AsyncLookupFunctionWrapper$352ff579c3a5d7c2997681cba992cdd3.eval(
delegates.getCompletableFuture(),
isNull$0 ? null : ((org.apache.flink.table.data.binary.BinaryStringData) field$2));
- 用到了DelegatingResultFuture类作为CompletableFuture到ResultFuture类的转换,所以我们的自己实现的方法签名是CompletableFuture
- function_org
a
p
a
c
h
e
apache
apacheflink
c
o
n
n
e
c
t
o
r
connector
connectorxx
c
l
o
u
d
h
b
a
s
e
cloudhbase
cloudhbasesource$AsyncLookupFunctionWrapper$352ff579c3a5d7c2997681cba992cdd3 是我们自己写类的对象(要继承自AsyncTableFunction类),这样就只需要实现一个*eval(CompletableFuture<Collection> future, RowData rowData)*方法。
|