IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Flink SQL实现对Kafka的数据处理,写入TiDB -> 正文阅读

[大数据]Flink SQL实现对Kafka的数据处理,写入TiDB

目录

前言

一、Kafka的数据 结构

二、步骤

1.构建系统环境

2.将Kafka数据映射成表

3、映射TiDB表数据

4、数据处理

总结



前言

使用Flink SQL的功能实现对Kafka的数据进行处理,减少业务代码的开发工作量,业务处理逻辑使用SQL语句实现

一、Kafka的数据 结构

kafka数据示例,实际比这个要复杂很多

{
  "fileInfo": [
    {
      "fileId": 4169200573588,
      "fileSize": 21.0,
      "fileType": 5,
      "md5": "B54FA0BB16D9D1180619FC5D4D653494"
    },
    "subjectMap": {
        "1234": 0,
        "4455": 4373
      }
  ],
  "serverIp": "10.101.0.1"
}

二、步骤

1.构建系统环境

代码如下:

//构建系统环境,使用流处理模式
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val settings = EnvironmentSettings
      .newInstance()
      .inStreamingMode()
      .build()

    env.setParallelism(1);
    // 重启策略配置
    // 固定延迟重启(最多尝试3次,每次间隔10s)
    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000L));
    // 失败率重启(在10分钟内最多尝试3次,每次至少间隔1分钟)
    env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.minutes(10), Time.minutes(1)));
    val tableEnv = StreamTableEnvironment.create(env,settings)

2.将Kafka数据映射成表

代码如下:

//读取kafka数据,将topic 映射成表
    var sqlInfo =
      """create table data_table (
        |serverIp STRING,
        |fileInfo ARRAY<ROW<fileId BIGINT,fileSize STRING,fileType INT,md5 STRING>,
        |subjectMap MAP<STRING, INT>>
        |)
        |WITH
        |(
        |'connector' = 'kafka',
        |'topic' = 'middle_vac_to_convert',
        |'properties.bootstrap.servers' = '10.101.1.101:9092,10.101.1.102:9092,10.101.1.103:9092',
        |'properties.group.id' = 'consumerGroup',
        |'format' = 'json',
        |'json.ignore-parse-errors' = 'true',
        |'scan.startup.mode' = 'latest-offset',
        |'json.fail-on-missing-field' = 'false')""".stripMargin
    tableEnv.executeSql(sqlInfo)

此处将创建Table? (data_table),后续可以对data_table进行SQL操作

3、映射TiDB表数据

tableEnv.executeSql("CREATE TABLE  FILEINFO_TEMP (" +
      "ID BIGINT ," +
      "FileType INT, " +
      "Content STRING, " +
      "HashCode STRING, " +
      "FileSize STRING, " +
      "InsertTime DATE, " +
      "UpdateTime DATE" +
            ") " +
      "WITH (" +
      "'connector' = 'jdbc'," +
      "'url' = 'jdbc:mysql://10.1.1.101:4000/MYDB?useUnicode=true&characterEncoding=utf-8&&useOldAliasMetadataBehavior=true&useSSL=false'," +
      "'table-name' = 'FILE_INFO'," +
      " 'username' = 'root'," +
      " 'password' = '123456'" +
      " )");

4、数据处理

//创建对应数据的查询语句
    var querySql =
      """
        |select  fileId1,fileSize2,fileType3,md54 from data_table CROSS JOIN UNNEST(fileInfo) AS t (fileId1,fileSize2,fileType3,md54)
        |""".stripMargin
    var fileInfoSql = tableEnv.sqlQuery(querySql)
//注册成中间表
    tableEnv.createTemporaryView("fileTableView", fileInfoSql)
    //构建新增sql语句
    var insertsql = "insert into FILEINFO_TEMP select fileId1, CAST(releaseTime AS DATE), fileType3, " +
      "CAST(NULL AS STRING), md54, fileSize2, CAST(NULL AS DATE),CAST(NULL AS DATE)," + System.nanoTime() +
      " from fileTableView"
    //单独提交任务
//    tableEnv.executeSql(insertsql .stripMargin)

使用?CROSS JOIN UNNEST 解析数组类型的数据


总结

该处理方式方便熟悉SQL语言的进行操作,方便简单,但是如果涉及多表事务处理,此方法就不支持了,此方法只能写入单表或者无数据关联的表

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-07-20 18:56:33  更:2022-07-20 18:57:01 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 2:00:51-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码