如何使用的RabbitMQ和Python的Puka将消息传递给多个消费者

先决条件

RabbitMQ

与RabbitMQ的工作来发送和接收消息可能只是安装和配置软件本身后如何安装和管理的RabbitMQ详细解释如何获得的RabbitMQ工作,对于使用此消息代理一个很好的起点。

Puka Python库

本文中的所有示例都使用Python语言提供,并使用处理AMQP消息传递协议的puka库进行了备份。 Python被选择为一个干净和易于理解的语言,为了简单的介绍,但由于AMQP是一个广泛采用的协议,任何其他编程语言可以自由使用来实现类似的目标。

Puka可使用快速安装pip一个Python包管理器- 。

pip install puka

pip并不总是与Linux发行版捆绑在一起。 在基于Debian的发行版(包括Ubuntu)上,可以很容易地安装:

apt-get install python-pip

在RHEL的基础上,像CentOS:

yum install python-setuptools
easy_install pip

RabbitMQ及其术语简介

消息[RabbitMQ的特别]介绍了描述消息代理及其力学的基本原理的一些术语。

  • 生产者发送邮件,从而创建一个消息产生一个聚会。

  • 消费者 接收信息,从而接收消息占用了聚会。

  • 队列是在其中发送的消息被存储并准备好被接收的缓冲区。 单个队列可以保存的消息数量没有限制。 对于有多少生产者可以向队列发送消息也没有限制,也没有对多少消费者尝试访问它的限制。 当消息命中现有队列时,它在那里等待直到被访问该特定队列的消费者消费。 当消息命中不存在的队列时,它被丢弃。

  • 交易所是驻留生产者和队列之间的实体。 生产者从不直接向队列发送消息。 它将消息发送到交换机,交换机依次将消息放置到一个或多个队列,这取决于所使用的交换。 为了使用现实生活中的隐喻,交换就像邮递员:它处理消息,以便它们被交付到适当的队列(邮箱),消费者可以从中收集它们。

  • 绑定是队列和交换机之间的连接。 绑定到某个交换的队列由交换服务。 究竟取决于交换本身。

所有五个术语将在本文中使用。 还有一个,严格来说与puka python库有关,它被选择作为它的清晰度的首选库。 它是一个承诺 ,其可以被理解为一个同步请求,保证所述请求的并在其上执行(成功与否)客户端等待,直到它完成AMQP服务器。

虽然puka可以异步工作,在我们的例子中puka将被用作同步库。 这意味着在每个请求(promise)之后puka将等待,直到它被执行,然后才能进入下一步。

用一个简单的例子测试RabbitMQ和Puka

为了测试是否Message Broker和Puka完美的作品,并获得在发送和接收消息在实践中是如何工作的一握,创建一个示例python脚本命名rabbit_test.py

vim rabbit_test.py

并粘贴脚本内容:

import puka

# declare send and receive clients, both connecting to the same server on local machine
producer = puka.Client("amqp://localhost/")
consumer = puka.Client("amqp://localhost/")

# connect sending party
send_promise = producer.connect()
producer.wait(send_promise)

# connect receiving party
receive_promise = consumer.connect()
consumer.wait(receive_promise)

# declare queue (queue must exist before it is being used - otherwise messages sent to that queue will be discarded)
send_promise = producer.queue_declare(queue='rabbit')
producer.wait(send_promise)

# send message to the queue named rabbit
send_promise = producer.basic_publish(exchange='', routing_key='rabbit', body='Droplet test!')
producer.wait(send_promise)

print "Message sent!"

# start waiting for messages, also those sent before (!), on the queue named rabbit
receive_promise = consumer.basic_consume(queue='rabbit', no_ack=True)

print "Starting receiving!"

while True:
    received_message = consumer.wait(receive_promise)
    print "GOT: %r" % (received_message['body'],)
    break

出版社:WQ保存文件并退出。

