如何使用Celery的RabbitMQ到队列的Ubuntu VPS任务

介绍

异步或非阻塞处理是将某些任务的执行与程序的主流分离的方法。 这为您提供了几个优点,包括允许面向用户的代码运行不中断。

消息传递是程序组件可以用来通信和交换信息的方法。 它可以同步或异步地实现,并且可以允许离散进程无问题地通信。 由于消息队列通常实现附加功能,提供更高的性能,并且可以完全驻留在内存中,因此消息传递通常实现为传统数据库的替代方式。

Celery是一个内置的异步消息传递系统的任务队列。 它可以用作可以转储编程任务的存储桶。 传递任务的程序可以继续执行并响应性地工作,然后稍后可以轮询Celery以查看计算是否完成并检索数据。

虽然Celery是用Python编写的,但它的协议可以用任何语言实现。 它甚至可以通过webhooks与其他语言。

通过在程序环境中实现作业队列,您可以轻松地卸载任务,并继续处理用户的交互。 这是一种提高应用程序响应速度,在执行长时间运行计算时不会被锁定的简单方法。

在本指南中,我们将使用RabbitMQ作为Ubuntu 12.04 VPS上的消息传递系统来安装和实现Celery作业队列。

安装组件

安装Celery

Celery是用Python编写的,因此,它很容易安装,就像我们处理普通的Python包一样。

我们将通过创建虚拟环境来安装我们的邮件系统,遵循建议的处理Python包的过程。 这有助于我们保持我们的环境稳定,不影响较大的系统。

从Ubuntu的默认存储库安装Python虚拟环境包:

sudo apt-get update
sudo apt-get install python-virtualenv

我们将创建一个消息目录,我们将实现我们的系统:

mkdir ~/messaging
cd ~/messaging

我们现在可以创建一个虚拟环境,我们可以使用以下命令安装celery:

virtualenv --no-site-packages venv

配置虚拟环境后,我们可以通过键入以下命令来激活它:

source venv/bin/activate

您的提示将更改,以反映您现在在我们上面所做的虚拟环境中操作。 这将确保我们的Python包是安装在本地而不是全局。

如果在任何时候我们需要停用环境(不是现在),您可以键入:

deactivate

现在我们已经激活了环境,我们可以用pip安装celery:

pip install celery

安装RabbitMQ

Celery需要一个消息代理,以便处理来自外部源的请求。 该代理称为“代理”。

对于可供选择的代理,有很多选项,包括关系数据库,NoSQL数据库,键值存储和实际的消息传递系统。

我们将配置Celery使用RabbitMQ消息传递系统,因为它提供强大,稳定的性能,并与Celery良好互动。 这是一个伟大的解决方案,因为它包括与我们的预期用途的网格良好的功能。

我们可以通过Ubuntu的仓库安装RabbitMQ:

sudo apt-get install rabbitmq-server

RabbitMQ服务在安装时在我们的服务器上自动启动。

创建Celery实例

为了使用Celery的任务排队功能,安装后的第一步必须是创建Celery实例。 这是一个简单的导入包的过程,创建一个“应用程序”,然后设置Celery将能够在后台执行的任务。

让我们来创建我们的消息目录里面名为Python脚本tasks.py在这里我们可以定义我们的工人可以执行的任务。

sudo nano ~/messaging/tasks.py

我们应该做的第一件事是从celery包导入Celery函数:

from celery import Celery

之后,我们可以创建一个连接到默认RabbitMQ服务的Celery应用程序实例:

from celery import Celery

app = Celery('tasks', backend='amqp', broker='amqp://')

第一个参数的Celery功能将被追加到任务,以确定他们的名字。

backend参数是一个可选参数,如果你想查询后台任务的状态,或检索其结果是必要的。

如果你的任务只是函数做一些工作然后退出,而不返回一个有用的值在程序中使用,你可以离开此参数。 如果只有一些任务需要此功能,请在此处启用它,我们可以根据具体情况进一步禁用它。

broker参数指定连接到我们的经纪人所需的URL。 在我们的例子中,这是在我们的服务器上运行的RabbitMQ服务。 RabbitMQ使用称为“amqp”的协议进行操作。 如果RabbitMQ的是根据其默认配置下运行,Celery能比其他任何其他信息连接amqp://方案。

构建Celery任务

仍然在这个文件中,我们现在需要添加我们的任务。

每个Celery任务必须与装饰推出@app.task 这允许Celery识别它可以添加其排队功能的功能。 在每个装饰器之后,我们只需创建一个我们的工作者可以运行的函数。

我们的第一个任务将是一个简单的函数,打印一个字符串到控制台。

from celery import Celery

app = Celery('tasks', backend='amqp', broker='amqp://')

@app.task
def print_hello():
    print 'hello there'

因为此函数不返回任何有用的信息(它反而将其打印到控制台),我们可以告诉Celery不使用后端存储有关此任务的状态信息。 这在发动机罩下不太复杂,并且需要较少的资源。

