可以通过在Airflow DAG中设置operator实例的job_name参数来解决此问题。在job_name参数中,可以使用Jinja模板语言包含BigQuery Job的项目ID和区域。
以下是一些示例代码:
from airflow import DAG
from airflow.contrib.operators.dataflow_operator import DataflowTemplatedJobStartOperator
from datetime import datetime
default_args = {
'owner': 'airflow',
'start_date': datetime(2020, 1, 1)
}
dag = DAG('my_dag', default_args=default_args, schedule_interval=None)
job_name = 'my_job_{{ ds_nodash }}'
project_id = 'my_project_id'
region = 'us-central1'
start_operator = DataflowTemplatedJobStartOperator(
task_id='start_operator',
job_name=job_name,
project_id=project_id,
region=region,
template='gs://my-bucket/my-template.json',
parameters={
'input': 'gs://my-input-bucket/my-input.txt',
'output': 'gs://my-output-bucket/my-output.txt',
},
dag=dag
)
在上面的代码中,job_name参数使用了Jinja模板语言,它将在每次运行DAG时设置为"my_job_{{ ds_nodash }}",其中ds_nodash是日期字符串,以实现唯一的作业ID。项目ID和区域在整个DAG中是常量,因此它们在Airflow DAG设置中直接设置。这样,DataflowTemplatedJobStartOperator将正确设置作业区域,并使用指定的项目ID启动作业。