介绍
异步或非阻塞处理是将某些任务的执行与程序的主流分离的方法。 这为您提供了几个优点,包括允许面向用户的代码运行不中断。
消息传递是程序组件可以用来通信和交换信息的方法。 它可以同步或异步地实现,并且可以允许离散进程无问题地通信。 由于消息队列通常实现附加功能,提供更高的性能,并且可以完全驻留在内存中,因此消息传递通常实现为传统数据库的替代方式。
Celery是一个内置的异步消息传递系统的任务队列。 它可以用作可以转储编程任务的存储桶。 传递任务的程序可以继续执行并响应性地工作,然后稍后可以轮询Celery以查看计算是否完成并检索数据。
虽然Celery是用Python编写的,但它的协议可以用任何语言实现。 它甚至可以通过webhooks与其他语言。
通过在程序环境中实现作业队列,您可以轻松地卸载任务,并继续处理用户的交互。 这是一种提高应用程序响应速度,在执行长时间运行计算时不会被锁定的简单方法。
在本指南中,我们将使用RabbitMQ作为Ubuntu 12.04 VPS上的消息传递系统来安装和实现Celery作业队列。
安装组件
安装Celery
Celery是用Python编写的,因此,它很容易安装,就像我们处理普通的Python包一样。
我们将通过创建虚拟环境来安装我们的邮件系统,遵循建议的处理Python包的过程。 这有助于我们保持我们的环境稳定,不影响较大的系统。
从Ubuntu的默认存储库安装Python虚拟环境包:
sudo apt-get update
sudo apt-get install python-virtualenv
我们将创建一个消息目录,我们将实现我们的系统:
mkdir ~/messaging
cd ~/messaging
我们现在可以创建一个虚拟环境,我们可以使用以下命令安装celery:
virtualenv --no-site-packages venv
配置虚拟环境后,我们可以通过键入以下命令来激活它:
source venv/bin/activate
您的提示将更改,以反映您现在在我们上面所做的虚拟环境中操作。 这将确保我们的Python包是安装在本地而不是全局。
如果在任何时候我们需要停用环境(不是现在),您可以键入:
deactivate
现在我们已经激活了环境,我们可以用pip安装celery:
pip install celery
安装RabbitMQ
Celery需要一个消息代理,以便处理来自外部源的请求。 该代理称为“代理”。
对于可供选择的代理,有很多选项,包括关系数据库,NoSQL数据库,键值存储和实际的消息传递系统。
我们将配置Celery使用RabbitMQ消息传递系统,因为它提供强大,稳定的性能,并与Celery良好互动。 这是一个伟大的解决方案,因为它包括与我们的预期用途的网格良好的功能。
我们可以通过Ubuntu的仓库安装RabbitMQ:
sudo apt-get install rabbitmq-server
RabbitMQ服务在安装时在我们的服务器上自动启动。
创建Celery实例
为了使用Celery的任务排队功能,安装后的第一步必须是创建Celery实例。 这是一个简单的导入包的过程,创建一个“应用程序”,然后设置Celery将能够在后台执行的任务。
让我们来创建我们的消息目录里面名为Python脚本tasks.py
在这里我们可以定义我们的工人可以执行的任务。
sudo nano ~/messaging/tasks.py
我们应该做的第一件事是从celery包导入Celery函数:
from celery import Celery
之后,我们可以创建一个连接到默认RabbitMQ服务的Celery应用程序实例:
from celery import Celery
app = Celery('tasks', backend='amqp', broker='amqp://')
第一个参数的Celery
功能将被追加到任务,以确定他们的名字。
在backend
参数是一个可选参数,如果你想查询后台任务的状态,或检索其结果是必要的。
如果你的任务只是函数做一些工作然后退出,而不返回一个有用的值在程序中使用,你可以离开此参数。 如果只有一些任务需要此功能,请在此处启用它,我们可以根据具体情况进一步禁用它。
该broker
参数指定连接到我们的经纪人所需的URL。 在我们的例子中,这是在我们的服务器上运行的RabbitMQ服务。 RabbitMQ使用称为“amqp”的协议进行操作。 如果RabbitMQ的是根据其默认配置下运行,Celery能比其他任何其他信息连接amqp://
方案。
构建Celery任务
仍然在这个文件中,我们现在需要添加我们的任务。
每个Celery任务必须与装饰推出@app.task
。 这允许Celery识别它可以添加其排队功能的功能。 在每个装饰器之后,我们只需创建一个我们的工作者可以运行的函数。
我们的第一个任务将是一个简单的函数,打印一个字符串到控制台。
from celery import Celery
app = Celery('tasks', backend='amqp', broker='amqp://')
@app.task
def print_hello():
print 'hello there'
因为此函数不返回任何有用的信息(它反而将其打印到控制台),我们可以告诉Celery不使用后端存储有关此任务的状态信息。 这在发动机罩下不太复杂,并且需要较少的资源。
from celery import Celery app = Celery('tasks', backend='amqp', broker='amqp://') @app.task(ignore_result=True) def print_hello(): print 'hello there'
接下来,我们将增加一个功能,将生成素数(摘自RosettaCode )。 这可以是一个长时间运行的进程,因此这是一个很好的例子,说明当我们等待结果时我们如何处理异步工作进程。
from celery import Celery
app = Celery('tasks', backend='amqp', broker='amqp://')
@app.task(ignore_result=True)
def print_hello():
print 'hello there'
@app.task
def gen_prime(x):
multiples = []
results = []
for i in xrange(2, x+1):
if i not in multiples:
results.append(i)
for j in xrange(i*i, x+1, i):
multiples.append(j)
return results
因为我们关心这个函数的返回值是什么,因为我们想知道什么时候已经完成(所以我们可以使用结果等),我们不添加ignore_result
参数第二个任务。
保存并关闭文件。
启动Celery Worker进程
我们现在可以启动一个工作进程,它将能够接受来自应用程序的连接。 它将使用我们刚刚创建的文件来了解它可以执行的任务。
启动工作程序实例与使用celery命令调用应用程序名称一样简单。 我们将在我们的字符串的结尾处包含一个“&”字符,以便将我们的工作进程放在后台:
celery worker -A tasks &
这将启动一个应用程序,然后将其从终端分离,允许您继续使用它进行其他任务。
如果要启动多个工人,你可以通过命名每一个与这么做-n
参数:
celery worker -A tasks -n one.%h &
celery worker -A tasks -n two.%h &
在%h
将由主机名代替当工人而得名。
要停止工作,可以使用kill命令。 我们可以查询进程id,然后根据这些信息消除worker。
ps auxww | grep 'celery worker' | awk '{print $2}' | xargs kill
这将允许工作者在退出之前完成其当前任务。
如果您希望关闭所有工作者而不等待他们完成任务,您可以执行:
ps auxww | grep 'celery worker' | awk '{print $2}' | xargs kill -9
使用队列处理工作
我们可以使用我们产生的工作进程在我们的程序的后台完成工作。
我们将探索Python解释器中的不同选项,而不是创建一个完整的程序来演示如何工作:
python
在提示符下,我们可以将我们的函数导入到环境中:
from tasks import print_hello
from tasks import gen_prime
如果测试这些函数,它们似乎没有任何特殊功能。 第一个函数按预期打印一行:
print_hello()
hello there
第二个函数返回素数列表:
primes = gen_prime(1000)
print primes
如果我们给第二个函数一个更大范围的数字来检查,执行挂起,同时它计算:
primes = gen_prime(50000)
通过键入“CTRL-C”停止执行。 这个过程显然不是在后台计算。
要访问后台工作,我们需要使用.delay
方法。 Celery用我们的附加功能包装我们的功能。 此方法用于将该函数传递给worker执行。 它应该立即返回:
primes = gen_prime.delay(50000)
这个任务现在正由我们之前开始的工人执行。 因为我们配置了一个backend
为我们的应用程序的参数,我们可以检查计算的状态,并可以访问结果。
要检查任务是否完成,我们可以使用.ready
的方法:
primes.ready()
False
值为“False”表示任务仍在运行,并且结果尚不可用。 当我们得到一个值“True”,我们可以做一些与答案。
primes.ready()
True
我们可以通过使用获取的值.get
方法。
如果我们已经验证了数值计算的.ready
方法,那么我们可以用这样的方法是:
print primes.get()
[2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83, 89, 97, 101, 103, 107, 109, 113, 127, 131, 137, 139, 149, 151, 157, 163, 167, 173, 179, 181, 191, 193, 197, 199, 211, 223, 227, 229, 233, 239, 241, 251, 257, 263, 269, 271, 277, 281, 283, 293, 307, 311, 313, 317, 331, 337, 347, 349, 353, 359, 367, 373, 379, 383, 389, 397, 401, 409, 419, 421, 431, 433, 439, 443, 449, 457, 461, 463, 467, 479, 487, 491, 499, 503, 509, 521, 523,
. . .
但是,如果你还没有使用了.ready
方法之前调用.get
,您很可能希望增加一个“超时”选项,这样你的程序是不是被迫等待的结果,这会破坏的目的,我们的实现:
print primes.get(timeout=2)
如果超时,这将引发异常,这可以在程序中处理。
结论
虽然这是足够的信息,让您开始在程序中使用Celery,它只是擦伤表面上的这个库的完整功能。 Celery允许您将后台任务串在一起,分组任务,并以有趣的方式组合函数。
虽然Celery是用Python编写的,但它可以通过webhooks与其他语言一起使用。 这使得它非常灵活地将任务移动到后台,无论您选择的语言。