基于路由键如何提供邮件使用的RabbitMQ和Puka Python库

入门

先决条件

这段文字的延续如何使用的RabbitMQ和Python的Puka邮件传递给多个消费者 ,并要求相同的软件捆绑起来并正常运行。 此外,在整篇文章中使用相同的定义,我们假设读者熟悉前文的主题。

交流

我们已经描述的fanout交换,该消息传递到绑定到的地方没有额外的规则,交易所每个队列。 这是一个非常有用的机制,但缺乏灵活性。 它经常是不希望接收所有生产者发出的交流。RabbitMQ的提供了可用于实现更复杂的情况两种不同交换类型。 其中之一就是direct交流。

直接交换

介绍

直接交流提供了RabbitMQ的一个简单的,基于密钥的路由机制。 它有点类似于在第一个例子中使用的无名交换,其中消息被递送到等于消息的路由密钥的名字队列。 然而,与无名交换没有必要定义显式队列绑定,在直接交换绑定是关键和强制性的。

使用时直接交流,产生了到交换每个消息必须指定一个路由的关键,这是一个任意名称字符串,如得克萨斯州 然后,该消息将被传递到已经被绑定到具有相同路由关键这种交换的所有队列(也被明确宣布为有意与得克萨斯州路由关键信息的所有队列)。

基本无名交换之间最大的区别direct交流的是,后者需要绑定,并没有队列听消息之前,这种交流。 这反过来导致三个巨大的优势。

  1. 一个队列可以绑定到听许多不同的路由键在同一交换

  2. 一个队列可以绑定一次在许多不同的交易所听

  3. 许多队列可以绑定,听取他们对交易所相同的路由关键

让我们想象一个大城市中心:铁路和公共汽车站在一个,有许多目的地可以通过两种交通工具。 让我们想象一下,该站要使用派遣出发的RabbitMQ通知。 我们的任务是告诉大家感兴趣的公共汽车或火车到西雅图图埃勒 ,或波士顿很快离开。

这样的程序将定义一个直接的departures ,这是所有有兴趣的客户可以订阅他们的队列交流。 然后,将包含出发时间的消息与包含目的地的路由密钥产生到该交换。 例如:

  1. 消息departures具有路由交换密钥Tooele和身体的2014-01-03 15:23

  2. 消息departures具有路由交换密钥Boston和身体的2014-01-03 15:41

  3. 消息departures具有路由密钥交换Seattle和身体的2014-01-03 15:55

由于一个队列可能同时绑定到许多路由密钥,并且许多队列可以绑定到同一个密钥,我们可以很容易地:

  1. 一个客户感兴趣的只有图埃勒

  2. 一个客户有兴趣在波士顿

  3. 另一位顾客的同时有意图埃勒波士顿

所有等待信息的同时。 他们将使用我们的直接交换得到适当的信息。

生产者

为了简化示例的任务,让我们编写一个将接受一个命令行参数的基本通知分派器。 它将指定目的地,应用程序将向所有感兴趣的消费者发送当前时间。

创建一个名为样本python脚本direct_notify.py

vim direct_notify.py

并粘贴脚本内容:

import puka
import datetime
import time
import sys

# declare and connect a producer
producer = puka.Client("amqp://localhost/")
connect_promise = producer.connect()
producer.wait(connect_promise)

# create a direct exchange named departures
exchange_promise = producer.exchange_declare(exchange='departures', type='direct')
producer.wait(exchange_promise)

# send current time to destination specified with command line argument
message = "%s" % datetime.datetime.now()

message_promise = producer.basic_publish(exchange='departures', routing_key=sys.argv[1], body=message)
producer.wait(message_promise)

print "Departure to %s at %s" % (sys.argv[1], message)

producer.close()

出版社:WQ保存文件并退出。

使用一个参数运行脚本应该打印当前时间和使用的目标。 输出应如下所示:

root@rabbitmq:~# python direct_notify.py Tooele
Departure to Tooele at 2014-02-18 15:57:29.035000
root@rabbitmq:~#

让我们一步一步地完成脚本:

  1. 生产者客户创建并连接到本地的RabbitMQ实例。 从现在开始,它可以自由地与RabbitMQ通信。

  2. 命名departures直接交流创建。 它不需要在创建时指定路由密钥,因为任何发布到该交换的消息可以分配不同的密钥。 在那个步骤之后,交换存在于RabbitMQ服务器上,并且可以用于将队列绑定到它并通过它发送消息。

  3. 包含当前时间的消息将使用命令行参数作为路由键发布到该交换机。 在样本运行中,Tooele用作参数,因此作为出发目的地 - 路由键。

注:为简单起见,该脚本不检查的强制命令行参数是否提供! 如果在没有参数的情况下执行,它将无法正常工作。

