IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 27.flink table api 流表转换 -> 正文阅读

[大数据]27.flink table api 流表转换

摘要

核心的英文并未删除,方便读者甄别我的理解和翻译。另外因为涉及到Table中如何传播 水位线和event_time所以读者需要知道DataStream api中的水位线知识,此知识请参考:DataStream 中的水位线,如果DataStream 中的水位线没读懂,建议不要往下看了,以防走火入魔。

官网的说法

在阅读下面之前,读者需要知道:fromChangelogStream()fromDataStream()方法一样是将DataStream 转化为Table 的基础接口。只不过二者DataStream的流数据不同,fromChangelogStreamfrom中的DataStream数据携带操作符(增删改),也就是说数据流中连续的几条数据其实是对一条数据的操作记录,而不是真实的数据,你可以理解为这是用户对数据的操作日志,而fromDataStream中的DataStream中每条数据都是独立的。如果很难理解我在说什么请去查下什么是cdc,不理解cdc的概念请不要阅读此文章,以免走火入魔。

1.Insert-only 类型的流表转化

  1. fromDataStream(DataStream): Interprets a stream of insert-only changes and arbitrary type as a table. Event-time and watermarks are not propagated by default. 默认DataStream中的事件时间和水位线不会传播到Table中。

  2. fromDataStream(DataStream, Schema): Interprets a stream of insert-only changes and arbitrary type as a table. The optional schema allows to enrich column data types and add time attributes, watermarks strategies, other computed columns, or primary keys.
    Schema可以在DataStream->Table的过程中为Table添加新的字段,或者为Table添加时间属性,再或者为table提供水位线,以及为 table提供唯一主键。

  3. createTemporaryView(String, DataStream): Registers the stream under a name to access it in SQL. It is a shortcut for createTemporaryView(String, fromDataStream(DataStream)). 基于fromDataStream(DataStream)创建了视图。

  4. createTemporaryView(String, DataStream, Schema): Registers the stream under a name to access it in SQL. It is a shortcut for createTemporaryView(String, fromDataStream(DataStream, Schema)). 基于fromDataStream(DataStream, Schema)创建了视图

  5. toDataStream(DataStream): Converts a table into a stream of insert-only changes. The default stream record type is org.apache.flink.types.Row. A single rowtime attribute column is written back into the DataStream API’s record. Watermarks are propagated as well. 默认会传播时间和水位线:时间属性字段名用rowtime ,此时间属性是event_time还是processing_time和table定义有关。

  6. toDataStream(DataStream, AbstractDataType): Converts a table into a stream of insert-only changes. This method accepts a data type to express the desired stream record type. The planner might insert implicit casts and reorders columns to map columns to fields of the (possibly nested) data type. table到Stream的时候,table中的行数据用指定的AbstractDataType接收

  7. toDataStream(DataStream, Class): A shortcut for toDataStream(DataStream, DataTypes.of(Class)) to quickly create the desired data type reflectively. table到Stream的时候,table中的行数据用指定的Class接收

1.1 demo小例子

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import java.time.Instant;


public class Student {
    public int id;
    public String name;
    public String sex;

    public Student() {
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getSex() {
        return sex;
    }

    public void setSex(String sex) {
        this.sex = sex;
    }

    @Override
    public String toString() {
        return "Student{" +
                "id=" + this.id +
                ", name='" + this.name + '\'' +
                ", sex='" + this.sex + '\'' +
                '}';
    }
}

// create a DataStream
DataStream<User> dataStream =
    env.fromElements(
        new User("Alice", 4, Instant.ofEpochMilli(1000)),
        new User("Bob", 6, Instant.ofEpochMilli(1001)),
        new User("Alice", 10, Instant.ofEpochMilli(1002)));


// === EXAMPLE 1 ===

