问题描述:
在Airflow中使用ExternalTaskMarker时,可能会出现下游任务未清除的情况。
可以在task_instance_cleaned信号的处理程序中添加代码,以确保ExternalTaskMarker清除下游任务。
以下是一个示例:
def on_task_instance_cleaned(sender, **kwargs):
ti = kwargs.get('task_instance')
dag = ti.dag
downstream_tasks = dag.get_downstream_tasks(ti.task_id)
for downstream_task in downstream_tasks:
if isinstance(downstream_task, ExternalTaskMarker):
downstream_ti = ti.get_dagrun().get_task_instance(downstream_task.task_id)
downstream_ti.set_state(State.NONE, downstream_ti.end_date)
这将确保ExternalTaskMarker清除下游任务。