IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 游戏开发 -> Blink SQL之创建数据维表 -> 正文阅读

[游戏开发]Blink SQL之创建数据维表

Blink创建数据维表

概述

在维表DDL语法中增加1行PERIOD FOR SYSTEM_TIME的声明,定义维表的变化周期,即可使用标准的CREATE TABLE语法定义实时计算维表。

示例

CREATE TABLE white_list (
  id varchar,
  name varchar,
  age int,
  PRIMARY KEY (id),
  PERIOD FOR SYSTEM_TIME  --定义维表的变化周期。实时计算3.x及以上版本,维表DDL中可以不声明该句,在维表JOIN时,声明FOR SYSTEM_TIME AS OF PROCTIME()即可。
) with (
  type = 'RDS',
  ...
);
  • 维表必须指定主键。维表JOIN时,ON的条件必须包含所有主键的等值条件。
  • 目前仅支持源表INNER JOINLEFT JOIN维表。
  • 维表的唯一键(UK)必须为数据库表中的唯一键。如果维表声明的唯一键不是数据库表的唯一键会产生以下影响:
    • 维表的读取速度变慢。
    • 在维表JOIN时,会从第一条数据进行JOIN,在加入Job的过程中,相同KEY的多条记录在数据库中按顺序发生变化,可能导致JOIN结果错误。

INDEX语法

维表定义要求声明PRIMARY KEY,这种情况下只能实现一对一连接。为支持一对多连接的需求,引入了INDEX语法。非Cache All的维表JOIN通过INDEX LOOKUP的方式实现一对多连接的需求。

CREATE TABLE Persons (
    ID bigint,
    LastName varchar,
    FirstName varchar,
    Nick varchar,
    Age int,
    [UNIQUE] INDEX(LastName,FirstName,Nick), --定义INDEX,不需要指定具体的类型,例如,fulltext或clustered等。
    PERIOD FOR SYSTEM_TIME
) with (
  type='RDS',
  ...
);

UNIQUE INDEX表示一对一连接,而INDEX表示一对多连接。

注意事项

  • 实时计算2.2.7及以后版本支持UNIQUE CONSTRAINTUNIQUE KEY),实时计算2.2.7以下版本可以使用PRIMARY KEY的定义。
  • 在生成执行计划时,引擎优先采用UNIQUE INDEX。即如果DDL中使用INDEX,但JOIN等值连接条件中同时包含UNIQUENON-UNIQUE INDEX时,优先使用UNIQUE INDEX查找右表数据。
  • 支持一对多连接的维表类型,例如RDS和MaxCompute。
  • 您可以增加maxJoinRows参数,表示在一对多连接时,左表一条记录连接右表的最大记录数(默认值为1024)。在一对多连接的记录数过多时,可能会极大的影响流任务的性能,因此您需要增大Cache的内存(cacheSize限制的是左表key的个数)。
  • 表格存储Tablestore和Hologres维表不支持使用INDEX进行一对多JOIN。

维表、源表和结果表的区别

类别源表结果表维表
是否能驱动计算
是否能读取数据是,直接读取。是,仅通过源表和维表JOIN读取。
是否能写入数据
是否支持Cache

创建交互式分析Hologres维表

什么是交互式分析Hologres

交互式分析Hologres是实时交互分析产品,兼容PostgreSQL协议,与大数据生态紧密连接,支持高并发、低延时实时分析与处理PB级数据,可以轻松使用现有BI(Business Intelligence)工具对数据进行多维分析和业务探索。

使用限制

创建Hologres维表时建议选择行存模式,列存模式对于点查场景性能开销较大。

选择行存模式创建维表时必须设置主键,并且将主键设置为clustering key才可以工作。示例语句如下:

begin;
create table test(a int primary key, b text, c text, d float8, e int8);
call set_table_property('test', 'orientation', 'row');
call set_table_property('test', 'clustering_key', 'a');
commit;

Hologres维表的主键必须是Blink Join On的字段,Blink Join On的字段也必须是维表完整的主键字段,两者必须完全匹配。

Hologres Blink Connector的维表功能不支持一对多的输出。

不支持读取Hologres分区表的数据。

语法示例

CREATE TABLE hologres_dim_table(
  id INT,
  len INT,
  content VARCHAR,
  PRIMARY KEY (id),
  PERIOD FOR SYSTEM_TIME  --定义维表的变化周期。
) WITH (
  type='hologres',
  endpoint='...',
  dbname='...',
  tablename='...',
  username='...',
  password='...'
);

WITH参数

参数描述是否必选备注
type数据库类型。固定值为hologres。
endpointHologres端点。无。
tablename表名称。无。
dbname数据库名称。无。
username用户名称。无。
password密码。无。

备注 如果Schema不为Public时,则tableName需要填写为schema.tableName

CACHE参数

参数描述是否必选示例值
cache缓存策略。目前交互式分析Hologres维表支持两种缓存策略:None(默认值):无缓存。LRU:缓存维表里的部分数据。
cacheSize缓存大小。缓存策略选择LRU时,可以设置缓存大小,默认为10000行。
cacheTTLMs缓存失效时间,单位为毫秒。缓存策略选择LRU时,可以设置缓存失效时间,默认不过期。
partitionedJoin是否根据JoinKey进行分区。参数的取值:false(默认值):不根据JoinKey进行分区。true:根据JoinKey进行分区,将数据分发到各JOIN节点,提高缓存命中率。
async是否异步读取数据。默认值:false

缓存注意事项

  • 源表的每条数据都会触发系统先在Cache中查找数据,如果没有找到,则去物理维表中查找。
  • 需要配置相关参数:缓存大小(cacheSize)和缓存更新时间间隔(cacheTTLMs)。

类型映射

