要从Google Cloud Storage接收pub/sub消息,您需要使用Google Cloud Pub/Sub I/O模块的ReadFromPubSub方法。以下是一个使用Apache Beam的Python代码示例,演示如何正确从Google Cloud Storage接收pub/sub消息:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
# 定义自定义选项
class MyOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument('--input_topic', help='Input Pub/Sub topic')
parser.add_argument('--output_bucket', help='Output GCS bucket')
parser.add_argument('--output_prefix', help='Output GCS prefix')
# 定义处理函数
def process_message(message):
# 处理接收到的消息
# 这只是一个示例,您可以根据实际需求进行处理
# 在这个示例中,我们将消息写入Google Cloud Storage
output_bucket = options.output_bucket
output_prefix = options.output_prefix
output_file = f'{output_bucket}/{output_prefix}/output.txt'
with beam.io.gcp.gcsio.GcsIO().open(output_file, 'w') as file:
file.write(message.data)
# 解析命令行参数
options = MyOptions()
options.view_as(beam.options.pipeline_options.StandardOptions).runner = 'DirectRunner'
pipeline = beam.Pipeline(options=options)
# 从Pub/Sub读取消息
input_topic = options.input_topic
messages = (
pipeline
| 'Read from Pub/Sub' >> beam.io.ReadFromPubSub(topic=input_topic)
| 'Process messages' >> beam.Map(process_message)
)
# 运行管道
pipeline.run()
在上述代码中,您需要将--input_topic
参数设置为您的Pub/Sub主题,--output_bucket
参数设置为您的GCS存储桶,--output_prefix
参数设置为输出文件的前缀。process_message
函数是用于处理接收到的pub/sub消息的自定义函数。在这个示例中,它将消息写入了Google Cloud Storage。
请确保已安装apache_beam
和google-cloud-pubsub
Python包,并设置正确的Google Cloud凭据。