Blink创建数据维表
概述
在维表DDL语法中增加1行PERIOD FOR SYSTEM_TIME的声明,定义维表的变化周期,即可使用标准的CREATE TABLE语法定义实时计算维表。
示例
CREATE TABLE white_list (
id varchar,
name varchar,
age int,
PRIMARY KEY (id),
PERIOD FOR SYSTEM_TIME
) with (
type = 'RDS',
...
);
- 维表必须指定主键。维表JOIN时,ON的条件必须包含所有主键的等值条件。
- 目前仅支持源表
INNER JOIN 或LEFT JOIN 维表。 - 维表的唯一键(UK)必须为数据库表中的唯一键。如果维表声明的唯一键不是数据库表的唯一键会产生以下影响:
- 维表的读取速度变慢。
- 在维表JOIN时,会从第一条数据进行JOIN,在加入Job的过程中,相同KEY的多条记录在数据库中按顺序发生变化,可能导致JOIN结果错误。
INDEX语法
维表定义要求声明PRIMARY KEY ,这种情况下只能实现一对一连接。为支持一对多连接的需求,引入了INDEX 语法。非Cache All 的维表JOIN通过INDEX LOOKUP 的方式实现一对多连接的需求。
CREATE TABLE Persons (
ID bigint,
LastName varchar,
FirstName varchar,
Nick varchar,
Age int,
[UNIQUE] INDEX(LastName,FirstName,Nick),
PERIOD FOR SYSTEM_TIME
) with (
type='RDS',
...
);
UNIQUE INDEX 表示一对一连接,而INDEX 表示一对多连接。
注意事项
- 实时计算2.2.7及以后版本支持
UNIQUE CONSTRAINT (UNIQUE KEY ),实时计算2.2.7以下版本可以使用PRIMARY KEY 的定义。 - 在生成执行计划时,引擎优先采用
UNIQUE INDEX 。即如果DDL中使用INDEX,但JOIN等值连接条件中同时包含UNIQUE 和NON-UNIQUE INDEX 时,优先使用UNIQUE INDEX 查找右表数据。 - 支持一对多连接的维表类型,例如RDS和MaxCompute。
- 您可以增加
maxJoinRows 参数,表示在一对多连接时,左表一条记录连接右表的最大记录数(默认值为1024)。在一对多连接的记录数过多时,可能会极大的影响流任务的性能,因此您需要增大Cache的内存(cacheSize 限制的是左表key的个数)。 - 表格存储Tablestore和Hologres维表不支持使用INDEX进行一对多JOIN。
维表、源表和结果表的区别
类别 | 源表 | 结果表 | 维表 |
---|
是否能驱动计算 | 是 | 否 | 否 | 是否能读取数据 | 是,直接读取。 | 否 | 是,仅通过源表和维表JOIN读取。 | 是否能写入数据 | 否 | 是 | 否 | 是否支持Cache | 否 | 否 | 是 |
创建交互式分析Hologres维表
什么是交互式分析Hologres
交互式分析Hologres是实时交互分析产品,兼容PostgreSQL协议,与大数据生态紧密连接,支持高并发、低延时实时分析与处理PB级数据,可以轻松使用现有BI(Business Intelligence)工具对数据进行多维分析和业务探索。
使用限制
创建Hologres维表时建议选择行存模式,列存模式对于点查场景性能开销较大。
选择行存模式创建维表时必须设置主键,并且将主键设置为clustering key才可以工作。示例语句如下:
begin;
create table test(a int primary key, b text, c text, d float8, e int8);
call set_table_property('test', 'orientation', 'row');
call set_table_property('test', 'clustering_key', 'a');
commit;
Hologres维表的主键必须是Blink Join On的字段,Blink Join On的字段也必须是维表完整的主键字段,两者必须完全匹配。
Hologres Blink Connector的维表功能不支持一对多的输出。
不支持读取Hologres分区表的数据。
语法示例
CREATE TABLE hologres_dim_table(
id INT,
len INT,
content VARCHAR,
PRIMARY KEY (id),
PERIOD FOR SYSTEM_TIME
) WITH (
type='hologres',
endpoint='...',
dbname='...',
tablename='...',
username='...',
password='...'
);
WITH参数
参数 | 描述 | 是否必选 | 备注 |
---|
type | 数据库类型。 | 是 | 固定值为hologres。 | endpoint | Hologres端点。 | 是 | 无。 | tablename | 表名称。 | 是 | 无。 | dbname | 数据库名称。 | 是 | 无。 | username | 用户名称。 | 是 | 无。 | password | 密码。 | 是 | 无。 |
备注 如果Schema不为Public时,则tableName需要填写为schema.tableName。
CACHE参数
参数 | 描述 | 是否必选 | 示例值 |
---|
cache | 缓存策略。 | 否 | 目前交互式分析Hologres维表支持两种缓存策略:None(默认值):无缓存。LRU:缓存维表里的部分数据。 | cacheSize | 缓存大小。 | 否 | 当缓存策略选择LRU时,可以设置缓存大小,默认为10000行。 | cacheTTLMs | 缓存失效时间,单位为毫秒。 | 否 | 当缓存策略选择LRU时,可以设置缓存失效时间,默认不过期。 | partitionedJoin | 是否根据JoinKey进行分区。 | 否 | 参数的取值:false(默认值):不根据JoinKey进行分区。true:根据JoinKey进行分区,将数据分发到各JOIN节点,提高缓存命中率。 | async | 是否异步读取数据。 | 否 | 默认值:false |
缓存注意事项
- 源表的每条数据都会触发系统先在Cache中查找数据,如果没有找到,则去物理维表中查找。
- 需要配置相关参数:缓存大小(cacheSize)和缓存更新时间间隔(cacheTTLMs)。
类型映射
Hologres字段类型 | 实时计算Flink版字段类型 |
---|
INT | INT | INT[] | ARRAY<INT> | BIGINT | BIGINT | BIGINT[] | ARRAY<BIGINT> | REAL | FLOAT | REAL[] | ARRAY<FLOAT> | DOUBLE PRECISION | DOUBLE | DOUBLE PRECISION[] | ARRAY<DOUBLE> | BOOLEAN | BOOLEAN | BOOLEAN[] | ARRAY<BOOLEAN> | TEXT | VARCHAR | TEXT[] | ARRAY<VARCHAR> | NUMERIC | DECIMAL | DATE | DATE | TIMESTAMP WITH TIMEZONE | TIMESTAMP |
代码示例
create table randomSource (a int, b VARCHAR, c VARCHAR) with (type = 'random');
create table test (
a int,
b VARCHAR,
c VARCHAR,
PRIMARY KEY (a, b), PERIOD FOR SYSTEM_TIME
) with (
type = 'hologres',
...
);
create table print_sink (
a int,
b VARCHAR
) with (
type = 'print',
`ignoreWrite` = 'false'
);
insert into print_sink
select randomSource.a, test.b from randomSource
LEFT JOIN test FOR SYSTEM_TIME AS OF PROCTIME()
on randomSource.a = test.a and randomSource.b = test.b;
创建表格存储Tablestore维表
什么是表格存储Tablestore
表格存储Tablestore是构建在阿里云飞天分布式系统之上的分布式NoSQL数据存储服务。表格存储通过数据分片和负载均衡技术,实现数据规模与访问并发上的无缝扩展,提供海量结构化数据的存储和实时访问服务。其功能和特性类似开源的HBase。
示例
CREATE TABLE ots_dim_table (
id int,
len int,
content VARCHAR,
PRIMARY KEY (id),
PERIOD FOR SYSTEM_TIME
) WITH (
type='ots',
endPoint='<yourEndpoint>',
instanceName='<yourInstanceName>',
tableName='<yourTableName>',
accessId='<yourAccessId>',
accessKey='<yourAccessKey>'
);
- 在声明维表时,必须要指名主键。
- 在维表JOIN时,ON条件必须包含所有主键的等值条件。
- Tablestore的主键即表的Rowkey。
WITH参数
参数 | 说明 | 备注 |
---|
type | 维表类型 | 固定值为ots 。 | endPoint | 表格存储的实例访问地址 | VPC网络环境需要选择实例的VPC Endpoint。 | instanceName | 表格存储的实例名称 | 无 | tableName | 表格存储的数据表名 | 无 | accessId | 表格存储读取的AccessKey ID | 无 | accessKey | 表格存储读取的密钥AccessKey Secret | 无 |
CACHE参数
参数 | 说明 | 备注 |
---|
cache | 缓存策略 | 表格存储维表支持两种缓存策略:None(默认值):无缓存。LRU:缓存维表里的部分数据。 | cacheSize | 缓存大小 | 当选择LRU缓存策略后,可以设置缓存大小,默认为10000行。 | cacheTTLMs | 缓存超时时间,单位为毫秒。 | 当选择LRU缓存策略后,可以设置缓存失效的超时时间。 |
代码示例
CREATE TABLE datahub_input1 (
id BIGINT,
name VARCHAR,
age BIGINT
) WITH (
type='datahub'
);
CREATE TABLE phoneNumber(
name VARCHAR,
phoneNumber bigint,
primary key(name),
PERIOD FOR SYSTEM_TIME
)with(
type='ots'
);
CREATE TABLE result_infor(
id bigint,
phoneNumber bigint,
name VARCHAR
)with(
type='rds'
);
INSERT INTO result_infor
SELECT
t.id,
w.phoneNumber,
t.name
FROM datahub_input1 as t
JOIN phoneNumber FOR SYSTEM_TIME AS OF PROCTIME() as w
ON t.name = w.name;
创建云数据库RDS MySQL版维表
语法示例
CREATE TABLE rds_dim_table(
id INT,
len INT,
content VARCHAR,
PRIMARY KEY (id),
PERIOD FOR SYSTEM_TIME
) with (
type='rds',
url='<yourDatabaseURL>',
tableName='<yourDatabaseTableName>',
userName='<yourDatabaseUserName>',
password='<yourDatabasePassword>'
);
声明维表时,必须要指名主键。维表JOIN时,ON条件必须包含所有主键的等值条件。RDS或DRDS的主键可以定义为表的主键或唯一索引列。
WITH参数
参数 | 说明 | 是否必填 | 备注 |
---|
type | 维表类型 | 是 | 固定值为rds 。 | url | JDBC(Java DataBase Connectivity)连接地址 | 是 | URL的格式为:jdbc:mysql://<内网地址>/<databaseName> ,其中databaseName为对应的数据库名称。 | tableName | 表名 | 是 | 无 | userName | 用户名 | 是 | 无 | password | 密码 | 是 | 无 | maxRetryTimes | 最大尝试连接次数 | 否 | 默认值为10。 |
CACHE参数
参数 | 说明 | 是否必填 | 备注 |
---|
cache | 缓存策略 | 否 | 云数据库RDS(DRDS)版维表支持三种缓存策略:None(默认值):无缓存。LRU:缓存维表里的部分数据。ALL:缓存维表里的所有数据。 | cacheSize | 缓存大小 | 否 | 当选择LRU 缓存策略后,可以设置缓存大小,默认10000行。 | cacheTTLMs | 缓存超时时间,单位为毫秒 | 否 | 当选择LRU 缓存策略后,可以设置缓存失效的超时时间,默认不过期。当选择ALL 策略,则为缓存加载的间隔时间,默认不重新加载。 | cacheReloadTimeBlackList | 缓存策略选择ALL 时启用。更新时间黑名单,防止在此时间内做cache更新(例如双11场景)。 | 否 | 默认为空。自定义输入格式为2017-10-24 14:00 -> 2017-10-24 15:00, 2017-11-10 23:30 -> 2017-11-11 08:00 。用逗号(,)来分隔多个黑名单,用箭头(->)来分割黑名单的起始结束时间。 | maxJoinRows | 主表中每一条数据查询维表时,匹配后最多返回的结果数。 | 否 | 默认值为1024。如果您可以预估一条数据对应的维表数据最多为n条,则可以设置maxJoinRows=‘n’,以确保实时计算匹配处理效率。 |
- 源表的每条数据都会触发系统先在Cache中查找数据,如果没有找到,则去物理维表中查找。
- 需要配置相关参数:缓存大小(cacheSize)和缓存更新时间间隔(cacheTTLMs)。
- 在Job运行前,系统会将维表中所有数据加载到Cache中,之后所有的维表查找数据都会通过Cache进行。如果在Cache中无法找到数据,则KEY不存在,并在Cache过期后重新加载一遍全量Cache。适用于远程表数据量小且MISS KEY(源表数据和维表JOIN时,ON条件无法关联)特别多的场景。需要配置相关参数:缓存更新时间间隔(cacheTTLMs),更新时间黑名单(cacheReloadTimeBlackList)。
- 因为系统会异步加载维表数据,所以在使用CACHE ALL时,需要增加维表JOIN节点的内存,增加的内存大小为远程表数据量的两倍。
- 进行Join时,主表输入一条数据,对应维表匹配后返回的结果总数受该参数限制。
代码示例
CREATE TABLE datahub_input1 (
id BIGINT,
name VARCHAR,
age BIGINT
) WITH (
type='datahub',
endPoint='http://dh-cn-hangzhou.aliyun-inc.com',
project='<yourProjectName>',
topic='<yourTopic>',
accessId='<yourAccessID>',
accessKey='<yourAccessSecret>',
startTime='2017-07-21 00:00:00'
);
create table phoneNumber(
name VARCHAR,
phoneNumber BIGINT,
primary key(name),
PERIOD FOR SYSTEM_TIME
)WITH(
type='rds',
url='<yourDatabaseURL>',
tableName='<yourDatabaseTableName>',
userName='<yourDatabaseUserName>',
password='<yourDatabasePassword>'
);
CREATE table result_infor(
id BIGINT,
phoneNumber BIGINT,
name VARCHAR
)WITH(
type='rds',
url='<yourDatabaseURL>',
tableName='<yourDatabaseTableName>',
userName='<yourDatabaseUserName>',
password='<yourDatabasePassword>'
);
INSERT INTO result_infor
SELECT
t.id,
w.phoneNumber,
t.name
FROM datahub_input1 as t
JOIN phoneNumber FOR SYSTEM_TIME AS OF PROCTIME() as w
ON t.name = w.name;
类型映射
RDS字段类型 | 实时计算Flink版字段类型 |
---|
BOOLEAN | BOOLEAN | TINYINT | TINYINT | SMALLINT | SMALLINT | INT | INT | BIGINT | BIGINT | FLOAT | FLOAT | DECIMAL | DECIMAL | DOUBLE | DOUBLE | DATE | DATE | TIME | TIME | TIMESTAMP | TIMESTAMP | VARCHAR | VARCHAR | VARBINARY | VARBINARY |
创建云数据库HBase版维表
注意事项
实时计算HBase维表不支持自建的开源HBase。
HBase维表仅支持一个PK(Primary Key)。
DDL定义
HBase企业标准版
CREATE TABLE hbase (
`key` varchar,
`name` varchar,
PRIMARY KEY (`key`),
PERIOD FOR SYSTEM_TIME
) with (
TYPE = 'cloudhbase',
zkQuorum = '<yourzkQuorum>',
columnFamily = '<yourColumnFamilyName>',
tableName = '<yourTableName>'
);
HBase性能增强版
CREATE TABLE hbase (
`key` varchar,
`name` varchar,
PRIMARY KEY (`key`),
PERIOD FOR SYSTEM_TIME
) with (
TYPE = 'cloudhbase',
endPoint = '<host:port>',
userName = 'root',
password = 'root',
columnFamily = '<yourColumnFamilyName>',
tableName = '<yourTableName>'
);
Blink-3.5.0以上HBase性能增强版
create table liuxd_user_behavior_test_front (
row_key varchar,
from_topic varchar,
origin_data varchar,
record_create_time varchar,
primary key (row_key)
) with (
type = 'cloudhbase',
zkQuorum = '<host:port>',
userName = 'root',
password = 'root',
columnFamily = '<yourColumnFamily>',
tableName = '<yourTableName>',
batchSize = '500'
);
Blink-3.5.0以上支持HBase写入主备切换
create table liuxd_user_behavior_test_front (
row_key varchar,
from_topic varchar,
origin_data varchar,
record_create_time varchar,
primary key (row_key)
) with (
type = 'cloudhbase',
zkQuorum = '<host:port>',
haClusterID = 'ha-xxx',
userName = 'root',
password = 'root',
columnFamily = '<yourColumnFamily>',
tableName = '<yourTableName>',
batchSize = '500'
);
备注
- 在声明维表时,必须要指名主键。
- 在维表进行JOIN时,ON的条件必须包含所有主键的等值条件。示例中HBase中的主键是row_key。
- HBase企业标准版和HBase性能增强版DDL的区别为连接参数不同:
- HBase企业标准版:
zkQuorum 。 - HBase性能增强版:
endPoint 。 - Blink 3.5.0以上标准版和增强版:
zkQuorum 。
WITH参数
参数 | 说明 | 是否必填 | 备注 |
---|
type | 维表类型 | 是 | 固定值为cloudhbase 。 | zkQuorum | HBase集群配置的zk地址,是以逗号(,)分隔的主机列表。 | 是 | 可以在hbase-site.xml文件中查看hbase.zookeeper.quorum相关配置。 仅在HBase企业标准版中生效。 | zkNodeParent | 集群配置在zk上的路径 | 否 | 可以在hbase-site.xml文件中查看hbase.zookeeper.quorum相关配置。仅在HBase企业标准版中生效。 | endPoint | HBase地域名称 | 是 | 可在购买的HBase实例控制台中获取。仅在HBase性能增强版中生效。 | userName | 用户名 | 否 | 仅在HBase性能增强版中生效。 | password | 密码 | 否 | 仅在HBase性能增强版中生效。 | tableName | HBase表名 | 是 | 无 | columnFamily | 列族名 | 是 | 仅支持插入同一列族。 | maxRetryTimes | 最大尝试次数 | 否 | 默认值为10次。 | partitionedJoin | 是否使用joinKey进行分区。 | 否 | 默认值为False。在设置partitionedJoin为True时,使用joinKey进行分区,将数据分发到各JOIN节点,提高缓存命中率。 | shuffleEmptyKey | 是否将上游EMPTY KEY随机发送到下游节点。 | 否 | 默认值为True。参数取值如下:True:如果上游有多个EMPTY KEY,会将所有EMPTY KEY随机发送到各个JOIN节点。False:如果上游有多个EMPTY KEY,会将所有EMPTY KEY发送至一个JOIN节点。 |
shuffleEmptyKey在partitionedJoin生效后才能使用。
CACHE参数
参数 | 说明 | 是否必填 | 备注 |
---|
cache | 缓存策略 | 否 | HBase维表支持以下三种缓存策略:None(默认值):无缓存。LRU:缓存维表里的部分数据。ALL:缓存维表里的所有数据。 | cacheSize | 缓存大小 | 否 | 当选择LRU 缓存策略后,可以设置缓存大小,默认为10000行。 | cacheTTLMs | 缓存超时时间,单位为毫秒。 | 否 | 当选择LRU 缓存策略后,可以设置缓存失效的超时时间,默认不过期。当选择ALL 策略,则为缓存加载的间隔时间,默认不重新加载。 | cacheReloadTimeBlackList | 缓存策略选择ALL 时启用。更新时间黑名单,防止在此时间内做Cache更新(例如双11场景)。 | 否 | 默认为空。自定义输入格式如下。2017-10-24 14:00 -> 2017-10-24 15:00, 2017-11-10 23:30 -> 2017-11-11 08:00 使用逗号(,)分隔多个黑名单,使用箭头(->)分割黑名单的起始和结束时间。 | cacheScanLimit | 缓存策略选择ALL 时启用。读取全量HBase数据,服务端一次RPC返回给客户端的行数。 | 否 | 默认值为100条。 |
代码示例
create table source (
id TINYINT,
name BIGINT
) with (
type = 'random'
);
create table dim (
id TINYINT,
score BIGINT
primary key(id),
PERIOD FOR SYSTEM_TIME
)with(
type = 'cloudhbase',
zkQuorum = '<yourzkQuorum>',
columnFamily = '<yourColumnFamilyName>',
tableName = '<yourTableName>'
);
CREATE table result_infor(
id BIGINT,
score BIGINT
)with(
type='rds'
);
INSERT INTO result_infor
SELECT
t.id,
w.score
FROM source as t
JOIN dim FOR SYSTEM_TIME AS OF PROCTIME() as w
ON t.id = w.id;
创建MaxCompute维表
DDL定义
CREATE TABLE white_list (
id varchar,
name varchar,
age int,
PRIMARY KEY (id),
PERIOD FOR SYSTEM_TIME
) WITH (
type = 'odps',
endPoint = '<YourEndPoint>',
project = '<YourProjectName>',
tableName = '<YourtableName>',
accessId = '<yourAccessKeyId>',
accessKey = '<yourAccessKeySecret>',
`partition` = 'ds=2018****',
cache = 'ALL'
);
备注
- 声明维表时,必须要指名主键,MaxCompute维表主键必须具有唯一性,否则会被去重。
- 在维表进行JOIN时,ON条件必须包含所有主键的等值条件。
- partition是关键字,需要使用反引号(`)注释,例如
partition 。 - 如果是分区表,目前不支持将分区列写入到DDL定义中。
WITH参数
参数 | 说明 | 是否必填 | 备注 |
---|
type | 维表类型。 | 是 | 固定值为odps 。 | endPoint | MaxCompute服务地址。 | 是 | 请参见Endpoint。 | tunnelEndpoint | MaxCompute Tunnel服务的连接地址。 | 是 | 请参见Endpoint。 | project | MaxCompute项目名称。 | 是 | 无。 | tableName | 表名。 | 是 | 无。 | accessId | AccessKey ID。 | 是 | 无。 | accessKey | AccessKey Secret。 | 是 | 无。 | partition | 分区名。 | 否 | 详见备注 | maxRowCount | 可加载的最大表格数量。 | 否 | 默认值为100000。如果您的数据超过100000,需要设置maxRowCount参数。建议设定值比实际值大。 |
备注
固定分区
-
只存在一个分区MaxCompute表 例如,如果只存在1个分区列ds ,则``partition= 'ds=20180905' 表示读ds=20180905 分区的数据。 -
存在多个分区的MaxCompute表 例如,如果存在2个分区列ds 和hh ,则``partition='ds=20180905,hh=*' 表示读ds=20180905 分区的数据。 分区过滤时需要声明所有分区的值。例如,上述示例中,只声明``partition= 'ds=20180905' ,则不会读取任何分区。
非固定分区
- Blink 2.2.0及以上版本支持``partition
= ‘max_pt()’ `功能, 即每次加载所有分区列表中字典序最大的分区。 - Blink 3.2.2及以上版本支持``partition
= 'max_pt_with_done()' 功能,即每次加载所有分区列表中字典序最大且伴随有.done 的分区。
CACHE参数
参数 | 说明 | 备注 |
---|
cache | 缓存策略 | 详见备注。 | cacheSize | 缓存大小 | 可以设置缓存大小,MaxCompute默认缓存值为100000行。 | cacheTTLMs | 缓存超时时间 | 单位为毫秒,当cache选择为ALL 策略,则为缓存加载的间隔时间,默认为不重新加载。 | cacheReloadTimeBlackList | 更新时间黑名单。在缓存策略选择为ALL时,启用更新时间黑名单,防止在此时间内做Cache更新(例如双11场景)。 | 默认为空,格式为2017-10-24 14:00 -> 2017-10-24 15:00, 2017-11-10 23:30 -> 2017-11-11 08:00 。分隔符的使用情况如下所示:用逗号, 来分隔多个黑名单。用箭头-> 来分割黑名单的起始结束时间。 | partitionedJoin | 当开启PartitionedJoin优化时,每个并发内存里只缓存维表的部分数据,即该并发上需要的缓存数据。 | 可选,默认值为false,表示每个并发内存里缓存全量维表数据。 |
备注
目前MaxCompute维表仅支持ALL 策略,必须显式声明。
ALL策略:缓存维表里的所有数据。在Job运行前,系统会将维表中所有数据加载到Cache中,之后所有的维表查询都会通过Cache进行。如果在Cache中无法找到数据,则KEY不存在,并在Cache过期后重新加载一遍全量Cache。
适用于远程表数据量小且MISS KEY(源表数据和维表JOIN时,ON条件无法关联)特别多的场景。需要配置缓存更新时间间隔(cacheTTLMs)和更新时间黑名单(cacheReloadTimeBlackList)参数。
- 因为系统会异步加载维表数据,所以在使用CACHE ALL时,需要增加维表JOIN节点的内存,增加的内存大小为远程表数据量的至少4倍,具体值与MaxCompute存储压缩算法有关。
- 在使用超大MaxCompute维表时,如果频繁GC(Allocation Failure)导致作业异常,且在增加维表JOIN节点的内存仍无改善的情况下,建议:
- Blink 3.6.0及以后版本,设置参数partitionedJoin = ‘true’ ,即打开PartitionedJoin优化。
- 改为支持LRU 缓存策略的KV型维表,例如云数据库Hbase版维表。
代码示例
CREATE TABLE datahub_input1 (
id BIGINT,
name VARCHAR,
age BIGINT
) with (
type='datahub'
);
CREATE TABLE odps_dim (
name VARCHAR,
phoneNumber BIGINT,
PRIMARY KEY (name),
PERIOD FOR SYSTEM_TIME
) with (
type = 'odps',
endPoint = '<yourEndpointName>',
project = '<yourProjectName>',
tableName = '<yourTableName>',
accessId = '<yourAccessId>',
accessKey = '<yourAccessPassword>',
`partition` = 'ds=20180905',
cache = 'ALL'
);
CREATE table result_infor(
id BIGINT,
phoneNumber BIGINT,
name VARCHAR
)with(
type='print'
);
INSERT INTO result_infor
SELECT
t.id,
w.phoneNumber,
t.name
FROM datahub_input1 as t
JOIN odps_dim FOR SYSTEM_TIME AS OF PROCTIME() as w
ON t.name = w.name;
类型映射
MaxCompute | BLINK |
---|
TINYINT | TINYINT | SMALLINT | SMALLINT | INT | INT | BIGINT | BIGINT | FLOAT | FLOAT | DOUBLE | DOUBLE | BOOLEAN | BOOLEAN | DATETIME | TIMESTAMP | TIMESTAMP | TIMESTAMP | VARCHAR | VARCHAR | DECIMAL | DECIMAL | BINARY | VARBINARY | STRING | VARCHAR |
实时计算Flink版MaxCompute维表仅支持上述MaxCompute字段类型。
创建云数据库Redis维表
注意
- 实时计算Flink版Redis维表仅支持引用Redis数据存储中STRING类型的数据。
- 实时计算Flink版Redis维表支持自建Redis服务。
语法示例
CREATE TABLE white_list (
id VARCHAR,
name VARCHAR,
PRIMARY KEY (id),
PERIOD FOR SYSTEM_TIME
) WITH (
type = 'redis',
host = '<yourHostName>',
port = '<yourPort>',
password = '<yourPassword>',
dbNum = '<yourDatabaseNumber>'
);
注意
- Redis维表必须声明且只能声明一个主键。
- 维表JOIN时,ON条件必须包含所有主键的等值条件。
- Redis维表仅支持声明两个字段,且字段类型必须为VARCHAR。
WITH参数
参数 | 说明 | 是否必填 | 备注 |
---|
type | 维表类型 | 是 | 固定值为redis 。 | host | Redis连接地址 | 是 | 无 | port | Redis连接端口 | 否 | 默认值为6379。 | dbNum | 选择操作的数据库 | 否 | 默认值为0。 | password | Redis密码 | 否 | 默认值为空,不进行权限验证。 | hashName | Hash模式下的Hash Key名称 | 否 | 默认值为空,实时计算Flink版从Redis中读取STRING类型的数据。 |
注意事项
通常,Redis维表中的数据类型为STRING类型,即key-value 对。如果设置hashName参数,则Redis维表中的数据类型为HASHMAP类型,即key-{field-value} 对,其中:
- key为hashName参数值。
- field为您在CREATE TABLE中指明的key参数值。
- value为key对应的赋值,和STRING类型
key-value 中value语义相同。
CACHE参数
参数 | 说明 | 是否必填 | 备注 |
---|
cache | 缓存策略 | 否 | 参考注意事项。 | cacheSize | 缓存大小 | 否 | 选择LRU 缓存策略后,可以设置缓存大小,默认为10000行。 | cacheTTLMs | 缓存超时时长 | 否 | 默认缓存不超时,单位为毫秒。可选LRU缓存策略,即设置缓存失效的超时时长。 | cacheEmpty | 是否缓存空结果 | 否 | 默认值为true。 |
注意事项
云数据库Redis维表支持以下两种缓存策略:
类型映射
Redis字段类型 | 实时计算Flink版字段类型 |
---|
STRING | VARCHAR |
代码示例
CREATE TABLE event (
id VARCHAR,
data VARCHAR) with (
type = 'random'
);
CREATE TABLE white_list (
id VARCHAR,
name VARCHAR,
PRIMARY KEY (id),
PERIOD FOR SYSTEM_TIME
) WITH (
type = 'redis',
host = '<yourRedisHost>',
password = '<yourRedisPassword>'
);
SELECT e.*, w.*
FROM event AS e
JOIN white_list FOR SYSTEM_TIME AS OF PROCTIME() AS w
ON e.id = w.id;
创建Elasticsearch维表
DDL定义
CREATE TABLE es_stream_sink(
field1 LONG,
field2 VARBINARY,
field3 VARCHAR,
PRIMARY KEY(field1),
PERIOD FOR SYSTEM_TIME
) WITH (
type ='elasticsearch',
endPoint = '<yourEndPoint>',
accessId = '<yourUsername>',
accessKey = '<yourPassword>',
index = '<yourIndex>',
typeName = '<yourTypeName>'
);
ES维表支持根据ES的PRIMARY KEY进行PRIMARY KEYUPDATE,且PRIMARY KEY只能为1个字段。
WITH参数
参数 | 说明 | 默认值 | 是否必选 |
---|
type | 维表类型 | elasticsearch | 是 | endPoint | Server地址,例如:http://127.0.0.1:9211。 | 无 | 是 | accessId | 创建ES时的登录名。 | 无 | 是 | accessKey | 创建ES时的登录密码 。 | 无 | 是 | index | 索引名称,类似于数据库Database的名称。 | 无 | 是 | typeName | Type名称,类似于数据库的Table名称。 | 无 | 是 | maxRetryTimes | 异常重试次数 | 30 | 否 | timeout | 读取超时时长,单位为毫秒。 | 600000 | 否 | discovery | 是否开启节点发现。如果开启,客户端每5分钟刷新一次Server List。 | false | 否 | compression | 是否使用GZIP压缩Request Bodies。 | true | 否 | multiThread | 是否开启JestClient多线程。 | true | 否 |
CACHE参数
参数 | 说明 | 备注 |
---|
cache | 缓存策略 | 参考注意事项。 | cacheSize | 缓存大小 | 选择LRU 缓存策略后,可以设置缓存大小,默认为10000行。 | cacheTTLMs | 缓存更新时间间隔 | 参考注意事项。 |
注意事项
缓存策略支持以下三种:
-
None(默认值):无缓存。 -
LRU:缓存维表里的部分数据。源表来一条数据,系统会先查找Cache,如果没有找到,则去物理维表中查询。 需要配置相关参数:缓存大小(cacheSize)和缓存更新时间间隔(cacheTTLMs)。 -
ALL:缓存维表里的所有数据。在Job运行前,系统会将维表中所有数据加载到Cache中,之后所有的维表查询都会通过Cache进行。如果在Cache中无法找到数据,则KEY不存在,并在Cache过期后重新加载一遍全量Cache。
默认缓存不超时,单位为毫秒。不同缓存更新时间策略下的功能如下:
- LRU:设置缓存失效的超时时长。
- ALL:设置缓存加载的间隔时长,默认不重新加载。
创建Phoenix5维表
语法示例
create table US_POPULATION_DIM (
`STATE` varchar,
CITY varchar,
POPULATION BIGINT,
PRIMARY KEY (`STATE`, CITY),
PERIOD FOR SYSTEM_TIME
) WITH (
type = 'PHOENIX5',
serverUrl = '<YourServerUrl>',
tableName = '<YourTableName>'
);
WITH参数
参数 | 说明 | 是否必填 | 备注 |
---|
type | 维表类型 | 是 | 固定值为PHOENIX5 。 | serverUrl | Phoenix5的Query Server地址。如果Phoenix5是在集群中创建的,则serverUrl是负载均衡服务的URL地址;如果Phoenix5是在单机中创建的,则serverUrl是单机的URL地址。 | 是 | serverUrl格式为http://host:port,其中:host为Phoenix5服务的域名。port为Phoenix5服务的端口号,固定值为8765。 | tableName | Phoenix5表名 | 是 | Phoenix5表名格式为SchemaName.TableName,其中:SchemaName为模式名,可以为空,即不写模式名,仅写表名,表示使用数据库的默认模式。TableName为表名。 |
CACHE参数
参数 | 说明 | 是否必填 | 备注 |
---|
cache | 缓存策略 | 否 | 详见注意事项。 | cacheSize | 缓存大小 | 否 | 选择LRU缓存策略后,可以设置缓存大小,默认为10000行。 | cacheTTLMs | 缓存超时时间,单位为毫秒。 | 否 | 当选择LRU缓存策略后,可以设置缓存失效的超时时间,默认不过期。当选择ALL策略,则为缓存加载的间隔时间,默认不重新加载。 | cacheReloadTimeBlackList | 更新时间黑名单。在缓存策略选择为ALL时,启用更新时间黑名单,防止在此时间内做Cache更新(例如双11场景)。 | 否 | 可选,默认空,格式为 ‘2017-10-24 14:00 -> 2017-10-24 15:00, 2017-11-10 23:30 -> 2017-11-11 08:00’。用逗号(,)来分隔多个黑名单,用箭头(->)来分割黑名单的起始或结束时间。 |
注意事项
目前Phoenix5维表支持以下三种缓存策略:
-
None(默认值):无缓存。 -
LRU:缓存维表里的部分数据。源表的每条数据都会触发系统先在Cache中查找数据,如果没有找到,则去物理维表中查找。 需要配置相关参数:缓存大小(cacheSize)和缓存更新时间间隔(cacheTTLMs)。 -
ALL:缓存维表里的所有数据。在Job运行前,系统会将维表中所有数据加载到Cache中,之后所有的维表查询都会通过Cache进行。如果在Cache中无法找到数据,则KEY不存在,并在Cache过期后重新加载一遍全量Cache。 适用于远程表数据量小且MISS KEY(源表数据和维表JOIN时,ON条件无法关联)特别多的场景。 需要配置相关参数:缓存更新时间间隔(cacheTTLMs),更新时间黑名单(cacheReloadTimeBlackList)。 因为系统会异步加载维表数据,所以在使用CACHE ALL时,需要增加维表JOIN节点的内存,增加的内存大小为远程表数据量的两倍。
代码示例
CREATE TABLE datahub_input1 (
id BIGINT,
name VARCHAR,
age BIGINT
) WITH (
type='datahub'
);
create table phoneNumber(
name VARCHAR,
phoneNumber BIGINT,
primary key(name),
PERIOD FOR SYSTEM_TIME
)with(
type='PHOENIX5'
);
CREATE table result_infor(
id BIGINT,
phoneNumber BIGINT,
name VARCHAR
)with(
type='rds'
);
INSERT INTO result_infor
SELECT
t.id,
w.phoneNumber,
t.name
FROM datahub_input1 as t
JOIN phoneNumber FOR SYSTEM_TIME AS OF PROCTIME() as w
ON t.name = w.name;
创建云原生数据仓库AnalyticDB MySQL版3.0维表
语法示例
CREATE TABLE dim_ads(
`name` VARCHAR,
id VARCHAR,
PRIMARY KEY (`name`),
PERIOD FOR SYSTEM_TIME
)with(
type='ADB30',
url='jdbc:mysql://<内网地址>/<databaseName>',
tableName='xxx',
userName='xxx',
password='xxx'
);
注意事项
- 在声明一个维表时,必须指明主键。
- 在维表进行JOIN时,ON条件必须包含所有主键的等值条件。
- 云原生数据仓库AnalyticDB MySQL版的主键可以定义为表的主键或唯一索引列。
WITH参数
参数 | 说明 | 是否必选 | 备注 |
---|
type | 维表类型。 | 是 | 固定值为ADB30。 | url | 云原生数据仓库AnalyticDB MySQL版数据库地址。 | 是 | 云原生数据仓库AnalyticDB MySQL版数据库地址。示例:url='jdbc:mysql://databaseName****-cn-shenzhen-a.ads.aliyuncs.com:10014/databaseName' 。 | tableName | 表名。 | 是 | 无。 | userName | 用户名。 | 是 | 无。 | password | 密码。 | 是 | 无。 | maxRetryTimes | 写入重试次数。 | 否 | 默认值为3。 |
CACHE参数
参数 | 说明 | 是否必填 | 备注 |
---|
cache | 缓存策略 | 否 | 详见注意事项。 | cacheSize | 缓存大小 | 否 | 当选择LRU缓存策略后,可以设置缓存大小,默认为10000行。 | cacheTTLMs | 缓存更新时间间隔。系统会根据您设置的缓存更新时间间隔,重新加载一次维表中的最新数据,保证源表能JOIN到维表的最新数据。 | 否 | 单位为毫秒。默认不设置此参数,表示不重新加载维表中的新数据。 | cacheReloadTimeBlackList | 更新时间黑名单。在缓存策略选择为ALL时,启用更新时间黑名单,防止在此时间内做Cache更新(例如双11场景)。 | 否 | 可选,默认空,格式为 ‘2017-10-24 14:00 -> 2017-10-24 15:00, 2017-11-10 23:30 -> 2017-11-11 08:00’ 。其中分割符使用情况如下:用逗号(,)来分隔多个黑名单。用箭头(->)来分割黑名单的起始结束时间。 | partitionedJoin | 是否开启partitionedJoin。在开启partitionedJoin优化时,主表会在关联维表前,先按照Join KEY进行Shuffle。 | 否 | 默认情况下为false,表示不开启partitionedJoin。 | maxJoinRows | 主表中每一条数据查询维表时,匹配后最多返回的结果数。 | 否 | 默认值为1024。如果您可以预估一条数据对应的维表数据最多为n条,则可以设置maxJoinRows=‘n’,以确保实时计算匹配处理效率。 |
注意事项
目前云原生数据仓库AnalyticDB MySQL版3.0支持以下三种缓存策略:
-
None(默认值):无缓存。 -
LRU:缓存维表里的部分数据。源表来一条数据,系统会先查找Cache,如果没有找到,则去物理维表中查询。 需要配置相关参数:缓存大小(cacheSize)和缓存更新时间间隔(cacheTTLMs)。 -
ALL:缓存维表里的所有数据。在Job运行前,系统会将维表中所有数据加载到Cache中,之后所有的维表查询都会通过Cache进行。如果在Cache中无法找到数据,则KEY不存在,并在Cache过期后重新加载一遍全量Cache。 适用于远程表数据量小且MISS KEY(源表数据和维表JOIN时,ON条件无法关联)特别多的场景。 需要配置相关参数:缓存更新时间间隔(cacheTTLMs),更新时间黑名单(cacheReloadTimeBlackList)。 因为系统会异步加载维表数据,所以在使用CACHE ALL时,需要增加维表JOIN节点的内存,增加的内存大小为远程表数据量的两倍。 对于数据量比较大的维表,选择CACHE ALL时,可能会出现OOM或者Full GC耗时很久的情况,针对这个问题,可以选择以下两种解决方式:
- 对于支持Cache All策略的维表,开启PartitionedJoin优化。3.6.0版本之前,每个并发默认加载维表全量数据。3.6.0版本之后,CACHE ALL策略支持PartitionedJoin优化。开启PartitionJoin优化后,每个并发只缓存自己并发所需要的数据。
- 使用HBase或者RDS等Key-Value类型的维表。
使用partitionedJoin优化前,需要您手动设置partitionedJoin = ‘true’。
进行Join时,主表输入一条数据,对应维表匹配后返回的结果总数受maxJoinRows参数限制。
开启partitionedJoin的优点:
- 在缓存策略为LRU时,可以提高缓存命中率。
- 在缓存策略为ALL时,节省内存资源,因为每个并发只缓存自己并发所需要的数据。
代码示例
CREATE TABLE datahub_input1 (
id BIGINT,
name VARCHAR,
age BIGINT
) WITH (
type='datahub'
);
create table phoneNumber (
name VARCHAR,
phoneNumber BIGINT,
primary key(name),
PERIOD FOR SYSTEM_TIME
) with (
type='ADB30'
);
CREATE table result_infor (
id BIGINT,
phoneNumber BIGINT,
name VARCHAR
) with (
type='rds'
);
INSERT INTO result_infor
SELECT
t.id,
w.phoneNumber,
t.name
FROM datahub_input1 as t
JOIN phoneNumber FOR SYSTEM_TIME AS OF PROCTIME() as w
ON t.name = w.name;
创建Oracle维表
DDL定义
CREATE TABLE oracle_dim(
employee_id BIGINT,
phone_number BIGINT,
dollar DOUBLE,
PRIMARY KEY (employee_id)
) WITH (
type = 'oracle_dim',
url = '<yourUrl>',
userName = '<yourUserName>',
password = '<yourPassword>',
tableName = '<yourTableName>',
cache = 'ALL'
);
WITH 参数
参数 | 描述 | 是否必选 | 示例值 |
---|
type | 维表类型 | 是 | 固定值为oracle_dim。 | url | JDBC的Oracle地址 | 是 | jdbc:oracle:thin:@ip:port:sid | userName | 数据库的用户名 | 是 | 无 | password | 数据库的密码 | 是 | 无 | tableName | 表名称 | 是 | 无 | maxRetryTimes | 读取维表异常重试的最大次数 | 否 | 默认值为10。 |
CACHE参数
参数 | 描述 | 是否必选 | 示例值 |
---|
cache | 缓存策略 | 否 | 详见注意事项。 | cacheSize | 缓存大小,即缓存多少行数据。 | 否 | 当缓存策略选择LRU时,可以设置缓存大小,默认值为10000行。 | cacheTTLMs | 缓存超时时间,单位为毫秒。 | 否 | 当缓存策略选择LRU时,可以设置缓存失效时间,默认不过期。当缓存策略选择ALL时,缓存失效时间为缓存重新加载的间隔时间,默认不重新加载。 | cacheReloadTimeBlackList | 更新时间黑名单。在缓存策略选择为ALL时,启用更新时间黑名单,防止在此时间内做Cache更新(例如双11场景)。 | 否 | 默认空,格式为 ‘2017-10-24 14:00 -> 2017-10-24 15:00, 2017-11-10 23:30 -> 2017-11-11 08:00’ 。分割符如下:用逗号(,)来分隔多个黑名单。用箭头(->)来分割黑名单的起始结束时间。 | maxJoinRows | 一对多连接时,左表一条记录连接右表的最大记录数。 | 否 | 默认值为1024。一对多连接的记录数过多时,需要调cache的内存。因为cacheSize限制的是左表key个数,单条左表记录对应的右表记录较多时,可能会极大地影响流任务的性能。 |
注意事项
目前Oracle维表支持以下三种缓存策略:
-
None(默认值):无缓存。 -
LRU:缓存维表里的部分数据。源表的每条数据都会触发系统先在Cache中查找数据,如果没有找到,则去物理维表中查找。 需要配置相关参数:缓存大小(cacheSize)和缓存更新时间间隔(cacheTTLMs)。 -
ALL:缓存维表里的所有数据。在Job运行前,系统会将维表中所有数据加载到Cache中,之后所有的维表查找数据都会通过Cache进行。如果在Cache中无法找到数据,则关键健不存在,并在Cache过期后重新加载一遍全量Cache。 适用于远程表数据量小且MISS KEY(源表数据和维表JOIN时,ON条件无法关联)特别多的场景。 需要配置相关参数:缓存更新时间间隔(cacheTTLMs),更新时间黑名单(cacheReloadTimeBlackList)。 因为系统会异步加载维表数据,所以在使用CACHE ALL时,需要增加维表JOIN节点的内存,增加的内存大小为远程表数据量的两倍。
类型映射
Oracle字段类型 | 实时计算Flink字段类型 |
---|
CHAR、VARCHAR、VARCHAR2 | VARCHAR | FLOAT | DOUBLE | NUMBER | BIGINT | DECIMAL | DECIMAL |
代码示例
CREATE TABLE oracle_source (
employee_id BIGINT,
employee_name VARCHAR,
employee_age INT
) WITH (
type ='random'
);
CREATE TABLE oracle_dim (
employee_id BIGINT,
phone_number BIGINT,
dollar DOUBLE,
PRIMARY KEY (employee_id)
) WITH (
type = 'oracle_dim',
url = '<yourUrl>',
userName = '<yourUserName>',
password = '<yourPassword>',
tableName = '<yourTableName>',
cache = 'ALL'
);
CREATE TEMPORARY TABLE oracle_sink (
employee_id BIGINT,
phone_number BIGINT,
employee_name VARCHAR
) WITH (
type = 'oracle',
url = '<yourUrl>',
userName = '<yourUserName>',
password = '<yourPassword>',
tableName = '<yourTableName>'
);
INSERT INTO oracle_sink
SELECT t.employee_id, w.phone_number, t.employee_name
FROM oracle_source as t JOIN oracle_dim FOR SYSTEM_TIME AS OF PROCTIME() as w
ON t.employee_id = w.employee_id;
|