在使用Apache_beam[gcp]进行GroupBy操作时,我们可以通过以下代码将结果保存到不同的文件夹中:
import apache_beam as beam
class WriteToFile(beam.DoFn):
def __init__(self, output_path):
self.output_path = output_path
def process(self, element):
key, value = element
filename = self.output_path + '/' + str(key) + '.txt'
with open(filename, 'a') as f:
for item in value:
f.write("%s\n" % item)
output_path = 'gs://my-bucket/output-folder'
with beam.Pipeline(options=options) as pipeline:
result = (
pipeline
| 'ReadFromSource' >> beam.io.ReadFromText('gs://my-bucket/input-file')
| 'ParseLine' >> beam.Map(lambda line: (line.split(',')[0], line.split(',')[1]))
| 'GroupByKey' >> beam.GroupByKey()
| 'WriteToFile' >> beam.ParDo(WriteToFile(output_path))
)
以上代码将输入文件按照第一列分组后,将每组的值写入到以组名命名的文件中,并保存在指定的输出路径下。我们可以根据需要修改文件的命名规则和输出路径。注意,此示例使用Google Cloud Storage作为输入和输出源,如果使用其他云服务或本地文件系统,需要相应地更改代码。