Использование RabbitMQ в django проектах без Celery, и что нового в Celery 3.0

от автора

Думаю что большинство python программистов уже в какой-то степени знакомы с возможностями Celery. В 1-ой части я расскажу, как можно использовать RabbitMQ без celery, а во второй части — краткий обзор новых возможностей celery 3.0.
Об установке связки Django-Celery-RabbitMQ можно почитать тут.
Про использование RabbitMQ хорошо написано тут, и тут, ну и на сайте RabbitMQ.

Коротко напомню установку и настройку:
RabbitMQ:
sudo apt-get install rabbitmq-server
Добавим пользователя:

$ rabbitmqctl add_user myuser mypassword $ rabbitmqctl add_vhost '/' $ rabbitmqctl set_permissions -p myvhost myuser ".*" ".*" ".*" 

Коротко настройки Celery, RabbitMQ:
in settings.py

import djcelery  os.environ["CELERY_LOADER"] = "django" djcelery.setup_loader() AMQP_HOST = 'localhost' BROKER_HOST='localhost' BROKER_PORT = 5672 BROKER_VHOST = "/" BROKER_USER = "myuser" BROKER_PASSWORD = "mypassword"  INSTALLED_APPS+='djcelery' 

Утверждение: для того чтобы сделать одну небольшую задачу асинхронной вовсе не обязательно использовать celery. Вполне можно обойтись RabbitMQ.
Доказательство:
Начнём от противного:
Задача: проверить email на наличие письма от заданного отправителя, если письма нет, повторить проверку через минуту, если есть — пойти дальше ( распарсить его например…)
Используем poplib, email.
Напишем функцию, получающую email от наперёд заданного отправителя и обернём её декоратором task
Функция принимает email адрес, пароль и email адрес, от кого должно прийти письмо и возвращает статус (Ok, Error) и сообщение
in tasks.py

from celery.task import task, periodic_task from celery.task.schedules import crontab  import poplib import email  @task def mail_content(user_mail, mail_pass, mail_from):     mail_server = 'pop.'+user_mail.split('@')[1]     mail_login = user_mail.split('@')[0]     p = poplib.POP3(mail_server)     print p.getwelcome()     try:         p.user(mail_login)         p.pass_(mail_pass)     except poplib.error_proto:         return 'Error', 'Email is blocked'        try:         print p.list()     except:         return 'Error', 'dont receive list of mails'     numMessages = len(p.list()[1])     print numMessages     for i in range(numMessages):         m = email.message_from_string("\n".join(p.top(i+1, 1)[1]))         try:             this_email_from = m['From']             if this_email_from.find(mail_from) >= 0:                 print this_email_from                 m = email.message_from_string('\n'.join(p.retr(i+1)[1]))                 content = get_text_body(m)                 print content                 return 'Ok', content             else:                 pass         except Exception, e:             return 'Error', unicode(e, 'utf8')     raise mail_content.retry(exc=e, countdown=30) 

Последняя строчка кода описывает перезапуск таска через 30 секунд, если письмо не было найдено.
Теперь мы можем запустить таск например так:

>>>res = mail_content.delay('user@domen', 'password', 'email_from@domen.email.from') 

в этом случае выполнение начнётся немедленно, или так:

>>>res = mail_content.apply_async(('user@domen', 'password', 'email_from@domen.email.from'), countdown=30) 

В этом случае выполнение начнется через 30 секунд.
(Предварительно нужно нужно запустить сервер celery:
python manage.py celeryd
и в другом окне запустить shell:
python manage.py shell,
А уже из шела вызывать эти команды)
Результат мы можем получить выполнив

>>>res.get() (синхронно) >>>res.info 

(возвращает None, если нет ещё результата и результат, если он есть)
Но проверять есть ли результат не всегда удобно и всегда означает выполнение лишних действий.
Для вызова функции после выполнения задачи можно реализовать callback. Если у Вас установлена celery и вы можете функцию, принимающую результат сделать задачей (task), то можете перейти к следующему подразделу. Кто хочет обойтись без celery — способ организации callback на основе pika и rabbitMQ.
Для работы с AMQP установим пакет pika:

$ sudo pip install pika==0.9.5 

Подробно Hello world с использованием этой библиотеки и RabbitMQ описано тут
in decorators.py:

