添加with_output_types()方法以显式指定BatchElements()输出类型,如下示例所示。
import apache_beam as beam
class MyDoFn(beam.DoFn):
def process(self, element):
return [(i,1) for i in element]
with beam.Pipeline() as p:
data = p | "Create data" >> beam.Create([(1,2),(3,4),(5,6),(7,8)])
batched_data = (
data
| "Batch data" >> beam.BatchElements(
min_batch_size=2,
max_batch_size=3,
max_latency=1))
reshuffled_data = (
batched_data
| "Reshuffle data" >> beam.Reshuffle())
processed_data = (
reshuffled_data
| "Process data" >> beam.ParDo(MyDoFn())
| "Grouping data" >> beam.GroupByKey())
processed_data | "Print result" >> beam.Map(print)
注:beam.BatchElements()输出类型默认情况下是Tuple[Any,Union[Timestamp,int]],这可能是导致DataflowRunner无法正常工作的原因之一。
上一篇:Batch的for命令无法返回值
下一篇:BAT程序由于未知原因崩溃。