Flink的HiveStreamingSink实现流程
前言
目前我们为了增强数据的时效性,增加了Flink实时写入Hive的流程,基于Flink写入Hive这里之前基本上是没有接触过的,看了官网的文章之后,由于我们的追求数据为1-10分钟内可见性,但是数据也不足1分钟就能达到128MB的情况,于是也会产生各种各样的十几MB的小文件,于是需要了解一下这个写入流程基于上面进行改造,使小文件能够达到自动合并的效果,顺便记录一下FlinkStreamingHive的流程
1,HiveTableSink初始化校验流程
1.1创建TableSink对象
public HiveTableSink(
ReadableConfig flinkConf,
JobConf jobConf,
ObjectIdentifier identifier,
CatalogTable table,
@Nullable Integer configuredParallelism) {
this.flinkConf = flinkConf;
this.jobConf = jobConf;
this.identifier = identifier;
this.catalogTable = table;
hiveVersion =
Preconditions.checkNotNull(
jobConf.get(HiveCatalogFactoryOptions.HIVE_VERSION.key()),
"Hive version is not defined");
hiveShim = HiveShimLoader.loadHiveShim(hiveVersion);
tableSchema = TableSchemaUtils.getPhysicalSchema(table.getSchema());
this.configuredParallelism = configuredParallelism;
}
1.2返回SinkRunTimeProvider
@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
DataStructureConverter converter =
context.createDataStructureConverter(tableSchema.toRowDataType());
return (DataStreamSinkProvider)
dataStream -> consume(dataStream, context.isBounded(), converter);
}
private DataStreamSink<?> consume(DataStream<RowData> dataStream, boolean isBounded, DataStructureConverter converter) {
checkAcidTable(catalogTable, identifier.toObjectPath());
try (HiveMetastoreClientWrapper client = HiveMetastoreClientFactory.create(HiveConfUtils.create(jobConf), hiveVersion)) {
Table table = client.getTable(identifier.getDatabaseName(), identifier.getObjectName());
StorageDescriptor sd = table.getSd();
Class hiveOutputFormatClz =hiveShim.getHiveOutputFormatClass(Class.forName(sd.getOutputFormat()));
boolean isCompressed = jobConf.getBoolean(HiveConf.ConfVars.COMPRESSRESULT.varname, false);
HiveWriterFactory writerFactory =
new HiveWriterFactory(
jobConf,
hiveOutputFormatClz,
sd.getSerdeInfo(),
tableSchema,
getPartitionKeyArray(),
HiveReflectionUtils.getTableMetadata(hiveShim, table),
hiveShim,
isCompressed);
String extension =
Utilities.getFileExtension(
jobConf,
isCompressed,
(HiveOutputFormat<?, ?>) hiveOutputFormatClz.newInstance());
OutputFileConfig.OutputFileConfigBuilder fileNamingBuilder =
OutputFileConfig.builder()
.withPartPrefix("part-" + UUID.randomUUID().toString())
.withPartSuffix(extension == null ? "" : extension);
final int parallelism =
Optional.ofNullable(configuredParallelism).orElse(dataStream.getParallelism());
if (isBounded) {
OutputFileConfig fileNaming = fileNamingBuilder.build();
return createBatchSink(
dataStream, converter, sd, writerFactory, fileNaming, parallelism);
} else {
if (overwrite) {
throw new IllegalStateException("Streaming mode not support overwrite.");
}
Properties tableProps = HiveReflectionUtils.getTableMetadata(hiveShim, table);
return createStreamSink(
dataStream, sd, tableProps, writerFactory, fileNamingBuilder, parallelism);
}
} catch (TException e) {
throw new CatalogException("Failed to query Hive metaStore", e);
} catch (IOException e) {
throw new FlinkRuntimeException("Failed to create staging dir", e);
} catch (ClassNotFoundException e) {
throw new FlinkHiveException("Failed to get output format class", e);
} catch (IllegalAccessException | InstantiationException e) {
throw new FlinkHiveException("Failed to instantiate output format instance", e);
}
}
基于这里,我们针对于Flink的HiveTableSink初始化就基本了解的差不多完成了,说实话一个顶级的框架代码规范以及异常处理都是非常吊的,学框架的基本就是了解思想,其次要去看看别人怎么写代码,可以收获特别多,非常值得我们学习;
2,HiveTableStreamSink创建
2.1 StreamSink的创建
private DataStreamSink<?> createStreamSink(
DataStream<RowData> dataStream,
StorageDescriptor sd,
Properties tableProps,
HiveWriterFactory recordWriterFactory,
OutputFileConfig.OutputFileConfigBuilder fileNamingBuilder,
final int parallelism
) {
org.apache.flink.configuration.Configuration conf = new org.apache.flink.configuration.Configuration();
catalogTable.getOptions().forEach(conf::setString);
HiveRowDataPartitionComputer partComputer =
new HiveRowDataPartitionComputer(
hiveShim,
defaultPartName(),
tableSchema.getFieldNames(),
tableSchema.getFieldDataTypes(),
getPartitionKeyArray());
TableBucketAssigner assigner = new TableBucketAssigner(partComputer);
HiveRollingPolicy rollingPolicy =
new HiveRollingPolicy(
conf.get(SINK_ROLLING_POLICY_FILE_SIZE).getBytes(),
conf.get(SINK_ROLLING_POLICY_ROLLOVER_INTERVAL).toMillis());
boolean autoCompaction = conf.getBoolean(FileSystemOptions.AUTO_COMPACTION);
if (autoCompaction) {
fileNamingBuilder.withPartPrefix(
convertToUncompacted(fileNamingBuilder.build().getPartPrefix()));
}
OutputFileConfig outputFileConfig = fileNamingBuilder.build();
org.apache.flink.core.fs.Path path = new org.apache.flink.core.fs.Path(sd.getLocation());
BucketsBuilder<RowData, String, ? extends BucketsBuilder<RowData, ?, ?>> builder;
if (flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER)) {
builder =
bucketsBuilderForMRWriter(
recordWriterFactory, sd, assigner, rollingPolicy, outputFileConfig);
LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer.");
} else {
Optional<BulkWriter.Factory<RowData>> bulkFactory =
createBulkWriterFactory(getPartitionKeyArray(), sd);
if (bulkFactory.isPresent()) {
builder =
StreamingFileSink.forBulkFormat(
path,
new FileSystemTableSink.ProjectionBulkFactory(
bulkFactory.get(), partComputer))
.withBucketAssigner(assigner)
.withRollingPolicy(rollingPolicy)
.withOutputFileConfig(outputFileConfig);
LOG.info("Hive streaming sink: Use native parquet&orc writer.");
} else {
builder =
bucketsBuilderForMRWriter(
recordWriterFactory, sd, assigner, rollingPolicy, outputFileConfig);
LOG.info(
"Hive streaming sink: Use MapReduce RecordWriter writer because BulkWriter Factory not available.");
}
}
long bucketCheckInterval = conf.get(SINK_ROLLING_POLICY_CHECK_INTERVAL).toMillis();
DataStream<PartitionCommitInfo> writerStream;
if (autoCompaction) {
long compactionSize =
conf.getOptional(FileSystemOptions.COMPACTION_FILE_SIZE)
.orElse(conf.get(SINK_ROLLING_POLICY_FILE_SIZE))
.getBytes();
writerStream =
StreamingSink.compactionWriter(
dataStream,
bucketCheckInterval,
builder,
fsFactory(),
path,
createCompactReaderFactory(sd, tableProps),
compactionSize,
parallelism);
} else {
writerStream =
StreamingSink.writer(dataStream, bucketCheckInterval, builder, parallelism);
}
return StreamingSink.sink(
writerStream, path, identifier, getPartitionKeys(), msFactory(), fsFactory(), conf);
}
到这里StreamSink就挂载结束了,但是其真正的实现我们目前并没有看到,真正实现,其实实现是在compactionWriter中实现的,我们可以看一下这个内容
public static <T> DataStream<PartitionCommitInfo> compactionWriter(
DataStream<T> inputStream,
long bucketCheckInterval,
StreamingFileSink.BucketsBuilder<
T, String, ? extends StreamingFileSink.BucketsBuilder<T, String, ?>>
bucketsBuilder,
FileSystemFactory fsFactory,
Path path,
CompactReader.Factory<T> readFactory,
long targetFileSize,
int parallelism
) {
CompactFileWriter<T> writer = new CompactFileWriter<>(bucketCheckInterval, bucketsBuilder);
..........
CompactCoordinator coordinator = new CompactCoordinator(fsSupplier, targetFileSize);
..........
CompactOperator<T> compacter = new CompactOperator<>(fsSupplier, readFactory, writerFactory);
}
3,HiveTableStreamSink压缩流程
3.1 CompactFileWriter
这个类就是将数据写入文件中~其本身没有实现如何写入,真正写入数据是在其父类中,但是当其父类提交了检查点之后,他会向下游发送一条写入结束的记录;
package org.apache.flink.table.filesystem.stream.compact;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.filesystem.stream.AbstractStreamingWriter;
import org.apache.flink.table.filesystem.stream.compact.CompactMessages.EndCheckpoint;
import org.apache.flink.table.filesystem.stream.compact.CompactMessages.InputFile;
public class CompactFileWriter<T>
extends AbstractStreamingWriter<T, CompactMessages.CoordinatorInput> {
private static final long serialVersionUID = 1L;
public CompactFileWriter(
long bucketCheckInterval,
StreamingFileSink.BucketsBuilder<
T, String, ? extends StreamingFileSink.BucketsBuilder<T, String, ?>>
bucketsBuilder)
super(bucketCheckInterval, bucketsBuilder);
}
@Override
protected void partitionCreated(String partition) {}
@Override
protected void partitionInactive(String partition) {}
@Override
protected void onPartFileOpened(String partition, Path newPath) {
output.collect(new StreamRecord<>(new InputFile(partition, newPath)));
}
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
super.notifyCheckpointComplete(checkpointId);
output.collect(
new StreamRecord<>(
new EndCheckpoint(
checkpointId,
getRuntimeContext().getIndexOfThisSubtask(),
getRuntimeContext().getNumberOfParallelSubtasks())));
}
}
public abstract class AbstractStreamingWriter<IN, OUT> extends AbstractStreamOperator<OUT>
implements OneInputStreamOperator<IN, OUT>, BoundedOneInput {
.........(略过代码标识)
protected abstract void partitionCreated(String partition);
protected abstract void partitionInactive(String partition);
protected abstract void onPartFileOpened(String partition, Path newPath);
protected void commitUpToCheckpoint(long checkpointId) throws Exception {
helper.commitUpToCheckpoint(checkpointId);
}
.........(略过代码标识)
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
helper.onElement(
element.getValue(),
getProcessingTimeService().getCurrentProcessingTime(),
element.hasTimestamp() ? element.getTimestamp() : null,
currentWatermark);
}
.........(略过代码标识)
}
3.2CompactCoordinator
该operator的receiver为当前打开的文件和检查点结束消息,同时会将本次检查点中打开的文件存储到state中,当接收到检查点结束的标识时,将本次检查点内的文件全部取出协调,然后将其发送到下游,下游压缩时可以随时开始,而无需去关注可能发生的不好情况
public class CompactCoordinator extends AbstractStreamOperator<CoordinatorOutput>
implements OneInputStreamOperator<CoordinatorInput, CoordinatorOutput> {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(CompactCoordinator.class);
private final SupplierWithException<FileSystem, IOException> fsFactory;
private final long targetFileSize;
private transient FileSystem fileSystem;
private transient ListState<Map<Long, Map<String, List<Path>>>> inputFilesState;
private transient TreeMap<Long, Map<String, List<Path>>> inputFiles;
private transient Map<String, List<Path>> currentInputFiles;
private transient TaskTracker inputTaskTracker;
public CompactCoordinator(
SupplierWithException<FileSystem, IOException> fsFactory, long targetFileSize) {
this.fsFactory = fsFactory;
this.targetFileSize = targetFileSize;
}
@Override
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);
fileSystem = fsFactory.get();
ListStateDescriptor<Map<Long, Map<String, List<Path>>>> filesDescriptor =
new ListStateDescriptor<>(
"files-state",
new MapSerializer<>(
LongSerializer.INSTANCE,
new MapSerializer<>(
StringSerializer.INSTANCE,
new ListSerializer<>(
new KryoSerializer<>(
Path.class, getExecutionConfig())))));
inputFilesState = context.getOperatorStateStore().getListState(filesDescriptor);
inputFiles = new TreeMap<>();
currentInputFiles = new HashMap<>();
if (context.isRestored()) {
inputFiles.putAll(inputFilesState.get().iterator().next());
}
}
@Override
public void processElement(StreamRecord<CoordinatorInput> element) throws Exception {
CoordinatorInput value = element.getValue();
if (value instanceof InputFile) {
InputFile file = (InputFile) value;
currentInputFiles
.computeIfAbsent(file.getPartition(), k -> new ArrayList<>())
.add(file.getFile());
} else if (value instanceof EndCheckpoint) {
EndCheckpoint endCheckpoint = (EndCheckpoint) value;
if (inputTaskTracker == null) {
inputTaskTracker = new TaskTracker(endCheckpoint.getNumberOfTasks());
}
boolean triggerCommit =
inputTaskTracker.add(
endCheckpoint.getCheckpointId(), endCheckpoint.getTaskId());
if (triggerCommit) {
commitUpToCheckpoint(endCheckpoint.getCheckpointId());
}
} else {
throw new UnsupportedOperationException("Unsupported input message: " + value);
}
}
private void commitUpToCheckpoint(long checkpointId) {
Map<Long, Map<String, List<Path>>> headMap = inputFiles.headMap(checkpointId, true);
for (Map.Entry<Long, Map<String, List<Path>>> entry : headMap.entrySet()) {
coordinate(entry.getKey(), entry.getValue());
}
headMap.clear();
}
private void coordinate(long checkpointId, Map<String, List<Path>> partFiles) {
Function<Path, Long> sizeFunc =
path -> {
try {
return fileSystem.getFileStatus(path).getLen();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
};
Map<String, List<List<Path>>> compactUnits = new HashMap<>();
partFiles.forEach(
(p, files) -> {
files.sort(Comparator.comparing(Path::getPath));
compactUnits.put(p, BinPacking.pack(files, sizeFunc, targetFileSize));
});
int unitId = 0;
for (Map.Entry<String, List<List<Path>>> unitsEntry : compactUnits.entrySet()) {
String partition = unitsEntry.getKey();
for (List<Path> unit : unitsEntry.getValue()) {
output.collect(new StreamRecord<>(new CompactionUnit(unitId, partition, unit)));
unitId++;
}
}
LOG.debug("Coordinate checkpoint-{}, compaction units are: {}", checkpointId, compactUnits);
output.collect(new StreamRecord<>(new EndCompaction(checkpointId)));
}
}
3,3 CompactOperator
这个类就是HiveStreamSink的最终operator,经过上游发送的文件就是同路径下可以压缩的文件和unitid,完成压缩流程其实就是所谓的将小文件读取出来,然后写入到一个新文件中,然后将原来的旧文件进行删除
public class CompactOperator<T> extends AbstractStreamOperator<PartitionCommitInfo>
implements OneInputStreamOperator<CoordinatorOutput, PartitionCommitInfo>, BoundedOneInput {
private static final long serialVersionUID = 1L;
public static final String UNCOMPACTED_PREFIX = ".uncompacted-";
public static final String COMPACTED_PREFIX = "compacted-";
private final SupplierWithException<FileSystem, IOException> fsFactory;
private final CompactReader.Factory<T> readerFactory;
private final CompactWriter.Factory<T> writerFactory;
private transient FileSystem fileSystem;
private transient ListState<Map<Long, List<Path>>> expiredFilesState;
private transient TreeMap<Long, List<Path>> expiredFiles;
private transient List<Path> currentExpiredFiles;
private transient Set<String> partitions;
private transient Path path;
public CompactOperator(
SupplierWithException<FileSystem, IOException> fsFactory,
CompactReader.Factory<T> readerFactory,
Path path,
CompactWriter.Factory<T> writerFactory) {
this.fsFactory = fsFactory;
this.path = path;
this.readerFactory = readerFactory;
this.writerFactory = writerFactory;
}
@Override
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);
this.partitions = new HashSet<>();
this.fileSystem = fsFactory.get();
ListStateDescriptor<Map<Long, List<Path>>> metaDescriptor =
new ListStateDescriptor<>(
"expired-files",
new MapSerializer<>(
LongSerializer.INSTANCE,
new ListSerializer<>(
new KryoSerializer<>(Path.class, getExecutionConfig()))));
this.expiredFilesState = context.getOperatorStateStore().getListState(metaDescriptor);
this.expiredFiles = new TreeMap<>();
this.currentExpiredFiles = new ArrayList<>();
if (context.isRestored()) {
this.expiredFiles.putAll(this.expiredFilesState.get().iterator().next());
}
}
@Override
public void processElement(StreamRecord<CoordinatorOutput> element) throws Exception {
CoordinatorOutput value = element.getValue();
if (value instanceof CompactionUnit) {
CompactionUnit unit = (CompactionUnit) value;
if (unit.isTaskMessage(getRuntimeContext().getNumberOfParallelSubtasks(),getRuntimeContext().getIndexOfThisSubtask())) {
String partition = unit.getPartition();
List<Path> paths = unit.getPaths();
doCompact(partition, paths);
this.partitions.add(partition);
this.currentExpiredFiles.addAll(paths);
}
} else if (value instanceof EndCompaction) {
LOG.info("当前检查点位是:" + ((EndCompaction) value).getCheckpointId());
endCompaction(((EndCompaction) value).getCheckpointId());
}
}
private void endCompaction(long checkpoint) {
this.output.collect(
new StreamRecord<>(
new PartitionCommitInfo(
checkpoint,
getRuntimeContext().getIndexOfThisSubtask(),
getRuntimeContext().getNumberOfParallelSubtasks(),
new ArrayList<>(this.partitions))));
this.partitions.clear();
}
...........
private void clearExpiredFiles(long checkpointId) throws IOException {
NavigableMap<Long, List<Path>> outOfDateMetas = expiredFiles.headMap(checkpointId, true);
for (List<Path> paths : outOfDateMetas.values()) {
for (Path meta : paths) {
fileSystem.delete(meta, true);
}
}
outOfDateMetas.clear();
}
private void doCompact(String partition, List<Path> paths) throws IOException {
if (paths.size() == 0) {
return;
}
Path target = createCompactedFile(paths);
if (fileSystem.exists(target)) {
return;
}
checkExist(paths);
long startMillis = System.currentTimeMillis();
boolean success = false;
if (paths.size() == 1) {
success = doSingleFileMove(paths.get(0), target);
}
if (!success) {
doMultiFilesCompact(partition, paths, target);
}
double costSeconds = ((double) (System.currentTimeMillis() - startMillis)) / 1000;
LOG.info(
"Compaction time cost is '{}S', target file is '{}', input files are '{}'",
costSeconds,
target,
paths);
}
......(下面就是压缩的代码,没啥讲的就是reader on write)
}
总结
HiveConnector到此基本上就告一段落了,基于其实现小文件合并可以为当N次检查点后合并最小的N+1个文件 or 基于time合并文件 or 离线部署任务合并文件 思路有很多主要是要考虑性能问题以及是否符合场景要求
|