01 引言
最近在做实时采集Kafka 发布的内容到MySQL ,本文记录一下关键的点,细节不再描述,希望能帮助到大家。
02 实现
2.1 添加依赖
在工程,除了添加基础的Flink 环境依赖,还需要添加flink-connector-kafka 的依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.13.6</version>
</dependency>
除此,因为Flink 把Kafka 作为了Source ,所以读取的字符串是有解析方式的,本文主要使用的是“json ”的方式,因此还需要引入序列化包的,但是flink-connector-kafka 已经自带了,所以没必要再引入。
ok,到这里如果我们写好FlinkSQL 去启动,直接就会一闪而退了,为什么呢?因为我们缺少了’ kafka-clients-2.1.0.jar '这个包,但是也无需引入,因为在flink-connector-kafka 里面已经自带了。
为什么要在这里特别提示 “序列化包”和“kafka-clients包呢”?因为如果我们采用Flink On Yarn的方式部署时,这两个包是需要放到HDFS的,如下:
2.2 Flink SQL
好了,到了关键的FlinkSQL 了,该如何写呢?
首先看看Source ,也就是我们的Kafka ,如下:
CREATE TABLE t_student (
id INT,
name STRING
) WITH (
'connector' = 'kafka',
'topic' = 'cdc_user',
'properties.bootstrap.servers' = '10.194.166.92:9092',
'properties.group.id' = 'flink-cdc-mysql-kafka',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
)
然后Sink 输出,我这里需要输出到MySQL :
CREATE TABLE t_student_copy (
id INT,
name STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://127.0.0.1:3306/big_data',
'username' = 'root',
'password' = '123456',
'table-name' = 't_student_copy'
)
最后,使用INSERT INTO 声明如何写入:
INSERT INTO t_student_copy(id,name) SELECT id,name FROM t_student
2.3 配置Kafka域名
还有一点需要注意的是,当我们跑Flink的程序的时候,会出现类似如下错误: unable to connect broker…
这个时候,我们要在跑Flink 的程序的服务器配置Kafka 的域名,具体在hosts 文件里配置:
vi /etc/hosts
ok,到这里,只要我们只要使用Kafka 工具发送json 格式的数据,Flink 程序就能实时收到,并写入MySQL 数据库。
03 文末
本文主要是记录Kafka 如何实时写入到MySQL 的一些坑点,完整源码就不贴出来了,希望能给大家一点启示并帮助到大家,谢谢大家的阅读,本文完!
附:KafkaTool的使用教程:
|