 **默认flink会尽可能做动态转换**

Table table = tableEnv.fromDataStream(dataStream);
table.printSchema();
// prints:
// (
//  `name` STRING,
//  `score` INT,
//  `event_time` TIMESTAMP_LTZ(9)
// )


// === EXAMPLE 2 ===

// derive all physical columns automatically
// but add computed columns (in this case for creating a proctime attribute column)

Table table = tableEnv.fromDataStream(
    dataStream,
    Schema.newBuilder()
    	**为每一行数据添加一个处理时间,该字段名称proc_time,值是PROCTIME()计算出来的**
        .columnByExpression("proc_time", "PROCTIME()")
        .build());
table.printSchema();
// prints:
// (
//  `name` STRING,
//  `score` INT NOT NULL,
//  `event_time` TIMESTAMP_LTZ(9),
//  `proc_time` TIMESTAMP_LTZ(3) NOT NULL *PROCTIME* AS PROCTIME()
//)


// === EXAMPLE 3 ===

// derive all physical columns automatically
// but add computed columns (in this case for creating a rowtime attribute column)
// and a custom watermark strategy

Table table =
    tableEnv.fromDataStream(
        dataStream,
        //对象中的event_time 是字符串格式,在这里调用cast方法将其转换为flink支持的时间格式,并用字段rowtime存储
        //此时每行数据都会多一个rowtime字段
        //watermark("rowtime", "rowtime - INTERVAL '10' SECOND")意思是用rowtime作为事件时间,
        //同时指定了十秒周期产生水位线
        Schema.newBuilder()
            .columnByExpression("rowtime", "CAST(event_time AS TIMESTAMP_LTZ(3))")
            .watermark("rowtime", "rowtime - INTERVAL '10' SECOND")
            .build());
table.printSchema();//打印出表结构
// prints:
// (
//  `name` STRING,
//  `score` INT,
//  `event_time` TIMESTAMP_LTZ(9),
//  `rowtime` TIMESTAMP_LTZ(3) *ROWTIME* AS CAST(event_time AS TIMESTAMP_LTZ(3)),
//  WATERMARK FOR `rowtime`: TIMESTAMP_LTZ(3) AS rowtime - INTERVAL '10' SECOND
// )


// === EXAMPLE 4   请多读取一下 ===

// derive all physical columns automatically->动态载入物理列
// but access the stream record's timestamp for creating a rowtime attribute column-》获取流数据中时间,创建rowtime列
// also rely on the watermarks generated in the DataStream API
// we assume that a watermark strategy has been defined for `dataStream` before


// columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)")意思是获取DataStream中的事件时间
//前提是DataStream中已经提取了某个字段作为事件时间了
//watermark("rowtime", "SOURCE_WATERMARK()") 意思是水位线不依赖table api生成,而是用DataStream中生成的水位线
//前提是DataStream中已经设置了水位线策略。
Table table =
    tableEnv.fromDataStream(
        dataStream,
        Schema.newBuilder()
            .columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)")
            .watermark("rowtime", "SOURCE_WATERMARK()")
            .build());
table.printSchema();
// prints:
// (
//  `name` STRING,
//  `score` INT,
//  `event_time` TIMESTAMP_LTZ(9),
//  `rowtime` TIMESTAMP_LTZ(3) *ROWTIME* METADATA,
//  WATERMARK FOR `rowtime`: TIMESTAMP_LTZ(3) AS SOURCE_WATERMARK()
// )


// === EXAMPLE 5 ===

// define physical columns manually
// in this example,
//   - we can reduce the default precision of timestamps from 9 to 3
//   - we also project the columns and put `event_time` to the beginning

Table table =
    tableEnv.fromDataStream(
        dataStream,
        Schema.newBuilder()
            .column("event_time", "TIMESTAMP_LTZ(3)")
            .column("name", "STRING")
            .column("score", "INT")
            .watermark("event_time", "SOURCE_WATERMARK()")
            .build());
table.printSchema();
// prints:
// (
//  `event_time` TIMESTAMP_LTZ(3) *ROWTIME*,
//  `name` VARCHAR(200),
//  `score` INT
// )
// note: the watermark strategy is not shown due to the inserted column reordering projection

2.change log 表流转换

