标题: flink-connector中kafka和upsertkafka的介绍 日期: 2021-08-03 16:46:43 标签: [flink, kafka, upsert kafka, 实时数仓] 分类: 数据仓库
今天来说下flink sql中常用到的connector:kafka,它承接了实时的消息数据,进行处理,当然,这些消息的特点有可能不一样,怎样处理,得到实时的结果,提供给分析、运营、营销等等。下面来看看具体有什么区别,怎么使用。
kafka中的实时消息,它也可以是关系型数据库的changelog(具有insert/update/delete属性),我们知道每一条消息的属性之后,就可以确定哪些数据是最新的,哪些数据是被删除的,那么在我们的存储系统中,应该被实时地更新,以便计算的及时,数据的延时性就会更低。
他们俩的依赖都是一样的 依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.12.3</version>
</dependency>
先来说下kafka connector:
1.创建一个kafka connector(sql)
CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset', // 消费的offset位置,也可以是latest-offset
'format' = 'csv'
)
这种connector,kafka中的数据是什么样子,KafkaTable表中的数据就是什么样子的,可以执行select语句查看:
select * from KafkaTable
接着你就可以对KafkaTable表中的数据进行各种操作了。
再来看下upsertkafka connector
先介绍一下,upsert kafka连接器支持实时消息数据以upsert方式从一个kafka topic中插入到另一个kafka topic中,source为changelog类型kafka, sink为kafka
什么是changelog类型kafka,就是kafka topic中的消息是带有一个属性的,这个属性标记数据是Insert、update before、update after、delete的,再根据主键来进行对sink kafka topic中的数据进行具体的操作,是插入,还是更新,还是删除。
另外,value为空的消息,将被视为Delete消息。
1.来看看sql的例子
CREATE TABLE pageviews_per_region (
user_region STRING,
pv BIGINT,
uv BIGINT,
PRIMARY KEY (user_region) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'pageviews_per_region',
'properties.bootstrap.servers' = '...',
'key.format' = 'avro',
'value.format' = 'avro'
);
CREATE TABLE pageviews (
user_id BIGINT,
page_id BIGINT,
viewtime TIMESTAMP,
user_region STRING,
WATERMARK FOR viewtime AS viewtime - INTERVAL '2' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'pageviews',
'properties.bootstrap.servers' = '...',
'format' = 'json'
);
-- 计算 pv、uv 并插入到 upsert-kafka sink
INSERT INTO pageviews_per_region
SELECT
user_region,
COUNT(*),
COUNT(DISTINCT user_id)
FROM pageviews
GROUP BY user_region;
记住DDL语句中,一定要设置主键Primary key。
上面的pageviews_per_region表中计算的pv,uv结果就是统计结果,当数据源端进行了增删改,对应的pv uv结果就会同步更新,这就是upsert kafka的魅力。
这是基于kafka的统计计算,前提条件是topic pageviews中的数据是changelog格式的。
changelog格式数据怎么来,怎么导入kafka,还有upsert到mysql、hbase呢,且看下回分解。
每天进步一点点,你会成为专家。
欢迎关注我的微信公众号,比较喜欢分享知识,也喜欢宠物,想知道狗狗怕蚊子不?微信扫描二维码查看详情
一起学习,一起进步。
|