是的,可以使用“TriggerDagRunOrder”依赖项来依赖前一个任务的上一次运行。具体实现方法如下所示:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
from airflow.models import TriggerDagRunOrder
def run_this_func(context, dag_run_obj):
dag_run_obj.payload = {'message': 'Hello World'}
return dag_run_obj
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': days_ago(2),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False
}
dag = DAG(
'example_trigger_controller_dag',
default_args=default_args,
schedule_interval=None,
tags=['example']
)
# [START howto_operator_trigger_dagrun]
run_this = TriggerDagRunOrder(
task_id='run_this',
trigger_dag_id="example_trigger_target_dag", # DAG ID
python_callable=run_this_func,
dag=dag,
)
# Task to print arguments passed to this trigger task
print_payload = BashOperator(
task_id='print_payload',
bash_command='echo "{{ dag_run.conf }}"',
dag=dag,
)
run_this >> print_payload
# [END howto_operator_trigger_dagrun]
在上面的代码示例中,“run_this”任务使用了“TriggerDagRunOrder”类来依赖于“example_trigger_target_dag” DAG中前一个任务的上一次运行。然后,“print_payload”任务打印出前一个任务的运行结果。