IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Flink Table与其他类型的转换 -> 正文阅读

[大数据]Flink Table与其他类型的转换

1. 将DataStream转换成Table

//DataStream可直接转换为Table,方便调用Table api做各种转换、输出操作
val dataStream: DataStream[YourModelObject] = ...
val table: Table = tableEnv.fromDataStream(dataStream)

//转换后的Table schema 与DataStream中定义的属性一一对应, 亦可显式指定出来,如:
val dataStream: DataStream[YourModelObject] = ...
val table: Table = tableEnv.fromDataStream(dataStream, 'id, 'timestamp, 'temperature)

2. 将DataStream中数据类型, 与Table的schema的对应关系

有两种类型:

    1. 基于字段名称
    1. 基于字段位置
//1.基于名称(name-based)
val table: Table = tableEnv.fromDataStream(dataStream,  'temperature, 'id as 'myId, 'timestamp as 'ts)

//2.基于位置(position-based)
val table: Table = tableEnv.fromDataStream(dataStream, 'ts, 'myId)

3.为DataStream或Table数据创建一个临时视图

//1.基于DataStream创建临时视图
tableEnv.createTemporaryView("sensorDataStream", dataStream)

//2.基于Table创建临时视图
tableEnv.createTemporaryView("sensorTable", sensorTable)

4. Table表输出

表的输出是通过把数据写入到具体的TableSink来实现.
TableSink是一个通用java接口, 可以支持不同的数据落地存储方式: 文件、RDBMS、KV-DB、MQ等.

在api的表现上, 为: 使用 Table.insertinto()方法, 将一个Table写入到已经注册过的TableSink里.

tableEnv.connect(...).createTemporaryTable("outputTable")
val resultTable: Table = ...
resultTable.insertInto("outputTable")

//例如:
    //3.1 使用table api
    //做简单转换
    val simpleTramsformTable = sensorTable
      //scala里的表达式写法(Expression)
      .select( 'id, 'temper )
      .filter( 'id === "sensor1" )
      
    // 输出到文件中
    val outputPath = "D:\\IdeaProject-ws-3\\FlinkTutorialScala\\src\\main\\resources\\input\\sensor_output.txt"

    tableEnv.connect( new FileSystem().path( outputPath ) )
      .withFormat( new Csv() )
      .withSchema(
        new Schema()
          .field( "id", DataTypes.STRING() )
          .field( "temper", DataTypes.DOUBLE() )
      )
      .createTemporaryTable( "outputTable" )

    simpleTramsformTable.insertInto( "outputTable" )
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-03-30 18:32:03  更:2022-03-30 18:35:02 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 15:50:16-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码