AwsGlue任务生成了很多小文件
创始人
2024-09-25 16:01:52
0

使用“合并文件”的技术,将多个小文件合并成一个大文件,以减少存储成本,并提高查询的性能。

示例代码如下:

from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.functions import udf
from pyspark.sql.types import StructType, StructField, IntegerType
from pyspark.sql.types import StringType, DoubleType, BooleanType

# 定义UDF将小文件合并成一个大文件
def merge_files(partition):
    # 通过monotonically_increasing_id函数为每一个行添加一个唯一的ID号码
    index = int(partition.first()["id"])
    filename = "/tmp/big_file_" + str(index) + ".csv"
    # 将小文件读取进来,合并成一个大文件
    with open(filename, "w") as f:
        for row in partition:
            f.write(",".join([str(x) for x in row]))
            f.write("\n")

# 定义结构和创建DF
schema = StructType([
  StructField("column1", IntegerType(), True),
  StructField("column2", StringType(), True),
  StructField("column3", DoubleType(), True),
  StructField("column4", BooleanType(), True)
])

input_df = spark.read.csv("s3://my_bucket/small_files/", header=True, schema=schema)

# 在DF上面调用UDF
output_df = input_df.repartition(10).sort("id").foreachPartition(merge_files)

这里我们使用了pyspark内置的函数 monotonically_increasing_id 给每一行数据添加了唯一的ID号码,并设定每个分区有10个文件。在执行 foreachPartition 操作之后,将调用我们自定义的UDF,将每个分区中的数据合并成一个大文件。最后,使用 spark.read.csv 函数从S3中读取这些大文件进行后续的数据分析处理。

相关内容

热门资讯

十分钟辅助!wepoker-h... 十分钟辅助!wepoker-h5下载,pokernow辅助控制,绝活教程(有挂实锤)1、点击下载安装...
5分钟辅助!aapoker辅助... 5分钟辅助!aapoker辅助器怎么用,wepoker模拟器哪个好用,模板教程(有挂辅助)wepok...
八分钟辅助!wpk官网下载链接... 八分钟辅助!wpk官网下载链接,wpk透视怎么安装,教程书教程(有挂分享)该软件可以轻松地帮助玩家将...
9分钟辅助!wepoker怎么... 9分钟辅助!wepoker怎么买辅助,wepoker安装教程,课程教程(有挂方略)1、wepoker...
第七分钟辅助!hh poker... 第七分钟辅助!hh poker插件下载,wepoker科技辅助器,教程书教程(有挂教程)wepoke...
三分钟辅助!hhpoker脚本... 三分钟辅助!hhpoker脚本下载,hhpoker辅助器,举措教程(有挂教程)三分钟辅助!hhpok...
第8分钟辅助!hhpoker的... 第8分钟辅助!hhpoker的辅助是真的吗,wepoker祈福有用吗,讲义教程(有挂解密)1、第8分...
6分钟辅助!hhpoker是真... 6分钟辅助!hhpoker是真的假的,hhpoker真的有透视吗,演示教程(真是有挂)1、每一步都需...
第七分钟辅助!we poker... 第七分钟辅助!we poker插件,we poker免费辅助器,手筋教程(有挂方式)暗藏猫腻,小编详...
七分钟辅助!aa poker辅... 七分钟辅助!aa poker辅助包,pokemmo脚本辅助器下载,讲义教程(有挂秘籍)1、pokem...