问题的原因是Celery默认使用pickle序列化任务函数,而Dash应用程序中的回调函数通常不能使用pickle序列化。这可能会导致Celery无法找到完整的函数对象并因此无法执行任务。
为了解决这个问题,可以指定Celery使用其他的序列化程序,比如json。以下是一个示例代码,展示了如何在Celery中使用json序列化来解决这个问题:
from celery import Celery
from celery.signals import worker_process_init, worker_process_shutdown
from dash.dependencies import Input, Output
import dash_html_components as html
import dash
app = dash.Dash(__name__)
server = app.server
@worker_process_init.connect
def init_worker(**kwargs):
global app
app = dash.Dash(__name__)
server = app.server
@worker_process_shutdown.connect
def shutdown_worker(**kwargs):
global app
app = None
app.layout = html.Div(
[
html.Button("Submit", id="submit-button"),
html.P(id="output")
]
)
@app.callback(Output("output", "children"), [Input("submit-button", "n_clicks")])
def update_output(n_clicks):
return f"Button clicked {n_clicks} times."
celery = Celery(__name__, broker="redis://localhost:6379/0", backend="redis://localhost:6379/0", task_serializer="json")
@celery.task
def run_dash_app():
app.run_server(debug=False, mode="external")
if __name__ == "__main__":
run_dash_app.delay()
在上面的例子中,我们使用 worker_process_init
和 worker_process_shutdown
信号来初始化和关闭Dash app,这样就可以确保所有Celery任务都在同一个进程中运行。然后,在Celery任务函数中,我们指定使用json序列化。
使用这种方法可以确保我们可以将Dash应用程序的背景回调添加到Celery任务队列中,并且可以顺利地运行。