1. flink cdc官网地址(2个地址):
CDC Connectors for Apache Flink?https://ververica.github.io/flink-cdc-connectors/https://github.com/ververica/flink-cdc-connectorshttps://github.com/ververica/flink-cdc-connectors
2.? mysql增加以下配置:
log-bin=mysql-bin server-id=1 binlog_format=ROW
[mysqld]
#安装目录
basedir=D:\\soft\mysql-5.7.19
datadir=D:\\soft\mysql-5.7.19\data
port=3306
#最大连接数
max_connections=200
#编码
character-set-server=utf8
default-storage-engine=INNODB
sql_mode=NO_ENGINE_SUBSTITUTION,STRICT_TRANS_TABLES
log-bin=mysql-bin
server-id=1
binlog_format=ROW
[mysql]
#编码
default-character-set=utf8
查看配置结果:SHOW VARIABLES LIKE '%log_bin%';
SHOW VARIABLES LIKE '%binlog_format%';
?
3. pom.xml文件
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.flink.cdc</groupId>
<artifactId>myflinkcdc</artifactId>
<version>1.0.0-RELEASE</version>
<properties>
<flink.version>1.13.5</flink.version>
<debezium.version>1.5.4.Final</debezium.version>
<geometry.version>2.2.0</geometry.version>
<testcontainers.version>1.15.3</testcontainers.version>
<java.version>1.8</java.version>
<scala.binary.version>2.11</scala.binary.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
<hamcrest.version>1.3</hamcrest.version>
<version.awaitility>4.0.1</version.awaitility>
<slf4j.version>1.7.15</slf4j.version>
<log4j.version>2.17.1</log4j.version>
<spotless.version>2.4.2</spotless.version>
<flink.forkCount>1</flink.forkCount>
<flink.reuseForks>true</flink.reuseForks>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<fastjson.version>1.2.56</fastjson.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<!-- Use fixed version 18.0-13.0 of flink shaded guava -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-guava</artifactId>
<version>18.0-13.0</version>
</dependency>
<!-- test dependencies -->
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
<version>${hamcrest.version}</version>
<type>jar</type>
<scope>test</scope>
</dependency>
<!-- tests will have log4j as the default logging framework available -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<!-- API bridge between log4j 1 and 2 -->
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-1.2-api</artifactId>
<version>${log4j.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<type>jar</type>
<version>${version.awaitility}</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<!-- add the dependency matching your database -->
<artifactId>flink-connector-mysql-cdc</artifactId>
<!-- the dependency is available only for stable releases. -->
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.flink.cdc.FlinkCdc</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
4.FlinkCdc.java
package com.flink.cdc;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import java.util.Arrays;
import java.util.Properties;
public class FlinkCdc {
private static final String DEFAULT_SOURCE_PARALLELISM = "2";
private static final String DEFAULT_SINK_PARALLELISM = "1";
private static final String DEFAULT_FLINK_CHECK_POINTING = "3000";
private static final String NULL_CONFIG = "null";
public static void main(String[] args) throws Exception {
ParameterTool parameter = ParameterTool.fromArgs(args);
// mysql source 配置
// String sourceHostname = parameter.get("source.hostname");
String sourceHostname = "10.11.11.xx";
int sourcePort = Integer.parseInt("3306");
String sourceUsername = "xxxx";
String sourcePassword = "xxxxx";
// 库表配置
/*String[] databaseList = parameter.get("source.databaseList").split(",");
String[] tableList = Arrays.stream(parameter.get("source.tableList", "").split(","))
.map(table -> databaseList.length == 1 && table.length() > 0 ? databaseList[0] + '.' + table : table)
.toArray(String[]::new);*/
String[] databaseList = "testdb".split(",");
String[] tableList = "testdb.test_table".split(",");
// flink 配置
long checkPointing = Long.parseLong(DEFAULT_FLINK_CHECK_POINTING);
int sourceParallelism = Integer.parseInt(DEFAULT_SOURCE_PARALLELISM);
int sinkParallelism = Integer.parseInt(DEFAULT_SINK_PARALLELISM);
// serverId 配置
// String serverId = parameter.get("serverId", NULL_CONFIG);
String serverId = "1";
if (NULL_CONFIG.equals(serverId)) {
throw new RuntimeException("serverId 必须要设置");
}
if (sourceParallelism > 1) {
serverId = serverId + "-" + (Integer.parseInt(serverId) + sourceParallelism - 1);
}
// source -> flink 序列化配置
Properties debeziumProperties = new Properties();
debeziumProperties.setProperty("decimal.handling.mode", "double");
debeziumProperties.setProperty("bigint.unsigned.handling.mode", "long");
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname(sourceHostname)
.port(sourcePort)
// set captured database
.databaseList(databaseList)
// set captured table
.tableList(tableList)
.username(sourceUsername)
.password(sourcePassword)
.serverId(serverId)
.debeziumProperties(debeziumProperties)
.deserializer(new JsonDebeziumDeserializationSchema())
//监控的方式:
// 1. initial 初始化全表拷贝,然后再比较
// 2. earliest 不做初始化,只从当前的
// 3. latest 指定最新的
// 4. specificOffset 指定offset
// 3. timestamp 比指定的时间大的
.startupOptions(StartupOptions.latest())
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// enable checkpoint
env.enableCheckpointing(checkPointing);
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.getCheckpointConfig().setCheckpointTimeout(7200000);
//env.getCheckpointConfig().setCheckpointStorage("hdfs://192.168.1.100:8020/tmp/flinkcdc/test1");
env.getCheckpointConfig().setCheckpointStorage("file:/D:/tmp/ckp");
// kafka 配置
String kafkaHost = "xxxxxx:7776";
String defaultTopic = "flink_cdc_kakfa_test";
// kafka producer 相关配置
Properties propertiesProducer = new Properties();
propertiesProducer.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHost);
propertiesProducer.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "2097152");
FlinkKafkaProducer<String> flinkKafkaProducer = new FlinkKafkaProducer<>(
defaultTopic,
new SimpleStringSerializationSchemaWrapper(false),
propertiesProducer,
FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
String jobName = "flinkCdcTest";
env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQLSource")
.setParallelism(sourceParallelism)
.addSink(flinkKafkaProducer)
.name(jobName)
.setParallelism(sinkParallelism);
env.execute(jobName);
}
}
5. SimpleStringSerializationSchemaWrapper.java
package com.flink.cdc;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.KafkaContextAware;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.nio.charset.StandardCharsets;
import javax.annotation.Nullable;
public class SimpleStringSerializationSchemaWrapper
implements KafkaSerializationSchema<String>, KafkaContextAware<String> {
private final SerializationSchema<String> serializationSchema = new SimpleStringSchema();
private boolean writeTimestamp;
public SimpleStringSerializationSchemaWrapper(boolean writeTimestamp) {
this.writeTimestamp = writeTimestamp;
}
@Override
public void open(SerializationSchema.InitializationContext context) throws Exception {
serializationSchema.open(context);
}
@Override
public ProducerRecord<byte[], byte[]> serialize(String element, @Nullable Long timestamp) {
byte[] serialized = serializationSchema.serialize(element);
final Long timestampToWrite;
if (writeTimestamp) {
timestampToWrite = timestamp;
} else {
timestampToWrite = null;
}
String targetTopic = getTargetTopic(element);
// 按topic(db.table)名来分区
byte[] key = targetTopic.getBytes(StandardCharsets.UTF_8);
return new ProducerRecord<>(targetTopic, null, timestampToWrite, key, serialized);
}
@Override
public String getTargetTopic(String element) {
JSONObject payload = JSON.parseObject(element);
JSONObject source = payload.getJSONObject("source");
String db = source.getString("db");
String table = source.getString("table");
return db + "." + table;
}
public void setWriteTimestamp(boolean writeTimestamp) {
this.writeTimestamp = writeTimestamp;
}
}
6. FlinkCdc2.java实现类
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import java.util.Properties;
public class FlinkCdc2 {
public static void main(String[] args) throws Exception {
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("localhost")
.port(3306)
.databaseList("mytest") // set captured database, If you need to synchronize the whole database, Please set tableList to ".*".
.tableList("mytest.t_user_info") // set captured table
.username("root")
.password("123456")
//监控的方式:
// 1. initial 初始化全表拷贝,然后再比较
// 2. earliest 不做初始化,只从当前的
// 3. latest 指定最新的
// 4. specificOffset 指定offset
// 3. timestamp 比指定的时间大的
.startupOptions(StartupOptions.latest())
.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
.build();
// kafka 配置
String kafkaHost = "192.168.1.100:7776,192.168.1.101:7776,192.168.1.102:7776";
String topicId = "flink_cdc_test";
// kafka producer 相关配置
Properties propertiesProducer = new Properties();
propertiesProducer.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHost);
propertiesProducer.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "2097152");
FlinkKafkaProducer<String> flinkKafkaProducer = new FlinkKafkaProducer<>(
kafkaHost,
topicId,
new SimpleStringSchema());
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// enable checkpoint
env.enableCheckpointing(3000);
env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQLSource")
.setParallelism(4)
.addSink(flinkKafkaProducer)
.setParallelism(1);
env.execute("flinkCDC");
}
7. FlinkCdc3.java
package com.flink.cdc;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class FlinkCdc1 {
public static void main(String[] args) throws Exception {
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("localhost")
.port(3306)
.databaseList("mytest") // set captured database, If you need to synchronize the whole database, Please set tableList to ".*".
.tableList("mytest.t_user_info") // set captured table
.username("root")
.password("xxxxx")
.startupOptions(StartupOptions.latest())
.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// enable checkpoint
env.enableCheckpointing(3000);
env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
.setParallelism(4)
.print().setParallelism(1); // use parallelism 1 for sink to keep message ordering
env.execute("Print MySQL Snapshot + Binlog");
}
}
8. 发布yarn
/opt/flink/bin/yarn-session.sh -d \
-Djobmanager.memory.process.size=1024m \
-Dtaskmanager.memory.process.size=8192m \
-Dtaskmanager.numberOfTaskSlots=8 \
-Dyarn.application.queue="uat" \
-Dyarn.application.name="flinkCdcTest"
/opt/flink/bin/flink run -t yarn-session -d \
-Dyarn.application.id=application_1655802018813_9105 \
-c com.flink.cdc.FlinkCdc \
/tmp/myflinkcdc-1.0.0-RELEASE.jar
|