Hologres字段类型实时计算Flink版字段类型
INTINT
INT[]ARRAY<INT>
BIGINTBIGINT
BIGINT[]ARRAY<BIGINT>
REALFLOAT
REAL[]ARRAY<FLOAT>
DOUBLE PRECISIONDOUBLE
DOUBLE PRECISION[]ARRAY<DOUBLE>
BOOLEANBOOLEAN
BOOLEAN[]ARRAY<BOOLEAN>
TEXTVARCHAR
TEXT[]ARRAY<VARCHAR>
NUMERICDECIMAL
DATEDATE
TIMESTAMP WITH TIMEZONETIMESTAMP

代码示例

create table randomSource (a int, b VARCHAR, c VARCHAR) with (type = 'random');

create table test (
  a int, 
  b VARCHAR, 
  c VARCHAR, 
  PRIMARY KEY (a, b), PERIOD FOR SYSTEM_TIME
) with (
  type = 'hologres',
  ...
);

create table print_sink (
  a int, 
  b VARCHAR
) with (
  type = 'print', 
  `ignoreWrite` = 'false'
);

insert into print_sink
select randomSource.a, test.b from randomSource 
LEFT JOIN test FOR SYSTEM_TIME AS OF PROCTIME() 
on randomSource.a = test.a and randomSource.b = test.b;

创建表格存储Tablestore维表

什么是表格存储Tablestore

表格存储Tablestore是构建在阿里云飞天分布式系统之上的分布式NoSQL数据存储服务。表格存储通过数据分片和负载均衡技术,实现数据规模与访问并发上的无缝扩展,提供海量结构化数据的存储和实时访问服务。其功能和特性类似开源的HBase

示例

CREATE TABLE ots_dim_table (
 id int,
 len int,
 content VARCHAR,
 PRIMARY KEY (id),
 PERIOD FOR SYSTEM_TIME--维表标识。
) WITH (
 type='ots',
 endPoint='<yourEndpoint>',
 instanceName='<yourInstanceName>',
 tableName='<yourTableName>',
 accessId='<yourAccessId>',
 accessKey='<yourAccessKey>'
);
  • 在声明维表时,必须要指名主键。
  • 在维表JOIN时,ON条件必须包含所有主键的等值条件。
  • Tablestore的主键即表的Rowkey。

WITH参数

参数说明备注
type维表类型固定值为ots
endPoint表格存储的实例访问地址VPC网络环境需要选择实例的VPC Endpoint。
instanceName表格存储的实例名称
tableName表格存储的数据表名
accessId表格存储读取的AccessKey ID
accessKey表格存储读取的密钥AccessKey Secret

CACHE参数

参数说明备注
cache缓存策略表格存储维表支持两种缓存策略:None(默认值):无缓存。LRU:缓存维表里的部分数据。
cacheSize缓存大小当选择LRU缓存策略后,可以设置缓存大小,默认为10000行。
cacheTTLMs缓存超时时间,单位为毫秒。当选择LRU缓存策略后,可以设置缓存失效的超时时间。

代码示例

CREATE TABLE datahub_input1 (
id            BIGINT,
name        VARCHAR,
age           BIGINT
) WITH (
type='datahub'
);

CREATE TABLE phoneNumber(
name VARCHAR,
phoneNumber bigint,
primary key(name),
PERIOD FOR SYSTEM_TIME--维表标识。
)with(
type='ots'
);

CREATE TABLE result_infor(
id bigint,
phoneNumber bigint,
name VARCHAR
)with(
type='rds'
);

INSERT INTO result_infor
SELECT
t.id,
w.phoneNumber,
t.name
FROM datahub_input1 as t
JOIN phoneNumber FOR SYSTEM_TIME AS OF PROCTIME() as w --维表JOIN时必须指定此声明。
ON t.name = w.name;     

创建云数据库RDS MySQL版维表

语法示例

CREATE TABLE rds_dim_table(
 id INT,
 len INT,
 content VARCHAR,
 PRIMARY KEY (id),
 PERIOD FOR SYSTEM_TIME --定义维表的变化周期。
) with (
 type='rds',
 url='<yourDatabaseURL>',
 tableName='<yourDatabaseTableName>',
 userName='<yourDatabaseUserName>',
 password='<yourDatabasePassword>'
);

声明维表时,必须要指名主键。维表JOIN时,ON条件必须包含所有主键的等值条件。RDS或DRDS的主键可以定义为表的主键或唯一索引列。

WITH参数

参数说明是否必填备注
type维表类型固定值为rds
urlJDBC(Java DataBase Connectivity)连接地址URL的格式为:jdbc:mysql://<内网地址>/<databaseName>,其中databaseName为对应的数据库名称。
tableName表名
userName用户名
password密码
maxRetryTimes最大尝试连接次数默认值为10。

CACHE参数

