IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 游戏开发 -> Flink订阅及消费Hologres Binlog -> 正文阅读

[游戏开发]Flink订阅及消费Hologres Binlog

订阅Hologres Binlog

要求

  • Hologres V0.9及以上版本。

开启Binlog

Hologres默认关闭Binlog,可以通过binlog.levelbinlog.ttl开启该功能。

列存表开启Binlog的成本大于行存表。

使用限制

  • V0.10以下版本,已存在的表无法通过修改表属性的方式开启Binlog,如需开启必须重建表。
  • 不支持消费分区表的Binlog。
  • 对于更新频繁的场景,建议使用行存表开启Binlog。

使用示例

begin;
create table test_message_src(
  id int primary key, 
  title text not null, 
  body text);
call set_table_property('test_message_src', 'orientation', 'row');--创建行存表test_message_src
call set_table_property('test_message_src', 'clustering_key', 'id');--在id列建立聚簇索引
call set_table_property('test_message_src', 'binlog.level', 'replica');--设置表属性开启Binlog功能
call set_table_property('test_message_src', 'binlog.ttl', '86400');--binlog.ttl,Binlog的TTL,单位为秒
commit;

参数说明:

  • binlog.level:是否开启Binlog,replica(开启) 、none(关闭)。
  • binlog.ttl:Binlog的TTL,单位秒。默认30天,即2592000。

按需开启Binlog

HologresV1.1之后版本支持。

  • 开启binlog

    -- 设置表属性开启Binlog功能
    begin;
    call set_table_property('test_message_src', 'binlog.level', 'replica');
    commit;
    
    -- 设置表属性,配置Binlog TTL时间,单位秒
    begin;
    call set_table_property('test_message_src', 'binlog.ttl', '2592000');
    commit;
    
  • 关闭binlog

    -- 设置表属性关闭Binlog功能
    begin; 
    call set_table_property('bin_demo', 'binlog.level', 'none'); 
    commit; 
    
  • 修改binlog的TTL

    begin; 
    call set_table_property('bin_demo', 'binlog.ttl', '8640'); --单位秒 
    commit;
    

Binlog格式说明

Binlog字段由Binlog系统字段和用户Table字段组成。如下所示:

字段名称字段类型说明
hg_binlog_lsnBIGINTBinlog的系统字段,表示Binlog序号。Shard内部单调递增不保证连续,不同Shard之间不保证唯一和有序。
hg_binlog_event_typeBIGINTBinlog的系统字段,表示当前Record所表示的修改类型。
hg_binlog_timestamp_usBIGINTBinlog的系统字段,系统时间戳,单位为us。
user_table_column_1用户自定义用户Table字段。
user_table_column_n用户自定义用户Table字段。

注意事项:

  • hg_binlog_event_type有四种取值:
    • DELETE=2,表示当前Binlog为删除记录。
    • INSERT=5,表示当前Binlog为插入新记录。
    • BEFORE_UPDATE=3,表示当前Binlog为一条更新前的记录。
    • AFTER_UPDATE=7,表示当前Binlog为一条更新后的记录。
  • UPDATE操作会产生两条Binlog记录,分别为更新前和更新后的记录。订阅Binlog功能会保证这两条记录是连续的且更新前的Binlog记录在前,更新后的Binlog记录在后。
  • 用户字段顺序与DDL定义顺序一致。

Flink和Blink实时消费Hologres Binlog

Flink实时消费Binlog

Flink VVP-2.4及以上版本,支持Hologres Connector实时消费Binlog,如下:

使用DDL语句创建源表

源表 DDL(非CDC模式)

Source消费的Binlog数据作为普通的Flink数据传递给下游节点,所有数据都是作为Insert类型的数据,可以根据业务情况选择如何处理特定hg_binlog_event_type类型的数据。

create table test_message_src_binlog_table(
  hg_binlog_lsn BIGINT,
  hg_binlog_event_type BIGINT,
  hg_binlog_timestamp_us BIGINT,
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) with (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username'='<yourAccessID>',
  'password'='<yourAccessSecret>',
  'endpoint'='<yourEndpoint>',
  'binlog' = 'true',
  'binlogMaxRetryTimes' = '10',
  'binlogRetryIntervalMs' = '500',
  'binlogBatchReadSize' = '100'
);
源表 DDL(CDC模式)

