有时候我们需要在运行某些任务后,在不运行下游任务的情况下标记该任务成功。这在一些复杂的调度场景中比较常见,比如当下游任务依赖于一个外部事件的时候。如果下游任务运行失败,我们也不希望它们重新启动,因为它们依赖的外部事件可能还没有满足。因此,我们需要一种机制来标记成功但不运行下游任务。这可以通过在 DAG 中使用 XCom 变量来实现。
假设我们有两个任务 task_1 和 task_2,其中 task_2 是 task_1 的下游任务,我们想要在运行 task_1 后标记它成功但不运行 task_2。可以按照以下步骤实现:
from airflow.models import XCom
dag_id =
task_id =
value = {'result': 'success'}
XCom.set(key='my_key', value=value, dag_id=dag_id, task_id=task_id)
from airflow.models import XCom
dag_id =
task_id =
value = XCom.get(key='my_key', dag_id=dag_id, task_id=task_id)
if value.get('result') == 'success':
print('Task 1 succeeded. Terminating Task 2...')
raise AirflowSkipException('Skipping downstream task as upstream task succeeded.')
上述代码在 task_1 中设置了一个 XCom 变量,当 task_1 成功运行时,它将存储一个包含 “result:succeed” 的字典。在 task_2 中,我们检查该变量并基于其值决定是否继续运行此
下一篇:编程方式创建堆栈视图