这个问题通常是由于worker或pipeline在运行时出现异常而导致的。针对这种情况,可以使用try except块来捕捉这些异常并确保“finish_bundle”仅被执行一次。
以下是示例代码:
class MyDoFn(beam.DoFn):
def process(self, element):
try:
# do some processing here
yield BeamRecord
except:
# handle exception here
finally:
# ensure that finish_bundle is only executed once
with self._lock:
if self._should_finish_bundle:
self._should_finish_bundle = False
yield beam.pvalue.TaggedOutput('finished', None)
def finish_bundle(self):
with self._lock:
self._should_finish_bundle = True
pipeline = beam.Pipeline(options=options)
(pipeline
| "Read input" >> beam.io.ReadFromText(input_file)
| "Process elements" >> beam.ParDo(MyDoFn()).with_outputs('finished))
result = pipeline.run()
在这个示例中,我们定义了一个名为“MyDoFn”的DoFn类,并使用属性“self._should_finish_bundle”来跟踪一个worker是否应该执行“finish_bundle”。如果出现异常,则将其处理并不执行“finish_bundle”。在任何情况下,如果_worker_存在于_finish_bundle中,则使用一个_lock_确保它只执行一次。由于_finish_bundle是耗时操作,因此在确保它只被执行一次的同时有助于保持Beam管道的高效性。