要使用 Apache Beam Python 条件和中断,您可以使用 DoFn
类中的 process
方法并使用 yield
语句返回满足条件的元素。以下是一个示例:
import apache_beam as beam
class FilterGreaterThan(beam.DoFn):
def __init__(self, threshold):
self.threshold = threshold
def process(self, element):
if element > self.threshold:
yield element
with beam.Pipeline() as pipeline:
input_data = [1, 2, 3, 4, 5]
threshold = 3
filtered_data = (
pipeline
| beam.Create(input_data)
| beam.ParDo(FilterGreaterThan(threshold))
)
filtered_data | beam.Map(print)
在上述示例中,我们定义了一个名为 FilterGreaterThan
的自定义 DoFn
类,它接受一个阈值作为参数。process
方法接收每个输入元素,并检查是否大于阈值。如果是,则使用 yield
语句返回该元素。
在管道中,我们首先使用 beam.Create
创建了一个输入数据集,然后使用 beam.ParDo
将 FilterGreaterThan
应用于每个元素。最后,我们使用 beam.Map
将过滤后的数据打印出来。
请注意,yield
语句用于返回满足条件的元素,而不是使用 return
语句。这是因为 yield
语句允许您返回多个元素,而不仅限于单个元素。