运行脚本应打印由脚本发送到的RabbitMQ队列的消息时,因为测试程序接收到消息之后立即。
输出应如下所示:

root@rabbitmq:~# python rabbit_test.py
Message sent!
Starting receiving!
GOT: 'Droplet test!'
root@rabbitmq:~#

为了解释这段代码中发生了什么,让我们一步一步:

  1. 消费者和生产者被创建并连接到相同的RabbitMQ服务器,驻留在localhost

  2. 生产者声明一个队列,以确保在生成消息时它存在。 如果不是这个步骤,队列可能不存在,因此消息可能立即被丢弃。

  3. 生产者将消息发送到同一个路由密钥的指定预先创建的队列nameless_交换 (更多的交流是后)。 之后,消息命中交换,这反过来将其放在“兔子”队列。 然后消息就坐在那里,直到有人消费它。

  4. 消费者访问“兔子”队列并开始接收存储在那里的消息。 因为有一个消息等待,它将立即交付。 它被消耗,这意味着它将不再停留在队列中。

  5. 消耗的消息将打印在屏幕上。

扇出交换

在前面的例子中,使用无名交换将消息传递到名为“rabbit”的特定队列。 无名交换需要一个队列名来工作,这意味着它只能将消息传递到单个队列。

也有其他类型的交流在RabbitMQ的 ,其中之一是扇出 ,我们在这个文本主要关注的问题。 扇出交换是一个简单的,盲目的工具,并将消息传递到所有队列是知道的。 对于扇出交换,没有必要(实际上是不可能的)提供特定的队列名称。 发出该类交换的消息将在消息生成之前传递到绑定到交换的所有队列。 有多少队列可以连接到交换机没有限制。

发布/订阅模式

随着扇出交流,我们可以很容易地创建一个发布/订阅模式,工作像一个开放给所有的通讯。 制作人,通讯广播员,向其可能甚至不知道(产生消息并将其发送到通讯扇出交换)的观众发送定期消息。 新订阅者申请通讯(将自己的队列绑定到同一个通讯扇出)。 从那时起,通讯扇出交换将向所有注册订户(队列)递送消息。

虽然人们对一消息非常简单,开发人员经常使用其他通信手段,一到多(其中,“多”是不明确的,可以批次之间的任何东西)是一个非常受欢迎的场景,其中一个消息代理可以是巨大的帮助。

写作生产者应用程序

生产者应用程序的唯一作用是创建一个命名的扇出交换并且产生到该交换的周期性消息(每几秒钟)。 在现实生活中,由于某种原因会产生消息。 为了简化示例,将自动生成消息。 此应用程序将作为通讯发布商。

创建一个python脚本命名newsletter_produce.py

vim newsletter_produce.py

并粘贴脚本内容:

import puka
import datetime
import time

# declare and connect a producer
producer = puka.Client("amqp://localhost/")
connect_promise = producer.connect()
producer.wait(connect_promise)

# create a fanout exchange
exchange_promise = producer.exchange_declare(exchange='newsletter', type='fanout')
producer.wait(exchange_promise)

# send current time in a loop
while True:
    message = "%s" % datetime.datetime.now()

    message_promise = producer.basic_publish(exchange='newsletter', routing_key='', body=message)
    producer.wait(message_promise)

    print "SENT: %s" % message

    time.sleep(1)

producer.close()

让我们一步一步与示例来解释代码中发生了什么。

  1. 生成器客户端已创建并连接到本地RabbitMQ实例。 从现在开始,它可以自由地与RabbitMQ通信。

  2. 命名newsletter扇出交换创建。 在那个步骤之后,交换存在于RabbitMQ服务器上,并且可以用于将队列绑定到它并通过它发送消息。

  3. 在一个无限循环,与当前时间的消息产生的newsletter交流。 需要注意的是routing_key是空的,这意味着没有指定特定的队列。 这是交换,将传递消息到适当的队列进一步。