参数说明是否必填备注
cache缓存策略云数据库RDS(DRDS)版维表支持三种缓存策略:None(默认值):无缓存。LRU:缓存维表里的部分数据。ALL:缓存维表里的所有数据。
cacheSize缓存大小当选择LRU缓存策略后,可以设置缓存大小,默认10000行。
cacheTTLMs缓存超时时间,单位为毫秒当选择LRU缓存策略后,可以设置缓存失效的超时时间,默认不过期。当选择ALL策略,则为缓存加载的间隔时间,默认不重新加载。
cacheReloadTimeBlackList缓存策略选择ALL 时启用。更新时间黑名单,防止在此时间内做cache更新(例如双11场景)。默认为空。自定义输入格式为2017-10-24 14:00 -> 2017-10-24 15:00, 2017-11-10 23:30 -> 2017-11-11 08:00。用逗号(,)来分隔多个黑名单,用箭头(->)来分割黑名单的起始结束时间。
maxJoinRows主表中每一条数据查询维表时,匹配后最多返回的结果数。默认值为1024。如果您可以预估一条数据对应的维表数据最多为n条,则可以设置maxJoinRows=‘n’,以确保实时计算匹配处理效率。
  • 源表的每条数据都会触发系统先在Cache中查找数据,如果没有找到,则去物理维表中查找。
  • 需要配置相关参数:缓存大小(cacheSize)和缓存更新时间间隔(cacheTTLMs)。
  • 在Job运行前,系统会将维表中所有数据加载到Cache中,之后所有的维表查找数据都会通过Cache进行。如果在Cache中无法找到数据,则KEY不存在,并在Cache过期后重新加载一遍全量Cache。适用于远程表数据量小且MISS KEY(源表数据和维表JOIN时,ON条件无法关联)特别多的场景。需要配置相关参数:缓存更新时间间隔(cacheTTLMs),更新时间黑名单(cacheReloadTimeBlackList)。
  • 因为系统会异步加载维表数据,所以在使用CACHE ALL时,需要增加维表JOIN节点的内存,增加的内存大小为远程表数据量的两倍。
  • 进行Join时,主表输入一条数据,对应维表匹配后返回的结果总数受该参数限制。

代码示例

CREATE TABLE datahub_input1 (
id            BIGINT,
name        VARCHAR,
age           BIGINT
) WITH (
  type='datahub',
  endPoint='http://dh-cn-hangzhou.aliyun-inc.com',
  project='<yourProjectName>',
  topic='<yourTopic>',
  accessId='<yourAccessID>',
  accessKey='<yourAccessSecret>',
  startTime='2017-07-21 00:00:00'
);

create table phoneNumber(
name VARCHAR,
phoneNumber BIGINT,
primary key(name),
PERIOD FOR SYSTEM_TIME--定义维表的变化周期。
)WITH(
 type='rds',
 url='<yourDatabaseURL>',
 tableName='<yourDatabaseTableName>',
 userName='<yourDatabaseUserName>',
 password='<yourDatabasePassword>'
);

CREATE table result_infor(
id BIGINT,
phoneNumber BIGINT,
name VARCHAR
)WITH(
 type='rds',
 url='<yourDatabaseURL>',
 tableName='<yourDatabaseTableName>',
 userName='<yourDatabaseUserName>',
 password='<yourDatabasePassword>'
);

INSERT INTO result_infor
SELECT
t.id,
w.phoneNumber,
t.name
FROM datahub_input1 as t
JOIN phoneNumber FOR SYSTEM_TIME AS OF PROCTIME() as w --维表JOIN时必须指定此声明。
ON t.name = w.name;

类型映射

RDS字段类型实时计算Flink版字段类型
BOOLEANBOOLEAN
TINYINTTINYINT
SMALLINTSMALLINT
INTINT
BIGINTBIGINT
FLOATFLOAT
DECIMALDECIMAL
DOUBLEDOUBLE
DATEDATE
TIMETIME
TIMESTAMPTIMESTAMP
VARCHARVARCHAR
VARBINARYVARBINARY

创建云数据库HBase版维表

注意事项

实时计算HBase维表不支持自建的开源HBase。

HBase维表仅支持一个PK(Primary Key)。

DDL定义

HBase企业标准版

CREATE TABLE hbase (
  `key` varchar, 
  `name` varchar,
  PRIMARY KEY (`key`), --HBase中的rowkey字段。
  PERIOD FOR SYSTEM_TIME --维表标识。
) with (
  TYPE = 'cloudhbase',
  zkQuorum = '<yourzkQuorum>',
  columnFamily = '<yourColumnFamilyName>',
  tableName = '<yourTableName>'
);

HBase性能增强版

CREATE TABLE hbase (
  `key` varchar, 
  `name` varchar,
  PRIMARY KEY (`key`), --HBase中的rowkey字段。
  PERIOD FOR SYSTEM_TIME --维表标识。
) with (
  TYPE = 'cloudhbase',
  endPoint = '<host:port>',--HBase增强版的Java API访问地址。
  userName  = 'root', --HBase用户名。
  password = 'root', --HBase密码。
  columnFamily = '<yourColumnFamilyName>',
  tableName = '<yourTableName>'
);

Blink-3.5.0以上HBase性能增强版

create table liuxd_user_behavior_test_front (
  row_key varchar,
  from_topic varchar,
  origin_data varchar,
  record_create_time varchar,
  primary key (row_key)
) with (
  type = 'cloudhbase',
  zkQuorum = '<host:port>', --HBase增强版的Java API访问地址。
  userName  = 'root', --HBase用户名。
  password = 'root', --HBase密码。
  columnFamily = '<yourColumnFamily>',
  tableName = '<yourTableName>',
  batchSize = '500'
);

Blink-3.5.0以上支持HBase写入主备切换

create table liuxd_user_behavior_test_front (
  row_key varchar,
  from_topic varchar,
  origin_data varchar,
  record_create_time varchar,
  primary key (row_key)
) with (
  type = 'cloudhbase',
  zkQuorum = '<host:port>', --HBase高可用访问地址。
  haClusterID = 'ha-xxx', --HBase高可用实例ID。
  userName  = 'root', --HBase用户名。
  password = 'root', --HBase密码。
  columnFamily = '<yourColumnFamily>',
  tableName = '<yourTableName>',
  batchSize = '500' 
);

备注

  • 在声明维表时,必须要指名主键。
  • 在维表进行JOIN时,ON的条件必须包含所有主键的等值条件。示例中HBase中的主键是row_key
  • HBase企业标准版和HBase性能增强版DDL的区别为连接参数不同:
    • HBase企业标准版:zkQuorum
    • HBase性能增强版:endPoint
    • Blink 3.5.0以上标准版和增强版:zkQuorum

WITH参数

