AWSGlueJobbookmarkvaluemodificationforJDBCsources
创始人
2024-09-25 15:31:10
0

AWS Glue Job JDBC数据源书签值修改

在AWS Glue Job中,我们可以在connection.getConnection()中使用SparkSession配置JDBC数据源来读取和写入数据。然而,由于某些限制,JDBC数据源可能无法处理从现有的位置开始读取的情况,因此需要在每个步骤之间保留书签值,以避免数据重复读取或丢失。在AWS Glue中,可以使用标准引擎的类glueetlbookmarks.jar和AWS内部库aws-glue-libs-for-scala来进行书签管理。

以下是在AWS Glue Job中针对JDBC数据源的书签值修改代码示例:

# 导入所需的库和类
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.utils import getResolvedOptions

import sys, traceback

from pyspark.context import SparkContext
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, BooleanType, ArrayType, TimestampType, DateType

# 从命令行中获取参数
args = getResolvedOptions(sys.argv, [
    'JOB_NAME',
    'DB_JDBC_URL',
    'DB_TABLE',
    'CONNECTION_OPTIONS',
    'BOOKMARK_BUCKET',
    'BOOKMARK_PREFIX'
])

# 初始化GlueContext和SparkContext
glueContext = GlueContext(SparkContext.getOrCreate())
spark = glueContext.spark_session

# 从数据源读取数据
df = spark.read.format('jdbc').options(
        url=args['DB_JDBC_URL'],
        dbtable=args['DB_TABLE'],
        driver='com.mysql.jdbc.Driver',
        **args['CONNECTION_OPTIONS']).load()

# 在此处添加转换或处理数据的代码

# 创建DynamicFrame并将其写入目标
targetDf = DynamicFrame.fromDF(df, glueContext, 'targetDf')

glueContext.write_dynamic_frame.from_options(
    frame=targetDf,
    connection_type='s3',
    connection_options={
        'path': args['S3_TARGET_PATH']
    },
    format='parquet',
    transformation_ctx='transform'
)

# 在此处更新书签值
bookmark = df.agg({'id': 'max'}).collect()[0][0]
glue_bookmark = glueContext.extract_bookmark()
if glue_bookmark:
    if bookmark >= glue_bookmark:
        glueContext.update_bookmark({'id': bookmark})
else:
    glueContext.write_bookmark({'id': bookmark})

在此代码中,我们使用getResolvedOptions获取AWS Glue Job传入的命令行参数,并初始化了GlueContextSparkContext。接着,我们使用Spark JDBC连接器读取数据源,创建DynamicFrame,并使用write_dynamic_frame.from_options将DynamicFrame写入S3桶中。最后,我们使用df.agg({'id': 'max'}).collect()[0][0]获取数据源中最大的ID,并使用GlueContext API更新书签值。

这是针对AWS Glue Job中JDBC数据源书签管理的一种解决方案,我们可以使用extract_bookmarkwrite_bookmarkupdate_bookmark API管理书签值,并避免数据的重复读取或丢失。

相关内容

热门资讯

五分钟辅助!哈糖大菠萝助手,h... 五分钟辅助!哈糖大菠萝助手,hhpoker软件安装包,教材教程(发现有挂)1、上手简单,内置详细流程...
第3分钟辅助!拱趴大菠萝作弊方... 第3分钟辅助!拱趴大菠萝作弊方法,xpoker辅助怎么用,手册教程(有挂透明挂)一、拱趴大菠萝作弊方...
两分钟辅助!poker辅助器免... 两分钟辅助!poker辅助器免费安装,hhpoker有没有外挂,大纲教程(有挂助手)1、很好的工具软...
第三分钟辅助!wpk透视怎么安... 第三分钟辅助!wpk透视怎么安装,hhpoker透视脚本视频,演示教程(有挂技巧)暗藏猫腻,小编详细...
第4分钟辅助!菠萝德普辅助器免... 第4分钟辅助!菠萝德普辅助器免费版在哪里,wepoker免费透视,教材教程(真实有挂)1、让任何用户...
五分钟辅助!德州透视hhpok... 五分钟辅助!德州透视hhpoker,wepoker透视器免费,绝活儿教程(揭秘有挂)1、wepoke...
第八分钟辅助!aa poker... 第八分钟辅助!aa poker辅助包,hhpoker脚本,模块教程(果真有挂)1、首先打开aa po...
3分钟辅助!epoker底牌透... 3分钟辅助!epoker底牌透视,德普之星透视,策略教程(有挂细节)1)德普之星透视辅助插件:进一步...
2分钟辅助!wepoker手机... 2分钟辅助!wepoker手机助手,pokermaster辅助器,法子教程(有挂工具)亲,关键说明,...
第五分钟辅助!pokemmo辅... 第五分钟辅助!pokemmo辅助器手机版下载,拱趴大菠萝怎么开挂,教材教程(有挂功能)1、玩家可以在...