这可能是由于AWS DMS流式传输本身的限制所导致的。解决此问题的一种方法是在AWS DMS任务中启用On Failure Replication,并使用AWS Lambda函数进行转换。以下是一个示例Lambda函数,可用于将JSON转换为适合Kinesis流的格式:
import json
def lambda_handler(event, context):
records = []
for record in event['records']:
operation = record['op']
data = json.loads(record['data'])
if operation == 'd':
data = {
'id': data['id']
}
records.append({
'Data': json.dumps(data),
'PartitionKey': data['id']
})
return {
'records': records
}
在此示例中,如果操作类型为“d”(即DELETE),则仅包括记录的ID字段,并使用ID作为Kinesis的分区键。
然后,将Lambda函数与AWS DMS任务关联,在任务设置的处理阶段中启用On Failure Replication,以便在任务失败时触发Lambda函数执行。
请注意,此方法仅适用于使用JSON格式的AWS DMS任务。对于其他格式,可能需要进行适当的修改。