1. 将DataStream转换成Table
val dataStream: DataStream[YourModelObject] = ...
val table: Table = tableEnv.fromDataStream(dataStream)
val dataStream: DataStream[YourModelObject] = ...
val table: Table = tableEnv.fromDataStream(dataStream, 'id, 'timestamp, 'temperature)
2. 将DataStream中数据类型, 与Table的schema的对应关系
有两种类型:
val table: Table = tableEnv.fromDataStream(dataStream, 'temperature, 'id as 'myId, 'timestamp as 'ts)
val table: Table = tableEnv.fromDataStream(dataStream, 'ts, 'myId)
3.为DataStream或Table数据创建一个临时视图
tableEnv.createTemporaryView("sensorDataStream", dataStream)
tableEnv.createTemporaryView("sensorTable", sensorTable)
4. Table表输出
表的输出是通过把数据写入到具体的TableSink来实现. TableSink是一个通用java接口, 可以支持不同的数据落地存储方式: 文件、RDBMS、KV-DB、MQ等.
在api的表现上, 为: 使用 Table.insertinto()方法, 将一个Table写入到已经注册过的TableSink里.
tableEnv.connect(...).createTemporaryTable("outputTable")
val resultTable: Table = ...
resultTable.insertInto("outputTable")
val simpleTramsformTable = sensorTable
.select( 'id, 'temper )
.filter( 'id === "sensor1" )
val outputPath = "D:\\IdeaProject-ws-3\\FlinkTutorialScala\\src\\main\\resources\\input\\sensor_output.txt"
tableEnv.connect( new FileSystem().path( outputPath ) )
.withFormat( new Csv() )
.withSchema(
new Schema()
.field( "id", DataTypes.STRING() )
.field( "temper", DataTypes.DOUBLE() )
)
.createTemporaryTable( "outputTable" )
simpleTramsformTable.insertInto( "outputTable" )
|