参数说明是否必填备注
type维表类型固定值为cloudhbase
zkQuorumHBase集群配置的zk地址,是以逗号(,)分隔的主机列表。可以在hbase-site.xml文件中查看hbase.zookeeper.quorum相关配置。 仅在HBase企业标准版中生效。
zkNodeParent集群配置在zk上的路径可以在hbase-site.xml文件中查看hbase.zookeeper.quorum相关配置。仅在HBase企业标准版中生效。
endPointHBase地域名称可在购买的HBase实例控制台中获取。仅在HBase性能增强版中生效。
userName用户名仅在HBase性能增强版中生效。
password密码仅在HBase性能增强版中生效。
tableNameHBase表名
columnFamily列族名仅支持插入同一列族。
maxRetryTimes最大尝试次数默认值为10次。
partitionedJoin是否使用joinKey进行分区。默认值为False。在设置partitionedJoin为True时,使用joinKey进行分区,将数据分发到各JOIN节点,提高缓存命中率。
shuffleEmptyKey是否将上游EMPTY KEY随机发送到下游节点。默认值为True。参数取值如下:True:如果上游有多个EMPTY KEY,会将所有EMPTY KEY随机发送到各个JOIN节点。False:如果上游有多个EMPTY KEY,会将所有EMPTY KEY发送至一个JOIN节点。

shuffleEmptyKeypartitionedJoin生效后才能使用。

CACHE参数

参数说明是否必填备注
cache缓存策略HBase维表支持以下三种缓存策略:None(默认值):无缓存。LRU:缓存维表里的部分数据。ALL:缓存维表里的所有数据。
cacheSize缓存大小当选择LRU缓存策略后,可以设置缓存大小,默认为10000行。
cacheTTLMs缓存超时时间,单位为毫秒。当选择LRU缓存策略后,可以设置缓存失效的超时时间,默认不过期。当选择ALL策略,则为缓存加载的间隔时间,默认不重新加载。
cacheReloadTimeBlackList缓存策略选择ALL时启用。更新时间黑名单,防止在此时间内做Cache更新(例如双11场景)。默认为空。自定义输入格式如下。2017-10-24 14:00 -> 2017-10-24 15:00, 2017-11-10 23:30 -> 2017-11-11 08:00使用逗号(,)分隔多个黑名单,使用箭头(->)分割黑名单的起始和结束时间。
cacheScanLimit缓存策略选择ALL时启用。读取全量HBase数据,服务端一次RPC返回给客户端的行数。默认值为100条。

代码示例

create table source (
  id   TINYINT,
  name BIGINT
) with (
  type = 'random'
);

create table dim (
  id   TINYINT,
  score BIGINT
  primary key(id),
  PERIOD FOR SYSTEM_TIME
)with(
  type = 'cloudhbase',
  zkQuorum = '<yourzkQuorum>',
  columnFamily = '<yourColumnFamilyName>',
  tableName = '<yourTableName>'
);

CREATE table result_infor(
  id BIGINT,
  score BIGINT
)with(
  type='rds'
);

INSERT INTO result_infor
SELECT
  t.id,
  w.score
FROM source as t
JOIN dim FOR SYSTEM_TIME AS OF PROCTIME() as w
ON t.id = w.id;

创建MaxCompute维表

DDL定义

CREATE TABLE white_list (
  id varchar,
  name varchar,
  age int,
  PRIMARY KEY (id), 
  PERIOD FOR SYSTEM_TIME --维表的标识。
) WITH (
  type = 'odps',
  endPoint = '<YourEndPoint>',
  project = '<YourProjectName>',
  tableName = '<YourtableName>',
  accessId = '<yourAccessKeyId>',
  accessKey = '<yourAccessKeySecret>',
  `partition` = 'ds=2018****',
  cache = 'ALL'
);       

