案例来自尚硅谷…
概述
基于时间的操作(比如Table API和SQL中窗口操作),需要定义相关的时间语义和时间数据来源的信息。所以,Table可以提供一个逻辑上的时间字段,用于在表处理程序中,指示时间和访问相应的时间戳。
时间属性,可以是每个表schema的一部分。一旦定义了时间属性,它就可以作为一个字段引用,并且可以在基于时间的操作中使用。
时间属性的行为类似于常规时间戳,可以访问,并且进行计算。
sensor.txt
sensor_1,1547718199,35.8
sensor_6,1547718201,15.4
sensor_7,1547718202,6.7
sensor_10,1547718205,38.1
sensor_1,1547718207,37.2
sensor_1,1547718212,33.5
sensor_1,1547718215,38.1
代码
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.api.{EnvironmentSettings, Table}
import org.apache.flink.table.api.scala._
import org.apache.flink.types.Row
object TimeAndWindowTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val inputStream: DataStream[String] =
env.readTextFile("D:\\20-Flink\\FlinkTutorial\\src\\main\\resources\\sensor.txt")
val dataStream: DataStream[SensorReading] = inputStream
.map(data => {
val dataArray = data.split(",")
SensorReading(dataArray(0), dataArray(1).toLong, dataArray(2).toDouble)
})
.assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) {
override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000L
})
val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
val sensorTable: Table =
tableEnv.fromDataStream(dataStream, 'id, 'timestamp as 'ts, 'temperature, 'pt.proctime)
sensorTable.printSchema()
sensorTable.toAppendStream[Row].print()
env.execute("time and window test")
}
}
打印结果
root
|-- id: STRING
|-- ts: BIGINT
|-- temperature: DOUBLE
|-- pt: TIMESTAMP(3) *PROCTIME*
sensor_1,1547718199,35.8,2021-07-13 08:11:09.802
sensor_6,1547718201,15.4,2021-07-13 08:11:09.816
sensor_7,1547718202,6.7,2021-07-13 08:11:09.816
sensor_10,1547718205,38.1,2021-07-13 08:11:09.818
sensor_1,1547718207,37.2,2021-07-13 08:11:09.818
sensor_1,1547718212,33.5,2021-07-13 08:11:09.818
sensor_1,1547718215,38.1,2021-07-13 08:11:09.819
可以看到表架构多了个pt: TIMESTAMP(3) PROCTIME这个东西,
然后输出结果每行最后都多了个时间字段
|