import pika import pickle import time  importr settings   def callback(function_to_decorate):     user = settings.BROKER_USER     broker_host = settings.BROKER_HOST     password = settings.BROKER_PASSWORD     credentials = pika.PlainCredentials(user, password)     parameters = pika.ConnectionParameters(host=broker_host, credentials=credentials)     def receiver(*args, **kw):         (backend_function, data) = function_to_decorate(*args, **kw)         pickled_obj = pickle.dumps(data)         queue_name = str(time.time())         print "call_backend", backend_function.__name__         connection = pika.BlockingConnection(parameters)         channel = connection.channel()         channel.queue_declare( queue = queue_name)         channel.basic_publish(exchange='', routing_key=queue_name, body=pickled_obj)         channel.basic_consume( backend_function, queue=queue_name, no_ack = True)         channel.queue_delete(queue=queue_name)         connection.close()     return receiver 

Это декоратор, которым мы обернем функцию mail_content перед(!) оборачиванием декоратором @task
Декоратор возвращает нашу функцию mail_content с добавленными инструкциями отправки сообщения в rabbitmq
Не буду переписывать всю функцию в tasks.py, только что поменялось
in tasks.py:

from decorators import * from tasks_backend import mail_analizer, mail_error  @task @callback def mail_content(...):     ...     if (...):         ...         return mail_analizer, (content,)     return mail_error, ('error',) 

Возвращаем первым аргументом функцию, вторым — список аргументов, которые хотим передать в функцию
in tasks_backend.py

import tasks def mail_analizer(ch, method, properties, body):     email_text = pickle.loads(body)     if emai_text.find(u'Hello'):         tasks.send_emails.delay(email_text)     else:         tasks.send_twitter_status.delay(email_text) 

Приняли email, распознали его и запустили новые задачи.
Заметим, что аргументы не очень удобные, исправим это:
in decorators.py

def backend(function_to_decorate):     def receive(ch, method, properties, body):         data=pickle.loads(body)         args = data         function_to_decorate(*args)     return receive 

теперь можем переписать функцию mail_analizer так:

@backend def mail_analizer(email_text):      if emai_text.find(u'Hello'):         tasks.send_emails.delay(email_text)     else:         tasks.send_twitter_status.delay(email_text) 

Для запуска следующих функций используем декоратор

@callback 

так-же как и в mail_content:

@backend @callback def mail_analizer(cont):     print cont     return send_twitter_status, (cont,) 

Простой пример, построения цепочки функций с данным интерфейсом:

@callback def first(*args):     print first.__name__     print args     return senders, args @backend @callback def senders(*args):     print args     return analizer, args @backend @callback def analizer( *args):     print args     return ended_fun, args @backend def ended_fun(*args):     print ended_fun.__name__     print args 

Первая функция обёрнута только декоратором

@callback 

, т.к. она ничего не принимает из кролика, а последняя — только

@backend 

— т.к. она ничего не передаёт.
Заметим, что функция может вызывать сама себя. Также заметим что функцию, которя обёрнута декоратором backend можно вызвать только из rabbitmq.
Для запуска используем функцию, которая обёрнута только callback.

@callback def runer(*args):     return test_func, (args) @backend @callback def test_func( *args):     print args     return test_func, args 

Окончательный вариант функций mail_content, email_analizer, run_email:

@backend @call_backend def mail_content(user_mail, mail_pass, mail_from):     mail_server = 'pop.'+user_mail.split('@')[1]     mail_login = user_mail.split('@')[0]     p = poplib.POP3(mail_server)     print p.getwelcome()     try:         p.user(mail_login)         p.pass_(mail_pass)     except poplib.error_proto:         return mail_error, 'Email is blocked'     try:         print p.list()     except:         return mail_error, 'dont receive list of mails'     numMessages = len(p.list()[1])     print numMessages     for i in range(numMessages):         m = email.message_from_string("\n".join(p.top(i+1, 1)[1]))         try:             this_from = m['From']             this_from = this_from.decode('cp1251').split('<'.decode('cp1251'))[1]             if this_from.find(mail_from) >= 0:                 print m['From']                 m = email.message_from_string('\n'.join(p.retr(i+1)[1]))                 content = get_text_body(m)                 print content                 return email_analizer, (content, email_from)             else:                 pass         except Exception, e:             return email_error, (unicode(e, 'utf8'),)     return mail_content, (user_mail, mail_pass, mail_from)  @backend @call_backend def email_analizer(content, email_from):     if content.find(u'Hello'):         email_to = email_from         text=u'Hello, my dear friend'         return send_mail, (email_to, text)     return send_twitter_status, (cont,)      @call_backend def run_email():     '''получаем из базы, например, email, password, email_from '''     return  mail_content, (email, password, email_from) 

