在Apache Airflow中,如果要防止DAG重复执行,可以使用以下方式:
from airflow import DAG
from datetime import datetime
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2021, 1, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('example_dag', default_args=default_args)
在默认参数中,设置了开始日期(start_date),结束日期之前(end_date),重试次数(retries)和重试延迟(retry_delay)等参数。这些参数可以帮助我们控制DAG的执行频率,并确保它不会重复执行。
from airflow.operators.bash_operator import BashOperator
from datetime import datetime
dag = DAG('example_dag', start_date=datetime(2021, 1, 1))
t1 = BashOperator(
task_id='task_1',
bash_command='echo {{ ds }}',
provide_context=True,
dag=dag,
)
在BashOperator中,我们设置了provide_context参数为True,这意味着我们可以在Bash命令中使用context中的变量。其中,{{ ds }}是日期字符串,表示任务执行的日期。这可以帮助我们控制DAG的执行时间并避免重复执行。
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.sensors import ExternalTaskSensor
from datetime import datetime, timedelta
source_dag_id = 'example_dag_source'
target_dag_id = 'example_dag_target'
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2021, 1, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries