Flink 的CDC 自定义实现数据转换
public class MysqlCdc {
public static void main(String[] args) throws Exception {
SourceFunction<JSONObject> sourceFunction = MySQLSource.<JSONObject>builder()
.hostname("192.168.71.28")
.databaseList("jydb")
.username("jydb")
.password("jydb")
.port(33061)
.deserializer(new CustomDebeziumDeserializationSchema())
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(sourceFunction).print()
.setParallelism(1);
env.execute("flink-mysql-cdc");
}
}
package com.quant.flowcalculation.flinkapi.cdc.customdebezium;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
public class CustomDebeziumDeserializationSchema implements DebeziumDeserializationSchema<JSONObject> {
private static final Logger LOGGER = LoggerFactory.getLogger(CustomDebeziumDeserializationSchema.class);
private static final long serialVersionUID = 7906905121308228264L;
public CustomDebeziumDeserializationSchema() {
}
@Override
public void deserialize(SourceRecord sourceRecord, Collector<JSONObject> collector) {
JSONObject resJson = new JSONObject();
try {
Struct valueStruct = (Struct) sourceRecord.value();
Struct afterStruct = valueStruct.getStruct("after");
Struct beforeStruct = valueStruct.getStruct("before");
if (afterStruct != null && beforeStruct != null) {
System.out.println("Updating >>>>>>>");
LOGGER.info("Updated, ignored ...");
} else if (afterStruct != null) {
System.out.println("Inserting >>>>>>>");
List<Field> fields = afterStruct.schema().fields();
String name;
Object value;
for (Field field : fields) {
name = field.name();
value = afterStruct.get(name);
resJson.put(name, value);
}
} else if (beforeStruct != null) {
System.out.println("Deleting >>>>>>>");
LOGGER.info("Deleted, ignored ...");
} else {
System.out.println("No this operation ...");
LOGGER.warn("No this operation ...");
}
} catch (Exception e) {
System.out.println("Deserialize throws exception:");
LOGGER.error("Deserialize throws exception:", e);
}
collector.collect(resJson);
}
@Override
public TypeInformation<JSONObject> getProducedType() {
return BasicTypeInfo.of(JSONObject.class);
}
}
参考:https://ververica.github.io/flink-cdc-connectors/release-2.0/content/about.html
|