在 Apache Airflow 中,我们可以选择使用 HttpHook 或直接使用 Python 中的 requests 库来发送 HTTP 请求。使用 HttpHook 可以直接将请求集成到 DAG 中,而不需要单独编写自定义代码。使用 requests 库则需要在 DAG 中编写自定义代码来实现 HTTP 请求。
而在 DAG 刷新方面,使用 HttpHook 会自动从 Airflow metadata database 中获取密钥和密码,以便进行 HTTP 请求。而使用 requests 库则需要手动从 Airflow Vault 中获取这些信息。
下面是使用 HttpHook 和 requests 库发送 HTTP 请求的示例代码:
使用 HttpHook:
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.providers.http.hooks.http import HttpHook
dag = DAG(
dag_id='httphook_example',
schedule_interval=None
)
def send_http_request():
http_hook = HttpHook(method='GET', http_conn_id='http_api')
response = http_hook.run(endpoint='/api/data')
print(response.json())
http_task = PythonOperator(
task_id='http_task',
python_callable=send_http_request,
dag=dag
)
使用 requests 库:
import requests
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
dag = DAG(
dag_id='requests_example',
schedule_interval=None
)
def send_http_request():
aws_hook = AwsBaseHook('aws')
# Get the necessary credentials from AWS Vault
aws_credentials = aws_hook.get_credentials()
http_endpoint = 'https://example.com/api/data'
response = requests.get(http_endpoint, auth=aws_credentials)
print(response.json())
http_task = PythonOperator(
task_id='http_task',
python_callable=send_http_request,
dag=dag
)