应用程序在运行时会将当前时间通知所有电子报订阅者。

撰写消费者应用程序

消费者应用程序将创建一个临时队列并将其绑定到一个命名的扇出交换。 之后,它将开始等待消息。 在将队列绑定到交换之后,由以前创建的生成器发送的每个消息将由该消费者接收。 此应用程序将充当通讯订阅者 - 可以一次运行应用程序多次,并且所有实例仍将接收广播消息。

创建一个python脚本命名newsletter_consume.py

vim newsletter_consume.py

并粘贴脚本内容:

import puka

# declare and connect a consumer
consumer = puka.Client("amqp://localhost/")
connect_promise = consumer.connect()
consumer.wait(connect_promise)

# create temporary queue
queue_promise = consumer.queue_declare(exclusive=True)
queue = consumer.wait(queue_promise)['queue']

# bind the queue to newsletter exchange
bind_promise = consumer.queue_bind(exchange='newsletter', queue=queue)
consumer.wait(bind_promise)

# start waiting for messages on the queue created beforehand and print them out
message_promise = consumer.basic_consume(queue=queue, no_ack=True)

while True:
    message = consumer.wait(message_promise)
    print "GOT: %r" % message['body']

consumer.close()

消费者代码比生产者更复杂。 让我们一步一步看看:

  1. 创建客户端客户端并将其连接到本地RabbitMQ实例。

  2. 创建临时队列。 临时意味着未提供任何名称-队列名称将自动生成由RabbitMQ的。 此外,这样的队列将在客户端断开连接后被销毁。 它是创建队列的常见方式,只存在于绑定到其中一个交换,没有其他特殊用途。 因为有必要创建一个队列来接收任何东西,所以避免考虑队列名称是一个方便的方法。

  3. 创建的队列绑定到newsletter交流。 从那一刻起,扇出交换机将每个消息传递到该队列。

  4. 在无限循环中,消费者在队列上等待,接收击中队列并将其打印在屏幕上的每个消息。

该应用程序在运行时从通讯发布商接收时间通知。 它可以一次执行多次,并且此应用程序的每个单个实例将获取当前时间。

测试这两个应用程序

要测试通讯发布商及其消费者,请打开到虚拟服务器的多个SSH会话(或打开多个终端窗口,如果在本地计算机上工作)。
在其中一个窗口中运行生产者应用程序。

root@rabbitmq:~# python newsletter_produce.py

它将开始显示当前时间的每秒:

SENT: 2014-02-11 17:24:47.309000
SENT: 2014-02-11 17:24:48.310000
SENT: 2014-02-11 17:24:49.312000
SENT: 2014-02-11 17:24:50.316000
...

在每个其他窗口中运行消费者应用程序:

root@rabbitmq:~# python newsletter_consume.py

此应用程序的每个实例都将接收制片人广播的时间通知:

GOT: 2014-02-11 17:24:47.309000
GOT: 2014-02-11 17:24:48.310000
GOT: 2014-02-11 17:24:49.312000
GOT: 2014-02-11 17:24:50.316000
...

这意味着RabbitMQ正确注册了扇出交换,将用户队列绑定到此交换,并将发送的消息传递到适当的队列。 换句话说,RabbitMQ按预期工作。

深入阅读

发布/订阅是一种简单的(在概念上和实现上的)消息模式,通常可以派上用场; 它是没有地方接近RabbitMQ限制。 有无数的方法来使用RabbitMQ来解决消息问题,包括高级消息路由,消息确认,安全性或持久性。

本文的主要目的是使用简单的例子介绍基本的消息传递概念。 许多其他用途涵盖官方详细RabbitMQ的文档,这是RabbitMQ的用户和管理员一个很好的资源。

文章提交者: Mateusz Papiernik
赞(52) 打赏
未经允许不得转载:优客志 » 系统运维
分享到:

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