Internally, Flink’s table runtime is a changelog processor. The concepts page describes how dynamic tables and streams relate to each other.
A StreamTableEnvironment offers the following methods to expose these change data capture (CDC) functionalities:
9. fromChangelogStream(DataStream):
Interprets a stream of changelog entries as a table. The stream record type must be org.apache.flink.types.Row since its RowKind flag is evaluated during runtime. Event-time and watermarks are not propagated by default. This method expects a changelog containing all kinds of changes (enumerated in org.apache.flink.types.RowKind) as the default ChangelogMode.
(默认不传播水位线,流数据中的每条数据都要用Row对象分装,且Row对象中有个属性RowKind ,表明的数据的操作类型)
10. fromChangelogStream(DataStream, Schema)
Schema中可以定义列的信息,可以定义水位线和event_time的信息
11. fromChangelogStream(DataStream, Schema, ChangelogMode): Gives full control about how to interpret a stream as a changelog. The passed ChangelogMode helps the planner to distinguish between **
insert-only, upsert, or retract behavior.**
4. toChangelogStream(Table): Reverse operation of fromChangelogStream(DataStream). It produces a stream with instances of org.apache.flink.types.Row and sets the RowKind flag for every record at runtime. All kinds of updating tables are supported by this method. If the input table contains a single rowtime column, it will be propagated into a stream record’s timestamp即事件的时间event_time). Watermarks will be propagated as well.(水位线也会被传播,rowtime列会自动转换为event_time)
5. toChangelogStream(Table, Schema): Reverse operation of fromChangelogStream(DataStream, Schema). The method can enrich the produced column data types. The planner might insert implicit(隐士的) casts if necessary. It is possible to write out the rowtime as a metadata column.
6. toChangelogStream(Table, Schema, ChangelogMode): Gives full control about how to convert a table to a changelog stream. The passed ChangelogMode helps the planner to distinguish(辨别) between insert-only, upsert, or retract behavior.

2.1 demo例子

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;

// === EXAMPLE 1 ===

// interpret the stream as a retract stream

// create a changelog DataStream
DataStream<Row> dataStream =
    env.fromElements(
        Row.ofKind(RowKind.INSERT, "Alice", 12),
        Row.ofKind(RowKind.INSERT, "Bob", 5),
        Row.ofKind(RowKind.UPDATE_BEFORE, "Alice", 12),
        Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 100));

// DataStream 转化为表
Table table = tableEnv.fromChangelogStream(dataStream);

// 注册表并执行聚合
tableEnv.createTemporaryView("InputTable", table);
tableEnv
    .executeSql("SELECT f0 AS name, SUM(f1) AS score FROM InputTable GROUP BY f0")
    .print();

// prints:
// +----+--------------------------------+-------------+
// | op |                           name |       score |
// +----+--------------------------------+-------------+
// | +I |                            Bob |           5 |
// | +I |                          Alice |          12 |
// | -D |                          Alice |          12 |
// | +I |                          Alice |         100 |
// +----+--------------------------------+-------------+


// === EXAMPLE 2 ===

// interpret the stream as an upsert stream (without a need for UPDATE_BEFORE)

// create a changelog DataStream
DataStream<Row> dataStream =
    env.fromElements(
        Row.ofKind(RowKind.INSERT, "Alice", 12),
        Row.ofKind(RowKind.INSERT, "Bob", 5),
        Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 100));

// interpret the DataStream as a Table
Table table =
    tableEnv.fromChangelogStream(
        dataStream,
        Schema.newBuilder().primaryKey("f0").build(),
        ChangelogMode.upsert());

// register the table under a name and perform an aggregation
tableEnv.createTemporaryView("InputTable", table);
tableEnv
    .executeSql("SELECT f0 AS name, SUM(f1) AS score FROM InputTable GROUP BY f0")
    .print();

// prints:
// +----+--------------------------------+-------------+
// | op |                           name |       score |
// +----+--------------------------------+-------------+
// | +I |                            Bob |           5 |
// | +I |                          Alice |          12 |
// | -U |                          Alice |          12 |
// | +U |                          Alice |         100 |
// +----+--------------------------------+-------------+
示例1中显示的默认ChangelogMode对于大多数用例应该足够了,因为它接受所有类型的更改。
示例2展示了如何使用upsert模式将更新消息的数量减少50%,从而限制传入更改的类型以提高效率。
可以通过为toChangelogStream定义一个主键和upsert更改日志管理模式来减少结果消息的数量。
下面是RowKind的源码:
public enum RowKind {

