源码解读系列,讲了很多关于Flink Kafka的源码,今天看到一段代码时,突然对Flink Kafka精确一次性有了更多的了解。代码如下:
protected void partitionConsumerRecordsHandler(
List<ConsumerRecord<byte[], byte[]>> partitionRecords,
KafkaTopicPartitionState<T, TopicPartition> partition) throws Exception {
for (ConsumerRecord<byte[], byte[]> record : partitionRecords) {
deserializer.deserialize(record, kafkaCollector);
// emit the actual records. this also updates offset state atomically and emits
// watermarks
emitRecordsWithTimestamps(
kafkaCollector.getRecords(),
partition,
record.offset(),
record.timestamp());
if (kafkaCollector.isEndOfStreamSignalled()) {
// end of stream signaled
running = false;
break;
}
}
}
protected void emitRecordsWithTimestamps(
Queue<T> records,
KafkaTopicPartitionState<T, KPH> partitionState,
long offset,
long kafkaEventTimestamp) {
// emit the records, u
|