这通常是正常现象。Apache Flink使用增量检查点来减小检查点的开销,这意味着在每个检查点中只保存发生更改的部分。如果一个任务没有发生任何更改,则其检查点中将不会有任何数据文件,因此会出现空的数据文件。
如果你想检测空数据文件,可以在你的代码中实现一个检查,在每个检查点/保存点后遍历所有的数据文件。例子如下:
public class EmptyFileVerifier implements CheckpointedFunction {
private ListState filenamesState;
private List filenames;
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
this.filenamesState.clear();
this.filenamesState.addAll(this.filenames);
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
this.filenamesState = context.getOperatorStateStore().getListState(
new ListStateDescriptor<>("filenames", String.class));
this.filenames = new ArrayList<>();
if (context.isRestored()) {
for (String filename : this.filenamesState.get()) {
checkEmptyFile(filename);
}
}
}
private void checkEmptyFile(String filename) {
Path filePath = new Path(filename);
try {
FileSystem fs = FileSystem.get(filePath.toUri(), new Configuration());
if (fs.getFileStatus(filePath).getLen() == 0) {
System.err.printf("Empty file %s detected\n", filename);
}
} catch (IOException e) {
System.err.printf("Failed to access file %s: %s\n", filename, e);
}
}
@Override
public void invoke(T value, Context context) throws Exception {
this.filenames.add(context.getOperatorStateStore().getCheckpointStorage().getLocationForCheckpoint(context.getCheckpointId())
.resolve(String.format("%d-%d", context.getSubtaskIndex(), context.getAttemptNumber())).toString());
}
}
在你的任务中,可以创建该 `EmptyFileVerifier