消费者

该示例消费者应用将充当对从车站可达的一个或多个目的地感兴趣的公共交通客户。

创建一个名为样本python脚本direct_watch.py

vim direct_watch.py

并粘贴脚本内容:

import puka
import sys

# declare and connect a consumer
consumer = puka.Client("amqp://localhost/")
connect_promise = consumer.connect()
consumer.wait(connect_promise)

# create temporary queue
queue_promise = consumer.queue_declare(exclusive=True)
queue = consumer.wait(queue_promise)['queue']

# bind the queue to all routing keys specified by command line arguments
for destination in sys.argv[1:]:
    print "Watching departure times for %s" % destination
    bind_promise = consumer.queue_bind(exchange='departures', queue=queue, routing_key=destination)
    consumer.wait(bind_promise)

# start waiting for messages on the queue created beforehand and print them out
message_promise = consumer.basic_consume(queue=queue, no_ack=True)

while True:
    message = consumer.wait(message_promise)
    print "Departure for %s at %s" % (message['routing_key'], message['body'])

consumer.close()

出版社:WQ保存文件并退出。

运行带有一个参数图埃勒脚本应该宣布,脚本手表图埃勒发车时间,而使用多个参数运行它应该宣布看发车时间为许多目的地。

root@rabbitmq:~# python direct_watch.py Tooele
Watching departure times for Tooele
(...)
root@rabbitmq:~# python direct_watch.py Tooele Boston
Watching departure times for Tooele
Watching departure times for Boston
(...)
root@rabbitmq:~#

让我们一步一步地解释一下脚本:

  1. 消费者客户创建并连接到本地的RabbitMQ实例。 从现在开始,它可以自由地与RabbitMQ通信。

  2. 将创建此特定使用者的临时队列,由RabbitMQ自动生成名称。 队列将在脚本结束后被销毁。

  3. 队列绑定到的所有departures的所有路由项(目的地)交换使用命令行参数,指定打印屏幕上的每个目的地的信息。

  4. 脚本开始等待队列上的消息。 它将接收与绑定的路由密钥匹配的所有消息。 只有那些,既图埃勒波士顿运转时- -当图埃勒运行作为一个单一的参数上两者。 每个出发时间将打印在屏幕上。

测试

要检查两个脚本是否按预期工作,请打开服务器的三个终端窗口。 一个将用作公共交通站点发送通知。 另两个将作为客户等待出发。

在第一端子,运行direct_notify.py与任何参数脚本一次:

root@rabbitmq:~# python direct_notify.py Tooele
Departure to Tooele at 2014-02-18 15:57:29.035000
root@rabbitmq:~#

重要事项: direct_notify.py脚本必须在任何消费者至少一次被执行,作为交换必须结合队列之前被创建。 执行后,交换留在RabbitMQ服务器上,可以自由使用。

在的第二端子,运行direct_watch.py带有一个参数脚本- 图埃勒

root@rabbitmq:~# python direct_watch.py Tooele
Watching departure times for Tooele
(...)
root@rabbitmq:~#

在第三终端,运行direct_watch.py两个参数脚本- 图埃勒波士顿

root@rabbitmq:~# python direct_watch.py Tooele Boston
Watching departure times for Tooele
Watching departure times for Boston
(...)
root@rabbitmq:~#

然后,回到第一终端,发送三个离开通知。 一到图埃勒 ,一到波士顿 ,一个芝加哥的航班:

root@rabbitmq:~# python direct_notify.py Tooele
Departure to Tooele at 2014-02-18 15:57:29.035000
root@rabbitmq:~# python direct_notify.py Boston
Departure to Tooele at 2014-02-18 15:57:31.035000
root@rabbitmq:~# python direct_notify.py Chicago
Departure to Tooele at 2014-02-18 15:57:35.035000
root@rabbitmq:~#

第一次通知只应由等待离开Tooele的消费者收到。 第二个应该只得到消费者等待离开波士顿。 第三个不应该被任何这些消费者接收,因为他们没有一个等待离开芝加哥。

这是预期的行为。 这些简单的例子说明如何分发只有由路由密钥指定的某些消费者将接收的消息。

深入阅读

直接路由不提供在哪里的消息将交付完整的控制权,但来自迈出了一大步fanout以往的交流使用对攻,盲目地到处传递消息。 随着direct交换许多现实世界的消息传递方案可以送达和过程并不十分困难。

本文的主要目的是介绍使用简单,现实世界情况的基本直接路由。 许多其他用途涵盖官方详细RabbitMQ的文档,这是RabbitMQ的用户和管理员一个很好的资源。

文章提交者: Mateusz Papiernik
赞(52) 打赏
未经允许不得转载:优客志 » 系统运维
分享到:

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