AWS Glue Spark ETL 写入 S3 不会触发 S3 事件的解决方法是使用 AWS Glue 的 Job Bookmarks 功能。以下是一个示例代码,演示了如何在 Glue Spark ETL 作业中使用 Job Bookmarks:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
# 获取命令行参数
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
# 创建 Spark Context 和 Glue Context
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
# 创建 Glue Job 对象
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
# 读取源数据
datasource = glueContext.create_dynamic_frame.from_catalog(database="your-database-name", table_name="your-table-name")
# 进行数据转换和处理
# ...
# 写入目标数据
glueContext.write_dynamic_frame.from_options(
frame = transformed_data,
connection_type = "s3",
connection_options = {"path": "s3://your-bucket-name/your-output-path"},
format = "parquet"
)
# 完成 Glue Job
job.commit()
在上述代码中,我们使用 glueContext.write_dynamic_frame.from_options()
函数将 Spark Dataframe 或 Glue DynamicFrame 写入到 S3 中。通过将 connection_type
设置为 "s3" 并提供 S3 路径作为 connection_options
参数的一部分,我们可以将数据写入到指定的 S3 路径。
使用 Job Bookmarks 功能时,AWS Glue 会跟踪作业的状态并为每个作业运行维护一个标记。这样,如果作业在中断后重新启动,它将能够从上次中断的位置继续读取和处理数据,而不是从头开始。这样就可以避免重复处理和丢失数据。
要启用 Job Bookmarks 功能,只需在 AWS Glue 控制台或使用 AWS CLI 创建作业时设置 --job-bookmark-option enable
。