批处理预测作业非阻塞化
对于批处理预测作业,如果使用传统的同步方式进行提交和等待,则会阻塞程序运行,导致整个程序无法进行其他操作。为了解决这个问题,可以使用异步方式进行提交和等待。
以Python代码为例:
import boto3
def start_batch_job(job_name, s3_input_location, s3_output_location):
sagemaker_client = boto3.client('sagemaker')
response = sagemaker_client.create_transform_job(
TransformJobName=job_name,
ModelName='my-model',
TransformInput={
'DataSource': {
'S3DataSource': {
'S3DataType': 'S3Prefix',
'S3Uri': s3_input_location
}
},
'ContentType': 'text/csv',
'SplitType': 'Line',
'CompressionType': 'None'
},
TransformOutput={
'S3OutputPath': s3_output_location,
'AssembleWith': 'Line',
'ContentType': 'text/csv',
'SplitType': 'Line'
},
TransformResources={
'InstanceType': 'ml.m4.xlarge',
'InstanceCount': 1
}
)
return response['TransformJobArn']
job_arn = start_batch_job('my-job', 's3://my-bucket/my-input-path', 's3://my-bucket/my-output-path')
print("Started job arn = {}".format(job_arn))
使用异步方式进行提交,可以在任务开始后立刻返回任务的ARN,程序可以继续运行其他操作。等待任务完成时,可以使用AWS SDK中提供的waiter方法来等待作业完成:
import boto3
def wait_for_transform_job(job_name):
sagemaker_client = boto3.client('sagemaker')
waiter = sagemaker_client.get_waiter('transform_job_completed_or_stopped')
waiter.wait(TransformJobName=job_name)
response = sagemaker_client