代码块
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
DataStreamSource<PageVisit> input = env.fromElements(
new PageVisit("2017-09-16 09:00:00", 1001, "/page1"),
new PageVisit("2017-09-16 09:00:00", 1001, "/page2"),
new PageVisit("2017-09-16 09:04:00", 1001, "/page1"),
new PageVisit("2017-09-16 09:05:00", 1001, "/page2"),
new PageVisit("2017-09-16 09:05:00", 1002, "/page2"),
new PageVisit("2017-09-16 09:05:00", 1002, "/page1"),
new PageVisit("2017-09-16 10:30:00", 1005, "/page1"),
new PageVisit("2017-09-16 10:30:00", 1005, "/page1"),
new PageVisit("2017-09-16 10:30:00", 1005, "/page2"));
Table table = tEnv.fromDataStream(input);
Table select = table.groupBy($("visitTime"))
.select($("visitTime"),
$("userId").count().distinct().as("uv")
, $("userId").isEqual(1005).count().as("suv")
)
;
DataStream<Tuple2<Boolean, Row>> dataStream = tEnv.toRetractStream(select, Row.class);
dataStream.print();
异常
Exception in thread "main" org.apache.flink.table.api.TableException: suv is not found in visitTime, EXPR$0, EXPR$1
at org.apache.flink.table.planner.codegen.SinkCodeGenerator$$anonfun$1.apply(SinkCodeGenerator.scala:82)
at org.apache.flink.table.planner.codegen.SinkCodeGenerator$$anonfun$1.apply(SinkCodeGenerator.scala:79)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at org.apache.flink.table.planner.codegen.SinkCodeGenerator$.generateRowConverterOperator(SinkCodeGenerator.scala:79)
at org.apache.flink.table.planner.codegen.SinkCodeGenerator.generateRowConverterOperator(SinkCodeGenerator.scala)
at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLegacySink.translateToTransformation(CommonExecLegacySink.java:190)
at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLegacySink.translateToPlanInternal(CommonExecLegacySink.java:141)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:70)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:69)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:69)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:165)
at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toStreamInternal(StreamTableEnvironmentImpl.java:439)
at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.java:528)
at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.java:517)
at com.xiaolin.stream.RetractStreamApp.main(RetractStreamApp.java:64)
原因
Table select = table.groupBy($("visitTime"))
.select($("visitTime"),
$("userId").count().distinct().as("uv")
, $("userId").isEqual(1005).count().as("suv")
)
.as("visitTime","uv","suv")
ROW<`visitTime` STRING, `EXPR$0` BIGINT NOT NULL, `EXPR$1` BIGINT NOT NULL>
|