IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> flinkSql的group by写入mysql -> 正文阅读

[大数据]flinkSql的group by写入mysql

1、代码?

package flinkSql

import java.text.SimpleDateFormat

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.{EnvironmentSettings, Table}
import org.apache.flink.types.Row

//视频链接 https://www.bilibili.com/video/BV1Qp4y1Y7YN?p=88
case class FlinkSqlLession3EventTimeSum(name: String, price: Long, ts: Long)

object FlinkSqlLession3_EventTime_sum {
  def main(args: Array[String]): Unit = {
    val executionEnvironment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //watermark周期性生成,默认是200ms
    executionEnvironment.setParallelism(1)
    //ddl形式必须使用blink planer ,2.1 blink版本planer的流处理,有setting的情况
    val blinkStreamSettings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
    val tableEnvironment: StreamTableEnvironment = StreamTableEnvironment.create(executionEnvironment, blinkStreamSettings)

    //    第一种从流数据中定义和获取event_time
    val stream2: DataStream[String] = executionEnvironment.socketTextStream("127.0.0.1", 1111)
    val transforStream: DataStream[FlinkSqlLession3EventTimeSum] = stream2.map(data => {
      val simpleDateFormat = new SimpleDateFormat("dd/mm/yy:HH:mm:ss")
      val tmpList: Array[String] = data.split(",")
      val ts = simpleDateFormat.parse(tmpList(2)).getTime
      FlinkSqlLession3EventTimeSum(tmpList(0), tmpList(1).toLong, ts)
    }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[FlinkSqlLession3EventTimeSum](Time.seconds(0)) {
      override def extractTimestamp(t: FlinkSqlLession3EventTimeSum) = t.ts
    })
    //sourceTable  FlinkSql,从流里面定义eventTime,转为table,执行sql
    tableEnvironment.createTemporaryView("FlinkSqlLession3EventTimeSumTable", transforStream, 'name, 'price, 'ts.rowtime)

    //sinkTable
    val sinkDDL: String =
      """
        |create table FlinkSqlLession3Sum_test3 (
        | name string,
        | price bigint
        |) with (
        | 'connector.type' = 'jdbc',
        | 'connector.url' = 'jdbc:mysql://localhost:3306/mybatis?useSSL=false&allowPublicKeyRetrieval=true',
        | 'connector.table' = 'FlinkSqlLession3Sum_test5',
        | 'connector.driver' = 'com.mysql.jdbc.Driver',
        | 'connector.username' = 'root',
        | 'connector.password' = 'zha',
        | 'connector.write.flush.max-rows' = '1'
        |)
          """.stripMargin
    tableEnvironment.sqlUpdate(sinkDDL)


    //    // where 语句
    //    val tumbleSql: String =
    //      """
    //        | select name,
    //        |   price
    //        | from FlinkSqlLession3EventTimeSumTable where name='Bush'
    //        |""".stripMargin

//    group aggregate,会有回撤策略,只能用toRetractStream进行转换,
        val tumbleSql: String =
          """
            | select name,
            |   sum(price) price_sum
            | from FlinkSqlLession3EventTimeSumTable
            | group by
            |   name
            |""".stripMargin




    val sqlTable: Table = tableEnvironment.sqlQuery(tumbleSql)
    sqlTable.insertInto("FlinkSqlLession3Sum_test3")

    sqlTable.toRetractStream[Row].print("FlinkSqlLession3_EventTime_sum")

    executionEnvironment.execute("flink sql")
  }
}

2、数据

Bush,1000,17/05/2015:10:25:41
Carter,1600,17/05/2015:10:25:42
Bush,700,17/05/2015:10:25:43
Bush,300,17/05/2015:10:25:44
Adams,2000,17/05/2015:10:25:45
Carter,1600,17/05/2015:10:25:51

3、mysql建表

CREATE TABLE `FlinkSqlLession3Sum_test5` (
  `name` varchar(10) NOT NULL,
  `price` bigint DEFAULT NULL,
  PRIMARY KEY (`name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci

4、过程记录

输入:Bush,1000,17/05/2015:10:25:41

输出:FlinkSqlLession3_EventTime_sum> (true,Bush,1000)

查询mysql:

+------+-------+

| name | price |

+------+-------+

| Bush |? 1000 |

+------+-------+

输入:Bush,700,17/05/2015:10:25:43

输出:

FlinkSqlLession3_EventTime_sum> (false,Bush,1000)
FlinkSqlLession3_EventTime_sum> (true,Bush,1700)

查询mysql:

+------+-------+

| name | price |

+------+-------+

| Bush |? 1700 |

+------+-------+

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-11-11 12:46:40  更:2021-11-11 12:48:20 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 0:37:35-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码