开发环境准备
pom.xml
<properties>
<java.version>1.8</java.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
<flink.version>1.13.0</flink.version>
<scala.version>2.12</scala.version>
<hadoop.version>3.1.3</hadoop.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.68</version>
</dependency>
<!--如果保存检查点到hdfs上,需要引入此依赖-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.20</version>
</dependency>
<!--Flink默认使用的是slf4j记录日志,相当于一个日志的接口,我们这里使用log4j作为具体的日志实现-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
<version>2.14.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
<exclude>org.apache.hadoop:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. -->
<!-- 打包时不复制META-INF下的签名文件,避免报非法签名文件的SecurityExceptions异常-->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers combine.children="append">
<!-- The service transformer is needed to merge META-INF/services files -->
<!-- connector和format依赖的工厂类打包时会相互覆盖,需要使用ServicesResourceTransformer解决-->
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
log4j.properties
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %10p (%c:%M) - %m%n
log4j.rootLogger=error,stdout
hbase-site.xml
使用phoenix的时候映射问题配置文件
<configuration>
<!-- 注意:为了开启hbase的namespace和phoenix的schema的映射,在程序中需要加这个配置文件,另外在linux服务上,也需要在hbase以及phoenix的hbase-site.xml配置文件中,加上以上两个配置,并使用xsync进行同步-->
<property>
<name>phoenix.schema.isNamespaceMappingEnabled</name>
<value>true</value>
</property>
<property>
<name>phoenix.schema.mapSystemTablesToNamespace</name>
<value>true</value>
</property>
</configuration>
phoenix客户端依赖?
<!-- phoenix客户端-->
<dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-spark</artifactId>
<version>5.0.0-HBase-2.0</version>
<exclusions>
<exclusion>
<groupId>org.glassfish</groupId>
<artifactId>javax.el</artifactId>
</exclusion>
</exclusions>
</dependency>
信息采集
Flume数据采集
采集配置文件
采集日志文件到kafka的模板配置文件
#为各组件命名
a1.sources = r1
a1.channels = c1
#描述source
a1.sources.r1.type = TAILDIR
#读取文件位置的配置
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /home/bigdata/mock/applog/app.*
#保存日志文件读取位置偏移量的保存文件
a1.sources.r1.positionFile = /home/bigdata/mock/applog/taildir_position.json
#这里是自己写的一个拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.flume.inter.ETLInterceptor$Builder
#描述channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = master:9092,node1:9092,node2:9092
a1.channels.c1.kafka.topic = topic_logg
#表示只要Event里面的信息
a1.channels.c1.parseAsFlumeEvent = false
#绑定source和channel以及sink和channel的关系
a1.sources.r1.channels = c1
kafka到hdfs的模板配置
## 组件
a1.sources=r1
a1.channels=c1
a1.sinks=k1
## source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = master:9092,node1:9092,node2:9092
a1.sources.r1.kafka.topics=topic_log
#拦截器解决零点漂移问题,要自己写
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.flume.inter.TimeStampInterceptor$Builder
## channel1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /home/bigdata/flume/flume-1.9.0/data/behavior1
a1.channels.c1.dataDirs = /home/bigdata/flume/flume-1.9.0/data/dataDirs/
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6
## sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = log-
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
## 控制输出文件是原生文件。
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip
## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1
启动示例
nohup /home/bigdata/flume/flume-1.9.0/bin/flume-ng agent --conf /home/bigdata/flume/flume-1.9.0/conf --conf-file /home/bigdata/flume/flume-1.9.0/myagentconf/shucan/file-to-kafka.conf --name a1 -Dflume.root.logger=INFO,console >/dev/null 2>&1 &
集群启停脚本
日志到kafka
#!/bin/bash
case $1 in
"start"){
for i in node1 node2
do
echo " --------启动 $i 采集flume-------"
ssh $i "nohup /home/bigdata/flume/flume-1.9.0/bin/flume-ng agent --conf /home/bigdata/flume/flume-1.9.0/conf --conf-file /home/bigdata/flume/flume-1.9.0/myagentconf/shucan/file-to-kafka.conf --name a1 -Dflume.root.logger=INFO,console >/dev/null 2>&1 &"
done
};;
"stop"){
for i in node1 node2
do
echo " --------停止 $i 采集flume-------"
ssh $i " ps -ef | grep file-to-kafka.conf | grep -v grep |awk '{print \$2}' | xargs -n1 kill -9 "
done
};;
esac
kafka到hdfs
#!/bin/bash
case $1 in
"start"){
for i in node2
do
echo " --------启动 $i 采集flume-------"
ssh $i "nohup /home/bigdata/flume/flume-1.9.0/bin/flume-ng agent --conf /home/bigdata/flume/flume-1.9.0/conf --conf-file /home/bigdata/flume/flume-1.9.0/myagentconf/shucan/kafka-to-hdfs.conf --name a1 -Dflume.root.logger=INFO,console >/dev/null 2>&1 &"
done
};;
"stop"){
for i in node2
do
echo " --------停止 $i 采集flume-------"
ssh $i " ps -ef | grep kafka-to-hdfs.conf | grep -v grep |awk '{print \$2}' | xargs -n1 kill -9 "
done
};;
esac
消费测试
./kafka-console-consumer.sh --bootstrap-server master:9092 --topic topic_logg
MaxWell采集业务数据
启动
#!/bin/bash
/home/bigdata/maxwell/maxwell-1.29.2/bin/maxwell --config /home/bigdata/maxwell/maxwell-1.29.2/config.properties --daemon
配置文件在我工具专栏里面有配置,就是监听到数据库的数据放到一个topic_db主题里面,后面可以使用Flink接收topic_db的数据,然后对于不同表的数据分流到不同的主题里面,maxwell自己也有对于数据库里面的表按表名分布到不同的主题,使用Flink分流的好处就是可以自己控制分流到不同的主题
Maxwell获取binlog以表名到不同主题的参考配置
producer=kafka
kafka.bootstrap.servers=master:9092,node1:9092,node2:9092
kafka_topic=%{table}
#表过滤,只同步特定的13张表
filter= include:gmall.cart_info,include:gmall.comment_info,include:gmall.coupon_use,include:gmall.favor_info,include:gmall.order_detail,include:gmall.order_detail_activity,include:gmall.order_detail_coupon,include:gmall.order_info,include:gmall.order_refund_info,include:gmall.order_status_log,include:gmall.payment_info,include:gmall.refund_payment,include:gmall.user_info
只保存到一个主题
# tl;dr config
log_level=info
producer=kafka
kafka.bootstrap.servers=master:9092,node1:9092,node2:9092
#需要添加
kafka_topic=topic_db
# mysql login info
host=master
user=root
password=root
schema_database=shishimaxwell
#需要添加 后续初始化会用
client_id=maxwell_1
producer_partition_by=random
采集结果(ODS层数据采集完成)
DIM层实现?
前置模板代码
消费kafka
KafkaUtil(kafka工具类)
public class KafkaUtil {
static String BOOTSTRAP_SERVERS = "master:9092, node1:9092, node2:9092";
public static FlinkKafkaConsumer<String> getKafkaConsumer(String topic, String groupId) {
Properties prop = new Properties();
prop.setProperty("bootstrap.servers", BOOTSTRAP_SERVERS);
prop.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(topic,
//由于默认的解码器,如果字符串为空的时候他会保存,所以自定义一个
new KafkaDeserializationSchema<String>() {
@Override
public boolean isEndOfStream(String nextElement) {
return false;
}
@Override
public String deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
if(record == null || record.value() == null) {
return "";
}
return new String(record.value());
}
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
}, prop);
return consumer;
}
}
应用消费kafka
DimSinkApp(消费kafka)
public class DimSinkApp {
public static void main(String[] args) throws Exception {
// TODO 1. 环境准备
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//这里和kafka的分区保持一致
env.setParallelism(4);
// TODO 2. 状态后端设置
env.enableCheckpointing(3000L, CheckpointingMode.EXACTLY_ONCE);
//检查点超时时间
env.getCheckpointConfig().setCheckpointTimeout(60 * 1000L);
//两次检查点最小间隔时间,就是第一次检查点完成以后,最少经过3s钟开始检查点
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);
env.getCheckpointConfig().enableExternalizedCheckpoints(
// ExternalizedCheckpointCleanup用于指定当job canceled的时候externalized checkpoint该如何清理
// DELETE_ON_CANCELLATION的话,在job canceled的时候会自动删除externalized state
// RETAIN_ON_CANCELLATION则在job canceled的时候会保留externalized checkpoint state
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);
env.setRestartStrategy(RestartStrategies.failureRateRestart(
// 设置任务失败重启 允许任务失败最大次数 10次
10,
// 任务失败的时间启动的间隔
Time.of(1L, TimeUnit.MINUTES),
// 允许任务延迟时间
Time.of(3L, TimeUnit.MINUTES)
));
//设置状态后端
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage("hdfs://master:8020/gmall/ck");
System.setProperty("HADOOP_USER_NAME", "bigdata");
// TODO 3. 读取业务主流
String topic = "topic_db";
String groupId = "dim_sink_app";
//提供主题和消费者组得到kafkaDataStream
DataStreamSource<String> gmallDS = env.addSource(KafkaUtil.getKafkaConsumer(topic, groupId));
// TODO 4. 主流数据结构转换
SingleOutputStreamOperator<JSONObject> jsonDS = gmallDS.map(JSON::parseObject);
// TODO 5. 主流 ETL
SingleOutputStreamOperator<JSONObject> filterDS = jsonDS.filter(
jsonObj ->
{
try {
jsonObj.getJSONObject("data");
//这里是由于maxwell使用bootstrap的时候开始的数据和结束的数据过滤掉
if(jsonObj.getString("type").equals("bootstrap-start")
|| jsonObj.getString("type").equals("bootstrap-complete"||jsonObj.getString("type").equals("delete"))) {
return false;
}
return true;
} catch (JSONException jsonException) {
return false;
}
});
// 打印测试
filterDS.print("filterDS >>> ");
env.execute();
}
}
FlinkCDC的使用
创建一个mysql数据库避免和其他数据干扰
我使用的是docker大家根据自己的情况创建一个Mysql
sudo docker run -itd --name flinkcdc -p 3307:3306 -e MYSQL_ROOT_PASSWORD=root --restart=always mysql:5.7
?配置FlinkCDC监听的表
create database gmall_config
use gmall_config
CREATE TABLE `table_process` (
`source_table` varchar(200) NOT NULL COMMENT '来源表',
`sink_table` varchar(200) DEFAULT NULL COMMENT '输出表',
`sink_columns` varchar(2000) DEFAULT NULL COMMENT '输出字段',
`sink_pk` varchar(200) DEFAULT NULL COMMENT '主键字段',
`sink_extend` varchar(200) DEFAULT NULL COMMENT '建表扩展',
PRIMARY KEY (`source_table`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
示例
/*Data for the table `table_process` */
INSERT INTO `table_process`(`source_table`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('activity_info', 'dim_activity_info', 'id,activity_name,activity_type,activity_desc,start_time,end_time,create_time', 'id', NULL);
INSERT INTO `table_process`(`source_table`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('activity_rule', 'dim_activity_rule', 'id,activity_id,activity_type,condition_amount,condition_num,benefit_amount,benefit_discount,benefit_level', 'id', NULL);
INSERT INTO `table_process`(`source_table`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('activity_sku', 'dim_activity_sku', 'id,activity_id,sku_id,create_time', 'id', NULL);
INSERT INTO `table_process`(`source_table`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('base_category1', 'dim_base_category1', 'id,name', 'id', NULL);
INSERT INTO `table_process`(`source_table`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('base_category2', 'dim_base_category2', 'id,name,category1_id', 'id', NULL);
INSERT INTO `table_process`(`source_table`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('base_category3', 'dim_base_category3', 'id,name,category2_id', 'id', NULL);
INSERT INTO `table_process`(`source_table`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('base_province', 'dim_base_province', 'id,name,region_id,area_code,iso_code,iso_3166_2', NULL, NULL);
INSERT INTO `table_process`(`source_table`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('base_region', 'dim_base_region', 'id,region_name', NULL, NULL);
INSERT INTO `table_process`(`source_table`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('base_trademark', 'dim_base_trademark', 'id,tm_name', 'id', NULL);
INSERT INTO `table_process`(`source_table`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('coupon_info', 'dim_coupon_info', 'id,coupon_name,coupon_type,condition_amount,condition_num,activity_id,benefit_amount,benefit_discount,create_time,range_type,limit_num,taken_count,start_time,end_time,operate_time,expire_time,range_desc', 'id', NULL);
INSERT INTO `table_process`(`source_table`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('coupon_range', 'dim_coupon_range', 'id,coupon_id,range_type,range_id', 'id', NULL);
INSERT INTO `table_process`(`source_table`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('financial_sku_cost', 'dim_financial_sku_cost', 'id,sku_id,sku_name,busi_date,is_lastest,sku_cost,create_time', 'id', NULL);
INSERT INTO `table_process`(`source_table`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('sku_info', 'dim_sku_info', 'id,spu_id,price,sku_name,sku_desc,weight,tm_id,category3_id,sku_default_img,is_sale,create_time', 'id', ' SALT_BUCKETS = 4');
INSERT INTO `table_process`(`source_table`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('spu_info', 'dim_spu_info', 'id,spu_name,description,category3_id,tm_id','id', 'SALT_BUCKETS = 3');
INSERT INTO `table_process`(`source_table`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('user_info', 'dim_user_info', 'id,login_name,name,user_level,birthday,gender,create_time,operate_time', 'id', ' SALT_BUCKETS = 3');
结果
进入容器开启mysql的binlog
sudo docker exec -it 1b955 /bin/bash
vi /etc/mysql/my.cnf
[mysqld]
server-id = 1
log-bin=mysql-bin
binlog_format=row
binlog-do-db=gmall_config
?重启Mysql
sudo docker restart 1b9550d
查看mysql的binlog开启情况
show master status
导入依赖
<!-- FlinkCDC的相关依赖-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.1.0</version>
</dependency>
<!-- 如果不引入 flink-table 相关依赖,则会报错:
Caused by: java.lang.ClassNotFoundException:
org.apache.flink.connector.base.source.reader.RecordEmitter
引入以下依赖可以解决这个问题(引入某些其它的 flink-table相关依赖也可)
-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.12</artifactId>
<version>1.13.0</version>
</dependency>
?使用FinkCDC例子
public class FlinkCDCceshi {
public static void main(String[] args) throws Exception {
// TODO 1. 准备流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// TODO 2. 开启检查点 Flink-CDC将读取binlog的位置信息以状态的方式保存在CK,如果想要做到断点续传,
// 需要从Checkpoint或者Savepoint启动程序
// 2.1 开启Checkpoint,每隔5秒钟做一次CK ,并指定CK的一致性语义
env.enableCheckpointing(3000L, CheckpointingMode.EXACTLY_ONCE);
// 2.2 设置超时时间为 1 分钟
env.getCheckpointConfig().setCheckpointTimeout(60 * 1000L);
// 2.3 设置两次重启的最小时间间隔
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);
// 2.4 设置任务关闭的时候保留最后一次 CK 数据
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 2.5 指定从 CK 自动重启策略
env.setRestartStrategy(RestartStrategies.failureRateRestart(
3, Time.days(1L), Time.minutes(1L)
));
// 2.6 设置状态后端
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage(
"hdfs://master:8020/flinkCDC"
);
// 2.7 设置访问HDFS的用户名
System.setProperty("HADOOP_USER_NAME", "bigdata");
// TODO 3. 创建 Flink-MySQL-CDC 的 Source
// initial:Performs an initial snapshot on the monitored database tables upon first startup, and continue to read the latest binlog.
// earliest:Never to perform snapshot on the monitored database tables upon first startup, just read from the beginning of the binlog. This should be used with care, as it is only valid when the binlog is guaranteed to contain the entire history of the database.
// latest:Never to perform snapshot on the monitored database tables upon first startup, just read from the end of the binlog which means only have the changes since the connector was started.
// specificOffset:Never to perform snapshot on the monitored database tables upon first startup, and directly read binlog from the specified offset.
// timestamp:Never to perform snapshot on the monitored database tables upon first startup, and directly read binlog from the specified timestamp.The consumer will traverse the binlog from the beginning and ignore change events whose timestamp is smaller than the specified timestamp.
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("master")
.port(3307)
.databaseList("gmall_config") // set captured database
.tableList("gmall_config.table_process") // set captured table
.username("root")
.password("root")
.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
.startupOptions(StartupOptions.initial())
.build();
// TODO 4.使用CDC Source从MySQL读取数据
DataStreamSource<String> mysqlDS =
env.fromSource(
mySqlSource,
WatermarkStrategy.noWatermarks(),
"MysqlSource");
// TODO 5.打印输出
mysqlDS.print();
// TODO 6.执行任务
env.execute();
}
}
控制台打印结果
1> {"before":null,"after":{"source_table":"base_trademark","sink_table":"dim_base_trademark","sink_columns":"id,tm_name","sink_pk":"id","sink_extend":null},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1658128357407,"snapshot":"false","db":"gmall_config","sequence":null,"table":"table_process","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1658128357407,"transaction":null}
1> {"before":null,"after":{"source_table":"spu_info","sink_table":"dim_spu_info","sink_columns":"id,spu_name,description,category3_id,tm_id","sink_pk":"id","sink_extend":"SALT_BUCKETS = 3"},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1658128357408,"snapshot":"false","db":"gmall_config","sequence":null,"table":"table_process","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1658128357408,"transaction":null}
1> {"before":null,"after":{"source_table":"base_province","sink_table":"dim_base_province","sink_columns":"id,name,region_id,area_code,iso_code,iso_3166_2","sink_pk":null,"sink_extend":null},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1658128357407,"snapshot":"false","db":"gmall_config","sequence":null,"table":"table_process","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1658128357407,"transaction":null}
1> {"before":null,"after":{"source_table":"coupon_range","sink_table":"dim_coupon_range","sink_columns":"id,coupon_id,range_type,range_id","sink_pk":"id","sink_extend":null},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1658128357407,"snapshot":"false","db":"gmall_config","sequence":null,"table":"table_process","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1658128357407,"transaction":null}
1> {"before":null,"after":{"source_table":"activity_info","sink_table":"dim_activity_info","sink_columns":"id,activity_name,activity_type,activity_desc,start_time,end_time,create_time","sink_pk":"id","sink_extend":null},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1658128357403,"snapshot":"false","db":"gmall_config","sequence":null,"table":"table_process","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1658128357406,"transaction":null}
1> {"before":null,"after":{"source_table":"sku_info","sink_table":"dim_sku_info","sink_columns":"id,spu_id,price,sku_name,sku_desc,weight,tm_id,category3_id,sku_default_img,is_sale,create_time","sink_pk":"id","sink_extend":" SALT_BUCKETS = 4"},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1658128357408,"snapshot":"false","db":"gmall_config","sequence":null,"table":"table_process","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1658128357408,"transaction":null}
1> {"before":null,"after":{"source_table":"base_category1","sink_table":"dim_base_category1","sink_columns":"id,name","sink_pk":"id","sink_extend":null},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1658128357406,"snapshot":"false","db":"gmall_config","sequence":null,"table":"table_process","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1658128357406,"transaction":null}
1> {"before":null,"after":{"source_table":"base_category2","sink_table":"dim_base_category2","sink_columns":"id,name,category1_id","sink_pk":"id","sink_extend":null},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1658128357407,"snapshot":"false","db":"gmall_config","sequence":null,"table":"table_process","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1658128357407,"transaction":null}
1> {"before":null,"after":{"source_table":"financial_sku_cost","sink_table":"dim_financial_sku_cost","sink_columns":"id,sku_id,sku_name,busi_date,is_lastest,sku_cost,create_time","sink_pk":"id","sink_extend":null},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1658128357407,"snapshot":"false","db":"gmall_config","sequence":null,"table":"table_process","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1658128357407,"transaction":null}
1> {"before":null,"after":{"source_table":"base_category3","sink_table":"dim_base_category3","sink_columns":"id,name,category2_id","sink_pk":"id","sink_extend":null},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1658128357407,"snapshot":"false","db":"gmall_config","sequence":null,"table":"table_process","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1658128357407,"transaction":null}
1> {"before":null,"after":{"source_table":"coupon_info","sink_table":"dim_coupon_info","sink_columns":"id,coupon_name,coupon_type,condition_amount,condition_num,activity_id,benefit_amount,benefit_discount,create_time,range_type,limit_num,taken_count,start_time,end_time,operate_time,expire_time,range_desc","sink_pk":"id","sink_extend":null},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1658128357407,"snapshot":"false","db":"gmall_config","sequence":null,"table":"table_process","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1658128357407,"transaction":null}
1> {"before":null,"after":{"source_table":"base_region","sink_table":"dim_base_region","sink_columns":"id,region_name","sink_pk":null,"sink_extend":null},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1658128357407,"snapshot":"false","db":"gmall_config","sequence":null,"table":"table_process","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1658128357407,"transaction":null}
1> {"before":null,"after":{"source_table":"activity_sku","sink_table":"dim_activity_sku","sink_columns":"id,activity_id,sku_id,create_time","sink_pk":"id","sink_extend":null},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1658128357406,"snapshot":"false","db":"gmall_config","sequence":null,"table":"table_process","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1658128357406,"transaction":null}
1> {"before":null,"after":{"source_table":"activity_rule","sink_table":"dim_activity_rule","sink_columns":"id,activity_id,activity_type,condition_amount,condition_num,benefit_amount,benefit_discount,benefit_level","sink_pk":"id","sink_extend":null},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1658128357406,"snapshot":"false","db":"gmall_config","sequence":null,"table":"table_process","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1658128357406,"transaction":null}
1> {"before":null,"after":{"source_table":"user_info","sink_table":"dim_user_info","sink_columns":"id,login_name,name,user_level,birthday,gender,create_time,operate_time","sink_pk":"id","sink_extend":" SALT_BUCKETS = 3"},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1658128357408,"snapshot":"false","db":"gmall_config","sequence":null,"table":"table_process","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1658128357408,"transaction":null}
修改一条数据后,控制台打印
{"before":{"source_table":"activity_info","sink_table":"dim_activity_info","sink_columns":"id,activity_name,activity_type,activity_desc,start_time,end_time,create_time","sink_pk":"id","sink_extend":null},"after":null,"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1658130300000,"snapshot":"false","db":"gmall_config","sequence":null,"table":"table_process","server_id":1,"gtid":null,"file":"mysql-bin.000001","pos":2011,"row":0,"thread":null,"query":null},"op":"d","ts_ms":1658130301002,"transaction":null}
{"before":null,"after":{"source_table":"activity_infoo","sink_table":"dim_activity_info","sink_columns":"id,activity_name,activity_type,activity_desc,start_time,end_time,create_time","sink_pk":"id","sink_extend":null},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1658130300000,"snapshot":"false","db":"gmall_config","sequence":null,"table":"table_process","server_id":1,"gtid":null,"file":"mysql-bin.000001","pos":2011,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1658130301002,"transaction":null}
根据得到binlog的信息拼接成Phoenix建表语句
创建一个对应数据库的javabean
@Data
public class TableProcess {
//来源表
String sourceTable;
//输出表
String sinkTable;
//输出字段
String sinkColumns;
//主键字段
String sinkPk;
//建表扩展
String sinkExtend;
}
下面就能够根据binlog得到拼接语句
public class FlinkCDCceshi {
public static void main(String[] args) throws Exception {
// TODO 1. 准备流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// TODO 2. 开启检查点 Flink-CDC将读取binlog的位置信息以状态的方式保存在CK,如果想要做到断点续传,
// 需要从Checkpoint或者Savepoint启动程序
// 2.1 开启Checkpoint,每隔5秒钟做一次CK ,并指定CK的一致性语义
env.enableCheckpointing(3000L, CheckpointingMode.EXACTLY_ONCE);
// 2.2 设置超时时间为 1 分钟
env.getCheckpointConfig().setCheckpointTimeout(60 * 1000L);
// 2.3 设置两次重启的最小时间间隔
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);
// 2.4 设置任务关闭的时候保留最后一次 CK 数据
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 2.5 指定从 CK 自动重启策略
env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.days(1L), Time.minutes(1L)));
// 2.6 设置状态后端
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage("hdfs://master:8020/flinkCDC");
// 2.7 设置访问HDFS的用户名
System.setProperty("HADOOP_USER_NAME", "bigdata");
// TODO 3. 创建 Flink-MySQL-CDC 的 Source
// initial:Performs an initial snapshot on the monitored database tables upon first startup, and continue to read the latest binlog.
// earliest:Never to perform snapshot on the monitored database tables upon first startup, just read from the beginning of the binlog. This should be used with care, as it is only valid when the binlog is guaranteed to contain the entire history of the database.
// latest:Never to perform snapshot on the monitored database tables upon first startup, just read from the end of the binlog which means only have the changes since the connector was started.
// specificOffset:Never to perform snapshot on the monitored database tables upon first startup, and directly read binlog from the specified offset.
// timestamp:Never to perform snapshot on the monitored database tables upon first startup, and directly read binlog from the specified timestamp.The consumer will traverse the binlog from the beginning and ignore change events whose timestamp is smaller than the specified timestamp.
MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("master").port(3307).databaseList("gmall_config") // set captured database
.tableList("gmall_config.table_process") // set captured table
.username("root").password("root").deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
.startupOptions(StartupOptions.initial()).build();
// TODO 4.使用CDC Source从MySQL读取数据
DataStreamSource<String> mysqlDS = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MysqlSource");
// TODO 5.打印输出
mysqlDS.process(new ProcessFunction<String, String>() {
// 定义Phoenix的连接
// private Connection conn;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// Class.forName(GmallConfig.PHOENIX_DRIVER);
// conn = DriverManager.getConnection(GmallConfig.PHOENIX_SERVER);
}
@Override
public void processElement(String binlogData, ProcessFunction<String, String>.Context ctx, Collector<String> out) throws Exception {
JSONObject jsonObj = JSON.parseObject(binlogData);
TableProcess config = jsonObj.getObject("after", TableProcess.class);
String sinkTable = config.getSinkTable();
String sinkColumns = config.getSinkColumns();
String sinkPk = config.getSinkPk();
String sinkExtend = config.getSinkExtend();
checkTable(sinkTable, sinkColumns, sinkPk, sinkExtend);
}
private void checkTable(String sinkTable, String sinkColumns, String sinkPk, String sinkExtend) {
// 封装建表 SQL
StringBuilder sql = new StringBuilder();
sql.append("create table if not exists " + GmallConfig.HBASE_SCHEMA + "." + sinkTable + "(\n");
String[] columnArr = sinkColumns.split(",");
// 为主键及扩展字段赋默认值
if (sinkPk == null) {
sinkPk = "id";
}
if (sinkExtend == null) {
sinkExtend = "";
}
// 遍历添加字段信息
for (int i = 0; i < columnArr.length; i++) {
sql.append(columnArr[i] + " varchar");
// 判断当前字段是否为主键
if (sinkPk.equals(columnArr[i])) {
sql.append(" primary key");
}
// 如果当前字段不是最后一个字段,则追加","
if (i < columnArr.length - 1) {
sql.append(",\n");
}
}
sql.append(")");
sql.append(sinkExtend);
String createStatement = sql.toString();
System.out.println(sql);
// 为数据库操作对象赋默认值,执行建表 SQL
PreparedStatement preparedSt = null;
// try {
// preparedSt = conn.prepareStatement(createStatement);
// preparedSt.execute();
// } catch (SQLException sqlException) {
// sqlException.printStackTrace();
// System.out.println("建表语句\n" + createStatement + "\n执行异常");
// } finally {
// if (preparedSt != null) {
// try {
// preparedSt.close();
// } catch (SQLException sqlException) {
// sqlException.printStackTrace();
// throw new RuntimeException("数据库操作对象释放异常");
// }
// }
// }
}
}).print();
// TODO 6.执行任务
env.execute();
}
}
得到的输出结果
create table if not exists GMALL_REALTIME.dim_spu_info(
id varchar primary key,
spu_name varchar,
description varchar,
category3_id varchar,
tm_id varchar)SALT_BUCKETS = 3
广播流前置知识
public class BroadCastTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
//主流的数据
DataStreamSource<Integer> initData = env.fromElements(1, 2, 3, 4, 5, 6,7,8,9,10);
//TODO 定义广播状态描述器,它用于获取广播变量,或者可以说可以用来中间存储的过程
MapStateDescriptor<String, String> stateDescriptor = new MapStateDescriptor<>("my-mapstate", String.class, String.class);
DataStreamSource<String> broadCast = env.fromElements("a", "b", "c");
//得到广播流,把状态描述器传入广播流参数,后面才能在主流和广播流connect起到存储的作用
BroadcastStream<String> broadcast = broadCast.broadcast(stateDescriptor);
//这里用主流和广播流合流
BroadcastConnectedStream<Integer, String> connect = initData.connect(broadcast);
connect.process(new BroadcastProcessFunction<Integer, String, String>() {
@Override
public void processElement(Integer value, BroadcastProcessFunction<Integer, String, String>.ReadOnlyContext ctx, Collector<String> out) throws Exception {
ReadOnlyBroadcastState<String, String> broadcastState = ctx.getBroadcastState(stateDescriptor);
//TODO 得到广播变量的数据,下面在状态里面保存了值,这里就可以拿出来
String a = broadcastState.get("a");
String b = broadcastState.get("b");
String c = broadcastState.get("c");
System.out.println("我是initData: "+value+"得到的广播变量: "+a+b+c);
}
@Override
public void processBroadcastElement(String value, Context ctx, Collector<String> out) throws Exception {
//TODO 合流以后就能得到上面声明的广播状态描述器拿到广播状态
BroadcastState<String, String> broadcastState = ctx.getBroadcastState(stateDescriptor);
//把数据添加到广播变量里面
//a a
//b b
//c c
broadcastState.put(value,value);
}
}).print();
env.execute();
}
}
得到的结果为
我是initData: 2得到的广播变量: nullnullnull
我是initData: 1得到的广播变量: nullnullnull
我是initData: 4得到的广播变量: anullnull
我是initData: 3得到的广播变量: anullnull
我是initData: 6得到的广播变量: abnull
我是initData: 5得到的广播变量: abnull
我是initData: 8得到的广播变量: abc
我是initData: 7得到的广播变量: abc
我是initData: 10得到的广播变量: abc
我是initData: 9得到的广播变量: abc
结论:
在使用广播状态的时候,如果想主流在状态描述器里面拿到数据,就先启动广播流,然后数据都保存到广播状态以后,主流才能够拿到数据
DIM具体实现代码
先在phoenix创建对应的schame
create schema ZHANGGMALL;
定义常量类?
public class GmallConfig {
// Phoenix库名
public static final String HBASE_SCHEMA = "ZHANGGMALL";
// Phoenix驱动
public static final String PHOENIX_DRIVER = "org.apache.phoenix.jdbc.PhoenixDriver";
// Phoenix连接参数
public static final String PHOENIX_SERVER = "jdbc:phoenix:master,node1,node2:2181";
}
根据mysql里面的配置表生成phoenix表
public class DimSinkApp {
public static void main(String[] args) throws Exception {
// TODO 1. 环境准备
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//这里和kafka的分区保持一致
env.setParallelism(4);
// TODO 2. 状态后端设置
env.enableCheckpointing(3000L, CheckpointingMode.EXACTLY_ONCE);
//检查点超时时间
env.getCheckpointConfig().setCheckpointTimeout(60 * 1000L);
//两次检查点最小间隔时间,就是第一次检查点完成以后,最少经过3s钟开始检查点
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);
env.getCheckpointConfig().enableExternalizedCheckpoints(
// ExternalizedCheckpointCleanup用于指定当job canceled的时候externalized checkpoint该如何清理
// DELETE_ON_CANCELLATION的话,在job canceled的时候会自动删除externalized state
// RETAIN_ON_CANCELLATION则在job canceled的时候会保留externalized checkpoint state
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);
env.setRestartStrategy(RestartStrategies.failureRateRestart(
// 设置任务失败重启 允许任务失败最大次数 10次
10,
// 任务失败的时间启动的间隔
Time.of(1L, TimeUnit.MINUTES),
// 允许任务延迟时间
Time.of(3L, TimeUnit.MINUTES)
));
//设置状态后端
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage("hdfs://master:8020/gmall/ck");
System.setProperty("HADOOP_USER_NAME", "bigdata");
// TODO 3. 读取业务主流
String topic = "topic_db";
String groupId = "dim_sink_app";
//提供主题和消费者组得到kafkaDataStream
DataStreamSource<String> gmallDS = env.addSource(KafkaUtil.getKafkaConsumer(topic, groupId));
// TODO 4. 主流数据结构转换
SingleOutputStreamOperator<JSONObject> jsonDS = gmallDS.map(JSON::parseObject);
// TODO 5. 主流 ETL
SingleOutputStreamOperator<JSONObject> filterDS = jsonDS.filter(
jsonObj ->
{
try {
jsonObj.getJSONObject("data");
//这里是由于maxwell使用bootstrap的时候开始的数据和结束的数据过滤掉
if (jsonObj.getString("type").equals("bootstrap-start")
|| jsonObj.getString("type").equals("bootstrap-complete") || jsonObj.getString("type").equals("delete")) {
return false;
}
return true;
} catch (JSONException jsonException) {
return false;
}
});
// 打印测试
// filterDS.print("filterDS >>> ");
// ==================FlinkCDC========================
// TODO 6 得到FlinkCDC的数据
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("master")
.port(3307)
.databaseList("gmall_config") // set captured database
.tableList("gmall_config.table_process") // set captured table
.username("root")
.password("root")
.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
.startupOptions(StartupOptions.initial())
.build();
// TODO 7.使用CDC Source从MySQL读取数据
DataStreamSource<String> mysqlDS =
env.fromSource(
mySqlSource,
WatermarkStrategy.noWatermarks(),
"MysqlSource");
// mysqlDS.print();
//TODO 8 用配置流作为广播变量到主流
MapStateDescriptor<String, TableProcess> tableConfigDescriptor = new MapStateDescriptor<String, TableProcess>("table-process-state", String.class, TableProcess.class);
BroadcastStream<String> mysqlDSBroadcast = mysqlDS.broadcast(tableConfigDescriptor);
//BroadcastProcessFunction<JSONObject, String, JSONObject>
//第一个参数是主流filterDS的数据,
//第二个参数是mysqlDS广播流的数据
SingleOutputStreamOperator<JSONObject> hbaseDStream = filterDS.connect(mysqlDSBroadcast).process(new BroadcastProcessFunction<JSONObject, String, JSONObject>() {
// 定义Phoenix的连接
private Connection conn;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
Class.forName(GmallConfig.PHOENIX_DRIVER);
conn = DriverManager.getConnection(GmallConfig.PHOENIX_SERVER);
}
@Override
public void processElement(JSONObject jsonObj, BroadcastProcessFunction<JSONObject, String, JSONObject>.ReadOnlyContext ctx, Collector<JSONObject> out) throws Exception {
// 这里就可以根据TODO 8的广播状态信息得到下面保存的key为sourceTable,value就是mysql里面对应的所有信息
ReadOnlyBroadcastState<String, TableProcess> tableConfigState = ctx.getBroadcastState(tableConfigDescriptor);
// 获取配置信息
//{"database":"gmall","xid":67037,"data":{"create_time":"2022-06-28 19:48:48","user_id":36,"appraise":"1201","comment_txt":"评论内容:12915481127889697134233463622659829844396746782776","sku_id":25,"id":1548998216402403330,"spu_id":8,"order_id":48},"commit":true,"type":"insert","table":"comment_info","ts":1658144928}
String sourceTable = jsonObj.getString("table");
TableProcess tableConfig = tableConfigState.get(sourceTable);
if (tableConfig != null) {
// TODO 11 如果不为空说明是Mysql保存的维度描述信息key为sourceTable,value就是mysql里面对应的所有信息
JSONObject data = jsonObj.getJSONObject("data");
String sinkTable = tableConfig.getSinkTable();
// 根据 sinkColumns 过滤数据
String sinkColumns = tableConfig.getSinkColumns();
//TODO 12这里就是根据mysql里面保存sink_columns的信息过滤出来不是这里面描述字段的信息
filterColumns(data, sinkColumns);
// 将目标表名加入到主流数据中
data.put("sinkTable", sinkTable);
out.collect(data);
}
}
//用于过滤kafka里面得到的maxwell的信息,和mysql里面定义的sink_columns保存到phoenix的字段保持一致
private void filterColumns(JSONObject data, String sinkColumns) {
Set<Map.Entry<String, Object>> dataEntries = data.entrySet();
String[] columsList = sinkColumns.split(",");
List<String> strings = Arrays.asList(columsList);
dataEntries.removeIf(r -> !strings.contains(r.getKey()));
}
@Override
public void processBroadcastElement(String jsonStr, BroadcastProcessFunction<JSONObject, String, JSONObject>.Context ctx, Collector<JSONObject> out) throws Exception {
System.out.println(jsonStr);
JSONObject jsonObj = JSON.parseObject(jsonStr);
System.out.println("过来了");
System.out.println(jsonObj.getString("after"));
System.out.println(jsonObj.getString("after")==null);
// {"before":null,"after":{"source_table":"activity_sku","sink_table":"dim_activity_sku","sink_columns":"id,activity_id,sku_id,create_time","sink_pk":"id","sink_extend":null},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1658144231613,"snapshot":"false","db":"gmall_config","sequence":null,"table":"table_process","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1658144231613,"transaction":null}
//上面是FlinkCDC得到的数据,根据得到的binlog封装成对应mysql里面存储的对象
//下面是删除数据的情况
// {"before":{"source_table":"test_mid","sink_table":"test_mid","sink_columns":"test_mid,pk","sink_pk":"pk","sink_extend":null},"after":null,"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1658148392000,"snapshot":"false","db":"gmall_config","sequence":null,"table":"table_process","server_id":1,"gtid":null,"file":"mysql-bin.000001","pos":7154,"row":0,"thread":null,"query":null},"op":"d","ts_ms":1658148393030,"transaction":null}
if(jsonObj.getString("after")!=null){
//如果能够进来说明不是删除的情况
TableProcess config = jsonObj.getObject("after", TableProcess.class);
String sourceTable = config.getSourceTable();
String sinkTable = config.getSinkTable();
String sinkColumns = config.getSinkColumns();
String sinkPk = config.getSinkPk();
String sinkExtend = config.getSinkExtend();
BroadcastState<String, TableProcess> tableConfigState = ctx.getBroadcastState(tableConfigDescriptor);
//保存到广播变量里面,key为sourceTable,value就是mysql里面对应的所有信息
// TODO 10把配置信息保存到广播状态变量里面
tableConfigState.put(sourceTable, config);
// TODO 9根据binlog创建Phoenix的表
checkTable(sinkTable, sinkColumns, sinkPk, sinkExtend);
}else{
System.out.println("这里是删除的情况");
}
}
private void checkTable(String sinkTable, String sinkColumns, String sinkPk, String sinkExtend) {
// 封装建表 SQL
StringBuilder sql = new StringBuilder();
sql.append("create table if not exists " + GmallConfig.HBASE_SCHEMA + "." + sinkTable + "(\n");
String[] columnArr = sinkColumns.split(",");
// 为主键及扩展字段赋默认值
if (sinkPk == null) {
sinkPk = "id";
}
if (sinkExtend == null) {
sinkExtend = "";
}
// 遍历添加字段信息
for (int i = 0; i < columnArr.length; i++) {
sql.append(columnArr[i] + " varchar");
// 判断当前字段是否为主键
if (sinkPk.equals(columnArr[i])) {
sql.append(" primary key");
}
// 如果当前字段不是最后一个字段,则追加","
if (i < columnArr.length - 1) {
sql.append(",\n");
}
}
sql.append(")");
sql.append(sinkExtend);
String createStatement = sql.toString();
System.out.println(sql);
// 为数据库操作对象赋默认值,执行建表 SQL
PreparedStatement preparedSt = null;
try {
preparedSt = conn.prepareStatement(createStatement);
preparedSt.execute();
} catch (SQLException sqlException) {
sqlException.printStackTrace();
System.out.println("建表语句\n" + createStatement + "\n执行异常");
} finally {
if (preparedSt != null) {
try {
preparedSt.close();
} catch (SQLException sqlException) {
sqlException.printStackTrace();
throw new RuntimeException("数据库操作对象释放异常");
}
}
}
}
});
hbaseDStream.print("hbasesavedata >>");
env.execute();
}
}
效果
添加一条数据
?效果
使用maxwell导出一遍维度相关表
bin/maxwell-bootstrap --user root --password root --host master --database gmall --table user_info --client_id maxwell_1
?控制台效果
测试非维度表order_detail
bin/maxwell-bootstrap --user root --password root --host master --database gmall --table order_detail --client_id maxwell_1
?控制台没有数据打印出来,说明上面的代码实现了
- mysql动态配置维度表,并且实时的在phoenix里面创建维度表
- 能够把业务数据里面维度相关的数据过滤出来并且在数据里面有一个指定保存到哪一张表的字段
自定义Sink保存到Hbase
?工具类PhoenixUtil
public class PhoenixUtil {
// 定义数据库连接对象
private static Connection conn;
/**
* 初始化 SQL 执行环境
*/
public static void initializeConnection() {
try {
// 1. 注册驱动
Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
// 2. 获取连接对象
conn = DriverManager.getConnection("jdbc:phoenix:master,node1,node2:2181");
// 3. 设置 Phoenix SQL 执行使用的 schema(对应 mysql 的 database)
conn.setSchema(GmallConfig.HBASE_SCHEMA);
} catch (ClassNotFoundException classNotFoundException) {
System.out.println("注册驱动异常");
classNotFoundException.printStackTrace();
} catch (SQLException sqlException) {
System.out.println("获取连接对象异常");
;
sqlException.printStackTrace();
}
}
/**
* Phoenix 表数据导入方法
* @param sinkTable 写入数据的 Phoenix 目标表名
* @param data 待写入的数据
*/
public static void insertValues(String sinkTable, JSONObject data) {
System.out.println(data.toJSONString());
// 双重校验锁初始化连接对象
if(conn == null) {
synchronized (PhoenixUtil.class) {
if(conn == null) {
initializeConnection();
}
}
}
// 获取字段名
Set<String> columns = data.keySet();
// 获取字段对应的值
Collection<Object> values = data.values();
// 拼接字段名
String columnStr = StringUtils.join(columns, ",");
// 拼接字段值
String valueStr = StringUtils.join(values, "','");
// 拼接插入语句
String sql = "upsert into " + GmallConfig.HBASE_SCHEMA
+ "." + sinkTable + "(" +
columnStr + ") values ('" + valueStr + "')";
// 为数据库操作对象赋默认值
PreparedStatement preparedSt = null;
// 执行 SQL
try {
System.out.println("插入语句为:" + sql);
preparedSt = conn.prepareStatement(sql);
preparedSt.execute();
conn.commit();
} catch (SQLException sqlException) {
sqlException.printStackTrace();
throw new RuntimeException("数据库操作对象获取或执行异常");
} finally {
if(preparedSt != null) {
try {
System.out.println("关闭了");
preparedSt.close();
} catch (SQLException sqlException) {
sqlException.printStackTrace();
throw new RuntimeException("数据库操作对象释放异常");
}
}
}
}
}
自定义Sink-MyPhoenixSink(最终的代码)
public class MyPhoenixSink implements SinkFunction<JSONObject> {
@Override
public void invoke(JSONObject jsonObj, Context context) throws Exception {
// 获取目标表表名
String sinkTable = jsonObj.getString("sinkTable");
// 清除 JSON 对象中的 sinkTable 字段,以便可将该对象直接用于 HBase 表的数据写入
jsonObj.remove("sinkTable");
PhoenixUtil.insertValues(sinkTable, jsonObj);
}
}
最终的代码DimSinkApp
public class DimSinkApp {
public static void main(String[] args) throws Exception {
// TODO 1. 环境准备
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//这里和kafka的分区保持一致
env.setParallelism(4);
// TODO 2. 状态后端设置
env.enableCheckpointing(3000L, CheckpointingMode.EXACTLY_ONCE);
//检查点超时时间
env.getCheckpointConfig().setCheckpointTimeout(60 * 1000L);
//两次检查点最小间隔时间,就是第一次检查点完成以后,最少经过3s钟开始检查点
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);
env.getCheckpointConfig().enableExternalizedCheckpoints(
// ExternalizedCheckpointCleanup用于指定当job canceled的时候externalized checkpoint该如何清理
// DELETE_ON_CANCELLATION的话,在job canceled的时候会自动删除externalized state
// RETAIN_ON_CANCELLATION则在job canceled的时候会保留externalized checkpoint state
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);
env.setRestartStrategy(RestartStrategies.failureRateRestart(
// 设置任务失败重启 允许任务失败最大次数 10次
10,
// 任务失败的时间启动的间隔
Time.of(1L, TimeUnit.MINUTES),
// 允许任务延迟时间
Time.of(3L, TimeUnit.MINUTES)
));
//设置状态后端
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage("hdfs://master:8020/gmall/ck");
System.setProperty("HADOOP_USER_NAME", "bigdata");
// TODO 3. 读取业务主流
String topic = "topic_db";
String groupId = "dim_sink_app";
//提供主题和消费者组得到kafkaDataStream
DataStreamSource<String> gmallDS = env.addSource(KafkaUtil.getKafkaConsumer(topic, groupId));
// TODO 4. 主流数据结构转换
SingleOutputStreamOperator<JSONObject> jsonDS = gmallDS.map(JSON::parseObject);
// TODO 5. 主流 ETL
SingleOutputStreamOperator<JSONObject> filterDS = jsonDS.filter(
jsonObj ->
{
try {
jsonObj.getJSONObject("data");
//这里是由于maxwell使用bootstrap的时候开始的数据和结束的数据过滤掉
if (jsonObj.getString("type").equals("bootstrap-start")
|| jsonObj.getString("type").equals("bootstrap-complete") || jsonObj.getString("type").equals("delete")) {
return false;
}
return true;
} catch (JSONException jsonException) {
return false;
}
});
// 打印测试
// filterDS.print("filterDS >>> ");
// ==================FlinkCDC========================
// TODO 6 得到FlinkCDC的数据
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("master")
.port(3307)
.databaseList("gmall_config") // set captured database
.tableList("gmall_config.table_process") // set captured table
.username("root")
.password("root")
.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
.startupOptions(StartupOptions.initial())
.build();
// TODO 7.使用CDC Source从MySQL读取数据
DataStreamSource<String> mysqlDS =
env.fromSource(
mySqlSource,
WatermarkStrategy.noWatermarks(),
"MysqlSource");
// mysqlDS.print();
//TODO 8 用配置流作为广播变量到主流
MapStateDescriptor<String, TableProcess> tableConfigDescriptor = new MapStateDescriptor<String, TableProcess>("table-process-state", String.class, TableProcess.class);
BroadcastStream<String> mysqlDSBroadcast = mysqlDS.broadcast(tableConfigDescriptor);
//BroadcastProcessFunction<JSONObject, String, JSONObject>
//第一个参数是主流filterDS的数据,
//第二个参数是mysqlDS广播流的数据
SingleOutputStreamOperator<JSONObject> hbaseDStream = filterDS.connect(mysqlDSBroadcast).process(new BroadcastProcessFunction<JSONObject, String, JSONObject>() {
// 定义Phoenix的连接
private Connection conn;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
Class.forName(GmallConfig.PHOENIX_DRIVER);
conn = DriverManager.getConnection(GmallConfig.PHOENIX_SERVER);
}
@Override
public void processElement(JSONObject jsonObj, BroadcastProcessFunction<JSONObject, String, JSONObject>.ReadOnlyContext ctx, Collector<JSONObject> out) throws Exception {
// 这里就可以根据TODO 8的广播状态信息得到下面保存的key为sourceTable,value就是mysql里面对应的所有信息
ReadOnlyBroadcastState<String, TableProcess> tableConfigState = ctx.getBroadcastState(tableConfigDescriptor);
// 获取配置信息
//{"database":"gmall","xid":67037,"data":{"create_time":"2022-06-28 19:48:48","user_id":36,"appraise":"1201","comment_txt":"评论内容:12915481127889697134233463622659829844396746782776","sku_id":25,"id":1548998216402403330,"spu_id":8,"order_id":48},"commit":true,"type":"insert","table":"comment_info","ts":1658144928}
String sourceTable = jsonObj.getString("table");
TableProcess tableConfig = tableConfigState.get(sourceTable);
if (tableConfig != null) {
// TODO 11 如果不为空说明是Mysql保存的维度描述信息key为sourceTable,value就是mysql里面对应的所有信息
JSONObject data = jsonObj.getJSONObject("data");
String sinkTable = tableConfig.getSinkTable();
// 根据 sinkColumns 过滤数据
String sinkColumns = tableConfig.getSinkColumns();
//TODO 12这里就是根据mysql里面保存sink_columns的信息过滤出来不是这里面描述字段的信息
filterColumns(data, sinkColumns);
// 将目标表名加入到主流数据中
data.put("sinkTable", sinkTable);
//TODO 13把维度的数据传下去保存到Phoenix
out.collect(data);
}
}
//用于过滤kafka里面得到的maxwell的信息,和mysql里面定义的sink_columns保存到phoenix的字段保持一致
private void filterColumns(JSONObject data, String sinkColumns) {
Set<Map.Entry<String, Object>> dataEntries = data.entrySet();
String[] columsList = sinkColumns.split(",");
List<String> strings = Arrays.asList(columsList);
dataEntries.removeIf(r -> !strings.contains(r.getKey()));
}
@Override
public void processBroadcastElement(String jsonStr, BroadcastProcessFunction<JSONObject, String, JSONObject>.Context ctx, Collector<JSONObject> out) throws Exception {
System.out.println(jsonStr);
JSONObject jsonObj = JSON.parseObject(jsonStr);
System.out.println("过来了");
System.out.println(jsonObj.getString("after"));
System.out.println(jsonObj.getString("after")==null);
// {"before":null,"after":{"source_table":"activity_sku","sink_table":"dim_activity_sku","sink_columns":"id,activity_id,sku_id,create_time","sink_pk":"id","sink_extend":null},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1658144231613,"snapshot":"false","db":"gmall_config","sequence":null,"table":"table_process","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1658144231613,"transaction":null}
//上面是FlinkCDC得到的数据,根据得到的binlog封装成对应mysql里面存储的对象
//下面是删除数据的情况
// {"before":{"source_table":"test_mid","sink_table":"test_mid","sink_columns":"test_mid,pk","sink_pk":"pk","sink_extend":null},"after":null,"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1658148392000,"snapshot":"false","db":"gmall_config","sequence":null,"table":"table_process","server_id":1,"gtid":null,"file":"mysql-bin.000001","pos":7154,"row":0,"thread":null,"query":null},"op":"d","ts_ms":1658148393030,"transaction":null}
if(jsonObj.getString("after")!=null){
//如果能够进来说明不是删除的情况
TableProcess config = jsonObj.getObject("after", TableProcess.class);
String sourceTable = config.getSourceTable();
String sinkTable = config.getSinkTable();
String sinkColumns = config.getSinkColumns();
String sinkPk = config.getSinkPk();
String sinkExtend = config.getSinkExtend();
BroadcastState<String, TableProcess> tableConfigState = ctx.getBroadcastState(tableConfigDescriptor);
//保存到广播变量里面,key为sourceTable,value就是mysql里面对应的所有信息
// TODO 10把配置信息保存到广播状态变量里面
tableConfigState.put(sourceTable, config);
// TODO 9根据binlog创建Phoenix的表
checkTable(sinkTable, sinkColumns, sinkPk, sinkExtend);
}else{
System.out.println("这里是删除的情况");
}
}
private void checkTable(String sinkTable, String sinkColumns, String sinkPk, String sinkExtend) {
// 封装建表 SQL
StringBuilder sql = new StringBuilder();
sql.append("create table if not exists " + GmallConfig.HBASE_SCHEMA + "." + sinkTable + "(\n");
String[] columnArr = sinkColumns.split(",");
// 为主键及扩展字段赋默认值
if (sinkPk == null) {
sinkPk = "id";
}
if (sinkExtend == null) {
sinkExtend = "";
}
// 遍历添加字段信息
for (int i = 0; i < columnArr.length; i++) {
sql.append(columnArr[i] + " varchar");
// 判断当前字段是否为主键
if (sinkPk.equals(columnArr[i])) {
sql.append(" primary key");
}
// 如果当前字段不是最后一个字段,则追加","
if (i < columnArr.length - 1) {
sql.append(",\n");
}
}
sql.append(")");
sql.append(sinkExtend);
String createStatement = sql.toString();
System.out.println(sql);
// 为数据库操作对象赋默认值,执行建表 SQL
PreparedStatement preparedSt = null;
try {
preparedSt = conn.prepareStatement(createStatement);
preparedSt.execute();
} catch (SQLException sqlException) {
sqlException.printStackTrace();
System.out.println("建表语句\n" + createStatement + "\n执行异常");
} finally {
if (preparedSt != null) {
try {
preparedSt.close();
} catch (SQLException sqlException) {
sqlException.printStackTrace();
throw new RuntimeException("数据库操作对象释放异常");
}
}
}
}
});
//TODO 14 保存到Phoenix
// hbaseDStream.print();
// TODO !!这里有如果数据过多有创建连接过度卡死的问题
hbaseDStream.addSink(new MyPhoenixSink());
env.execute();
}
}
得到的结果
总结:
- 使用数据导出以后维度相关的表,数据能够到达Phoenix
- 当数据库里面的数据改变以后能够动态的更新到Phoenix
bin/maxwell-bootstrap --user root --password root --host master --database gmall --table activity_info --client_id maxwell_1
bin/maxwell-bootstrap --user root --password root --host master --database gmall --table activity_rule --client_id maxwell_1
bin/maxwell-bootstrap --user root --password root --host master --database gmall --table activity_sku --client_id maxwell_1
bin/maxwell-bootstrap --user root --password root --host master --database gmall --table base_category1 --client_id maxwell_1
bin/maxwell-bootstrap --user root --password root --host master --database gmall --table base_category2 --client_id maxwell_1
bin/maxwell-bootstrap --user root --password root --host master --database gmall --table base_category3 --client_id maxwell_1
bin/maxwell-bootstrap --user root --password root --host master --database gmall --table base_province --client_id maxwell_1
bin/maxwell-bootstrap --user root --password root --host master --database gmall --table base_region --client_id maxwell_1
bin/maxwell-bootstrap --user root --password root --host master --database gmall --table base_trademark --client_id maxwell_1
bin/maxwell-bootstrap --user root --password root --host master --database gmall --table coupon_info --client_id maxwell_1
bin/maxwell-bootstrap --user root --password root --host master --database gmall --table coupon_range --client_id maxwell_1
bin/maxwell-bootstrap --user root --password root --host master --database gmall --table financial_sku_cost --client_id maxwell_1
bin/maxwell-bootstrap --user root --password root --host master --database gmall --table sku_info --client_id maxwell_1
bin/maxwell-bootstrap --user root --password root --host master --database gmall --table spu_info --client_id maxwell_1
bin/maxwell-bootstrap --user root --password root --host master --database gmall --table user_info --client_id maxwell_1
DIM层逻辑图
|