from celery import Celery

app = Celery('tasks', backend='amqp', broker='amqp://')

@app.task(ignore_result=True)
def print_hello():
    print 'hello there'

接下来,我们将增加一个功能,将生成素数(摘自RosettaCode )。 这可以是一个长时间运行的进程,因此这是一个很好的例子,说明当我们等待结果时我们如何处理异步工作进程。

from celery import Celery

app = Celery('tasks', backend='amqp', broker='amqp://')

@app.task(ignore_result=True)
def print_hello():
    print 'hello there'

@app.task
def gen_prime(x):
    multiples = []
    results = []
    for i in xrange(2, x+1):
        if i not in multiples:
            results.append(i)
            for j in xrange(i*i, x+1, i):
                multiples.append(j)
    return results

因为我们关心这个函数的返回值是什么,因为我们想知道什么时候已经完成(所以我们可以使用结果等),我们不添加ignore_result参数第二个任务。

保存并关闭文件。

启动Celery Worker进程

我们现在可以启动一个工作进程,它将能够接受来自应用程序的连接。 它将使用我们刚刚创建的文件来了解它可以执行的任务。

启动工作程序实例与使用celery命令调用应用程序名称一样简单。 我们将在我们的字符串的结尾处包含一个“&”字符,以便将我们的工作进程放在后台:

celery worker -A tasks &

这将启动一个应用程序,然后将其从终端分离,允许您继续使用它进行其他任务。

如果要启动多个工人,你可以通过命名每一个与这么做-n参数:

celery worker -A tasks -n one.%h &
celery worker -A tasks -n two.%h &

%h将由主机名代替当工人而得名。

要停止工作,可以使用kill命令。 我们可以查询进程id,然后根据这些信息消除worker。

ps auxww | grep 'celery worker' | awk '{print $2}' | xargs kill

这将允许工作者在退出之前完成其当前任务。

如果您希望关闭所有工作者而不等待他们完成任务,您可以执行:

ps auxww | grep 'celery worker' | awk '{print $2}' | xargs kill -9

使用队列处理工作

我们可以使用我们产生的工作进程在我们的程序的后台完成工作。

我们将探索Python解释器中的不同选项,而不是创建一个完整的程序来演示如何工作:

python

在提示符下,我们可以将我们的函数导入到环境中:

from tasks import print_hello
from tasks import gen_prime

如果测试这些函数,它们似乎没有任何特殊功能。 第一个函数按预期打印一行:

print_hello()
hello there

第二个函数返回素数列表:

primes = gen_prime(1000)
print primes

如果我们给第二个函数一个更大范围的数字来检查,执行挂起,同时它计算:

primes = gen_prime(50000)

通过键入“CTRL-C”停止执行。 这个过程显然不是在后台计算。

要访问后台工作,我们需要使用.delay方法。 Celery用我们的附加功能包装我们的功能。 此方法用于将该函数传递给worker执行。 它应该立即返回:

primes = gen_prime.delay(50000)

这个任务现在正由我们之前开始的工人执行。 因为我们配置了一个backend为我们的应用程序的参数,我们可以检查计算的状态,并可以访问结果。

要检查任务是否完成,我们可以使用.ready的方法:

primes.ready()
False

值为“False”表示任务仍在运行,并且结果尚不可用。 当我们得到一个值“True”,我们可以做一些与答案。

primes.ready()
True

我们可以通过使用获取的值.get方法。

如果我们已经验证了数值计算的.ready方法,那么我们可以用这样的方法是:

print primes.get()
[2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83, 89, 97, 101, 103, 107, 109, 113, 127, 131, 137, 139, 149, 151, 157, 163, 167, 173, 179, 181, 191, 193, 197, 199, 211, 223, 227, 229, 233, 239, 241, 251, 257, 263, 269, 271, 277, 281, 283, 293, 307, 311, 313, 317, 331, 337, 347, 349, 353, 359, 367, 373, 379, 383, 389, 397, 401, 409, 419, 421, 431, 433, 439, 443, 449, 457, 461, 463, 467, 479, 487, 491, 499, 503, 509, 521, 523,
. . .

但是,如果你还没有使用了.ready方法之前调用.get ,您很可能希望增加一个“超时”选项,这样你的程序是不是被迫等待的结果,这会破坏的目的,我们的实现:

print primes.get(timeout=2)

如果超时,这将引发异常,这可以在程序中处理。

结论

虽然这是足够的信息,让您开始在程序中使用Celery,它只是擦伤表面上的这个库的完整功能。 Celery允许您将后台任务串在一起,分组任务,并以有趣的方式组合函数。

虽然Celery是用Python编写的,但它可以通过webhooks与其他语言一起使用。 这使得它非常灵活地将任务移动到后台,无论您选择的语言。

作者:Justin Ellingwood
赞(52) 打赏
未经允许不得转载:优客志 » 系统运维
分享到:

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