📅  最后修改于: 2023-12-03 15:38:54.645000             🧑  作者: Mango
在使用 Celery 进行任务调度时,您可能会遇到 Celery 无法创建缺失的队列的情况。本指南将向您展示如何让 Celery 自动创建缺失的队列。
为了让 Celery 能够自动创建缺失的队列,您需要在本地安装 RabbitMQ。可以根据您的操作系统,选择使用 package 安装或者源码编译安装。
Ubuntu / Debian:
sudo apt-get install rabbitmq
CentOS / Fedora:
sudo yum install rabbitmq-server
在您的 Celery 任务调度代码中,需要进行如下配置:
# 配置 broker_url
broker_url = 'amqp://guest:guest@localhost:5672/'
# 配置 task_queues
task_queues = {
'queue1': {
'exchange': 'queue1_tasks',
'routing_key': 'queue1',
},
'queue2': {
'exchange': 'queue2_tasks',
'routing_key': 'queue2',
},
'queue3': {
'exchange': 'queue3_tasks',
'routing_key': 'queue3',
}
}
# 创建 Celery 应用程序
app = Celery('my_app',
broker=broker_url,
task_queues=task_queues,
include=['module.tasks'])
以上代码中,我们定义了三个队列 'queue1'、'queue2' 和 'queue3',每个队列都分别定义了 exchange 和 routing_key。现在,我们需要让 Celery 任务调度代码自动创建这些队列。
我们可以在 shell 中运行如下命令,创建队列:
sudo rabbitmqctl add_vhost my_vhost
sudo rabbitmqctl add_user my_user my_password
sudo rabbitmqctl set_permissions -p my_vhost my_user ".*" ".*" ".*"
以上命令中,我们创建了一个名称为 my_vhost 的虚拟主机,创建了一个名为 my_user 的用户,并为该用户分配了虚拟主机 my_vhost 的权限。接下来,我们需要为每个队列在 RabbitMQ 中创建一个 exchange,并将每个队列与其对应的 exchange 绑定:
# 创建 exchange
sudo rabbitmqctl -p my_vhost \
exchange_declare \
--exchange=queue1_tasks \
--type=direct \
--auto-delete=False
sudo rabbitmqctl -p my_vhost \
exchange_declare \
--exchange=queue2_tasks \
--type=direct \
--auto-delete=False
sudo rabbitmqctl -p my_vhost \
exchange_declare \
--exchange=queue3_tasks \
--type=direct \
--auto-delete=False
# 绑定 queue1 和 queue1_tasks
sudo rabbitmqctl -p my_vhost \
queue_declare \
--queue=queue1 \
--auto-delete=False
sudo rabbitmqctl -p my_vhost \
queue_bind \
--queue=queue1 \
--exchange=queue1_tasks \
--routing-key=queue1
# 绑定 queue2 和 queue2_tasks
sudo rabbitmqctl -p my_vhost \
queue_declare \
--queue=queue2 \
--auto-delete=False
sudo rabbitmqctl -p my_vhost \
queue_bind \
--queue=queue2 \
--exchange=queue2_tasks \
--routing-key=queue2
# 绑定 queue3 和 queue3_tasks
sudo rabbitmqctl -p my_vhost \
queue_declare \
--queue=queue3 \
--auto-delete=False
sudo rabbitmqctl -p my_vhost \
queue_bind \
--queue=queue3 \
--exchange=queue3_tasks \
--routing-key=queue3
至此,我们已经成功地向 RabbitMQ 创建了三个队列和三个 exchange,并将它们绑定起来了。现在,我们可以重启 Celery 应用程序,在此之前,需要确保 RabbitMQ 正在运行。
sudo systemctl restart rabbitmq-server
celery -A my_app worker -l INFO
以上命令将启动 Celery Worker,如果所有配置正确的话,Celery 将自动创建缺失的队列。
本指南向您演示了如何让 Celery 自动创建缺失的队列,在您的任务调度中,这是一个非常有用的技能。在正式环境中,您应该使用 Ansible、Docker、Kubernetes 等工具来维护您的服务器和 RabbitMQ 环境。