要使用AWS Athena查询CSV文件,首先需要将文件上传到AWS S3存储桶中,并创建一个Athena数据库和一个数据表来引用该文件。
下面是一个示例代码,演示如何在AWS Athena中创建数据库、数据表并查询CSV文件:
import boto3
# 创建Athena客户端
client = boto3.client('athena')
# 创建数据库
database_name = 'my_database'
create_database_query = f"CREATE DATABASE IF NOT EXISTS {database_name}"
response = client.start_query_execution(
QueryString=create_database_query,
ResultConfiguration={
'OutputLocation': 's3://my-query-results-bucket/'
}
)
execution_id = response['QueryExecutionId']
response = client.get_query_execution(QueryExecutionId=execution_id)
# 创建数据表
table_name = 'my_table'
create_table_query = f"""
CREATE EXTERNAL TABLE IF NOT EXISTS {database_name}.{table_name} (
column1 datatype1,
column2 datatype2,
...
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LOCATION 's3://my-bucket/path/to/csv/'
"""
response = client.start_query_execution(
QueryString=create_table_query,
ResultConfiguration={
'OutputLocation': 's3://my-query-results-bucket/'
}
)
execution_id = response['QueryExecutionId']
response = client.get_query_execution(QueryExecutionId=execution_id)
# 查询CSV文件
query = f"SELECT * FROM {database_name}.{table_name} LIMIT 10"
response = client.start_query_execution(
QueryString=query,
ResultConfiguration={
'OutputLocation': 's3://my-query-results-bucket/'
}
)
execution_id = response['QueryExecutionId']
response = client.get_query_execution(QueryExecutionId=execution_id)
# 获取查询结果
results = client.get_query_results(QueryExecutionId=execution_id)
for row in results['ResultSet']['Rows']:
print(row['Data'])
在上述代码中,需要替换以下变量:
my_database
:要创建的数据库名称。my_table
:要创建的数据表名称。my-bucket/path/to/csv/
:CSV文件的S3存储桶路径。my-query-results-bucket/
:查询结果的S3存储桶路径。确保在运行代码之前,已经安装了AWS SDK for Python(Boto3)并配置了正确的AWS凭证。