备注

  • 声明维表时,必须要指名主键,MaxCompute维表主键必须具有唯一性,否则会被去重。
  • 在维表进行JOIN时,ON条件必须包含所有主键的等值条件。
  • partition是关键字,需要使用反引号(`)注释,例如partition
  • 如果是分区表,目前不支持将分区列写入到DDL定义中。

WITH参数

参数说明是否必填备注
type维表类型。固定值为odps
endPointMaxCompute服务地址。请参见Endpoint
tunnelEndpointMaxCompute Tunnel服务的连接地址。请参见Endpoint
projectMaxCompute项目名称。无。
tableName表名。无。
accessIdAccessKey ID。无。
accessKeyAccessKey Secret。无。
partition分区名。详见备注
maxRowCount可加载的最大表格数量。默认值为100000。如果您的数据超过100000,需要设置maxRowCount参数。建议设定值比实际值大。

备注

固定分区

  • 只存在一个分区MaxCompute表

    例如,如果只存在1个分区列ds,则``partition= 'ds=20180905'表示读ds=20180905分区的数据。

  • 存在多个分区的MaxCompute表

    例如,如果存在2个分区列dshh,则``partition='ds=20180905,hh=*'表示读ds=20180905分区的数据。

    分区过滤时需要声明所有分区的值。例如,上述示例中,只声明``partition= 'ds=20180905',则不会读取任何分区。

非固定分区

  • Blink 2.2.0及以上版本支持``partition= ‘max_pt()’ `功能, 即每次加载所有分区列表中字典序最大的分区。
  • Blink 3.2.2及以上版本支持``partition= 'max_pt_with_done()'功能,即每次加载所有分区列表中字典序最大且伴随有.done的分区。

CACHE参数

参数说明备注
cache缓存策略详见备注。
cacheSize缓存大小可以设置缓存大小,MaxCompute默认缓存值为100000行。
cacheTTLMs缓存超时时间单位为毫秒,当cache选择为ALL策略,则为缓存加载的间隔时间,默认为不重新加载。
cacheReloadTimeBlackList更新时间黑名单。在缓存策略选择为ALL时,启用更新时间黑名单,防止在此时间内做Cache更新(例如双11场景)。默认为空,格式为2017-10-24 14:00 -> 2017-10-24 15:00, 2017-11-10 23:30 -> 2017-11-11 08:00。分隔符的使用情况如下所示:用逗号,来分隔多个黑名单。用箭头->来分割黑名单的起始结束时间。
partitionedJoin当开启PartitionedJoin优化时,每个并发内存里只缓存维表的部分数据,即该并发上需要的缓存数据。可选,默认值为false,表示每个并发内存里缓存全量维表数据。

备注

目前MaxCompute维表仅支持ALL策略,必须显式声明。

ALL策略:缓存维表里的所有数据。在Job运行前,系统会将维表中所有数据加载到Cache中,之后所有的维表查询都会通过Cache进行。如果在Cache中无法找到数据,则KEY不存在,并在Cache过期后重新加载一遍全量Cache。

适用于远程表数据量小且MISS KEY(源表数据和维表JOIN时,ON条件无法关联)特别多的场景。需要配置缓存更新时间间隔(cacheTTLMs)和更新时间黑名单(cacheReloadTimeBlackList)参数。

  • 因为系统会异步加载维表数据,所以在使用CACHE ALL时,需要增加维表JOIN节点的内存,增加的内存大小为远程表数据量的至少4倍,具体值与MaxCompute存储压缩算法有关。
  • 在使用超大MaxCompute维表时,如果频繁GC(Allocation Failure)导致作业异常,且在增加维表JOIN节点的内存仍无改善的情况下,建议:
    • Blink 3.6.0及以后版本,设置参数partitionedJoin = ‘true’ ,即打开PartitionedJoin优化。
    • 改为支持LRU 缓存策略的KV型维表,例如云数据库Hbase版维表。

代码示例

CREATE TABLE datahub_input1 (
  id    BIGINT,
  name  VARCHAR,
  age   BIGINT
) with (
  type='datahub'
);


CREATE TABLE odps_dim (
   name VARCHAR,
   phoneNumber BIGINT, 
   PRIMARY KEY (name), 
   PERIOD FOR SYSTEM_TIME --维表的标识。
) with (
   type = 'odps',
   endPoint = '<yourEndpointName>', 
   project = '<yourProjectName>',
   tableName = '<yourTableName>',
   accessId = '<yourAccessId>',
   accessKey = '<yourAccessPassword>',
   `partition` = 'ds=20180905', --动态分区、固定分区请参见WITH参数说明。
   cache = 'ALL'
);

CREATE table result_infor(
  id BIGINT,
  phoneNumber BIGINT,
  name VARCHAR
)with(
  type='print'
);

INSERT INTO result_infor
SELECT
  t.id,
  w.phoneNumber,
  t.name
FROM datahub_input1 as t
JOIN odps_dim FOR SYSTEM_TIME AS OF PROCTIME() as w --维表JOIN时必须指定此声明。
ON t.name = w.name;

类型映射

MaxComputeBLINK
TINYINTTINYINT
SMALLINTSMALLINT
INTINT
BIGINTBIGINT
FLOATFLOAT
DOUBLEDOUBLE
BOOLEANBOOLEAN
DATETIMETIMESTAMP
TIMESTAMPTIMESTAMP
VARCHARVARCHAR
DECIMALDECIMAL
BINARYVARBINARY
STRINGVARCHAR

实时计算Flink版MaxCompute维表仅支持上述MaxCompute字段类型。

创建云数据库Redis维表

注意

  • 实时计算Flink版Redis维表仅支持引用Redis数据存储中STRING类型的数据。
  • 实时计算Flink版Redis维表支持自建Redis服务。

语法示例

CREATE TABLE white_list (
  id VARCHAR,
  name VARCHAR,
  PRIMARY KEY (id),  --Redis中的Row Key字段。
  PERIOD FOR SYSTEM_TIME --维表标识。
) WITH (
  type = 'redis',
  host = '<yourHostName>',
  port = '<yourPort>',
  password = '<yourPassword>',
  dbNum = '<yourDatabaseNumber>'
);

注意

  • Redis维表必须声明且只能声明一个主键。
  • 维表JOIN时,ON条件必须包含所有主键的等值条件。
  • Redis维表仅支持声明两个字段,且字段类型必须为VARCHAR。

WITH参数

参数说明是否必填备注
type维表类型固定值为redis
hostRedis连接地址
portRedis连接端口默认值为6379。
dbNum选择操作的数据库默认值为0。
passwordRedis密码默认值为空,不进行权限验证。
hashNameHash模式下的Hash Key名称默认值为空,实时计算Flink版从Redis中读取STRING类型的数据。

注意事项

通常,Redis维表中的数据类型为STRING类型,即key-value对。如果设置hashName参数,则Redis维表中的数据类型为HASHMAP类型,即key-{field-value}对,其中:

  • keyhashName参数值。
  • field为您在CREATE TABLE中指明的key参数值。
  • valuekey对应的赋值,和STRING类型key-valuevalue语义相同。

CACHE参数

参数说明是否必填备注
cache缓存策略参考注意事项。
cacheSize缓存大小选择LRU缓存策略后,可以设置缓存大小,默认为10000行。
cacheTTLMs缓存超时时长默认缓存不超时,单位为毫秒。可选LRU缓存策略,即设置缓存失效的超时时长。
cacheEmpty是否缓存空结果默认值为true。

注意事项

云数据库Redis维表支持以下两种缓存策略:

  • None(默认值):无缓存。

  • LRU:缓存维表里的部分数据。源表来一条数据,系统会先查找Cache,如果没有找到,则去物理维表中查询。

    需要配置相关参数:缓存大小(cacheSize)和缓存更新时间间隔(cacheTTLMs)。

类型映射

Redis字段类型实时计算Flink版字段类型
STRINGVARCHAR

代码示例

CREATE TABLE event (
  id VARCHAR, 
  data VARCHAR) with (
  type = 'random'
);

CREATE TABLE white_list (
  id VARCHAR,
  name VARCHAR,
  PRIMARY KEY (id),  --Redis中的Row Key字段。
  PERIOD FOR SYSTEM_TIME --维表的标识。
) WITH (
  type = 'redis',
  host = '<yourRedisHost>',
  password = '<yourRedisPassword>'
);

SELECT e.*, w.*
FROM event AS e
JOIN white_list FOR SYSTEM_TIME AS OF PROCTIME() AS w
ON e.id = w.id;

创建Elasticsearch维表

DDL定义

 CREATE TABLE es_stream_sink(
  field1 LONG, 
  field2 VARBINARY, 
  field3 VARCHAR,
  PRIMARY KEY(field1),
  PERIOD FOR SYSTEM_TIME
) WITH (
  type ='elasticsearch',
  endPoint = '<yourEndPoint>',
  accessId = '<yourUsername>',
  accessKey = '<yourPassword>',
  index = '<yourIndex>',
  typeName = '<yourTypeName>'
);

ES维表支持根据ES的PRIMARY KEY进行PRIMARY KEYUPDATE,且PRIMARY KEY只能为1个字段。

WITH参数

参数说明默认值是否必选
type维表类型elasticsearch
endPointServer地址,例如:http://127.0.0.1:9211。
accessId创建ES时的登录名。
accessKey创建ES时的登录密码 。
index索引名称,类似于数据库Database的名称。
typeNameType名称,类似于数据库的Table名称。
maxRetryTimes异常重试次数30
timeout读取超时时长,单位为毫秒。600000
discovery是否开启节点发现。如果开启,客户端每5分钟刷新一次Server List。false
compression是否使用GZIP压缩Request Bodies。true
multiThread是否开启JestClient多线程。true

CACHE参数

参数说明备注
cache缓存策略参考注意事项。
cacheSize缓存大小选择LRU缓存策略后,可以设置缓存大小,默认为10000行。
cacheTTLMs缓存更新时间间隔参考注意事项。

注意事项

缓存策略支持以下三种:

  • None(默认值):无缓存。

  • LRU:缓存维表里的部分数据。源表来一条数据,系统会先查找Cache,如果没有找到,则去物理维表中查询。

    需要配置相关参数:缓存大小(cacheSize)和缓存更新时间间隔(cacheTTLMs)。

  • ALL:缓存维表里的所有数据。在Job运行前,系统会将维表中所有数据加载到Cache中,之后所有的维表查询都会通过Cache进行。如果在Cache中无法找到数据,则KEY不存在,并在Cache过期后重新加载一遍全量Cache。

默认缓存不超时,单位为毫秒。不同缓存更新时间策略下的功能如下:

  • LRU:设置缓存失效的超时时长。
  • ALL:设置缓存加载的间隔时长,默认不重新加载。

创建Phoenix5维表

语法示例

create table US_POPULATION_DIM (
  `STATE` varchar,
  CITY varchar,
  POPULATION BIGINT,
  PRIMARY KEY (`STATE`, CITY),
  PERIOD FOR SYSTEM_TIME
) WITH (
  type = 'PHOENIX5',
  serverUrl = '<YourServerUrl>',
  tableName = '<YourTableName>'
);           

WITH参数

参数说明是否必填备注
type维表类型固定值为PHOENIX5
serverUrlPhoenix5的Query Server地址。如果Phoenix5是在集群中创建的,则serverUrl是负载均衡服务的URL地址;如果Phoenix5是在单机中创建的,则serverUrl是单机的URL地址。serverUrl格式为http://host:port,其中:host为Phoenix5服务的域名。port为Phoenix5服务的端口号,固定值为8765。
tableNamePhoenix5表名Phoenix5表名格式为SchemaName.TableName,其中:SchemaName为模式名,可以为空,即不写模式名,仅写表名,表示使用数据库的默认模式。TableName为表名。

CACHE参数

参数说明是否必填备注
cache缓存策略详见注意事项。
cacheSize缓存大小选择LRU缓存策略后,可以设置缓存大小,默认为10000行。
cacheTTLMs缓存超时时间,单位为毫秒。当选择LRU缓存策略后,可以设置缓存失效的超时时间,默认不过期。当选择ALL策略,则为缓存加载的间隔时间,默认不重新加载。
cacheReloadTimeBlackList更新时间黑名单。在缓存策略选择为ALL时,启用更新时间黑名单,防止在此时间内做Cache更新(例如双11场景)。可选,默认空,格式为 ‘2017-10-24 14:00 -> 2017-10-24 15:00, 2017-11-10 23:30 -> 2017-11-11 08:00’。用逗号(,)来分隔多个黑名单,用箭头(->)来分割黑名单的起始或结束时间。

注意事项

目前Phoenix5维表支持以下三种缓存策略:

  • None(默认值):无缓存。

  • LRU:缓存维表里的部分数据。源表的每条数据都会触发系统先在Cache中查找数据,如果没有找到,则去物理维表中查找。

    需要配置相关参数:缓存大小(cacheSize)和缓存更新时间间隔(cacheTTLMs)。

  • ALL:缓存维表里的所有数据。在Job运行前,系统会将维表中所有数据加载到Cache中,之后所有的维表查询都会通过Cache进行。如果在Cache中无法找到数据,则KEY不存在,并在Cache过期后重新加载一遍全量Cache。

    适用于远程表数据量小且MISS KEY(源表数据和维表JOIN时,ON条件无法关联)特别多的场景。

    需要配置相关参数:缓存更新时间间隔(cacheTTLMs),更新时间黑名单(cacheReloadTimeBlackList)。

    因为系统会异步加载维表数据,所以在使用CACHE ALL时,需要增加维表JOIN节点的内存,增加的内存大小为远程表数据量的两倍。

代码示例

CREATE TABLE datahub_input1 (
  id  BIGINT,
  name  VARCHAR,
  age   BIGINT
) WITH (
  type='datahub'
);

create table phoneNumber(
  name VARCHAR,
  phoneNumber BIGINT,
  primary key(name),
  PERIOD FOR SYSTEM_TIME--定义维表的变化周期。
)with(
  type='PHOENIX5'
);

CREATE table result_infor(
  id BIGINT,
  phoneNumber BIGINT,
  name VARCHAR
)with(
  type='rds'
);

INSERT INTO result_infor
SELECT
  t.id,
  w.phoneNumber,
  t.name
FROM datahub_input1 as t
JOIN phoneNumber FOR SYSTEM_TIME AS OF PROCTIME() as w --维表JOIN时必须指定该声明。
ON t.name = w.name;

创建云原生数据仓库AnalyticDB MySQL版3.0维表

语法示例

CREATE TABLE dim_ads(
   `name` VARCHAR,
    id VARCHAR,
    PRIMARY KEY (`name`),
    PERIOD FOR SYSTEM_TIME
)with(
    type='ADB30',
    url='jdbc:mysql://<内网地址>/<databaseName>',
    tableName='xxx',
    userName='xxx',
    password='xxx'
);

注意事项

  • 在声明一个维表时,必须指明主键。
  • 在维表进行JOIN时,ON条件必须包含所有主键的等值条件。
  • 云原生数据仓库AnalyticDB MySQL版的主键可以定义为表的主键或唯一索引列。

WITH参数

参数说明是否必选备注
type维表类型。固定值为ADB30。
url云原生数据仓库AnalyticDB MySQL版数据库地址。云原生数据仓库AnalyticDB MySQL版数据库地址。示例:url='jdbc:mysql://databaseName****-cn-shenzhen-a.ads.aliyuncs.com:10014/databaseName'
tableName表名。无。
userName用户名。无。
password密码。无。
maxRetryTimes写入重试次数。默认值为3。

CACHE参数

参数说明是否必填备注
cache缓存策略详见注意事项。
cacheSize缓存大小当选择LRU缓存策略后,可以设置缓存大小,默认为10000行。
cacheTTLMs缓存更新时间间隔。系统会根据您设置的缓存更新时间间隔,重新加载一次维表中的最新数据,保证源表能JOIN到维表的最新数据。单位为毫秒。默认不设置此参数,表示不重新加载维表中的新数据。
cacheReloadTimeBlackList更新时间黑名单。在缓存策略选择为ALL时,启用更新时间黑名单,防止在此时间内做Cache更新(例如双11场景)。可选,默认空,格式为 ‘2017-10-24 14:00 -> 2017-10-24 15:00, 2017-11-10 23:30 -> 2017-11-11 08:00’ 。其中分割符使用情况如下:用逗号(,)来分隔多个黑名单。用箭头(->)来分割黑名单的起始结束时间。
partitionedJoin是否开启partitionedJoin。在开启partitionedJoin优化时,主表会在关联维表前,先按照Join KEY进行Shuffle。默认情况下为false,表示不开启partitionedJoin。
maxJoinRows主表中每一条数据查询维表时,匹配后最多返回的结果数。默认值为1024。如果您可以预估一条数据对应的维表数据最多为n条,则可以设置maxJoinRows=‘n’,以确保实时计算匹配处理效率。

注意事项

目前云原生数据仓库AnalyticDB MySQL版3.0支持以下三种缓存策略:

  • None(默认值):无缓存。

  • LRU:缓存维表里的部分数据。源表来一条数据,系统会先查找Cache,如果没有找到,则去物理维表中查询。

    需要配置相关参数:缓存大小(cacheSize)和缓存更新时间间隔(cacheTTLMs)。

  • ALL:缓存维表里的所有数据。在Job运行前,系统会将维表中所有数据加载到Cache中,之后所有的维表查询都会通过Cache进行。如果在Cache中无法找到数据,则KEY不存在,并在Cache过期后重新加载一遍全量Cache。

    适用于远程表数据量小且MISS KEY(源表数据和维表JOIN时,ON条件无法关联)特别多的场景。

    需要配置相关参数:缓存更新时间间隔(cacheTTLMs),更新时间黑名单(cacheReloadTimeBlackList)。

    因为系统会异步加载维表数据,所以在使用CACHE ALL时,需要增加维表JOIN节点的内存,增加的内存大小为远程表数据量的两倍。

    对于数据量比较大的维表,选择CACHE ALL时,可能会出现OOM或者Full GC耗时很久的情况,针对这个问题,可以选择以下两种解决方式:

    • 对于支持Cache All策略的维表,开启PartitionedJoin优化。3.6.0版本之前,每个并发默认加载维表全量数据。3.6.0版本之后,CACHE ALL策略支持PartitionedJoin优化。开启PartitionJoin优化后,每个并发只缓存自己并发所需要的数据。
    • 使用HBase或者RDS等Key-Value类型的维表。

使用partitionedJoin优化前,需要您手动设置partitionedJoin = ‘true’

进行Join时,主表输入一条数据,对应维表匹配后返回的结果总数受maxJoinRows参数限制。

开启partitionedJoin的优点:

  • 在缓存策略为LRU时,可以提高缓存命中率。
  • 在缓存策略为ALL时,节省内存资源,因为每个并发只缓存自己并发所需要的数据。

代码示例

CREATE TABLE datahub_input1 (
  id      BIGINT,
  name    VARCHAR,
  age     BIGINT
) WITH (
  type='datahub'
);

create table phoneNumber (
  name VARCHAR,
  phoneNumber BIGINT,
  primary key(name),
  PERIOD FOR SYSTEM_TIME--维表标识。
) with (
  type='ADB30'
);

CREATE table result_infor (
  id BIGINT,
  phoneNumber BIGINT,
  name VARCHAR
) with (
  type='rds'
);

INSERT INTO result_infor
SELECT
  t.id,
  w.phoneNumber,
  t.name
FROM datahub_input1 as t
JOIN phoneNumber FOR SYSTEM_TIME AS OF PROCTIME() as w --维表JOIN时必须指定该声明。
ON t.name = w.name;

创建Oracle维表

DDL定义

CREATE TABLE oracle_dim(
  employee_id BIGINT,
  phone_number BIGINT,
  dollar DOUBLE,
  PRIMARY KEY (employee_id)
) WITH (
  type = 'oracle_dim',
  url = '<yourUrl>',
  userName = '<yourUserName>',
  password = '<yourPassword>',
  tableName = '<yourTableName>',
  cache = 'ALL'
);

WITH 参数

参数描述是否必选示例值
type维表类型固定值为oracle_dim。
urlJDBC的Oracle地址jdbc:oracle:thin:@ip:port:sid
userName数据库的用户名
password数据库的密码
tableName表名称
maxRetryTimes读取维表异常重试的最大次数默认值为10。

CACHE参数

参数描述是否必选示例值
cache缓存策略详见注意事项。
cacheSize缓存大小,即缓存多少行数据。当缓存策略选择LRU时,可以设置缓存大小,默认值为10000行。
cacheTTLMs缓存超时时间,单位为毫秒。缓存策略选择LRU时,可以设置缓存失效时间,默认不过期。当缓存策略选择ALL时,缓存失效时间为缓存重新加载的间隔时间,默认不重新加载。
cacheReloadTimeBlackList更新时间黑名单。在缓存策略选择为ALL时,启用更新时间黑名单,防止在此时间内做Cache更新(例如双11场景)。默认空,格式为 ‘2017-10-24 14:00 -> 2017-10-24 15:00, 2017-11-10 23:30 -> 2017-11-11 08:00’ 。分割符如下:用逗号(,)来分隔多个黑名单。用箭头(->)来分割黑名单的起始结束时间。
maxJoinRows一对多连接时,左表一条记录连接右表的最大记录数。默认值为1024。一对多连接的记录数过多时,需要调cache的内存。因为cacheSize限制的是左表key个数,单条左表记录对应的右表记录较多时,可能会极大地影响流任务的性能。

注意事项

目前Oracle维表支持以下三种缓存策略:

  • None(默认值):无缓存。

  • LRU:缓存维表里的部分数据。源表的每条数据都会触发系统先在Cache中查找数据,如果没有找到,则去物理维表中查找。

    需要配置相关参数:缓存大小(cacheSize)和缓存更新时间间隔(cacheTTLMs)。

  • ALL:缓存维表里的所有数据。在Job运行前,系统会将维表中所有数据加载到Cache中,之后所有的维表查找数据都会通过Cache进行。如果在Cache中无法找到数据,则关键健不存在,并在Cache过期后重新加载一遍全量Cache。

    适用于远程表数据量小且MISS KEY(源表数据和维表JOIN时,ON条件无法关联)特别多的场景。

    需要配置相关参数:缓存更新时间间隔(cacheTTLMs),更新时间黑名单(cacheReloadTimeBlackList)。

    因为系统会异步加载维表数据,所以在使用CACHE ALL时,需要增加维表JOIN节点的内存,增加的内存大小为远程表数据量的两倍。

类型映射

Oracle字段类型实时计算Flink字段类型
CHAR、VARCHAR、VARCHAR2VARCHAR
FLOATDOUBLE
NUMBERBIGINT
DECIMALDECIMAL

代码示例

CREATE TABLE oracle_source (
  employee_id BIGINT,
  employee_name VARCHAR,
  employee_age INT
) WITH (
  type ='random'
);


CREATE TABLE oracle_dim (
  employee_id BIGINT,
  phone_number BIGINT,
  dollar DOUBLE,
  PRIMARY KEY (employee_id)
) WITH (
  type = 'oracle_dim',
  url = '<yourUrl>',
  userName = '<yourUserName>',
  password = '<yourPassword>',
  tableName = '<yourTableName>',
  cache = 'ALL'
);

CREATE TEMPORARY TABLE oracle_sink (
  employee_id BIGINT,
  phone_number BIGINT,
  employee_name VARCHAR
) WITH (
  type = 'oracle',
  url = '<yourUrl>',
  userName = '<yourUserName>',
  password = '<yourPassword>',
  tableName = '<yourTableName>'
);

INSERT INTO oracle_sink
SELECT t.employee_id, w.phone_number, t.employee_name
FROM oracle_source as t JOIN oracle_dim FOR SYSTEM_TIME AS OF PROCTIME() as w
ON t.employee_id = w.employee_id;
  游戏开发 最新文章
6、英飞凌-AURIX-TC3XX: PWM实验之使用 GT
泛型自动装箱
CubeMax添加Rtthread操作系统 组件STM32F10
python多线程编程:如何优雅地关闭线程
数据类型隐式转换导致的阻塞
WebAPi实现多文件上传,并附带参数
from origin ‘null‘ has been blocked by
UE4 蓝图调用C++函数(附带项目工程)
Unity学习笔记(一)结构体的简单理解与应用
【Memory As a Programming Concept in C a
上一篇文章      下一篇文章      查看所有文章
加:2022-04-04 12:42:53  更:2022-04-04 12:45:34 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 18:47:24-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码