要使用pymssql和SQLAlchemy连接到MS SQL Server,您可以使用以下步骤:
pip install apache-airflow
pip install pymssql
pip install SQLAlchemy
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
from sqlalchemy import create_engine
import pymssql
def execute_sql():
# 创建MS SQL Server的连接字符串
conn_str = 'mssql+pymssql://username:password@host:port/database'
# 创建SQLAlchemy引擎
engine = create_engine(conn_str)
# 执行查询
with engine.connect() as conn:
result = conn.execute('SELECT * FROM table')
# 输出结果
for row in result:
print(row)
请注意,将username
,password
,host
,port
和database
替换为实际的MS SQL Server连接详细信息。
dag = DAG(
'sql_server_dag',
description='DAG to connect to MS SQL Server',
schedule_interval=None,
start_date=datetime(2022, 1, 1),
catchup=False
)
execute_sql_task = PythonOperator(
task_id='execute_sql',
python_callable=execute_sql,
dag=dag
)
在上面的代码中,我们创建了一个名为sql_server_dag
的DAG,并创建了一个名为execute_sql
的任务,该任务将运行execute_sql
函数。
airflow scheduler
这将启动Airflow的调度程序,它将定期检查并运行DAG。
airflow trigger_dag sql_server_dag
这将手动触发sql_server_dag
DAG的执行。
这样,您就可以使用pymssql和SQLAlchemy连接到MS SQL Server,并在Airflow中执行查询。