AWSGlueJobbookmarkvaluemodificationforJDBCsources
创始人
2024-09-25 15:31:10
0

AWS Glue Job JDBC数据源书签值修改

在AWS Glue Job中,我们可以在connection.getConnection()中使用SparkSession配置JDBC数据源来读取和写入数据。然而,由于某些限制,JDBC数据源可能无法处理从现有的位置开始读取的情况,因此需要在每个步骤之间保留书签值,以避免数据重复读取或丢失。在AWS Glue中,可以使用标准引擎的类glueetlbookmarks.jar和AWS内部库aws-glue-libs-for-scala来进行书签管理。

以下是在AWS Glue Job中针对JDBC数据源的书签值修改代码示例:

# 导入所需的库和类
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.utils import getResolvedOptions

import sys, traceback

from pyspark.context import SparkContext
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, BooleanType, ArrayType, TimestampType, DateType

# 从命令行中获取参数
args = getResolvedOptions(sys.argv, [
    'JOB_NAME',
    'DB_JDBC_URL',
    'DB_TABLE',
    'CONNECTION_OPTIONS',
    'BOOKMARK_BUCKET',
    'BOOKMARK_PREFIX'
])

# 初始化GlueContext和SparkContext
glueContext = GlueContext(SparkContext.getOrCreate())
spark = glueContext.spark_session

# 从数据源读取数据
df = spark.read.format('jdbc').options(
        url=args['DB_JDBC_URL'],
        dbtable=args['DB_TABLE'],
        driver='com.mysql.jdbc.Driver',
        **args['CONNECTION_OPTIONS']).load()

# 在此处添加转换或处理数据的代码

# 创建DynamicFrame并将其写入目标
targetDf = DynamicFrame.fromDF(df, glueContext, 'targetDf')

glueContext.write_dynamic_frame.from_options(
    frame=targetDf,
    connection_type='s3',
    connection_options={
        'path': args['S3_TARGET_PATH']
    },
    format='parquet',
    transformation_ctx='transform'
)

# 在此处更新书签值
bookmark = df.agg({'id': 'max'}).collect()[0][0]
glue_bookmark = glueContext.extract_bookmark()
if glue_bookmark:
    if bookmark >= glue_bookmark:
        glueContext.update_bookmark({'id': bookmark})
else:
    glueContext.write_bookmark({'id': bookmark})

在此代码中,我们使用getResolvedOptions获取AWS Glue Job传入的命令行参数,并初始化了GlueContextSparkContext。接着,我们使用Spark JDBC连接器读取数据源,创建DynamicFrame,并使用write_dynamic_frame.from_options将DynamicFrame写入S3桶中。最后,我们使用df.agg({'id': 'max'}).collect()[0][0]获取数据源中最大的ID,并使用GlueContext API更新书签值。

这是针对AWS Glue Job中JDBC数据源书签管理的一种解决方案,我们可以使用extract_bookmarkwrite_bookmarkupdate_bookmark API管理书签值,并避免数据的重复读取或丢失。

相关内容

热门资讯

事发当天!wepoker透视脚... 事发当天!wepoker透视脚本是什么(透视)一直有挂(有挂解惑开挂辅助安装)-哔哩哔哩1、首先打开...
突发!impoker辅助,po... 突发!impoker辅助,poker world辅助,方案教程(有挂工具)-哔哩哔哩一、poker ...
有玩家发现!newpoker脚... 有玩家发现!newpoker脚本(透视)原来真的有挂(有挂解惑开挂辅助辅助器)-哔哩哔哩1、首先打开...
记者获悉!wepoker怎么看... 您好,wepoker怎么看底牌这款游戏可以开挂的,确实是有挂的,需要了解加去威信【485275054...
昨日!wepoker透视脚本(... 昨日!wepoker透视脚本(透视)其实确实有挂(真实有挂开挂辅助工具)-哔哩哔哩1、在wepoke...
为切实保障!wepoker透视... 为切实保障!wepoker透视有吗,pokerrrr2辅助,方针教程(有挂技术)-哔哩哔哩1、起透看...
为了进一步!hhpoker怎么... 为了进一步!hhpoker怎么破解(透视)原来是有挂(有挂方针开挂辅助下载)-哔哩哔哩1、完成hhp...
近日!智星菠萝透视,智星菠萝透... 近日!智星菠萝透视,智星菠萝透视,模板教程(有挂透明挂)-哔哩哔哩1、全新机制【智星菠萝透视软件透明...
截至目前!wepoker免费脚... 截至目前!wepoker免费脚本咨询(透视)原来有挂(有挂秘笈开挂辅助平台)-哔哩哔哩1、任何wep...
有了最新消息!xpoker辅助... 有了最新消息!xpoker辅助神器,wepoker怎么发冤家牌,策略教程(有挂教程)-哔哩哔哩1、在...