📜  如何让 celery 创建缺少的队列 - Python (1)

📅  最后修改于: 2023-12-03 15:38:54.645000             🧑  作者: Mango

如何让 Celery 创建缺失的队列

在使用 Celery 进行任务调度时,您可能会遇到 Celery 无法创建缺失的队列的情况。本指南将向您展示如何让 Celery 自动创建缺失的队列。

安装 RabbitMQ

为了让 Celery 能够自动创建缺失的队列,您需要在本地安装 RabbitMQ。可以根据您的操作系统,选择使用 package 安装或者源码编译安装。

Ubuntu / Debian:

sudo apt-get install rabbitmq

CentOS / Fedora:

sudo yum install rabbitmq-server
配置 Celery

在您的 Celery 任务调度代码中,需要进行如下配置:

# 配置 broker_url
broker_url = 'amqp://guest:guest@localhost:5672/'

# 配置 task_queues
task_queues = {
    'queue1': {
        'exchange': 'queue1_tasks',
        'routing_key': 'queue1',
    },
    'queue2': {
        'exchange': 'queue2_tasks',
        'routing_key': 'queue2',
    },
    'queue3': {
        'exchange': 'queue3_tasks',
        'routing_key': 'queue3',
    }
}

# 创建 Celery 应用程序
app = Celery('my_app',
             broker=broker_url,
             task_queues=task_queues,
             include=['module.tasks'])

以上代码中,我们定义了三个队列 'queue1'、'queue2' 和 'queue3',每个队列都分别定义了 exchange 和 routing_key。现在,我们需要让 Celery 任务调度代码自动创建这些队列。

创建队列

我们可以在 shell 中运行如下命令,创建队列:

sudo rabbitmqctl add_vhost my_vhost
sudo rabbitmqctl add_user my_user my_password
sudo rabbitmqctl set_permissions -p my_vhost my_user ".*" ".*" ".*"

以上命令中,我们创建了一个名称为 my_vhost 的虚拟主机,创建了一个名为 my_user 的用户,并为该用户分配了虚拟主机 my_vhost 的权限。接下来,我们需要为每个队列在 RabbitMQ 中创建一个 exchange,并将每个队列与其对应的 exchange 绑定:

# 创建 exchange
sudo rabbitmqctl -p my_vhost \
    exchange_declare \
    --exchange=queue1_tasks \
    --type=direct \
    --auto-delete=False

sudo rabbitmqctl -p my_vhost \
    exchange_declare \
    --exchange=queue2_tasks \
    --type=direct \
    --auto-delete=False

sudo rabbitmqctl -p my_vhost \
    exchange_declare \
    --exchange=queue3_tasks \
    --type=direct \
    --auto-delete=False

# 绑定 queue1 和 queue1_tasks
sudo rabbitmqctl -p my_vhost \
    queue_declare \
    --queue=queue1 \
    --auto-delete=False

sudo rabbitmqctl -p my_vhost \
    queue_bind \
    --queue=queue1 \
    --exchange=queue1_tasks \
    --routing-key=queue1

# 绑定 queue2 和 queue2_tasks
sudo rabbitmqctl -p my_vhost \
    queue_declare \
    --queue=queue2 \
    --auto-delete=False

sudo rabbitmqctl -p my_vhost \
    queue_bind \
    --queue=queue2 \
    --exchange=queue2_tasks \
    --routing-key=queue2

# 绑定 queue3 和 queue3_tasks
sudo rabbitmqctl -p my_vhost \
    queue_declare \
    --queue=queue3 \
    --auto-delete=False

sudo rabbitmqctl -p my_vhost \
    queue_bind \
    --queue=queue3 \
    --exchange=queue3_tasks \
    --routing-key=queue3

至此,我们已经成功地向 RabbitMQ 创建了三个队列和三个 exchange,并将它们绑定起来了。现在,我们可以重启 Celery 应用程序,在此之前,需要确保 RabbitMQ 正在运行。

sudo systemctl restart rabbitmq-server
celery -A my_app worker -l INFO

以上命令将启动 Celery Worker,如果所有配置正确的话,Celery 将自动创建缺失的队列。

结论

本指南向您演示了如何让 Celery 自动创建缺失的队列,在您的任务调度中,这是一个非常有用的技能。在正式环境中,您应该使用 Ansible、Docker、Kubernetes 等工具来维护您的服务器和 RabbitMQ 环境。