背景
基于select语句的Flink-CDC 适用于数据同步的全量同步的场景,可以结合 Azkaban 或者dolphin scheduler 做定时调度 T+1 数据同步。
1、maven
<properties>
<flink.version>1.13.6</flink.version>
<scala.binary.version>2.11</scala.binary.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.20</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.75</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.0</version>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<!--如果要打包的话,这里要换成对应的 main class-->
<mainClass>com.flink.cdc.demo.MysqlCdcMysql</mainClass>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
</transformers>
<filters>
<filter>
<artifact>*:*:*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>6</source>
<target>6</target>
</configuration>
</plugin>
</plugins>
</build>
2、MysqlReader
package com.flink.cdc.demo;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
public class MysqlReader extends RichSourceFunction<Tuple3<Integer, String, String>> {
private Connection connection = null;
private PreparedStatement ps = null;
//该方法主要用于打开数据库连接,下面的ConfigKeys类是获取配置的类
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
Class.forName("com.mysql.jdbc.Driver");//加载数据库驱动
connection = DriverManager.getConnection("jdbc:mysql://10.252.92.4:3306", "root", "root");//获取连接
ps = connection.prepareStatement("select id,username,password from flink_cdc_test.t_test");
}
@Override
public void run(SourceContext<Tuple3<Integer, String, String>> sourceContext) throws Exception {
ResultSet resultSet = ps.executeQuery();
while (resultSet.next()) {
Tuple3<Integer, String, String> tuple = new Tuple3<Integer, String, String>();
tuple.setFields(resultSet.getInt(1), resultSet.getString(2), resultSet.getString(3));
sourceContext.collect(tuple);
}
}
@Override
public void cancel() {
try {
super.close();
if (connection != null) {
connection.close();
}
if (ps != null) {
ps.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
3、MysqlWriter
package com.flink.cdc.demo;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
public class MysqlWriter extends RichSinkFunction<Tuple3<Integer, String, String>> {
private Connection connection = null;
private PreparedStatement ps = null;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
if (connection == null) {
Class.forName("com.mysql.jdbc.Driver");//加载数据库驱动
connection = DriverManager.getConnection("jdbc:mysql://10.252.92.4:3306", "root", "root");//获取连接
}
ps = connection.prepareStatement("insert into ods_flink_cdc_test.ods_t_test values (?,?,?)");
System.out.println("完成");
}
@Override
public void invoke(Tuple3<Integer, String, String> value, Context context) throws Exception {
//获取JdbcReader发送过来的结果
try {
ps.setInt(1, value.f0);
ps.setString(2, value.f1);
ps.setString(3, value.f2);
ps.executeUpdate();
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void close() throws Exception {
super.close();
if (ps != null) {
ps.close();
}
if (connection != null) {
connection.close();
}
super.close();
}
}
4、主类MysqlCdcMysql
package com.flink.cdc.demo;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class MysqlCdcMysql {
public static void main(String[] args) throws Exception {
// ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost",8081,"D:\\flink-steven\\target\\flink-0.0.1-SNAPSHOT.jar");
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", 8081, "D:\\flink-steven\\target\\flink-0.0.1-SNAPSHOT.jar");
// 最好不要在程序设置并行度 如果设置了8 要保证安装的flink配置里面的parallelism这个参数大于8 不然会导致资源异常
// env.setParallelism(8);
DataStreamSource<Tuple3<Integer, String, String>> dataStream = env.addSource(new MysqlReader());
dataStream.print();
dataStream.addSink(new MysqlWriter());
env.execute("Flink cost MySQL data to write MySQL");
}
}
5、本地运行
6、打成jar包进行上传
注意:flink版本要和maven里的版本一致 scala版本也要保持一致?
7、运行
|