110.1 演示环境介绍
- CM版本:5.13.1
- CDH版本:5.13.1
- MariaDB版本:5.5.56
- StreamSets版本:3.1.2.0
110.2 操作演示
1.环境布置
- 把MariaDB的Binlog日志开启
- 修改/etc/my.conf文件,在配置文件mysqld下增加如下配置:
server-id=1
log-bin=mysql-bin
binlog_format=ROW
[root@ip-168-31-16-68 ~]# systemctl restart mariadb
[root@ip-168-31-16-68 ~]# systemctl status mariadb
GRANT ALL on maxwell.* to 'maxwell'@'%' identified by '123456';
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'maxwell'@'%';
FLUSH PRIVILEGES;
- 安装MySQL驱动在StreamSets中
- 把MySQL的JDBC驱动拷贝至/opt/cloudera/parcels/STREAMSETS_DATACOLLECTOR/streamsets-libs/streamsets-datacollector-mysql-binlog-lib/lib目录
- 创建测试表
create database test;
create table cdc_test (
id int,
name varchar(32)
);
create table cdc_test (
id int,
name String,
primary key(id)
)
PARTITION BY HASH PARTITIONS 16
STORED AS KUDU;
2.创建Pipline
- 创建一个新的Pipline
- 选择Origins类别,搜索MySQL Binary Log
- 高级配置,根据需要进行配置
- 添加表过滤的Stream Selector
- 添加插入类型分流的Stream Selector
- 添加处理Delete类型日志的JavaScript Evaluator
- JavaScript Evaluator主要用于解析DELETE类型的Binary Log 日志
- 配置JavaScript脚本
for(var i = 0; i < records.length; i++) {
try {
var newRecord = sdcFunctions.createRecord(true);
newRecord.value = records[i].value['OldData'];
newRecord.value.Type = records[i].value['Type'];
newRecord.value.Database = records[i].value['Database'];
newRecord.value.Table = records[i].value['Table'];
log.info(records[i].value['Type'])
output.write(newRecord);
} catch (e) {
// Send record to error
error.write(records[i], e);
}
}
- 添加处理INSRET和UPDATE类型日志的JavaScript Evaluator
- JavaScript Evaluator主要用于解析INSERT和UPDATE类型的日志
- 配置JavaScript脚本
for(var i = 0; i < records.length; i++) {
try {
var newRecord = sdcFunctions.createRecord(true);
newRecord.value = records[i].value['Data'];
newRecord.value.Type = records[i].value['Type'];
newRecord.value.Database = records[i].value['Database'];
newRecord.value.Table = records[i].value['Table'];
log.info(records[i].value['Type'])
output.write(newRecord);
} catch (e) {
// Send record to error
error.write(records[i], e);
}
}
- 在JavaScript Evaluator-DELETE添加Kudu
- 在JavaScript Evaluator-UPSERT添加Kudu
- 启动Pipelines
3.Pipeline测试
insert into cdc_test values(1, 'fayson');
- 查看Pipeline实时状态
- 查看Kudu表数据
- 数据成功的插入到Kudu的cdc_test表中
- 修改cdc_test表中数据
update cdc_test set name='fayson-update' where id=1;
- 查看Pipeline实时状态
- Kudu-Upsert成功处理了两条数据,这两条数据分别是INSERT和UPDATE
- 查看Kudu的cdc_test表
- 删除cdc_test表中数据
delete from cdc_test where id=1;
- 查看Pipeline实时状态
- Kudu-Delete成功处理一条日志
- 查看Kudu的cdc_test表,id为1的数据已不存在
4.总结 1.在Kudu插入数据时指定Kudu表名需要注意,如使用Impala创建的表,则需要加上impala的前缀格式impala::
2.实现MySQL CDC的前提是需要开启MySQL的Binary Log日志,并且需要创建复制账号,SreamSets中MySQL-Binary Log实际充当的为MySQL的一个Slave
3.向Kudu实时写入数据的前提是Kudu的表已存在,否则无法正常写入数据
4.需要去确保组装的Map数据中Key与Kudu表中的column字段一致
大数据视频推荐: CSDN 大数据语音推荐: 企业级大数据技术应用 大数据机器学习案例之推荐系统 自然语言处理 大数据基础 人工智能:深度学习入门到精通
|