前言
因为工作需要使用到Calcite的虚拟列,而Flink的计算列使用了Calcite虚拟列的部分实现,于是调研了一些Flink计算列的实现原理。
Create
Parse
直接使用CalciteParser将String的Sql解析为SqlNode,虚拟列对应的SqlNode的SqlComputedColumn
在Flink的设计文档中,虚拟列也被称为计算列。其实应该是刻意与Calcite原生的虚拟列区分开,因为Flink的计算列在Calcite的虚拟列上进行了一些封装。
Validate
Flink不对DDL做validate,所以直接return
Convert
根据create table 对应的SqlNode即SqlCreateTable,生成CatalogTableImpl。CatalogTableImpl的ComputedColumn中多出来type信息
type的信息可以根据表达式推断出来
Execute
执行的时候,由不同的catalog实现自己的creatTable 逻辑
- GenericInMemoryCatalog#createTable
从CatalogTableITCase#testInsertSourceTableExpressionFields 单测可以看到Flink直接将CatalogTableImpl封装到了ResolvedCatalogTable,注册到GenericInMemoryCatalog的tables中
ResolvedCatalogTable的父接口是CatalogBaseTable,tables的类型为Map<ObjectPath, CatalogBaseTable>,所以ResolvedCatalogTable可以注册到tables
并没有找到使用虚拟列创建HiveTable的单测HiveTableFactoryTest#testGenericTable ,使所以我手动改写了一下以便查看虚拟列的create schema的信息被组织成Map<String, String> 的结构,通过hive的Table#setParameters 接口持久化下来
注意这种情况下,所有的schema都维护在parameters当中,sd的columns为空
Select
Parse
parse依旧复用Calcite的,此时只认识SqlIdentifier并不会区分出ComputedColumn
Validate
首先会根据properties生成TableSchema 持有TableSchema被层层封装进RelOptTableImpl,注册到Calcite的TableNamespace
注意rowType中是包含计算列的
Convert
Flink自己处理了计算列的转换 而不是使用Calcite的虚拟列convert逻辑
|