在Apache Airflow中,您可以使用XComs来在操作器之间传递数据。对于下载的文件,您可以将文件路径作为XCom值传递。
以下是一个示例代码,演示如何在操作器之间存储下载的文件位置:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime
default_args = {
'start_date': datetime(2021, 1, 1)
}
def download_file(**kwargs):
# 下载文件的代码
file_path = '/path/to/downloaded/file.txt'
# 将文件路径存储为XCom值
kwargs['ti'].xcom_push(key='file_path', value=file_path)
def process_file(**kwargs):
# 从上一个任务中获取文件路径
file_path = kwargs['ti'].xcom_pull(key='file_path')
# 处理文件的代码
print(f"Processing file: {file_path}")
dag = DAG(
'file_processing_dag',
default_args=default_args,
schedule_interval=None
)
download_task = PythonOperator(
task_id='download_file',
python_callable=download_file,
provide_context=True,
dag=dag
)
process_task = PythonOperator(
task_id='process_file',
python_callable=process_file,
provide_context=True,
dag=dag
)
download_task >> process_task
在上面的代码中,download_file
任务下载文件并将文件路径存储为XCom值。process_file
任务从上一个任务中获取文件路径,并处理该文件。
通过在操作器之间使用XCom传递文件路径,您可以在操作器之间轻松地共享下载的文件位置。