AWSGlueJobbookmarkvaluemodificationforJDBCsources
创始人
2024-09-25 15:31:10
0

AWS Glue Job JDBC数据源书签值修改

在AWS Glue Job中,我们可以在connection.getConnection()中使用SparkSession配置JDBC数据源来读取和写入数据。然而,由于某些限制,JDBC数据源可能无法处理从现有的位置开始读取的情况,因此需要在每个步骤之间保留书签值,以避免数据重复读取或丢失。在AWS Glue中,可以使用标准引擎的类glueetlbookmarks.jar和AWS内部库aws-glue-libs-for-scala来进行书签管理。

以下是在AWS Glue Job中针对JDBC数据源的书签值修改代码示例:

# 导入所需的库和类
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.utils import getResolvedOptions

import sys, traceback

from pyspark.context import SparkContext
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, BooleanType, ArrayType, TimestampType, DateType

# 从命令行中获取参数
args = getResolvedOptions(sys.argv, [
    'JOB_NAME',
    'DB_JDBC_URL',
    'DB_TABLE',
    'CONNECTION_OPTIONS',
    'BOOKMARK_BUCKET',
    'BOOKMARK_PREFIX'
])

# 初始化GlueContext和SparkContext
glueContext = GlueContext(SparkContext.getOrCreate())
spark = glueContext.spark_session

# 从数据源读取数据
df = spark.read.format('jdbc').options(
        url=args['DB_JDBC_URL'],
        dbtable=args['DB_TABLE'],
        driver='com.mysql.jdbc.Driver',
        **args['CONNECTION_OPTIONS']).load()

# 在此处添加转换或处理数据的代码

# 创建DynamicFrame并将其写入目标
targetDf = DynamicFrame.fromDF(df, glueContext, 'targetDf')

glueContext.write_dynamic_frame.from_options(
    frame=targetDf,
    connection_type='s3',
    connection_options={
        'path': args['S3_TARGET_PATH']
    },
    format='parquet',
    transformation_ctx='transform'
)

# 在此处更新书签值
bookmark = df.agg({'id': 'max'}).collect()[0][0]
glue_bookmark = glueContext.extract_bookmark()
if glue_bookmark:
    if bookmark >= glue_bookmark:
        glueContext.update_bookmark({'id': bookmark})
else:
    glueContext.write_bookmark({'id': bookmark})

在此代码中,我们使用getResolvedOptions获取AWS Glue Job传入的命令行参数,并初始化了GlueContextSparkContext。接着,我们使用Spark JDBC连接器读取数据源,创建DynamicFrame,并使用write_dynamic_frame.from_options将DynamicFrame写入S3桶中。最后,我们使用df.agg({'id': 'max'}).collect()[0][0]获取数据源中最大的ID,并使用GlueContext API更新书签值。

这是针对AWS Glue Job中JDBC数据源书签管理的一种解决方案,我们可以使用extract_bookmarkwrite_bookmarkupdate_bookmark API管理书签值,并避免数据的重复读取或丢失。

相关内容

热门资讯

一分钟揭秘!微扑克有辅助软件吗... 一分钟揭秘!微扑克有辅助软件吗,aapoker挂,我来教教你(固有有挂)1、aapoker挂ai辅助...
重大发现!wpk德州辅助,we... 重大发现!wpk德州辅助,wepoke有挂,微扑克教程(原本有挂)1.wepoke有挂 ai辅助创建...
总算明白!微扑克辅助器是真的么... 总算明白!微扑克辅助器是真的么,德州ai辅助有用,力荐教程(都是是真的有挂)1、构建自己的微扑克辅助...
玩家必备科技!gg扑克辅助,w... 玩家必备科技!gg扑克辅助,wepoke辅助有挂,AI教程(确实真的是有挂)1、在wepoke辅助有...
重大来袭!cloudpoker... 重大来袭!cloudpoker辅助器,wepoke真的有挂,力荐教程(固有真的有挂)1、wepoke...
最新技巧!wepoke透明真的... 最新技巧!wepoke透明真的吗,智星德州菠萝有挂吗,AI教程(最初有挂);人气非常高,ai更新快且...
一分钟快速了解!德州微扑克辅助... 一分钟快速了解!德州微扑克辅助,德州微扑克辅助,高科技教程(好像存在有挂)是一款可以让一直输的玩家,...
一分钟教会你!wpk有吗,德州... 一分钟教会你!wpk有吗,德州ai辅助软件,玩家教你(固有是真的有挂)德州ai辅助软件辅助器中分为三...
实操分享!微扑克辅助,德州ai... 实操分享!微扑克辅助,德州ai机器人,科技教程(真是有挂)1、超多福利:超高返利,海量正版游戏,德州...
玩家必看科普!微扑克有辅助软件... 自定义aapoker辅助系统规律,只需要输入自己想要的开挂功能,一键便可以生成出微扑克专用辅助器,不...