    // Note: Enums have no stable hash code across different JVMs, use toByteValue() for
    // this purpose.

    /** Insertion operation. */
    INSERT("+I", (byte) 0),

    /**
     * Update operation with the previous content of the updated row.
     *
     * <p>This kind SHOULD occur together with {@link #UPDATE_AFTER} for modelling an update that
     * needs to retract the previous row first. It is useful in cases of a non-idempotent update,
     * i.e., an update of a row that is not uniquely identifiable by a key.
     */
    UPDATE_BEFORE("-U", (byte) 1),

    /**
     * Update operation with new content of the updated row.
     *
     * <p>This kind CAN occur together with {@link #UPDATE_BEFORE} for modelling an update that
     * needs to retract the previous row first. OR it describes an idempotent update, i.e., an
     * update of a row that is uniquely identifiable by a key.
     */
    UPDATE_AFTER("+U", (byte) 2),

    /** Deletion operation. */
    DELETE("-D", (byte) 3);

    private final String shortString;

    private final byte value;

    /**
     * Creates a {@link RowKind} enum with the given short string and byte value representation of
     * the {@link RowKind}.
     */
    RowKind(String shortString, byte value) {
        this.shortString = shortString;
        this.value = value;
    }

    /**
     * Returns a short string representation of this {@link RowKind}.
     *
     * <p>
     *
     * <ul>
     *   <li>"+I" represents {@link #INSERT}.
     *   <li>"-U" represents {@link #UPDATE_BEFORE}.
     *   <li>"+U" represents {@link #UPDATE_AFTER}.
     *   <li>"-D" represents {@link #DELETE}.
     * </ul>
     */
    public String shortString() {
        return shortString;
    }

    /**
     * Returns the byte value representation of this {@link RowKind}. The byte value is used for
     * serialization and deserialization.
     *
     * <p>
     *
     * <ul>
     *   <li>"0" represents {@link #INSERT}.
     *   <li>"1" represents {@link #UPDATE_BEFORE}.
     *   <li>"2" represents {@link #UPDATE_AFTER}.
     *   <li>"3" represents {@link #DELETE}.
     * </ul>
     */
    public byte toByteValue() {
        return value;
    }

    /**
     * Creates a {@link RowKind} from the given byte value. Each {@link RowKind} has a byte value
     * representation.
     *
     * @see #toByteValue() for mapping of byte value and {@link RowKind}.
     */
    public static RowKind fromByteValue(byte value) {
        switch (value) {
            case 0:
                return INSERT;
            case 1:
                return UPDATE_BEFORE;
            case 2:
                return UPDATE_AFTER;
            case 3:
                return DELETE;
            default:
                throw new UnsupportedOperationException(
                        "Unsupported byte value '" + value + "' for row kind.");
        }
    }
}

3. flink内置的row对象

  1. toChangelogStream(Table): It produces a stream with instances of org.apache.flink.types.Row and sets the RowKind flag for every record at runtime. All kinds of updating tables are supported by this method. If the input table contains a single rowtime column, it will be propagated into a stream record’s timestamp. Watermarks will be propagated as well.
  2. toDatalogStream(Table):Converts a table into a stream of insert-only changes. The default stream record type is org.apache.flink.types.Row. A single rowtime attribute column is written back into the DataStream API’s record. Watermarks are propagated as well.
    英文表达的很好,而且上面两小节也有介绍,这里就不翻译了,这两个方法都有提到:org.apache.flink.types.Row,默认会转化为这个对象,对toChangelogStream 来说,Row中还包含了一个RowKind对象。
@PublicEvolving
public final class Row implements Serializable {

    private static final long serialVersionUID = 3L;

    /** The kind of change a row describes in a changelog. */
    private RowKind kind;

    /** Fields organized by position. Either this or {@link #fieldByName} is set. */
    private final @Nullable Object[] fieldByPosition;