Source消费的Binlog数据,将根据hg_binlog_event_type自动为每行数据设置准确的Flink RowKind类型(INTERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER),这样就能完成表的数据的镜像同步,类似MySQL和Postgres的CDC功能。

create table test_message_src_binlog_table(
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) with (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username'='<yourAccessID>',
  'password'='<yourAccessSecret>',
  'endpoint'='<yourEndpoint>',
  'binlog' = 'true',
  'cdcMode' = 'true'
  'binlogMaxRetryTimes' = '10',
  'binlogRetryIntervalMs' = '500',
  'binlogBatchReadSize' = '100'
);
参数说明

hg_binlog_xxx开头的三个字段表示Binlog的系统字段,命名和类型不支持修改。其余字段需要和用户字段一一对应,且必须为小写。

参数名称是否必填说明
connector源表类型,值填写为hologres。
dbname读取的Hologres DB名称。
tablename读取的表名称。
username当前阿里云账号的AccessKey ID。
password当前阿里云账号的AccessKey Secret。
endpointHologres对应VPC的区域。
binlog是否为Binlog source。如果需要消费,需要将binlog参数设置为true
cdcmode读取Binlog时是否采用CDC模式。如果是CDC模式,需要将cdcmode参数设置为true
binlogMaxRetryTimes读取Binlog出错重试次数,默认为60次。
binlogRetryIntervalMs读取Binlog出错重试间隔,默认为2000ms。
binlogBatchReadSize读取Binlog批量大小,默认为16个。
startTime启动位点的时间。如果没有设置该参数,且作业没有从状态恢复,则从最早的Binlog开始消费Hologres数据。格式为yyyy-MM-dd hh:mm:ss。

配置Binlog并发

Binlog订阅的并发等于Hologres中Table的Shard个数,请执行如下语句查看Shard数。Binlog并发建议执行计划配置,将其并发数与Binlog对应的Hologres中Table的Shard数保持一致。

select tg.property_value from hologres.hg_table_properties tb join hologres.hg_table_group_properties tg on tb.property_value = tg.tablegroup_name where tb.property_key = 'table_group' and tg.property_key = 'shard_count' and table_name = 'Hologres表名';

Blink实时消费Binlog

Blink 3.7及以上版本,支持Hologres Connector实时消费Binlog。

使用DDL语句创建源表

create table test_message_src_binlog_table(
  hg_binlog_lsn BIGINT,
  hg_binlog_event_type BIGINT,
  hg_binlog_timestamp_us BIGINT,
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) with (
  type = 'hologres',
  'endpoint' = 'ip:port',--Hologres的vpc网络地址
  'username' = 'xxxx',--当前账号的AccessKey ID
  'password' = 'xxxx',--当前账号的AccessKey Secret
  'dbname' = 'xxxx',--Hologres的DB名
  'tablename' = 'xxxx',--Hologres的表名
  'binlog' = 'true',
  'binlogMaxRetryTimes' = '10',
  'binlogRetryIntervalMs' = '500',
  'binlogBatchReadSize' = '256'
);

参数说明及Binlog并发配置同Flink。

  游戏开发 最新文章
6、英飞凌-AURIX-TC3XX: PWM实验之使用 GT
泛型自动装箱
CubeMax添加Rtthread操作系统 组件STM32F10
python多线程编程:如何优雅地关闭线程
数据类型隐式转换导致的阻塞
WebAPi实现多文件上传,并附带参数
from origin ‘null‘ has been blocked by
UE4 蓝图调用C++函数(附带项目工程)
Unity学习笔记(一)结构体的简单理解与应用
【Memory As a Programming Concept in C a
上一篇文章      下一篇文章      查看所有文章
加:2022-03-24 00:54:10  更:2022-03-24 00:55:07 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/29 2:37:02-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码