import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
/**
* FlinkCDC通过分流的方式将多个库表写入不同的Kafka Topic
*/
public class FlinkCDC {
public static void main(String[] args) throws Exception {
//1.获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//1.1 设置CK&状态后端
env.setStateBackend(new FsStateBackend("file:///C:\\Users\\linwe\\Desktop\\checkpoint\\3"));
env.enableCheckpointing(5000L);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(10000L);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000);
// env.setRestartStrategy(RestartStrategies.fixedDelayRestart());
//2.通过FlinkCDC构建SourceFunction并读取数据
DebeziumSourceFunction<String> sourceFunction = MySQLSource.<String>builder()
.hostname("192.168.137.210")
.port(3306)
.username("root")
.password("")
.databaseList("bigdata","dim_db") // 读取各个数据库的数据
.deserializer(new CustomerDeserialization())
.startupOptions(StartupOptions.initial())
.build();
// 将源数据读取成为流数据
DataStreamSource<String> streamSource = env.addSource(sourceFunction);
// streamSource.print("source>>>>>>>>>>>");
// 按照不同表进行分流
// test1
OutputTag<String> test1Tag = new OutputTag<String>("test1") {
};
// test2
OutputTag<String> test2Tag = new OutputTag<String>("test2") {
};
// 拆分各个流
SingleOutputStreamOperator<String> tableNameDS = streamSource.map(new MapFunction<String, JSONObject>() {
@Override
public JSONObject map(String value) throws Exception {
return JSON.parseObject(value);
}
}).process(new ProcessFunction<JSONObject, String>() {
@Override
public void processElement(JSONObject value, Context ctx, Collector<String> out) throws Exception {
// 根据不同 tableName写入不同 的 tableName Topic
String tableName = value.getString("tableName");
if ("test1".equals(tableName)) {
ctx.output(test1Tag, value.toJSONString());
} else if ("test2".equals(tableName)) {
ctx.output(test2Tag, value.toJSONString());
}
}
});
DataStream<String> test1TagDS = tableNameDS.getSideOutput(test1Tag);
DataStream<String> test2TagDS = tableNameDS.getSideOutput(test2Tag);
//3.打印数据并将数据写入Kafka
test1TagDS.print("test1TagDS>>>>>>>>>>>");
test2TagDS.print("test2TagDS>>>>>>>>>>>");
String sinkTopic1 = "ods_bigdata_test1";
String sinkTopic2 = "ods_dim_test2";
test1TagDS.addSink(WSFKafkaUtil.getKafkaProducer(sinkTopic1));
test2TagDS.addSink(WSFKafkaUtil.getKafkaProducer(sinkTopic2));
//4.启动任务
env.execute("FlinkCDC");
}
}
|