ApacheBeamPython的Dataflow在使用GCPPub/Sub计数时进行了过多的计数。
创始人
2024-09-05 11:30:37
0

可以通过将窗口持续时间设置为 Pub/Sub 保留的最长时间来解决该问题,比如1小时。具体实现代码示例如下:

# 定义窗口持续时间为1小时
window_duration_in_seconds = 60 * 60

# 定义 Pub/Sub 的话题
input_topic = "projects/{project}/topics/{topic}".format(
   project=project, topic=topic)

# 定义数据流管道
pipeline = beam.Pipeline(options=options)

# 从 Pub/Sub 话题中读取数据流
messages = (
    pipeline
    | "Read Pub/Sub Messages" >> beam.io.gcp.pubsub.ReadFromPubSub(topic=input_topic)
    | "Decode Messages from JSON" >> beam.Map(lambda x: json.loads(x)))

# 固定窗口持续时间为1小时
fixed_windows = (
    messages
    | "add timestamp to messages" >> beam.ParDo(AddTimestampFn())
    | "Create Fixed Windows with Hourly duration" >> beam.WindowInto(
        beam.window.FixedWindows(window_duration_in_seconds))
)

# 进行计数操作
message_count = (
    fixed_windows
    | "Extract Pub/Sub message ids" >> beam.Map(lambda message: message.get("message_id"))
    | "Count messages" >> beam.combiners.Count.PerElement())

# 将计数结果写入云存储
message_count | "Write results to GCS" >> WriteToText(output_path)

在以上示例代码中,“AddTimestampFn”是一个自定义的 DoFn 函数,它的作用是为每条消息添加时间戳信息,以便进行窗口操作中的时间约束。最后计数结果将被写入云存储中。

相关内容

热门资讯

在玩家背景下!新珊瑚大厅辅助,... 在玩家背景下!新珊瑚大厅辅助,新海贝辅助器-好像确实有辅助技巧(哔哩哔哩)1、每一步都需要思考,不同...
透视存在!微信小程序辅助app... 透视存在!微信小程序辅助app下载,广西友乐免费辅助-真是确实有辅助挂(哔哩哔哩)亲,关键说明,微信...
透视好友!指尖四川辅助破解版,... 透视好友!指尖四川辅助破解版,红茶馆app辅助-真是真的有辅助教程(哔哩哔哩)1、操作简单,无需指尖...
透视脚本!越局吧可以看到别人底... 透视脚本!越局吧可以看到别人底牌,金华佛手在线辅助软件-原来存在有辅助方法(哔哩哔哩)1、让任何用户...
近期!wepoker辅助器是真... 近期!wepoker辅助器是真的吗,闲逸辅助软件下载-确实是有辅助挂(哔哩哔哩)该软件可以轻松地帮助...
据统计!创思维激k看底牌辅助开... 据统计!创思维激k看底牌辅助开发商,约战竞技场辅助脚本-总是是真的有辅助软件(哔哩哔哩)1、许多玩家...
黑科技辅助挂!乐乐休闲游戏辅助... 黑科技辅助挂!乐乐休闲游戏辅助,微信边锋辅助软件-其实真的有辅助器(哔哩哔哩)1、超多福利:超高返利...
据目击者称!新玉海楼游戏茶苑,... 据目击者称!新玉海楼游戏茶苑,丫丫衡阳字牌3辅助-一贯存在有辅助攻略(哔哩哔哩)丫丫衡阳字牌3辅助脚...
复盘辅助挂!牵手游戏辅助,开心... 复盘辅助挂!牵手游戏辅助,开心泉州辅助-果然真的是有辅助教程(哔哩哔哩)1、每一步都需要思考,不同水...
据相关数据显示!雀友会广东潮汕... 据相关数据显示!雀友会广东潮汕麻雀有挂么,衢州都莱辅助软件-总是是有辅助app(哔哩哔哩)亲,关键说...