流式表处理的过程可以描述出下图:
?1.先把流转换成动态表? ?Stream=>? table
2.表经过一定操作,转成结果表? ?table? (CRUD)=>? table
3.将表转换成流输出? ? ? table => Stream
/**
* @program: flink
* @description: ${description}
* @author: Mr.G
* @create: 2021-09-26 19:21
**/
package com.ct.day08
import com.ct.day01.SensorSource
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api._
import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala._
import org.apache.flink.types.Row
/**
* @ClassName: ScalarFunction
* @Description: ${description}
* @Author Mr.G
* @Date 2021/9/26
* @Version 1.0
*
*/
object TableApiExample {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val settings = EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build()
//得到表处理运行环境
val tEnv = StreamTableEnvironment.create(env,settings)
val stream = env.addSource(new SensorSource)
// stream => table
val table: Table = tEnv.fromDataStream(stream,'id,'timestamp as 'ts,'temperature)
// TableApi 形式
// table CRUD => table
val rstable: Table = table
.filter("id ='sensor_1'")
.select('id)
// table => DStream
tEnv.toAppendStream[Row](rstable)
// .print()
// SQL 形式
//先注册成一个表(视图) Stream => table
tEnv.createTemporaryView("sensor",stream,'id,'timestamp as 'ts,'temperature)
//table CRUD => table
tEnv.sqlQuery(" select id from sensor where id = 'sensor_1'")
//table => Stream
.toAppendStream[Row]
// .toRetractStream[Row]
.print()
env.execute()
}
}
|