Apache Beam无法正确从Google Cloud Storage接收pub/sub消息。
创始人
2024-09-03 15:01:03
0

要从Google Cloud Storage接收pub/sub消息,您需要使用Google Cloud Pub/Sub I/O模块的ReadFromPubSub方法。以下是一个使用Apache Beam的Python代码示例,演示如何正确从Google Cloud Storage接收pub/sub消息:

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

# 定义自定义选项
class MyOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_argument('--input_topic', help='Input Pub/Sub topic')
        parser.add_argument('--output_bucket', help='Output GCS bucket')
        parser.add_argument('--output_prefix', help='Output GCS prefix')

# 定义处理函数
def process_message(message):
    # 处理接收到的消息
    # 这只是一个示例,您可以根据实际需求进行处理
    # 在这个示例中,我们将消息写入Google Cloud Storage
    output_bucket = options.output_bucket
    output_prefix = options.output_prefix
    output_file = f'{output_bucket}/{output_prefix}/output.txt'

    with beam.io.gcp.gcsio.GcsIO().open(output_file, 'w') as file:
        file.write(message.data)

# 解析命令行参数
options = MyOptions()
options.view_as(beam.options.pipeline_options.StandardOptions).runner = 'DirectRunner'
pipeline = beam.Pipeline(options=options)

# 从Pub/Sub读取消息
input_topic = options.input_topic
messages = (
    pipeline
    | 'Read from Pub/Sub' >> beam.io.ReadFromPubSub(topic=input_topic)
    | 'Process messages' >> beam.Map(process_message)
)

# 运行管道
pipeline.run()

在上述代码中,您需要将--input_topic参数设置为您的Pub/Sub主题,--output_bucket参数设置为您的GCS存储桶,--output_prefix参数设置为输出文件的前缀。process_message函数是用于处理接收到的pub/sub消息的自定义函数。在这个示例中,它将消息写入了Google Cloud Storage。

请确保已安装apache_beamgoogle-cloud-pubsub Python包,并设置正确的Google Cloud凭据。

相关内容

热门资讯

玩家必看科普“789大菠萝可以... 玩家必看科普“789大菠萝可以控制”原先有开挂辅助助手(今日头条);详细789大菠萝可以控制攻略(7...
透视软件!wepoker有插件... 透视软件!wepoker有插件-科普开挂透视辅助工具(有挂秘笈)1、玩家可以在wepoker有插件软...
技术分享“多多科技手游辅助”从... 技术分享“多多科技手游辅助”从来有开挂辅助下载(竟然有挂);1、完成多多科技手游辅助的残局,帮助玩家...
透视了解!wepoker透视脚... 您好,wepoker透视脚本免费这款游戏可以开挂的,确实是有挂的,需要了解加去威信【13670430...
一起来讨论“微乐家乡小程序辅助... 一起来讨论“微乐家乡小程序辅助”确实有开挂辅助下载(有挂技术);1、完成微乐家乡小程序辅助的残局,帮...
透视ai!xpoker辅助器-... 透视ai!xpoker辅助器-揭露开挂透视辅助app(有挂教程)1、全新机制【xpoker辅助器软件...
9分钟了解“麻友圈安庆版插件”... 9分钟了解“麻友圈安庆版插件”其实有开挂辅助软件(有挂存在)是一款可以让一直输的玩家,快速成为一个“...
透视科技!pokemmo手机版... 透视科技!pokemmo手机版透视脚本-解密开挂透视辅助方法(有人有挂)1、用户打开应用后不用登录就...
总算了解“闲逸透视app安装步... 总算了解“闲逸透视app安装步骤详解”本来有开挂辅助助手(今日头条);闲逸透视app安装步骤详解软件...
透视规律!aapoker如何设... 透视规律!aapoker如何设置胜率-科普开挂透视辅助软件(有挂存在)1、让任何用户在无需aapok...