Apache Beam中的嵌套Top N
创始人
2024-09-03 15:02:03
0

在Apache Beam中实现嵌套Top N操作可以使用Combine和GroupByKey转换。下面是一个示例代码,演示如何使用Apache Beam在数据集上进行嵌套Top N操作。

import apache_beam as beam

class NestedTopN(beam.CombineFn):
    def __init__(self, n):
        self.n = n
    
    def create_accumulator(self):
        return []
    
    def add_input(self, accumulator, element):
        accumulator.append(element)
        return accumulator
    
    def merge_accumulators(self, accumulators):
        merged = []
        for acc in accumulators:
            merged.extend(acc)
        merged.sort(key=lambda x: x[1], reverse=True)
        return merged[:self.n]
    
    def extract_output(self, accumulator):
        return accumulator

def nested_top_n(elements, n):
    return (
        elements
        | "Add keys" >> beam.Map(lambda x: (x[0], x))
        | "Group by key" >> beam.GroupByKey()
        | "Top N" >> beam.CombinePerKey(NestedTopN(n))
    )

def run_pipeline():
    with beam.Pipeline() as pipeline:
        elements = [
            ("A", 10),
            ("B", 5),
            ("A", 15),
            ("B", 8),
            ("C", 12),
            ("A", 20),
            ("B", 3),
            ("C", 9)
        ]
        
        result = (
            pipeline
            | "Create elements" >> beam.Create(elements)
            | "Nested Top N" >> beam.ParDo(nested_top_n, n=2)
        )
        
        beam.pvalue.Print(result)

if __name__ == "__main__":
    run_pipeline()

在这个示例中,元素是一个键值对的列表,其中键是要进行嵌套Top N操作的键,值是要进行排序的值。在NestedTopN类中,我们使用add_input方法将输入元素添加到累加器中,merge_accumulators方法将所有累加器合并为一个,并根据值对其进行排序,extract_output方法返回结果。

nested_top_n函数中,我们首先通过Map转换将元素转换为键值对的形式,然后使用GroupByKey转换将具有相同键的元素分组到一起。最后,我们使用CombinePerKey转换和NestedTopN组合函数对每个键的元素进行嵌套Top N操作。

run_pipeline函数中,我们创建一个Beam管道,并使用Create转换创建输入元素的PCollection。然后,我们使用ParDo转换应用nested_top_n函数进行嵌套Top N操作。最后,我们使用Print转换打印结果。

执行上述代码,将输出嵌套Top N结果:

INFO:apache_beam.runners.portability.fn_api_runner:Running pipeline with DirectRunner.
('A', [('A', 20), ('A', 15)])
('B', [('B', 8), ('B', 5)])
('C', [('C', 12), ('C', 9)])

相关内容

热门资讯

最新技巧!潮汕掌上娱透视怎么买... 最新技巧!潮汕掌上娱透视怎么买,科技新星游牛牛辅助,存在挂教程(通报有开挂辅助下载);无需打开直接搜...
分享个大家!中至赣牌圈手机辅助... 您好:中至赣牌圈手机辅助这款游戏可以开挂的,确实是有挂的,很多玩家在这款游戏中打牌都会发现很多用户的...
避坑细节!约战辅助,九九山城辅... 避坑细节!约战辅助,九九山城辅助免费,必胜教程(发现有开挂辅助器)1、下载安装好九九山城辅助免费,进...
推荐一款!四川熊猫辅助软件免费... 推荐一款!四川熊猫辅助软件免费,新鸿狐辅助软件是真的吗,wepoke教程(推荐有开挂辅助下载);无需...
如何分辨真伪!方片十三张透视脚... 如何分辨真伪!方片十三张透视脚本,三哥玩辅助器免费下载,微扑克教程(了解有开挂辅助平台);无需打开直...
一分钟了解!钱塘十三水透视卦,... 一分钟了解!钱塘十三水透视卦,决战卡五星游戏辅助器,可靠技巧(推荐有开挂辅助平台);无需打开直接搜索...
玩家必备教程!心悦怎么开挂,微... 玩家必备教程!心悦怎么开挂,微信随意玩辅助器,透视教程(详细有开挂辅助器)1、下载安装好微信随意玩辅...
一分钟揭秘!湖南牵手胡子跑脚本... 一分钟揭秘!湖南牵手胡子跑脚本,微信小程序游戏辅助器,教你攻略(分析有开挂辅助脚本);无需打开直接搜...
记者发布!雀姬辅助脚本,拱趴游... 记者发布!雀姬辅助脚本,拱趴游戏诀窍,大神讲解(实测有开挂辅助插件);无需打开直接搜索加(薇:136...
玩家攻略!福建天天开心辅助器是... 玩家攻略!福建天天开心辅助器是真的码,微信小程序游戏破解微乐游戏,必备教程(原来有有开挂辅助挂);无...