📜  启用 rabbitmq_delayed_message_exchange (1)

📅  最后修改于: 2023-12-03 15:22:59.816000             🧑  作者: Mango

启用 rabbitmq_delayed_message_exchange

在 RabbitMQ 中,通常情况下我们只能通过指定消息的过期时间来实现延迟投递功能。但是在某些情况下,我们希望可以精确地指定消息的投递时间,这时候就需要使用 RabbitMQ 的一个插件: rabbitmq_delayed_message_exchange。

什么是 rabbitmq_delayed_message_exchange

rabbitmq_delayed_message_exchange 是 RabbitMQ 的一个插件,它相当于一个普通的 exchange,但是可以根据消息中的 header 来判断该消息是否需要延迟投递或者需要以精确时间投递。当某个消息需要延迟投递时,它不会直接被路由到对应的 queue 中,而是会被发送到一个内部的 exchange 中,然后在指定的延迟时间后再次路由到原本应该被路由到的 queue 中。

如何使用 rabbitmq_delayed_message_exchange

首先,需要在 RabbitMQ 服务器上启用 rabbitmq_delayed_message_exchange 插件。可以通过以下命令来启用该插件:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

启用插件之后,我们就可以使用该插件提供的新的 exchange 类型:x-delayed-message。在创建 exchange 时,需要指定该 exchange 的 type 为 x-delayed-message,并添加一个 x-delayed-type 参数,该参数指定了原本应该被使用的 exchange 的 type。

以下是一个创建一个 type 为 fanout 的 x-delayed-message 的例子:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='delayed_exchange', exchange_type='x-delayed-message', arguments={
    'x-delayed-type': 'fanout'  # 原本应该是 fanout 类型的 exchange
})
connection.close()

接下来,创建 queue 时,需要按照以下方式指定相关参数:

channel.queue_declare(queue='my_queue', arguments={
    'x-dead-letter-exchange': 'delayed_exchange',  # 消息过期后将被发送到的 exchange
    'x-message-ttl': 60000  # 消息过期时间,单位为毫秒
})

当需要延迟投递消息时,只需要在消息的 header 中添加一个 x-delay 参数,该参数指定了延迟的时间,单位为毫秒。该消息会被发送到 'delayed_exchange' exchange,然后在指定的时间后再次被路由到 'my_queue' queue。

下面是一个基本的 Publisher 和 Consumer 的示例。

Publisher:

import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='delayed_exchange', exchange_type='x-delayed-message', arguments={
    'x-delayed-type': 'fanout'
})
channel.queue_declare(queue='my_queue', arguments={
    'x-dead-letter-exchange': 'delayed_exchange',
    'x-message-ttl': 60000
})

channels.basic_publish(
    exchange='delayed_exchange',
    routing_key='',
    body='test delayed message',
    properties=pika.BasicProperties(
        headers={
            'x-delay': 5000  # 5s 后投递消息
        }
    )
)

connection.close()

Consumer:

import pika

def callback(channel, method, properties, body):
    print(body)

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='delayed_exchange', exchange_type='x-delayed-message', arguments={
    'x-delayed-type': 'fanout'
})
channel.queue_declare(queue='my_queue', arguments={
    'x-dead-letter-exchange': 'delayed_exchange',
    'x-message-ttl': 60000
})
channel.queue_bind(exchange='delayed_exchange', queue='my_queue')

channel.basic_consume(queue='my_queue', on_message_callback=callback, auto_ack=True)
channel.start_consuming()

以上就是 rabbitmq_delayed_message_exchange 的一些简单介绍和使用方法。使用该插件可以非常方便地实现一些需要精确控制投递时间的功能。