📜  Python – 使用 RabbitMQ 的异步任务(1)

📅  最后修改于: 2023-12-03 15:04:11.716000             🧑  作者: Mango

Python – 使用 RabbitMQ 的异步任务

介绍

在现代的分布式系统中,一些任务需要异步处理以提高性能和可伸缩性。RabbitMQ 是一个流行的消息代理,它可以将消息传递到异步工作队列中,这些工作队列可以在许多不同的工作者之间进行分散处理。

本文将介绍如何在 Python 中使用 RabbitMQ 来创建异步任务,并将任务发送到工作队列中进行处理。

准备工作

在开始本教程之前,需要安装以下软件:

我们还需要安装 Pika,这是一个 RabbitMQ Python 客户端库:

pip install pika
创建工作队列

要创建一个工作队列,我们需要向 RabbitMQ 发送一条消息。将消息发送到某个队列将创建该队列(如果尚不存在)。如果队列已经存在,则将重用该队列。

首先,我们需要连接到 RabbitMQ 服务器:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

现在可以创建一个队列:

channel.queue_declare(queue='my-queue')

这将在 RabbitMQ 中创建一个名为 'my-queue' 的队列。

发送消息

要将消息发送到工作队列中,我们需要将消息发布到特定的队列中。在 RabbitMQ 中,发布消息是一个不同于将消息发送到队列的过程。

我们需要指定消息要发布到的队列和消息体:

channel.basic_publish(exchange='', routing_key='my-queue', body='Hello, World!')

通过将消息发布到 'my-queue' 队列,我们告诉 RabbitMQ,我们希望工作者在该队列中接收异步任务。

接收消息

现在我们已经将任务发送到工作队列中,需要将消息传递给一个或多个工作者来异步处理。

为了接收消息,我们需要创建一个回调函数。该回调函数将在我们从队列中获取消息时执行:

def callback(ch, method, properties, body):
    print("Received message:", body.decode())

channel.basic_consume(queue='my-queue', on_message_callback=callback, auto_ack=True)

print('Waiting for messages...')
channel.start_consuming()

当工作者完成异步任务时,它们可以向 RabbitMQ 发送一个确认消息,告知 RabbitMQ 它们已经处理了消息。

完整代码
import pika

# Connect to RabbitMQ server
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# Create a queue
channel.queue_declare(queue='my-queue')

# Publish a message
channel.basic_publish(exchange='', routing_key='my-queue', body='Hello, World!')

# Define a callback function for message consumption
def callback(ch, method, properties, body):
    print("Received message:", body.decode())

# Start consuming messages
channel.basic_consume(queue='my-queue', on_message_callback=callback, auto_ack=True)
print('Waiting for messages...')
channel.start_consuming()
结论

在本文中,我们学习了如何在 Python 中使用 RabbitMQ 创建异步任务,将任务发送到工作队列中,并由工作者异步处理。

RabbitMQ 可以大大提高分布式系统的性能和可伸缩性,现在使用这些技术创建分布式应用程序将变得更容易。