flink 1.11 No operators defined in streaming topology. Cannot execute.
代码如下:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;
public class TableTest3_FileOutput {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
String filePath = "F:\\project\\flink20210920\\src\\main\\resources\\sensor.txt";
tableEnv.connect(new FileSystem().path(filePath))
.withFormat(new Csv())
.withSchema(new Schema()
.field("id", DataTypes.STRING())
.field("timestamp", DataTypes.BIGINT())
.field("temp", DataTypes.DOUBLE())
)
.createTemporaryTable("inputTable");
Table inputTable = tableEnv.from("inputTable");
Table resultTable = inputTable.select("id, temp")
.filter("id === 'sensor_6'");
Table aggTable = inputTable.groupBy("id")
.select("id, id.count as count, temp.avg as avgTemp");
tableEnv.sqlQuery("select id, temp from inputTable where id = 'senosr_6'");
Table sqlAggTable = tableEnv.sqlQuery("select id, count(id) as cnt, avg(temp) as avgTemp from inputTable group by id");
String outputPath = "F:\\project\\flink20210920\\src\\main\\resources\\out.txt";
tableEnv.connect(new FileSystem().path(outputPath))
.withFormat(new Csv())
.withSchema(new Schema()
.field("id", DataTypes.STRING())
.field("temperature", DataTypes.DOUBLE())
)
.createTemporaryTable("outputTable");
resultTable.insertInto("outputTable");
env.execute("test");
}
}
受到https://blog.csdn.net/wtmdcnm/article/details/117821106以及官网描述 修改测试代码如下:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.executeSql("CREATE TABLE MyTable(\n" +
"`id` STRING," +
"`timestamp` BIGINT," +
"`temp` DOUBLE" +
") WITH (\n" +
" 'connector' = 'filesystem',\n" +
" 'path' = 'F:\\project\\flink20210920\\src\\main\\resources\\sensor.txt',\n" +
" 'format' = 'csv'\n" +
")");
Table resultTable = tableEnv.sqlQuery("select id, temp from MyTable where id = 'sensor_1'");
tableEnv.executeSql("CREATE TABLE MyTable1(\n" +
"`id` STRING," +
"`temp` DOUBLE" +
") WITH (\n" +
" 'connector' = 'filesystem',\n" +
" 'path' = 'F:\\project\\flink20210920\\src\\main\\resources\\out',\n" +
" 'format' = 'csv'\n" +
")");
resultTable.executeInsert("MyTable1");
执行成功
please declare primary key for sink table when query contains update/delete record.
"id STRING primary key," + // 聚合类操作必须要指定一个主键
|