flink table api 报错的原因是
在使用table api 进行数据 group by 之后,做数据的排序。这样写之后,运行 报错。
Table sourceTable = tEnv.from("kafka_source");
Table table = sourceTable.groupBy($("level"))
.select($("msg"));
详细错误信息如下:
Exception in thread "main" org.apache.flink.table.api.ValidationException: Cannot resolve field [timeConsuming], input field list:[level].
at org.apache.flink.table.expressions.resolver.rules.ReferenceResolverRule$ExpressionResolverVisitor.failForField(ReferenceResolverRule.java:93)
at org.apache.flink.table.expressions.resolver.rules.ReferenceResolverRule$ExpressionResolverVisitor.lambda$null$3(ReferenceResolverRule.java:87)
at java.util.Optional.orElseThrow(Optional.java:290)
at org.apache.flink.table.expressions.resolver.rules.ReferenceResolverRule$ExpressionResolverVisitor.lambda$null$4(ReferenceResolverRule.java:85)
at java.util.Optional.orElseGet(Optional.java:267)
at org.apache.flink.table.expressions.resolver.rules.ReferenceResolverRule$ExpressionResolverVisitor.lambda$visit$5(ReferenceResolverRule.java:79)
at java.util.Optional.orElseGet(Optional.java:267)
at org.apache.flink.table.expressions.resolver.rules.ReferenceResolverRule$ExpressionResolverVisitor.visit(ReferenceResolverRule.java:73)
at org.apache.flink.table.expressions.resolver.rules.ReferenceResolverRule$ExpressionResolverVisitor.visit(ReferenceResolverRule.java:51)
at org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:29)
at org.apache.flink.table.expressions.UnresolvedReferenceExpression.accept(UnresolvedReferenceExpression.java:59)
at org.apache.flink.table.expressions.resolver.rules.ReferenceResolverRule.lambda$apply$0(ReferenceResolverRule.java:47)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at org.apache.flink.table.expressions.resolver.rules.ReferenceResolverRule.apply(ReferenceResolverRule.java:48)
at org.apache.flink.table.expressions.resolver.ExpressionResolver.lambda$null$2(ExpressionResolver.java:235)
at java.util.function.Function.lambda$andThen$1(Function.java:88)
at java.util.function.Function.lambda$andThen$1(Function.java:88)
at java.util.function.Function.lambda$andThen$1(Function.java:88)
at java.util.function.Function.lambda$andThen$1(Function.java:88)
at org.apache.flink.table.expressions.resolver.ExpressionResolver.resolve(ExpressionResolver.java:198)
at org.apache.flink.table.operations.utils.OperationTreeBuilder.projectInternal(OperationTreeBuilder.java:194)
at org.apache.flink.table.operations.utils.OperationTreeBuilder.project(OperationTreeBuilder.java:169)
at org.apache.flink.table.operations.utils.OperationTreeBuilder.project(OperationTreeBuilder.java:153)
at org.apache.flink.table.api.internal.TableImpl$GroupedTableImpl.select(TableImpl.java:637)
at com.quant.flowcalculation.flinkapi.tableapi.TableApiKafka.main(TableApiKafka.java:64)
报错原因:
Flink SQL的table api只支持 only_full_group_by
也就是 groupy以后剩下的字段就是 groupby里面里面包含的字段。
解决方案:
就是需要查询的字段 要放到group by 里面。不放到里面的字段,要进行 特殊函数处理。
Table table = sourceTable.groupBy($("level"),$("msg")) .select($("msg"),$("timeConsuming"));
特殊的处理如下,
Table table = sourceTable.groupBy($("level"),$("msg"))
.select($("msg"),$("timeConsuming").sum().as("totalNum"));
如果你有疑问为啥在mysql中不满足上述规律也可以正常运行?
因为你的Mysql是5.7以下的.
Flink Table API使用的是mysql5.7以上的语法.
|