以下是一个使用AWS数据管道将CSV文件从S3上传到DynamoDB的解决方案,包含代码示例:
import boto3
def lambda_handler(event, context):
input_bucket = event['inputBucket']
input_key = event['inputKey']
output_table = event['outputTable']
# 从S3下载CSV文件
s3 = boto3.client('s3')
s3.download_file(input_bucket, input_key, '/tmp/input.csv')
# 处理CSV文件,转换为DynamoDB条目
items = []
with open('/tmp/input.csv', 'r') as file:
lines = file.readlines()
headers = lines[0].strip().split(',')
for line in lines[1:]:
values = line.strip().split(',')
item = {}
for i in range(len(headers)):
item[headers[i]] = values[i]
items.append(item)
# 将转换后的条目加载到DynamoDB表中
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(output_table)
with table.batch_writer() as batch:
for item in items:
batch.put_item(Item=item)
return 'Data pipeline completed successfully.'
{
"objects": [
{
"id": "S3Input",
"name": "S3Input",
"schedule": {
"ref": "DefaultSchedule"
},
"type": "S3DataNode",
"dataFormat": {
"ref": "CSVDataFormat"
},
"scheduleType": "ONDEMAND",
"directoryPath": "#{myInputS3Path}",
"errorThreshold": 0,
"errorHandlingConfig": {
"failOnError": true
}
},
{
"id": "DynamoDBOutput",
"name": "DynamoDBOutput",
"schedule": {
"ref": "DefaultSchedule"
},
"type": "DynamoDBDataNode",
"scheduleType": "ONDEMAND",
"tableName": "#{myOutputDynamoDBTable}",
"errorThreshold": 0,
"errorHandlingConfig": {
"failOnError": true
}
}
],
"parameters": [
{
"id": "myInputS3Path",
"description": "S3 input path",
"type": "String"
},
{
"id": "myOutputDynamoDBTable",
"description": "DynamoDB output table",
"type": "String"
}
]
}
import boto3
def create_data_pipeline(input_bucket, input_key, output_table):
client = boto3.client('datapipeline')
# 创建数据管道定义
with open('pipeline-definition.json', 'r') as file:
pipeline_definition = file.read()
# 创建数据管道
response = client.create_pipeline(
name='csv-to-dynamodb-pipeline',
uniqueId='csv-to-dynamodb-pipeline',
description='Pipeline to upload CSV file to DynamoDB',
pipelineTags=[
{
'key': 'environment',
'value': 'development'
},
],
pipelineObjects=[
{
'id': 'S3Input',
'name': 'S3Input',
'fields': [
{
'key': 'myInputS3Path',
'stringValue': 's3://' + input_bucket + '/' + input_key
}
]
},
{
'id': 'DynamoDBOutput',
'name': 'DynamoDBOutput',
'fields': [
{
'key': 'myOutputDynamoDBTable',
'stringValue': output_table
}
]
}
],
parameterObjects=[
{
'id': 'myInputS3Path',
'attributes': []
},
{
'id': 'myOutputDynamoDBTable',
'attributes': []
}
],
parameterValues=[
{
'id': 'myInputS3Path',
'stringValue': 's3://' + input_bucket + '/' + input_key
},
{
'id': 'myOutputDynamoDBTable',
'stringValue': output_table
}
],
pipelineDefinition=pipeline_definition
)