Подитог:

я надеюсь, что не было ничего сложного. Можно использовать, вместо celery, если у Вас к примеру одна небольшая задача (task).

Как это можно сделать средствами celery 3.0

В celery 3.0 в задачу (task) можно передать имя задачи, которой нужно передать результат выполнения задачи
Пример из документации:

@celery.task def add(x, y):     return x + y add.apply_async((2, 2), link=add.s(16)) 

где add — наша задача (task), add.s — подзадача( subtask), которая запускается после выполнения add(2, 2), первым аргументом в подзадачу приходит результат выполнения add(2, 2), вторым аргументом приходит 16. Итого получается (2+2)+16=20. Что такое subtask тут
Применительно к нашей задаче делаем из функции mail_analizer task, оставляем один аргумент — content, убираем декоратор @call_backend и вызываем так:

>>>mail_content.apply_async(mail_addres, mail_password, email_from, link=mail_analizer.s())
Также предусмотрена переменная link_error для случая, когда задача «рэйзит» ошибку.
Подробнее об этом тут
Помимо этого в celery 3.0 появилось:

Group

группа, принимает список задач, которые должны быть применены параллельно:
пример из документации:

>>> from celery import group >>> res = group(add.s(i, i) for i in xrange(10))() >>> res.get(timeout=1) [0, 2, 4, 6, 8, 10, 12, 14, 16, 18] или так >>> g = group(add.s(i, i) for i in xrange(10)) >>> g.apply_async() 

chain:

Цепочки вызовов
Теперь задачи можно вызывать цепочками, например:

>>> from celery import chain @task def mul(x,y):     return x*y @task def div(x,y):     return x/y # (2 + 2) * 8 / 2 >>> res = chain(add.subtask((2, 2)),                 mul.subtask((8, )),                 div.subtask((2,))).apply_async() >>> res.get() == 16  >>> res.parent.get() == 32  >>> res.parent.parent.get() == 4 короткая запись >>> (add.s(2, 2) | add.s(4) | add.s(8))().get() 16 </source  <h5>immutable</h5> Подзадачу можно определить как неизменяемую,  тогда эта подзадача будет вызываться только с определёнными при инициализации аргументами <source lang="python"> >>> add.subtask((2, 2), immutable=True) или >>> add.si(2, 2) 

chord

Аккорд:
принимает список задач, которые должны быть выполнены параллельно и задачу, которая принимает список результатов выполнения списка задач. Вот это загнул.

@task def xsum(res_list):     return sum(res_list) >>> from celery import chord >>> res = chord((add.s(i, i) for i in xrange(10)), xsum.s())() >>> res.get() 90 

используя chain(group) получим chord:

>>> c3 = (group(add.s(i, i) for i in xrange(10) | xsum.s())) >>> res = c3() >>> res.get() 90 

map

Как map(fun, [1,2,3])

res=task.map([1,2]) выполнит res=[task(1), task(2)] 

starmap

res=add.starmap([(1,2), (2,4)]) выполнит res=[add(1,2), add(2,4)] 

chuncs

Разбивает длинные списки аргументов на разные таски,

>>> from proj.tasks import add  >>> res = add.chunks(zip(range(100), range(100)), 10)() >>> res.get() [[0, 2, 4, 6, 8, 10, 12, 14, 16, 18],  [20, 22, 24, 26, 28, 30, 32, 34, 36, 38],  [40, 42, 44, 46, 48, 50, 52, 54, 56, 58],  [60, 62, 64, 66, 68, 70, 72, 74, 76, 78],  [80, 82, 84, 86, 88, 90, 92, 94, 96, 98],  [100, 102, 104, 106, 108, 110, 112, 114, 116, 118],  [120, 122, 124, 126, 128, 130, 132, 134, 136, 138],  [140, 142, 144, 146, 148, 150, 152, 154, 156, 158],  [160, 162, 164, 166, 168, 170, 172, 174, 176, 178],  [180, 182, 184, 186, 188, 190, 192, 194, 196, 198]] 

Подитог:

Celery 3.0 дает много очень удобных плюшек, которые использовать одно удовольствие, если их использовать.

Итог:

Celery предоставляет много удобных инструментов, но для небольших задач, где 90% этих функций не надо, вполне можно обойтись очередью сообщений (rabbit), таким образом избавиться от необходимости настраивать celery, уменьшить нагрузку на сервер, избавится от дополнительных зависимостей проекта.
Всем спасибо за внимание.

ссылка на оригинал статьи http://habrahabr.ru/post/158961/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *