订阅Hologres Binlog
要求
开启Binlog
Hologres默认关闭Binlog,可以通过binlog.level 和binlog.ttl 开启该功能。
列存表开启Binlog的成本大于行存表。
使用限制
- V0.10以下版本,已存在的表无法通过修改表属性的方式开启Binlog,如需开启必须重建表。
- 不支持消费分区表的Binlog。
- 对于更新频繁的场景,建议使用行存表开启Binlog。
使用示例
begin;
create table test_message_src(
id int primary key,
title text not null,
body text);
call set_table_property('test_message_src', 'orientation', 'row');
call set_table_property('test_message_src', 'clustering_key', 'id');
call set_table_property('test_message_src', 'binlog.level', 'replica');
call set_table_property('test_message_src', 'binlog.ttl', '86400');
commit;
参数说明:
binlog.level :是否开启Binlog,replica(开启) 、none(关闭)。binlog.ttl :Binlog的TTL,单位秒。默认30天,即2592000。
按需开启Binlog
HologresV1.1之后版本支持。
-
开启binlog
begin;
call set_table_property('test_message_src', 'binlog.level', 'replica');
commit;
begin;
call set_table_property('test_message_src', 'binlog.ttl', '2592000');
commit;
-
关闭binlog
begin;
call set_table_property('bin_demo', 'binlog.level', 'none');
commit;
-
修改binlog的TTL begin;
call set_table_property('bin_demo', 'binlog.ttl', '8640');
commit;
Binlog格式说明
Binlog字段由Binlog系统字段和用户Table字段组成。如下所示:
字段名称 | 字段类型 | 说明 |
---|
hg_binlog_lsn | BIGINT | Binlog的系统字段,表示Binlog序号。Shard内部单调递增不保证连续,不同Shard之间不保证唯一和有序。 | hg_binlog_event_type | BIGINT | Binlog的系统字段,表示当前Record所表示的修改类型。 | hg_binlog_timestamp_us | BIGINT | Binlog的系统字段,系统时间戳,单位为us。 | user_table_column_1 | 用户自定义 | 用户Table字段。 | … | … | … | user_table_column_n | 用户自定义 | 用户Table字段。 |
注意事项:
- hg_binlog_event_type有四种取值:
- DELETE=2,表示当前Binlog为删除记录。
- INSERT=5,表示当前Binlog为插入新记录。
- BEFORE_UPDATE=3,表示当前Binlog为一条更新前的记录。
- AFTER_UPDATE=7,表示当前Binlog为一条更新后的记录。
- UPDATE操作会产生两条Binlog记录,分别为更新前和更新后的记录。订阅Binlog功能会保证这两条记录是连续的且更新前的Binlog记录在前,更新后的Binlog记录在后。
- 用户字段顺序与DDL定义顺序一致。
Flink和Blink实时消费Hologres Binlog
Flink实时消费Binlog
Flink VVP-2.4及以上版本,支持Hologres Connector实时消费Binlog,如下:
使用DDL语句创建源表
源表 DDL(非CDC模式)
Source消费的Binlog数据作为普通的Flink数据传递给下游节点,所有数据都是作为Insert类型的数据,可以根据业务情况选择如何处理特定hg_binlog_event_type 类型的数据。
create table test_message_src_binlog_table(
hg_binlog_lsn BIGINT,
hg_binlog_event_type BIGINT,
hg_binlog_timestamp_us BIGINT,
id INTEGER,
title VARCHAR,
body VARCHAR
) with (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username'='<yourAccessID>',
'password'='<yourAccessSecret>',
'endpoint'='<yourEndpoint>',
'binlog' = 'true',
'binlogMaxRetryTimes' = '10',
'binlogRetryIntervalMs' = '500',
'binlogBatchReadSize' = '100'
);
源表 DDL(CDC模式)
Source消费的Binlog数据,将根据hg_binlog_event_type 自动为每行数据设置准确的Flink RowKind类型(INTERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER),这样就能完成表的数据的镜像同步,类似MySQL和Postgres的CDC功能。
create table test_message_src_binlog_table(
id INTEGER,
title VARCHAR,
body VARCHAR
) with (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username'='<yourAccessID>',
'password'='<yourAccessSecret>',
'endpoint'='<yourEndpoint>',
'binlog' = 'true',
'cdcMode' = 'true'
'binlogMaxRetryTimes' = '10',
'binlogRetryIntervalMs' = '500',
'binlogBatchReadSize' = '100'
);
参数说明
hg_binlog_xxx开头的三个字段表示Binlog的系统字段,命名和类型不支持修改。其余字段需要和用户字段一一对应,且必须为小写。
参数名称 | 是否必填 | 说明 |
---|
connector | 是 | 源表类型,值填写为hologres。 | dbname | 是 | 读取的Hologres DB名称。 | tablename | 是 | 读取的表名称。 | username | 是 | 当前阿里云账号的AccessKey ID。 | password | 是 | 当前阿里云账号的AccessKey Secret。 | endpoint | 是 | Hologres对应VPC的区域。 | binlog | 是 | 是否为Binlog source。如果需要消费,需要将binlog参数设置为true。 | cdcmode | 否 | 读取Binlog时是否采用CDC模式。如果是CDC模式,需要将cdcmode参数设置为true。 | binlogMaxRetryTimes | 否 | 读取Binlog出错重试次数,默认为60次。 | binlogRetryIntervalMs | 否 | 读取Binlog出错重试间隔,默认为2000ms。 | binlogBatchReadSize | 否 | 读取Binlog批量大小,默认为16个。 | startTime | 否 | 启动位点的时间。如果没有设置该参数,且作业没有从状态恢复,则从最早的Binlog开始消费Hologres数据。格式为yyyy-MM-dd hh:mm:ss。 |
配置Binlog并发
Binlog订阅的并发等于Hologres中Table的Shard个数,请执行如下语句查看Shard数。Binlog并发建议执行计划配置,将其并发数与Binlog对应的Hologres中Table的Shard数保持一致。
select tg.property_value from hologres.hg_table_properties tb join hologres.hg_table_group_properties tg on tb.property_value = tg.tablegroup_name where tb.property_key = 'table_group' and tg.property_key = 'shard_count' and table_name = 'Hologres表名';
Blink实时消费Binlog
Blink 3.7及以上版本,支持Hologres Connector实时消费Binlog。
使用DDL语句创建源表
create table test_message_src_binlog_table(
hg_binlog_lsn BIGINT,
hg_binlog_event_type BIGINT,
hg_binlog_timestamp_us BIGINT,
id INTEGER,
title VARCHAR,
body VARCHAR
) with (
type = 'hologres',
'endpoint' = 'ip:port',
'username' = 'xxxx',
'password' = 'xxxx',
'dbname' = 'xxxx',
'tablename' = 'xxxx',
'binlog' = 'true',
'binlogMaxRetryTimes' = '10',
'binlogRetryIntervalMs' = '500',
'binlogBatchReadSize' = '256'
);
参数说明及Binlog并发配置同Flink。
|