01 引言
在上一节《Flink自定义Connector》,我们知道了Flink自定义Connector 的基本流程,其流程图如下: 进入代码层面,开发者自定义connector 的流程如下:
- 首先定义Flink SQL的DDL;
- 创建解析和验证选项的工厂(DynamicTableSourceFactory、DeserializationFormatFactory),注意两个工厂都已添加到META-INF/services目录中;
- 实现ScanTableSource;
- 具体业务细节在getScanRuntimeProvider实现;
为了掌握Flink 自定义Connector ,本文直接从源码出发,研究Flink 的kafka connector 是如何实现的?
附:Flink源码下载地址
02 Kafka-Connector 源码分析
2.1 项目结构
导入Flink 的源码后,可以看到Kafka Connector 的源码结构如下: 先来看看里面的pom 文件,其余细节不用看,主要看依赖的内容,下面做了一些整理与添加了相关的描述:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>flink-connectors</artifactId>
<groupId>org.apache.flink</groupId>
<version>1.13.6</version>
<relativePath>..</relativePath>
</parent>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<name>Flink : Connectors : Kafka</name>
<packaging>jar</packaging>
<properties>
<kafka.version>2.4.1</kafka.version>
</properties>
<dependencies>
<!----- Flink的相关依赖,带flink的都是 ----->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<!-- 其它Flink相关依赖,此处不再详述 ...... -->
</dependency>
<!-- Kafka相关依赖 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
</dependencies>
<build>
<!-- 打包插件的定义这些内容.....-->
</build>
</project>
2.2 工厂(源码解读入口)
在src/main/resources/META-INF/services 目录,我们可以看到了有两个文件: 结合引言里面的《流程图》,可以知道定义完DDL后,工厂就是整个流程的入口了,我们看看里面的两个文件内容。
org.apache.flink.table.factories.Factory(Table SQL模式):
org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory
org.apache.flink.streaming.connectors.kafka.table.UpsertKafkaDynamicTableFactory
org.apache.flink.table.factories.TableFactory(Table API模式):
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
从配置文件可以得知,我们需要继续研读以下工厂类:
- KafkaDynamicTableFactory
- UpsertKafkaDynamicTableFactory
- KafkaTableSourceSinkFactory
2.2.1 KafkaDynamicTableFactory
KafkaDynamicTableFactory 的详细代码如下,里面已经写好注释了,可以方便大家的理解:
@Internal
public class KafkaDynamicTableFactory
implements DynamicTableSourceFactory, DynamicTableSinkFactory {
public static final String IDENTIFIER = "kafka";
@Override
public String factoryIdentifier() {
return IDENTIFIER;
}
@Override
public Set<ConfigOption<?>> requiredOptions() {
final Set<ConfigOption<?>> options = new HashSet<>();
options.add(PROPS_BOOTSTRAP_SERVERS);
return options;
}
@Override
public Set<ConfigOption<?>> optionalOptions() {
final Set<ConfigOption<?>> options = new HashSet<>();
options.add(FactoryUtil.FORMAT);
options.add(KEY_FORMAT);
options.add(KEY_FIELDS);
options.add(KEY_FIELDS_PREFIX);
options.add(VALUE_FORMAT);
options.add(VALUE_FIELDS_INCLUDE);
options.add(TOPIC);
options.add(TOPIC_PATTERN);
options.add(PROPS_GROUP_ID);
options.add(SCAN_STARTUP_MODE);
options.add(SCAN_STARTUP_SPECIFIC_OFFSETS);
options.add(SCAN_TOPIC_PARTITION_DISCOVERY);
options.add(SCAN_STARTUP_TIMESTAMP_MILLIS);
options.add(SINK_PARTITIONER);
options.add(SINK_SEMANTIC);
options.add(SINK_PARALLELISM);
return options;
}
@Override
public DynamicTableSource createDynamicTableSource(Context context) {
final TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
final ReadableConfig tableOptions = helper.getOptions();
final Optional<DecodingFormat<DeserializationSchema<RowData>>> keyDecodingFormat =
getKeyDecodingFormat(helper);
final DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat =
getValueDecodingFormat(helper);
helper.validateExcept(PROPERTIES_PREFIX);
validateTableSourceOptions(tableOptions);
validatePKConstraints(
context.getObjectIdentifier(), context.getCatalogTable(), valueDecodingFormat);
final StartupOptions startupOptions = getStartupOptions(tableOptions);
final Properties properties = getKafkaProperties(context.getCatalogTable().getOptions());
properties.setProperty(
FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS,
String.valueOf(
tableOptions
.getOptional(SCAN_TOPIC_PARTITION_DISCOVERY)
.map(Duration::toMillis)
.orElse(FlinkKafkaConsumerBase.PARTITION_DISCOVERY_DISABLED)));
final DataType physicalDataType =
context.getCatalogTable().getSchema().toPhysicalRowDataType();
final int[] keyProjection = createKeyFormatProjection(tableOptions, physicalDataType);
final int[] valueProjection = createValueFormatProjection(tableOptions, physicalDataType);
final String keyPrefix = tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null);
return createKafkaTableSource(
physicalDataType,
keyDecodingFormat.orElse(null),
valueDecodingFormat,
keyProjection,
valueProjection,
keyPrefix,
KafkaOptions.getSourceTopics(tableOptions),
KafkaOptions.getSourceTopicPattern(tableOptions),
properties,
startupOptions.startupMode,
startupOptions.specificOffsets,
startupOptions.startupTimestampMillis);
}
@Override
public DynamicTableSink createDynamicTableSink(Context context) {
final TableFactoryHelper helper =
FactoryUtil.createTableFactoryHelper(
this, autoCompleteSchemaRegistrySubject(context));
final ReadableConfig tableOptions = helper.getOptions();
final Optional<EncodingFormat<SerializationSchema<RowData>>> keyEncodingFormat =
getKeyEncodingFormat(helper);
final EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat =
getValueEncodingFormat(helper);
helper.validateExcept(PROPERTIES_PREFIX);
validateTableSinkOptions(tableOptions);
validatePKConstraints(
context.getObjectIdentifier(), context.getCatalogTable(), valueEncodingFormat);
final DataType physicalDataType =
context.getCatalogTable().getSchema().toPhysicalRowDataType();
final int[] keyProjection = createKeyFormatProjection(tableOptions, physicalDataType);
final int[] valueProjection = createValueFormatProjection(tableOptions, physicalDataType);
final String keyPrefix = tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null);
final Integer parallelism = tableOptions.getOptional(SINK_PARALLELISM).orElse(null);
return createKafkaTableSink(
physicalDataType,
keyEncodingFormat.orElse(null),
valueEncodingFormat,
keyProjection,
valueProjection,
keyPrefix,
tableOptions.get(TOPIC).get(0),
getKafkaProperties(context.getCatalogTable().getOptions()),
getFlinkKafkaPartitioner(tableOptions, context.getClassLoader()).orElse(null),
getSinkSemantic(tableOptions),
parallelism);
}
private static Optional<DecodingFormat<DeserializationSchema<RowData>>> getKeyDecodingFormat(
TableFactoryHelper helper) {
final Optional<DecodingFormat<DeserializationSchema<RowData>>> keyDecodingFormat =
helper.discoverOptionalDecodingFormat(
DeserializationFormatFactory.class, KEY_FORMAT);
keyDecodingFormat.ifPresent(
format -> {
if (!format.getChangelogMode().containsOnly(RowKind.INSERT)) {
throw new ValidationException(
String.format(
"A key format should only deal with INSERT-only records. "
+ "But %s has a changelog mode of %s.",
helper.getOptions().get(KEY_FORMAT),
format.getChangelogMode()));
}
});
return keyDecodingFormat;
}
private static Optional<EncodingFormat<SerializationSchema<RowData>>> getKeyEncodingFormat(
TableFactoryHelper helper) {
final Optional<EncodingFormat<SerializationSchema<RowData>>> keyEncodingFormat =
helper.discoverOptionalEncodingFormat(SerializationFormatFactory.class, KEY_FORMAT);
keyEncodingFormat.ifPresent(
format -> {
if (!format.getChangelogMode().containsOnly(RowKind.INSERT)) {
throw new ValidationException(
String.format(
"A key format should only deal with INSERT-only records. "
+ "But %s has a changelog mode of %s.",
helper.getOptions().get(KEY_FORMAT),
format.getChangelogMode()));
}
});
return keyEncodingFormat;
}
private static DecodingFormat<DeserializationSchema<RowData>> getValueDecodingFormat(
TableFactoryHelper helper) {
return helper.discoverOptionalDecodingFormat(
DeserializationFormatFactory.class, FactoryUtil.FORMAT)
.orElseGet(
() ->
helper.discoverDecodingFormat(
DeserializationFormatFactory.class, VALUE_FORMAT));
}
private static EncodingFormat<SerializationSchema<RowData>> getValueEncodingFormat(
TableFactoryHelper helper) {
return helper.discoverOptionalEncodingFormat(
SerializationFormatFactory.class, FactoryUtil.FORMAT)
.orElseGet(
() ->
helper.discoverEncodingFormat(
SerializationFormatFactory.class, VALUE_FORMAT));
}
private static void validatePKConstraints(
ObjectIdentifier tableName, CatalogTable catalogTable, Format format) {
if (catalogTable.getSchema().getPrimaryKey().isPresent()
&& format.getChangelogMode().containsOnly(RowKind.INSERT)) {
Configuration options = Configuration.fromMap(catalogTable.getOptions());
String formatName =
options.getOptional(FactoryUtil.FORMAT).orElse(options.get(VALUE_FORMAT));
throw new ValidationException(
String.format(
"The Kafka table '%s' with '%s' format doesn't support defining PRIMARY KEY constraint"
+ " on the table, because it can't guarantee the semantic of primary key.",
tableName.asSummaryString(),
formatName));
}
}
protected KafkaDynamicSource createKafkaTableSource(
DataType physicalDataType,
@Nullable DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat,
DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat,
int[] keyProjection,
int[] valueProjection,
@Nullable String keyPrefix,
@Nullable List<String> topics,
@Nullable Pattern topicPattern,
Properties properties,
StartupMode startupMode,
Map<KafkaTopicPartition, Long> specificStartupOffsets,
long startupTimestampMillis) {
return new KafkaDynamicSource(
physicalDataType,
keyDecodingFormat,
valueDecodingFormat,
keyProjection,
valueProjection,
keyPrefix,
topics,
topicPattern,
properties,
startupMode,
specificStartupOffsets,
startupTimestampMillis,
false);
}
protected KafkaDynamicSink createKafkaTableSink(
DataType physicalDataType,
@Nullable EncodingFormat<SerializationSchema<RowData>> keyEncodingFormat,
EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat,
int[] keyProjection,
int[] valueProjection,
@Nullable String keyPrefix,
String topic,
Properties properties,
FlinkKafkaPartitioner<RowData> partitioner,
KafkaSinkSemantic semantic,
Integer parallelism) {
return new KafkaDynamicSink(
physicalDataType,
physicalDataType,
keyEncodingFormat,
valueEncodingFormat,
keyProjection,
valueProjection,
keyPrefix,
topic,
properties,
partitioner,
semantic,
false,
SinkBufferFlushMode.DISABLED,
parallelism);
}
}
从源码我们得知,KafkaDynamicTableFactory 的主要作用有:
- 定义
key 和value 的序列化和反序列化格式工厂 - 校验连接参数
- 注入连接参数到
DynamicTableSource 和DynamicTableSink 实例
后面会继续介绍如何具体实现SerializationFormatFactory 、DeserializationFormatFactory ,以及DynamicTableSource 和DynamicTableSink 实例的实现。
2.2.2 UpsertKafkaDynamicTableFactory
UpsertKafkaDynamicTableFactory 源码里面未找到这个工厂,其代码流程猜测与KafkaDynamicTableFactory 一致。
2.2.3 KafkaTableSourceSinkFactory
Table API 模式与Table SQL 模式(声明式)原理几乎一致,都是用作查询的输入和输出,只是写法表现上有些区别,因此,这里我们只需要研读Table SQL模式下的工厂代码就好了,这里不再讲解KafkaTableSourceSinkFactory ,不影响整体理解。
2.3 序列化和反序列化工厂
2.3.1 DeserializationFormatFactory
从 《2.2.1 KafkaDynamicTableFactory》我们可以得知,里面的工厂定义了SerializationFormatFactory 、DeserializationFormatFactory ,以及DynamicTableSource 和DynamicTableSink 实例的实现。下面按顺序的讲解。首先来看看DeserializationFormatFactory。
Ctrl+T ,可以查看 DeserializationFormatFactory 的反序列化工厂有哪些实现。 这里拿典型的JsonFormatFactory 来举例,完整代码如下,里面已经写好注释,方便大家的理解:
public class JsonFormatFactory implements DeserializationFormatFactory, SerializationFormatFactory {
public static final String IDENTIFIER = "json";
@Override
public String factoryIdentifier() {
return IDENTIFIER;
}
@Override
public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(
DynamicTableFactory.Context context, ReadableConfig formatOptions) {
FactoryUtil.validateFactoryOptions(this, formatOptions);
validateDecodingFormatOptions(formatOptions);
final boolean failOnMissingField = formatOptions.get(FAIL_ON_MISSING_FIELD);
final boolean ignoreParseErrors = formatOptions.get(IGNORE_PARSE_ERRORS);
TimestampFormat timestampOption = JsonOptions.getTimestampFormat(formatOptions);
return new DecodingFormat<DeserializationSchema<RowData>>() {
@Override
public DeserializationSchema<RowData> createRuntimeDecoder(
DynamicTableSource.Context context, DataType producedDataType) {
final RowType rowType = (RowType) producedDataType.getLogicalType();
final TypeInformation<RowData> rowDataTypeInfo =
context.createTypeInformation(producedDataType);
return new JsonRowDataDeserializationSchema(
rowType,
rowDataTypeInfo,
failOnMissingField,
ignoreParseErrors,
timestampOption);
}
@Override
public ChangelogMode getChangelogMode() {
return ChangelogMode.insertOnly();
}
};
}
@Override
public EncodingFormat<SerializationSchema<RowData>> createEncodingFormat(
DynamicTableFactory.Context context, ReadableConfig formatOptions) {
FactoryUtil.validateFactoryOptions(this, formatOptions);
validateEncodingFormatOptions(formatOptions);
TimestampFormat timestampOption = JsonOptions.getTimestampFormat(formatOptions);
JsonOptions.MapNullKeyMode mapNullKeyMode = JsonOptions.getMapNullKeyMode(formatOptions);
String mapNullKeyLiteral = formatOptions.get(MAP_NULL_KEY_LITERAL);
final boolean encodeDecimalAsPlainNumber =
formatOptions.get(ENCODE_DECIMAL_AS_PLAIN_NUMBER);
return new EncodingFormat<SerializationSchema<RowData>>() {
@Override
public SerializationSchema<RowData> createRuntimeEncoder(
DynamicTableSink.Context context, DataType consumedDataType) {
final RowType rowType = (RowType) consumedDataType.getLogicalType();
return new JsonRowDataSerializationSchema(
rowType,
timestampOption,
mapNullKeyMode,
mapNullKeyLiteral,
encodeDecimalAsPlainNumber);
}
@Override
public ChangelogMode getChangelogMode() {
return ChangelogMode.insertOnly();
}
};
}
@Override
public Set<ConfigOption<?>> requiredOptions() {
return Collections.emptySet();
}
@Override
public Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(FAIL_ON_MISSING_FIELD);
options.add(IGNORE_PARSE_ERRORS);
options.add(TIMESTAMP_FORMAT);
options.add(MAP_NULL_KEY_MODE);
options.add(MAP_NULL_KEY_LITERAL);
options.add(ENCODE_DECIMAL_AS_PLAIN_NUMBER);
return options;
}
}
我们在前面可以知道,工厂的思想就是定义规则,并把这些规则注入到我们新建的实例并返回给上一级。
从上面的代码注释,可以看到JsonFormatFactory 工厂实现了DeserializationFormatFactory 和SerializationFormatFactory 两个工厂,里面均实现了这两个类的方法,分别为“createDecodingFormat ”和“createEncodingFormat ”,而这两个方法的核心主要是为了生成“DeserializationSchema ”和“SerializationSchema ”。
简单的说,就是JsonFormatFactory 这个工厂具体的序列化和反序列化实现在“DeserializationSchema ”和“SerializationSchema ”这两个接口的实现里面,分别对应“JsonRowDataDeserializationSchema ”和“JsonRowDataSerializationSchema ”。
下面先看看“JsonRowDataDeserializationSchema ”这个类。
2.3.1.2.1 JsonRowDataDeserializationSchema
JsonRowDataDeserializationSchema的主要作用是将JSON反序列化为Flink Table/SQL的内部数据结构,其代码与注释如下:
@Internal
public class JsonRowDataDeserializationSchema implements DeserializationSchema<RowData> {
private static final long serialVersionUID = 1L;
private final boolean failOnMissingField;
private final boolean ignoreParseErrors;
private final TypeInformation<RowData> resultTypeInfo;
private final JsonToRowDataConverters.JsonToRowDataConverter runtimeConverter;
private final ObjectMapper objectMapper = new ObjectMapper();
private final TimestampFormat timestampFormat;
public JsonRowDataDeserializationSchema(
RowType rowType,
TypeInformation<RowData> resultTypeInfo,
boolean failOnMissingField,
boolean ignoreParseErrors,
TimestampFormat timestampFormat) {
if (ignoreParseErrors && failOnMissingField) {
throw new IllegalArgumentException(
"JSON format doesn't support failOnMissingField and ignoreParseErrors are both enabled.");
}
this.resultTypeInfo = checkNotNull(resultTypeInfo);
this.failOnMissingField = failOnMissingField;
this.ignoreParseErrors = ignoreParseErrors;
this.runtimeConverter =
new JsonToRowDataConverters(failOnMissingField, ignoreParseErrors, timestampFormat)
.createConverter(checkNotNull(rowType));
this.timestampFormat = timestampFormat;
boolean hasDecimalType =
LogicalTypeChecks.hasNested(rowType, t -> t instanceof DecimalType);
if (hasDecimalType) {
objectMapper.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);
}
objectMapper.configure(JsonReadFeature.ALLOW_UNESCAPED_CONTROL_CHARS.mappedFeature(), true);
}
@Override
public RowData deserialize(@Nullable byte[] message) throws IOException {
if (message == null) {
return null;
}
try {
return convertToRowData(deserializeToJsonNode(message));
} catch (Throwable t) {
if (ignoreParseErrors) {
return null;
}
throw new IOException(
format("Failed to deserialize JSON '%s'.", new String(message)), t);
}
}
public JsonNode deserializeToJsonNode(byte[] message) throws IOException {
return objectMapper.readTree(message);
}
public RowData convertToRowData(JsonNode message) {
return (RowData) runtimeConverter.convert(message);
}
@Override
public boolean isEndOfStream(RowData nextElement) {
return false;
}
@Override
public TypeInformation<RowData> getProducedType() {
return resultTypeInfo;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
JsonRowDataDeserializationSchema that = (JsonRowDataDeserializationSchema) o;
return failOnMissingField == that.failOnMissingField
&& ignoreParseErrors == that.ignoreParseErrors
&& resultTypeInfo.equals(that.resultTypeInfo)
&& timestampFormat.equals(that.timestampFormat);
}
@Override
public int hashCode() {
return Objects.hash(failOnMissingField, ignoreParseErrors, resultTypeInfo, timestampFormat);
}
}
里面使用过了工具类JsonToRowDataConverter ,它的主要作用是将JsonNode转换为Flink Table/SQL的内部数据结构,这里由于篇幅原因,代码细节不再详述,不过可以看下代码片段:
2.3.2 SerializationFormatFactory
与 DeserializationFormatFactory原理差不多,此处不再详述。
2.3.2.2 JsonRowDataSerializationSchema
JsonRowDataSerializationSchema与JsonRowDataDeserializationSchema的原理差不多,主要作用是将Flink Table/SQL的内部数据结构序列化为JSON,这里不再详述。
2.4 DynamicTable动态表
从 《2.2.1 KafkaDynamicTableFactory》我们可以得知,里面的工厂除了定义了SerializationFormatFactory 、DeserializationFormatFactory ,还定义DynamicTableSource 和DynamicTableSink 实例的实现。
接下来讲解DynamicTableSource ,其对应的实现类是KafkaDynamicSource ,我们看看KafkaDynamicSource 里面的代码。
2.4.1 KafkaDynamicSource
KafkaDynamicSource 主要是对Source 的一些处理,比如这里定义了Kafka的消费者,具体的代码及注释如下:
/** KafkaDynamicSource. */
@Internal
public class KafkaDynamicSource
implements ScanTableSource, SupportsReadingMetadata, SupportsWatermarkPushDown {
// --------------------------------------------------------------------------------------------
// 可变的属性
// --------------------------------------------------------------------------------------------
/** 描述源的最终输出的数据类型。 */
protected DataType producedDataType;
/** 附加在物理源行末尾的元数据。 */
protected List<String> metadataKeys;
/** 用于生成每个分区水印的水印策略。 */
protected @Nullable
WatermarkStrategy<RowData> watermarkStrategy;
// --------------------------------------------------------------------------------------------
// 格式属性
// --------------------------------------------------------------------------------------------
private static final String VALUE_METADATA_PREFIX = "value.";
/** 配置格式的数据类型。 */
protected final DataType physicalDataType;
/** 从Kafka解码keys的可选格式。 */
protected final @Nullable
DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat;
/** 从Kafka解码值的格式。 */
protected final DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat;
/** 确定key字段和生成行的目标位置的索引。 */
protected final int[] keyProjection;
/** 确定value字段和生成行的目标位置的索引。 */
protected final int[] valueProjection;
/** 在构造物理数据类型时需要从字段中删除的前缀。 */
protected final @Nullable
String keyPrefix;
// --------------------------------------------------------------------------------------------
// Kafka特定的属性
// --------------------------------------------------------------------------------------------
/** 要消费的Kafka主题。 */
protected final List<String> topics;
/** 消费的Kafka主题模式。 */
protected final Pattern topicPattern;
/** Kafka消费者的属性。 */
protected final Properties properties;
/**
* 被包含的消费者的启动模式(默认是{@link StartupMode
*/
protected final StartupMode startupMode;
/**
* 具体启动补偿;仅当启动模式为{@link时相关StartupMode
*/
protected final Map<KafkaTopicPartition, Long> specificStartupOffsets;
/**
* 定位分区偏移量的开始时间戳;仅当启动模式为{@link时相关 StartupMode
*/
protected final long startupTimestampMillis;
/** 标志以确定源模式。在upsert模式下,它将保留tombstone消息。 * */
protected final boolean upsertMode;
/** 构造函数,初始化一些信息 */
public KafkaDynamicSource(
DataType physicalDataType,
@Nullable DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat,
DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat,
int[] keyProjection,
int[] valueProjection,
@Nullable String keyPrefix,
@Nullable List<String> topics,
@Nullable Pattern topicPattern,
Properties properties,
StartupMode startupMode,
Map<KafkaTopicPartition, Long> specificStartupOffsets,
long startupTimestampMillis,
boolean upsertMode) {
// Format attributes
this.physicalDataType =
Preconditions.checkNotNull(
physicalDataType, "Physical data type must not be null.");
this.keyDecodingFormat = keyDecodingFormat;
this.valueDecodingFormat =
Preconditions.checkNotNull(
valueDecodingFormat, "Value decoding format must not be null.");
this.keyProjection =
Preconditions.checkNotNull(keyProjection, "Key projection must not be null.");
this.valueProjection =
Preconditions.checkNotNull(valueProjection, "Value projection must not be null.");
this.keyPrefix = keyPrefix;
// Mutable attributes
this.producedDataType = physicalDataType;
this.metadataKeys = Collections.emptyList();
this.watermarkStrategy = null;
// Kafka-specific attributes
Preconditions.checkArgument(
(topics != null && topicPattern == null)
|| (topics == null && topicPattern != null),
"Either Topic or Topic Pattern must be set for source.");
this.topics = topics;
this.topicPattern = topicPattern;
this.properties = Preconditions.checkNotNull(properties, "Properties must not be null.");
this.startupMode =
Preconditions.checkNotNull(startupMode, "Startup mode must not be null.");
this.specificStartupOffsets =
Preconditions.checkNotNull(
specificStartupOffsets, "Specific offsets must not be null.");
this.startupTimestampMillis = startupTimestampMillis;
this.upsertMode = upsertMode;
}
@Override
public ChangelogMode getChangelogMode() {
return valueDecodingFormat.getChangelogMode();
}
/**
* 返回一个用于读取数据的运行时实现的Provider,这里是主要的业务实现
*/
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
final DeserializationSchema<RowData> keyDeserialization =
createDeserialization(context, keyDecodingFormat, keyProjection, keyPrefix);
final DeserializationSchema<RowData> valueDeserialization =
createDeserialization(context, valueDecodingFormat, valueProjection, null);
final TypeInformation<RowData> producedTypeInfo =
context.createTypeInformation(producedDataType);
final FlinkKafkaConsumer<RowData> kafkaConsumer =
createKafkaConsumer(keyDeserialization, valueDeserialization, producedTypeInfo);
return SourceFunctionProvider.of(kafkaConsumer, false);
}
/**
* 返回元数据键的映射及其可以生成的相应数据类型
*/
@Override
public Map<String, DataType> listReadableMetadata() {
final Map<String, DataType> metadataMap = new LinkedHashMap<>();
// according to convention, the order of the final row must be
// PHYSICAL + FORMAT METADATA + CONNECTOR METADATA
// where the format metadata has highest precedence
// add value format metadata with prefix
valueDecodingFormat
.listReadableMetadata()
.forEach((key, value) -> metadataMap.put(VALUE_METADATA_PREFIX + key, value));
// add connector metadata
Stream.of(ReadableMetadata.values())
.forEachOrdered(m -> metadataMap.putIfAbsent(m.key, m.dataType));
return metadataMap;
}
@Override
public void applyReadableMetadata(List<String> metadataKeys, DataType producedDataType) {
// separate connector and format metadata
final List<String> formatMetadataKeys =
metadataKeys.stream()
.filter(k -> k.startsWith(VALUE_METADATA_PREFIX))
.collect(Collectors.toList());
final List<String> connectorMetadataKeys = new ArrayList<>(metadataKeys);
connectorMetadataKeys.removeAll(formatMetadataKeys);
// push down format metadata
final Map<String, DataType> formatMetadata = valueDecodingFormat.listReadableMetadata();
if (formatMetadata.size() > 0) {
final List<String> requestedFormatMetadataKeys =
formatMetadataKeys.stream()
.map(k -> k.substring(VALUE_METADATA_PREFIX.length()))
.collect(Collectors.toList());
valueDecodingFormat.applyReadableMetadata(requestedFormatMetadataKeys);
}
this.metadataKeys = connectorMetadataKeys;
this.producedDataType = producedDataType;
}
@Override
public void applyWatermark(WatermarkStrategy<RowData> watermarkStrategy) {
this.watermarkStrategy = watermarkStrategy;
}
@Override
public DynamicTableSource copy() {
final KafkaDynamicSource copy =
new KafkaDynamicSource(
physicalDataType,
keyDecodingFormat,
valueDecodingFormat,
keyProjection,
valueProjection,
keyPrefix,
topics,
topicPattern,
properties,
startupMode,
specificStartupOffsets,
startupTimestampMillis,
upsertMode);
copy.producedDataType = producedDataType;
copy.metadataKeys = metadataKeys;
copy.watermarkStrategy = watermarkStrategy;
return copy;
}
@Override
public String asSummaryString() {
return "Kafka table source";
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final KafkaDynamicSource that = (KafkaDynamicSource) o;
return Objects.equals(producedDataType, that.producedDataType)
&& Objects.equals(metadataKeys, that.metadataKeys)
&& Objects.equals(physicalDataType, that.physicalDataType)
&& Objects.equals(keyDecodingFormat, that.keyDecodingFormat)
&& Objects.equals(valueDecodingFormat, that.valueDecodingFormat)
&& Arrays.equals(keyProjection, that.keyProjection)
&& Arrays.equals(valueProjection, that.valueProjection)
&& Objects.equals(keyPrefix, that.keyPrefix)
&& Objects.equals(topics, that.topics)
&& Objects.equals(String.valueOf(topicPattern), String.valueOf(that.topicPattern))
&& Objects.equals(properties, that.properties)
&& startupMode == that.startupMode
&& Objects.equals(specificStartupOffsets, that.specificStartupOffsets)
&& startupTimestampMillis == that.startupTimestampMillis
&& Objects.equals(upsertMode, that.upsertMode)
&& Objects.equals(watermarkStrategy, that.watermarkStrategy);
}
@Override
public int hashCode() {
return Objects.hash(
producedDataType,
metadataKeys,
physicalDataType,
keyDecodingFormat,
valueDecodingFormat,
keyProjection,
valueProjection,
keyPrefix,
topics,
topicPattern,
properties,
startupMode,
specificStartupOffsets,
startupTimestampMillis,
upsertMode,
watermarkStrategy);
}
// ---------具体的业务实现------------------------------------------------------------------------
/** 创建Kafka消费者 **/
protected FlinkKafkaConsumer<RowData> createKafkaConsumer(
DeserializationSchema<RowData> keyDeserialization,
DeserializationSchema<RowData> valueDeserialization,
TypeInformation<RowData> producedTypeInfo) {
final MetadataConverter[] metadataConverters =
metadataKeys.stream()
.map(
k ->
Stream.of(ReadableMetadata.values())
.filter(rm -> rm.key.equals(k))
.findFirst()
.orElseThrow(IllegalStateException::new))
.map(m -> m.converter)
.toArray(MetadataConverter[]::new);
// check if connector metadata is used at all
final boolean hasMetadata = metadataKeys.size() > 0;
// adjust physical arity with value format's metadata
final int adjustedPhysicalArity =
producedDataType.getChildren().size() - metadataKeys.size();
// adjust value format projection to include value format's metadata columns at the end
final int[] adjustedValueProjection =
IntStream.concat(
IntStream.of(valueProjection),
IntStream.range(
keyProjection.length + valueProjection.length,
adjustedPhysicalArity))
.toArray();
final KafkaDeserializationSchema<RowData> kafkaDeserializer =
new DynamicKafkaDeserializationSchema(
adjustedPhysicalArity,
keyDeserialization,
keyProjection,
valueDeserialization,
adjustedValueProjection,
hasMetadata,
metadataConverters,
producedTypeInfo,
upsertMode);
final FlinkKafkaConsumer<RowData> kafkaConsumer;
if (topics != null) {
kafkaConsumer = new FlinkKafkaConsumer<>(topics, kafkaDeserializer, properties);
} else {
kafkaConsumer = new FlinkKafkaConsumer<>(topicPattern, kafkaDeserializer, properties);
}
switch (startupMode) {
case EARLIEST:
kafkaConsumer.setStartFromEarliest();
break;
case LATEST:
kafkaConsumer.setStartFromLatest();
break;
case GROUP_OFFSETS:
kafkaConsumer.setStartFromGroupOffsets();
break;
case SPECIFIC_OFFSETS:
kafkaConsumer.setStartFromSpecificOffsets(specificStartupOffsets);
break;
case TIMESTAMP:
kafkaConsumer.setStartFromTimestamp(startupTimestampMillis);
break;
}
kafkaConsumer.setCommitOffsetsOnCheckpoints(properties.getProperty("group.id") != null);
if (watermarkStrategy != null) {
kafkaConsumer.assignTimestampsAndWatermarks(watermarkStrategy);
}
return kafkaConsumer;
}
/** 创建反序列化模式 **/
private @Nullable
DeserializationSchema<RowData> createDeserialization(
DynamicTableSource.Context context,
@Nullable DecodingFormat<DeserializationSchema<RowData>> format,
int[] projection,
@Nullable String prefix) {
if (format == null) {
return null;
}
DataType physicalFormatDataType =
DataTypeUtils.projectRow(this.physicalDataType, projection);
if (prefix != null) {
physicalFormatDataType = DataTypeUtils.stripRowPrefix(physicalFormatDataType, prefix);
}
return format.createRuntimeDecoder(context, physicalFormatDataType);
}
// --------------------------------------------------------------------------------------------
// 元数据处理
// --------------------------------------------------------------------------------------------
enum ReadableMetadata {
TOPIC(
"topic",
DataTypes.STRING().notNull(),
new MetadataConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object read(ConsumerRecord<?, ?> record) {
return StringData.fromString(record.topic());
}
}),
PARTITION(
"partition",
DataTypes.INT().notNull(),
new MetadataConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object read(ConsumerRecord<?, ?> record) {
return record.partition();
}
}),
HEADERS(
"headers",
// key and value of the map are nullable to make handling easier in queries
DataTypes.MAP(DataTypes.STRING().nullable(), DataTypes.BYTES().nullable())
.notNull(),
new MetadataConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object read(ConsumerRecord<?, ?> record) {
final Map<StringData, byte[]> map = new HashMap<>();
for (Header header : record.headers()) {
map.put(StringData.fromString(header.key()), header.value());
}
return new GenericMapData(map);
}
}),
LEADER_EPOCH(
"leader-epoch",
DataTypes.INT().nullable(),
new MetadataConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object read(ConsumerRecord<?, ?> record) {
return record.leaderEpoch().orElse(null);
}
}),
OFFSET(
"offset",
DataTypes.BIGINT().notNull(),
new MetadataConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object read(ConsumerRecord<?, ?> record) {
return record.offset();
}
}),
TIMESTAMP(
"timestamp",
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull(),
new MetadataConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object read(ConsumerRecord<?, ?> record) {
return TimestampData.fromEpochMillis(record.timestamp());
}
}),
TIMESTAMP_TYPE(
"timestamp-type",
DataTypes.STRING().notNull(),
new MetadataConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object read(ConsumerRecord<?, ?> record) {
return StringData.fromString(record.timestampType().toString());
}
});
final String key;
final DataType dataType;
final MetadataConverter converter;
ReadableMetadata(String key, DataType dataType, MetadataConverter converter) {
this.key = key;
this.dataType = dataType;
this.converter = converter;
}
}
}
当然,里面最主要的还是FlinkKafkaConsumer 消费者这个类,这里就不再描述具体的代码细节,贴上核心的解析: RichParallelSourceFunction 就自己看源码吧,一般都不会改的。
2.4.2 KafkaDynamicSink
原理和KafkaDynamicSource差不多,这里就不再详述了。
03 总结
本文的代码繁多,需要读者耐心的去看,为了更进一步的加深大家了解,这里画了一张图,希望能让大家更加容易理解:
到这里,本文讲解完了Flink Kafka Connector的源码了,希望能帮助到大家,谢谢大家的阅读,本文完!
|