在Apache Airflow中,可以通过自定义Operator来传递数据。下面是一个示例代码,演示了如何使用自定义Operator在任务之间传递数据。
首先,您需要创建一个自定义Operator类,继承自BaseOperator
。在这个自定义Operator中,您可以定义您需要的参数和任务逻辑。以下是一个示例代码:
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
class CustomOperator(BaseOperator):
@apply_defaults
def __init__(self, my_data, *args, **kwargs):
super().__init__(*args, **kwargs)
self.my_data = my_data
def execute(self, context):
# 在这里执行您的任务逻辑
# 您可以使用 self.my_data 来访问传递进来的数据
# 例如,打印出传递进来的数据
print(self.my_data)
然后,您可以在DAG中使用这个自定义Operator。以下是一个示例代码:
from airflow import DAG
from datetime import datetime
from custom_operators import CustomOperator
default_args = {
'start_date': datetime(2021, 1, 1)
}
with DAG('my_dag', default_args=default_args, schedule_interval='@daily') as dag:
task1 = CustomOperator(
task_id='task1',
my_data='Hello, Airflow!'
)
task2 = CustomOperator(
task_id='task2',
my_data=task1.output
)
task1 >> task2
在这个例子中,我们创建了两个CustomOperator任务,task1和task2。在task1中,我们将字符串'Hello, Airflow!'传递给my_data参数。在task2中,我们将task1的输出作为my_data参数的值。
通过task1.output
,您可以将task1的输出传递给task2,以便在task2中访问task1的输出数据。
这是一个简单的示例,演示了如何在自定义Operator中传递数据。您可以根据您的需求扩展和修改这个示例。