📅  最后修改于: 2023-12-03 15:22:59.816000             🧑  作者: Mango
在 RabbitMQ 中,通常情况下我们只能通过指定消息的过期时间来实现延迟投递功能。但是在某些情况下,我们希望可以精确地指定消息的投递时间,这时候就需要使用 RabbitMQ 的一个插件: rabbitmq_delayed_message_exchange。
rabbitmq_delayed_message_exchange 是 RabbitMQ 的一个插件,它相当于一个普通的 exchange,但是可以根据消息中的 header 来判断该消息是否需要延迟投递或者需要以精确时间投递。当某个消息需要延迟投递时,它不会直接被路由到对应的 queue 中,而是会被发送到一个内部的 exchange 中,然后在指定的延迟时间后再次路由到原本应该被路由到的 queue 中。
首先,需要在 RabbitMQ 服务器上启用 rabbitmq_delayed_message_exchange 插件。可以通过以下命令来启用该插件:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
启用插件之后,我们就可以使用该插件提供的新的 exchange 类型:x-delayed-message。在创建 exchange 时,需要指定该 exchange 的 type 为 x-delayed-message,并添加一个 x-delayed-type 参数,该参数指定了原本应该被使用的 exchange 的 type。
以下是一个创建一个 type 为 fanout 的 x-delayed-message 的例子:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='delayed_exchange', exchange_type='x-delayed-message', arguments={
'x-delayed-type': 'fanout' # 原本应该是 fanout 类型的 exchange
})
connection.close()
接下来,创建 queue 时,需要按照以下方式指定相关参数:
channel.queue_declare(queue='my_queue', arguments={
'x-dead-letter-exchange': 'delayed_exchange', # 消息过期后将被发送到的 exchange
'x-message-ttl': 60000 # 消息过期时间,单位为毫秒
})
当需要延迟投递消息时,只需要在消息的 header 中添加一个 x-delay 参数,该参数指定了延迟的时间,单位为毫秒。该消息会被发送到 'delayed_exchange' exchange,然后在指定的时间后再次被路由到 'my_queue' queue。
下面是一个基本的 Publisher 和 Consumer 的示例。
Publisher:
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='delayed_exchange', exchange_type='x-delayed-message', arguments={
'x-delayed-type': 'fanout'
})
channel.queue_declare(queue='my_queue', arguments={
'x-dead-letter-exchange': 'delayed_exchange',
'x-message-ttl': 60000
})
channels.basic_publish(
exchange='delayed_exchange',
routing_key='',
body='test delayed message',
properties=pika.BasicProperties(
headers={
'x-delay': 5000 # 5s 后投递消息
}
)
)
connection.close()
Consumer:
import pika
def callback(channel, method, properties, body):
print(body)
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='delayed_exchange', exchange_type='x-delayed-message', arguments={
'x-delayed-type': 'fanout'
})
channel.queue_declare(queue='my_queue', arguments={
'x-dead-letter-exchange': 'delayed_exchange',
'x-message-ttl': 60000
})
channel.queue_bind(exchange='delayed_exchange', queue='my_queue')
channel.basic_consume(queue='my_queue', on_message_callback=callback, auto_ack=True)
channel.start_consuming()
以上就是 rabbitmq_delayed_message_exchange 的一些简单介绍和使用方法。使用该插件可以非常方便地实现一些需要精确控制投递时间的功能。