    /** Fields organized by name. Either this or {@link #fieldByPosition} is set. */
    private final @Nullable Map<String, Object> fieldByName;

    /** Mapping from field names to positions. Requires {@link #fieldByPosition} semantics. */
    private final @Nullable LinkedHashMap<String, Integer> positionByName;
   ...........
   ...........
   ...........
   ...........
   ...........
}

4. Schema

直接来看源码,源码中的注释说明了Schema是用来干嘛的:
This method allows to declare a Schema for the resulting table. The declaration is similar to a CREATE TABLE DDL in SQL and allows to:
1. enrich or overwrite automatically derived columns with a custom DataType reorder columns
使用自定义数据类型重新排序列来丰富或覆盖自动派生的列
3. add computed or metadata columns next to the physical columns
在物理列旁边添加计算列或元数据列
4. access a stream record’s timestamp
声明流数据的时间戳,事件时间表示event_time,处理时间就表示processing_time.
5. declare a watermark strategy or propagate the DataStream watermarks
声明水位线
6. declare a primary key
声明主键

感言:越讲到后面越难,已经不是上来就能读懂的文章了,因为涉及到flink前面很多的知识,没办法,尽力了。

5. 流到表转换的列类型推断规则如下

flink所有的类型的顶级接口为TypeInformation

  1. All subclasses of TypeInformation are mapped to logical types, including nullability that is aligned with Flink’s built-in serializers.

  2. Subclasses of TupleTypeInfoBase are translated into a row (for Row) or structured type (for tuples, POJOs, and case classes).
    TupleTypeInfoBase的子类被转化为row,或者是结构化为tuples,pojo对象,和scala中的case class
    子类

  3. The order of PojoTypeInfo fields is determined by a constructor with all fields as its parameters. If that is not found during the conversion, the field order will be alphabetical.

  4. GenericTypeInfo and other TypeInformation that cannot be represented as one of the listed org.apache.flink.table.api.DataTypes will be treated as a black-box RAW type. The current session configuration is used to materialize the serializer of the raw type. Composite nested fields will not be accessible then.
    不能用DataTypes 表示的类型,则无法转化,该数据会被封装为黑盒类型RAW, RAW中的字段什么的是无法用sql查询的。

5.1 流到表 字段名称映射规则

5.11.通过位置映射

以下demo适用于:tuples, rows, and case classes

DataStream<Tuple2<Long, Integer>> stream = ...


// 从左到右根据位置顺序依次给Tuple中的数据构造列名
Table table = tableEnv.fromDataStream(stream, $("myLong"), $("myInt"));
//其实就算我们不构造列名,flink内部也帮我们构造了列明,默认:f0,f1  .....fn
//因此可以基于以下方式重命名列名
Table table = tableEnv.fromDataStream(stream, $("f1").as("myInt"), $("f0").as("myLong"));

5.12 原子类型

Flink将基本类型(Integer、Double、String)或泛型类型(不能分析和分解的类型)视为原子类型。原子类型的DataStream被转换为具有单列的Table。列的类型是从原子类型推断出来的。可以指定列的名称。


DataStream<Long> stream = ...

// Convert DataStream into Table with field name "myLong"
Table table = tableEnv.fromDataStream(stream, $("myLong"));

5.13 java 对象(POJO)

在不指定字段名的情况下将POJO DataStream转换为表时,将使用原始POJO字段的名称。名称映射需要原始名称,不能通过位置来完成。字段可以使用别名(带有as关键字)重命名、重新排序和投影。

// Person is a POJO with fields "name" and "age"
DataStream<Person> stream = ...

// convert DataStream into Table with renamed fields "myAge", "myName" (name-based)
Table table = tableEnv.fromDataStream(stream, $("age").as("myAge"), $("name").as("myName"));

// convert DataStream into Table with projected field "name" (name-based)
Table table = tableEnv.fromDataStream(stream, $("name"));

// convert DataStream into Table with projected and renamed field "myName" (name-based)
Table table = tableEnv.fromDataStream(stream, $("name").as("myName"));
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-09-24 21:03:44  更:2022-09-24 21:06:22 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/15 20:32:31-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码