摘要
核心的英文并未删除,方便读者甄别我的理解和翻译。另外因为涉及到Table中如何传播 水位线和event_time所以读者需要知道DataStream api中的水位线知识,此知识请参考:DataStream 中的水位线,如果DataStream 中的水位线没读懂,建议不要往下看了,以防走火入魔。
官网的说法
在阅读下面之前,读者需要知道:fromChangelogStream()和fromDataStream()方法一样是将DataStream 转化为Table 的基础接口。只不过二者DataStream的流数据不同,fromChangelogStreamfrom中的DataStream数据携带操作符(增删改),也就是说数据流中连续的几条数据其实是对一条数据的操作记录,而不是真实的数据,你可以理解为这是用户对数据的操作日志,而fromDataStream中的DataStream中每条数据都是独立的。如果很难理解我在说什么请去查下什么是cdc,不理解cdc的概念请不要阅读此文章,以免走火入魔。
1.Insert-only 类型的流表转化
-
fromDataStream(DataStream): Interprets a stream of insert-only changes and arbitrary type as a table. Event-time and watermarks are not propagated by default. 默认DataStream中的事件时间和水位线不会传播到Table中。 -
fromDataStream(DataStream, Schema): Interprets a stream of insert-only changes and arbitrary type as a table. The optional schema allows to enrich column data types and add time attributes, watermarks strategies, other computed columns, or primary keys. Schema可以在DataStream->Table的过程中为Table添加新的字段,或者为Table添加时间属性,再或者为table提供水位线,以及为 table提供唯一主键。 -
createTemporaryView(String, DataStream): Registers the stream under a name to access it in SQL. It is a shortcut for createTemporaryView(String, fromDataStream(DataStream)). 基于fromDataStream(DataStream)创建了视图。 -
createTemporaryView(String, DataStream, Schema): Registers the stream under a name to access it in SQL. It is a shortcut for createTemporaryView(String, fromDataStream(DataStream, Schema)). 基于fromDataStream(DataStream, Schema)创建了视图 -
toDataStream(DataStream): Converts a table into a stream of insert-only changes. The default stream record type is org.apache.flink.types.Row. A single rowtime attribute column is written back into the DataStream API’s record. Watermarks are propagated as well. 默认会传播时间和水位线:时间属性字段名用rowtime ,此时间属性是event_time还是processing_time和table定义有关。 -
toDataStream(DataStream, AbstractDataType): Converts a table into a stream of insert-only changes. This method accepts a data type to express the desired stream record type. The planner might insert implicit casts and reorders columns to map columns to fields of the (possibly nested) data type. table到Stream的时候,table中的行数据用指定的AbstractDataType接收 -
toDataStream(DataStream, Class): A shortcut for toDataStream(DataStream, DataTypes.of(Class)) to quickly create the desired data type reflectively. table到Stream的时候,table中的行数据用指定的Class接收
1.1 demo小例子
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import java.time.Instant;
public class Student {
public int id;
public String name;
public String sex;
public Student() {
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getSex() {
return sex;
}
public void setSex(String sex) {
this.sex = sex;
}
@Override
public String toString() {
return "Student{" +
"id=" + this.id +
", name='" + this.name + '\'' +
", sex='" + this.sex + '\'' +
'}';
}
}
DataStream<User> dataStream =
env.fromElements(
new User("Alice", 4, Instant.ofEpochMilli(1000)),
new User("Bob", 6, Instant.ofEpochMilli(1001)),
new User("Alice", 10, Instant.ofEpochMilli(1002)));
**默认flink会尽可能做动态转换**
Table table = tableEnv.fromDataStream(dataStream);
table.printSchema();
Table table = tableEnv.fromDataStream(
dataStream,
Schema.newBuilder()
**为每一行数据添加一个处理时间,该字段名称proc_time,值是PROCTIME()计算出来的**
.columnByExpression("proc_time", "PROCTIME()")
.build());
table.printSchema();
Table table =
tableEnv.fromDataStream(
dataStream,
Schema.newBuilder()
.columnByExpression("rowtime", "CAST(event_time AS TIMESTAMP_LTZ(3))")
.watermark("rowtime", "rowtime - INTERVAL '10' SECOND")
.build());
table.printSchema();
Table table =
tableEnv.fromDataStream(
dataStream,
Schema.newBuilder()
.columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)")
.watermark("rowtime", "SOURCE_WATERMARK()")
.build());
table.printSchema();
Table table =
tableEnv.fromDataStream(
dataStream,
Schema.newBuilder()
.column("event_time", "TIMESTAMP_LTZ(3)")
.column("name", "STRING")
.column("score", "INT")
.watermark("event_time", "SOURCE_WATERMARK()")
.build());
table.printSchema();
2.change log 表流转换
Internally, Flink’s table runtime is a changelog processor. The concepts page describes how dynamic tables and streams relate to each other. A StreamTableEnvironment offers the following methods to expose these change data capture (CDC) functionalities: 9. fromChangelogStream(DataStream): Interprets a stream of changelog entries as a table. The stream record type must be org.apache.flink.types.Row since its RowKind flag is evaluated during runtime. Event-time and watermarks are not propagated by default. This method expects a changelog containing all kinds of changes (enumerated in org.apache.flink.types.RowKind) as the default ChangelogMode. (默认不传播水位线,流数据中的每条数据都要用Row对象分装,且Row对象中有个属性RowKind ,表明的数据的操作类型) 10. fromChangelogStream(DataStream, Schema) Schema中可以定义列的信息,可以定义水位线和event_time的信息 11. fromChangelogStream(DataStream, Schema, ChangelogMode): Gives full control about how to interpret a stream as a changelog. The passed ChangelogMode helps the planner to distinguish between ** insert-only, upsert, or retract behavior.** 4. toChangelogStream(Table): Reverse operation of fromChangelogStream(DataStream). It produces a stream with instances of org.apache.flink.types.Row and sets the RowKind flag for every record at runtime. All kinds of updating tables are supported by this method. If the input table contains a single rowtime column, it will be propagated into a stream record’s timestamp(即事件的时间event_time). Watermarks will be propagated as well.(水位线也会被传播,rowtime列会自动转换为event_time) 5. toChangelogStream(Table, Schema): Reverse operation of fromChangelogStream(DataStream, Schema). The method can enrich the produced column data types. The planner might insert implicit(隐士的) casts if necessary. It is possible to write out the rowtime as a metadata column. 6. toChangelogStream(Table, Schema, ChangelogMode): Gives full control about how to convert a table to a changelog stream. The passed ChangelogMode helps the planner to distinguish(辨别) between insert-only, upsert, or retract behavior.
2.1 demo例子
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
DataStream<Row> dataStream =
env.fromElements(
Row.ofKind(RowKind.INSERT, "Alice", 12),
Row.ofKind(RowKind.INSERT, "Bob", 5),
Row.ofKind(RowKind.UPDATE_BEFORE, "Alice", 12),
Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 100));
Table table = tableEnv.fromChangelogStream(dataStream);
tableEnv.createTemporaryView("InputTable", table);
tableEnv
.executeSql("SELECT f0 AS name, SUM(f1) AS score FROM InputTable GROUP BY f0")
.print();
DataStream<Row> dataStream =
env.fromElements(
Row.ofKind(RowKind.INSERT, "Alice", 12),
Row.ofKind(RowKind.INSERT, "Bob", 5),
Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 100));
Table table =
tableEnv.fromChangelogStream(
dataStream,
Schema.newBuilder().primaryKey("f0").build(),
ChangelogMode.upsert());
tableEnv.createTemporaryView("InputTable", table);
tableEnv
.executeSql("SELECT f0 AS name, SUM(f1) AS score FROM InputTable GROUP BY f0")
.print();
示例1中显示的默认ChangelogMode对于大多数用例应该足够了,因为它接受所有类型的更改。
示例2展示了如何使用upsert模式将更新消息的数量减少50%,从而限制传入更改的类型以提高效率。
可以通过为toChangelogStream定义一个主键和upsert更改日志管理模式来减少结果消息的数量。
下面是RowKind的源码:
public enum RowKind {
INSERT("+I", (byte) 0),
UPDATE_BEFORE("-U", (byte) 1),
UPDATE_AFTER("+U", (byte) 2),
DELETE("-D", (byte) 3);
private final String shortString;
private final byte value;
RowKind(String shortString, byte value) {
this.shortString = shortString;
this.value = value;
}
public String shortString() {
return shortString;
}
public byte toByteValue() {
return value;
}
public static RowKind fromByteValue(byte value) {
switch (value) {
case 0:
return INSERT;
case 1:
return UPDATE_BEFORE;
case 2:
return UPDATE_AFTER;
case 3:
return DELETE;
default:
throw new UnsupportedOperationException(
"Unsupported byte value '" + value + "' for row kind.");
}
}
}
3. flink内置的row对象
- toChangelogStream(Table): It produces a stream with instances of org.apache.flink.types.Row and sets the RowKind flag for every record at runtime. All kinds of updating tables are supported by this method. If the input table contains a single rowtime column, it will be propagated into a stream record’s timestamp. Watermarks will be propagated as well.
- toDatalogStream(Table):Converts a table into a stream of insert-only changes. The default stream record type is org.apache.flink.types.Row. A single rowtime attribute column is written back into the DataStream API’s record. Watermarks are propagated as well.
英文表达的很好,而且上面两小节也有介绍,这里就不翻译了,这两个方法都有提到:org.apache.flink.types.Row,默认会转化为这个对象,对toChangelogStream 来说,Row中还包含了一个RowKind对象。
@PublicEvolving
public final class Row implements Serializable {
private static final long serialVersionUID = 3L;
private RowKind kind;
private final @Nullable Object[] fieldByPosition;
private final @Nullable Map<String, Object> fieldByName;
private final @Nullable LinkedHashMap<String, Integer> positionByName;
...........
...........
...........
...........
...........
}
4. Schema
直接来看源码,源码中的注释说明了Schema是用来干嘛的: This method allows to declare a Schema for the resulting table. The declaration is similar to a CREATE TABLE DDL in SQL and allows to: 1. enrich or overwrite automatically derived columns with a custom DataType reorder columns 使用自定义数据类型重新排序列来丰富或覆盖自动派生的列 3. add computed or metadata columns next to the physical columns 在物理列旁边添加计算列或元数据列 4. access a stream record’s timestamp 声明流数据的时间戳,事件时间表示event_time,处理时间就表示processing_time. 5. declare a watermark strategy or propagate the DataStream watermarks 声明水位线 6. declare a primary key 声明主键
感言:越讲到后面越难,已经不是上来就能读懂的文章了,因为涉及到flink前面很多的知识,没办法,尽力了。
5. 流到表转换的列类型推断规则如下
flink所有的类型的顶级接口为TypeInformation
-
All subclasses of TypeInformation are mapped to logical types, including nullability that is aligned with Flink’s built-in serializers. -
Subclasses of TupleTypeInfoBase are translated into a row (for Row) or structured type (for tuples, POJOs, and case classes). TupleTypeInfoBase的子类被转化为row,或者是结构化为tuples,pojo对象,和scala中的case class -
The order of PojoTypeInfo fields is determined by a constructor with all fields as its parameters. If that is not found during the conversion, the field order will be alphabetical. -
GenericTypeInfo and other TypeInformation that cannot be represented as one of the listed org.apache.flink.table.api.DataTypes will be treated as a black-box RAW type. The current session configuration is used to materialize the serializer of the raw type. Composite nested fields will not be accessible then. 不能用DataTypes 表示的类型,则无法转化,该数据会被封装为黑盒类型RAW, RAW中的字段什么的是无法用sql查询的。
5.1 流到表 字段名称映射规则
5.11.通过位置映射
以下demo适用于:tuples, rows, and case classes
DataStream<Tuple2<Long, Integer>> stream = ...
Table table = tableEnv.fromDataStream(stream, $("myLong"), $("myInt"));
Table table = tableEnv.fromDataStream(stream, $("f1").as("myInt"), $("f0").as("myLong"));
5.12 原子类型
Flink将基本类型(Integer、Double、String)或泛型类型(不能分析和分解的类型)视为原子类型。原子类型的DataStream被转换为具有单列的Table。列的类型是从原子类型推断出来的。可以指定列的名称。
DataStream<Long> stream = ...
Table table = tableEnv.fromDataStream(stream, $("myLong"));
5.13 java 对象(POJO)
在不指定字段名的情况下将POJO DataStream转换为表时,将使用原始POJO字段的名称。名称映射需要原始名称,不能通过位置来完成。字段可以使用别名(带有as关键字)重命名、重新排序和投影。
DataStream<Person> stream = ...
Table table = tableEnv.fromDataStream(stream, $("age").as("myAge"), $("name").as("myName"));
Table table = tableEnv.fromDataStream(stream, $("name"));
Table table = tableEnv.fromDataStream(stream, $("name").as("myName"));
|