1、代码?
package flinkSql
import java.text.SimpleDateFormat
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.{EnvironmentSettings, Table}
import org.apache.flink.types.Row
//视频链接 https://www.bilibili.com/video/BV1Qp4y1Y7YN?p=88
case class FlinkSqlLession3EventTimeSum(name: String, price: Long, ts: Long)
object FlinkSqlLession3_EventTime_sum {
def main(args: Array[String]): Unit = {
val executionEnvironment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //watermark周期性生成,默认是200ms
executionEnvironment.setParallelism(1)
//ddl形式必须使用blink planer ,2.1 blink版本planer的流处理,有setting的情况
val blinkStreamSettings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val tableEnvironment: StreamTableEnvironment = StreamTableEnvironment.create(executionEnvironment, blinkStreamSettings)
// 第一种从流数据中定义和获取event_time
val stream2: DataStream[String] = executionEnvironment.socketTextStream("127.0.0.1", 1111)
val transforStream: DataStream[FlinkSqlLession3EventTimeSum] = stream2.map(data => {
val simpleDateFormat = new SimpleDateFormat("dd/mm/yy:HH:mm:ss")
val tmpList: Array[String] = data.split(",")
val ts = simpleDateFormat.parse(tmpList(2)).getTime
FlinkSqlLession3EventTimeSum(tmpList(0), tmpList(1).toLong, ts)
}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[FlinkSqlLession3EventTimeSum](Time.seconds(0)) {
override def extractTimestamp(t: FlinkSqlLession3EventTimeSum) = t.ts
})
//sourceTable FlinkSql,从流里面定义eventTime,转为table,执行sql
tableEnvironment.createTemporaryView("FlinkSqlLession3EventTimeSumTable", transforStream, 'name, 'price, 'ts.rowtime)
//sinkTable
val sinkDDL: String =
"""
|create table FlinkSqlLession3Sum_test3 (
| name string,
| price bigint
|) with (
| 'connector.type' = 'jdbc',
| 'connector.url' = 'jdbc:mysql://localhost:3306/mybatis?useSSL=false&allowPublicKeyRetrieval=true',
| 'connector.table' = 'FlinkSqlLession3Sum_test5',
| 'connector.driver' = 'com.mysql.jdbc.Driver',
| 'connector.username' = 'root',
| 'connector.password' = 'zha',
| 'connector.write.flush.max-rows' = '1'
|)
""".stripMargin
tableEnvironment.sqlUpdate(sinkDDL)
// // where 语句
// val tumbleSql: String =
// """
// | select name,
// | price
// | from FlinkSqlLession3EventTimeSumTable where name='Bush'
// |""".stripMargin
// group aggregate,会有回撤策略,只能用toRetractStream进行转换,
val tumbleSql: String =
"""
| select name,
| sum(price) price_sum
| from FlinkSqlLession3EventTimeSumTable
| group by
| name
|""".stripMargin
val sqlTable: Table = tableEnvironment.sqlQuery(tumbleSql)
sqlTable.insertInto("FlinkSqlLession3Sum_test3")
sqlTable.toRetractStream[Row].print("FlinkSqlLession3_EventTime_sum")
executionEnvironment.execute("flink sql")
}
}
2、数据
Bush,1000,17/05/2015:10:25:41
Carter,1600,17/05/2015:10:25:42
Bush,700,17/05/2015:10:25:43
Bush,300,17/05/2015:10:25:44
Adams,2000,17/05/2015:10:25:45
Carter,1600,17/05/2015:10:25:51
3、mysql建表
CREATE TABLE `FlinkSqlLession3Sum_test5` (
`name` varchar(10) NOT NULL,
`price` bigint DEFAULT NULL,
PRIMARY KEY (`name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci
4、过程记录
输入:Bush,1000,17/05/2015:10:25:41
输出:FlinkSqlLession3_EventTime_sum> (true,Bush,1000)
查询mysql:
+------+-------+
| name | price |
+------+-------+
| Bush |? 1000 |
+------+-------+
输入:Bush,700,17/05/2015:10:25:43
输出:
FlinkSqlLession3_EventTime_sum> (false,Bush,1000) FlinkSqlLession3_EventTime_sum> (true,Bush,1700)
查询mysql:
+------+-------+
| name | price |
+------+-------+
| Bush |? 1700 |
+------+-------+
|