这种错误通常发生在重新运行同一个DAG时,由于之前的运行ID仍存在于数据库中,导致Airflow无法重复使用。解决方法很简单,只需要在运行DAG时指定一个新的运行ID即可,例如:
from datetime import datetime
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
dag = DAG('my_dag',
description='My example DAG',
schedule_interval='0 0 * * *',
start_date=datetime(2019, 1, 1))
run_id = 'my_new_run_id' # 指定一个新的运行ID
task1 = BashOperator(
task_id='task1',
bash_command='echo "Hello Airflow"',
dag=dag,
run_id=run_id # 将新的运行ID传递给任务
)
task2 = BashOperator(
task_id='task2',
bash_command='echo "Goodbye Airflow"',
dag=dag,
run_id=run_id # 将新的运行ID传递给任务
)
task1 >> task2
使用此方法,您可以避免再次运行相同的DAG时遇到“run id already exists”的错误。