目录
前言
一、Kafka的数据 结构
二、步骤
1.构建系统环境
2.将Kafka数据映射成表
3、映射TiDB表数据
4、数据处理
总结
前言
使用Flink SQL的功能实现对Kafka的数据进行处理,减少业务代码的开发工作量,业务处理逻辑使用SQL语句实现
一、Kafka的数据 结构
kafka数据示例,实际比这个要复杂很多
{
"fileInfo": [
{
"fileId": 4169200573588,
"fileSize": 21.0,
"fileType": 5,
"md5": "B54FA0BB16D9D1180619FC5D4D653494"
},
"subjectMap": {
"1234": 0,
"4455": 4373
}
],
"serverIp": "10.101.0.1"
}
二、步骤
1.构建系统环境
代码如下:
//构建系统环境,使用流处理模式
val env = StreamExecutionEnvironment.getExecutionEnvironment
val settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.build()
env.setParallelism(1);
// 重启策略配置
// 固定延迟重启(最多尝试3次,每次间隔10s)
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000L));
// 失败率重启(在10分钟内最多尝试3次,每次至少间隔1分钟)
env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.minutes(10), Time.minutes(1)));
val tableEnv = StreamTableEnvironment.create(env,settings)
2.将Kafka数据映射成表
代码如下:
//读取kafka数据,将topic 映射成表
var sqlInfo =
"""create table data_table (
|serverIp STRING,
|fileInfo ARRAY<ROW<fileId BIGINT,fileSize STRING,fileType INT,md5 STRING>,
|subjectMap MAP<STRING, INT>>
|)
|WITH
|(
|'connector' = 'kafka',
|'topic' = 'middle_vac_to_convert',
|'properties.bootstrap.servers' = '10.101.1.101:9092,10.101.1.102:9092,10.101.1.103:9092',
|'properties.group.id' = 'consumerGroup',
|'format' = 'json',
|'json.ignore-parse-errors' = 'true',
|'scan.startup.mode' = 'latest-offset',
|'json.fail-on-missing-field' = 'false')""".stripMargin
tableEnv.executeSql(sqlInfo)
此处将创建Table? (data_table),后续可以对data_table进行SQL操作
3、映射TiDB表数据
tableEnv.executeSql("CREATE TABLE FILEINFO_TEMP (" +
"ID BIGINT ," +
"FileType INT, " +
"Content STRING, " +
"HashCode STRING, " +
"FileSize STRING, " +
"InsertTime DATE, " +
"UpdateTime DATE" +
") " +
"WITH (" +
"'connector' = 'jdbc'," +
"'url' = 'jdbc:mysql://10.1.1.101:4000/MYDB?useUnicode=true&characterEncoding=utf-8&&useOldAliasMetadataBehavior=true&useSSL=false'," +
"'table-name' = 'FILE_INFO'," +
" 'username' = 'root'," +
" 'password' = '123456'" +
" )");
4、数据处理
//创建对应数据的查询语句
var querySql =
"""
|select fileId1,fileSize2,fileType3,md54 from data_table CROSS JOIN UNNEST(fileInfo) AS t (fileId1,fileSize2,fileType3,md54)
|""".stripMargin
var fileInfoSql = tableEnv.sqlQuery(querySql)
//注册成中间表
tableEnv.createTemporaryView("fileTableView", fileInfoSql)
//构建新增sql语句
var insertsql = "insert into FILEINFO_TEMP select fileId1, CAST(releaseTime AS DATE), fileType3, " +
"CAST(NULL AS STRING), md54, fileSize2, CAST(NULL AS DATE),CAST(NULL AS DATE)," + System.nanoTime() +
" from fileTableView"
//单独提交任务
// tableEnv.executeSql(insertsql .stripMargin)
使用?CROSS JOIN UNNEST 解析数组类型的数据
总结
该处理方式方便熟悉SQL语言的进行操作,方便简单,但是如果涉及多表事务处理,此方法就不支持了,此方法只能写入单表或者无数据关联的表
|