当一个任务执行时间较长,而下游任务需要等待其watermark,此时可以适当调整并发度,增加任务数量,提升整体处理能力。例如:
pipeline.apply("MyOperation", …) .setParallelism(2) // 设置并发度为2 .apply("MyDownstreamOperation", …) .setParallelism(4) // 设置下游并发度更高,提高流水线吞吐量
Beam提供了多种时间特性选择,在高负载下可以适当调整,但需要权衡使用场景,确保时间戳逻辑正确,例如:
pipeline.apply("ReadMySource", …) .withTimestampFn(new MyTimestampFn()) // 针对特定场景定制时间戳 .apply("MyOperation", …) .apply("WriteMySink", …);
Flink提供了丰富的度量监控功能,可以通过度量监控接口对pipeline进行监控,在发现问题时可以及时调整处理流程。例如:
public class MyStep extends DoFn<> { ... private final Counter myCounter = Metrics.counter(MyStep.class, "myCounter"); private final Distribution myDistribution = Metrics.distribution(MyStep.class, "myDistribution"); ...
@ProcessElement
public void processElement(ProcessContext ctx) {
...
myCounter.inc();
myDistribution.update(System.currentTimeMillis() - ctx.timestamp().getMillis());
...
}
}
通过以上解决方法可以解决Beam on Flink runner在高负载下无法推进watermarks的问题。