Об установке связки Django-Celery-RabbitMQ можно почитать тут.
Про использование RabbitMQ хорошо написано тут, и тут, ну и на сайте RabbitMQ.
Коротко напомню установку и настройку:
RabbitMQ:
sudo apt-get install rabbitmq-server
Добавим пользователя:
$ rabbitmqctl add_user myuser mypassword $ rabbitmqctl add_vhost '/' $ rabbitmqctl set_permissions -p myvhost myuser ".*" ".*" ".*"
Коротко настройки Celery, RabbitMQ:
in settings.py
import djcelery os.environ["CELERY_LOADER"] = "django" djcelery.setup_loader() AMQP_HOST = 'localhost' BROKER_HOST='localhost' BROKER_PORT = 5672 BROKER_VHOST = "/" BROKER_USER = "myuser" BROKER_PASSWORD = "mypassword" INSTALLED_APPS+='djcelery'
Утверждение: для того чтобы сделать одну небольшую задачу асинхронной вовсе не обязательно использовать celery. Вполне можно обойтись RabbitMQ.
Доказательство:
Начнём от противного:
Задача: проверить email на наличие письма от заданного отправителя, если письма нет, повторить проверку через минуту, если есть — пойти дальше ( распарсить его например…)
Используем poplib, email.
Напишем функцию, получающую email от наперёд заданного отправителя и обернём её декоратором task
Функция принимает email адрес, пароль и email адрес, от кого должно прийти письмо и возвращает статус (Ok, Error) и сообщение
in tasks.py
from celery.task import task, periodic_task from celery.task.schedules import crontab import poplib import email @task def mail_content(user_mail, mail_pass, mail_from): mail_server = 'pop.'+user_mail.split('@')[1] mail_login = user_mail.split('@')[0] p = poplib.POP3(mail_server) print p.getwelcome() try: p.user(mail_login) p.pass_(mail_pass) except poplib.error_proto: return 'Error', 'Email is blocked' try: print p.list() except: return 'Error', 'dont receive list of mails' numMessages = len(p.list()[1]) print numMessages for i in range(numMessages): m = email.message_from_string("\n".join(p.top(i+1, 1)[1])) try: this_email_from = m['From'] if this_email_from.find(mail_from) >= 0: print this_email_from m = email.message_from_string('\n'.join(p.retr(i+1)[1])) content = get_text_body(m) print content return 'Ok', content else: pass except Exception, e: return 'Error', unicode(e, 'utf8') raise mail_content.retry(exc=e, countdown=30)
Последняя строчка кода описывает перезапуск таска через 30 секунд, если письмо не было найдено.
Теперь мы можем запустить таск например так:
>>>res = mail_content.delay('user@domen', 'password', 'email_from@domen.email.from')
в этом случае выполнение начнётся немедленно, или так:
>>>res = mail_content.apply_async(('user@domen', 'password', 'email_from@domen.email.from'), countdown=30)
В этом случае выполнение начнется через 30 секунд.
(Предварительно нужно нужно запустить сервер celery:
python manage.py celeryd
и в другом окне запустить shell:
python manage.py shell,
А уже из шела вызывать эти команды)
Результат мы можем получить выполнив
>>>res.get() (синхронно) >>>res.info
(возвращает None, если нет ещё результата и результат, если он есть)
Но проверять есть ли результат не всегда удобно и всегда означает выполнение лишних действий.
Для вызова функции после выполнения задачи можно реализовать callback. Если у Вас установлена celery и вы можете функцию, принимающую результат сделать задачей (task), то можете перейти к следующему подразделу. Кто хочет обойтись без celery — способ организации callback на основе pika и rabbitMQ.
Для работы с AMQP установим пакет pika:
$ sudo pip install pika==0.9.5
Подробно Hello world с использованием этой библиотеки и RabbitMQ описано тут
in decorators.py:
import pika import pickle import time importr settings def callback(function_to_decorate): user = settings.BROKER_USER broker_host = settings.BROKER_HOST password = settings.BROKER_PASSWORD credentials = pika.PlainCredentials(user, password) parameters = pika.ConnectionParameters(host=broker_host, credentials=credentials) def receiver(*args, **kw): (backend_function, data) = function_to_decorate(*args, **kw) pickled_obj = pickle.dumps(data) queue_name = str(time.time()) print "call_backend", backend_function.__name__ connection = pika.BlockingConnection(parameters) channel = connection.channel() channel.queue_declare( queue = queue_name) channel.basic_publish(exchange='', routing_key=queue_name, body=pickled_obj) channel.basic_consume( backend_function, queue=queue_name, no_ack = True) channel.queue_delete(queue=queue_name) connection.close() return receiver
Это декоратор, которым мы обернем функцию mail_content перед(!) оборачиванием декоратором @task
Декоратор возвращает нашу функцию mail_content с добавленными инструкциями отправки сообщения в rabbitmq
Не буду переписывать всю функцию в tasks.py, только что поменялось
in tasks.py:
from decorators import * from tasks_backend import mail_analizer, mail_error @task @callback def mail_content(...): ... if (...): ... return mail_analizer, (content,) return mail_error, ('error',)
Возвращаем первым аргументом функцию, вторым — список аргументов, которые хотим передать в функцию
in tasks_backend.py
import tasks def mail_analizer(ch, method, properties, body): email_text = pickle.loads(body) if emai_text.find(u'Hello'): tasks.send_emails.delay(email_text) else: tasks.send_twitter_status.delay(email_text)
Приняли email, распознали его и запустили новые задачи.
Заметим, что аргументы не очень удобные, исправим это:
in decorators.py
def backend(function_to_decorate): def receive(ch, method, properties, body): data=pickle.loads(body) args = data function_to_decorate(*args) return receive
теперь можем переписать функцию mail_analizer так:
@backend def mail_analizer(email_text): if emai_text.find(u'Hello'): tasks.send_emails.delay(email_text) else: tasks.send_twitter_status.delay(email_text)
Для запуска следующих функций используем декоратор
@callback
так-же как и в mail_content:
@backend @callback def mail_analizer(cont): print cont return send_twitter_status, (cont,)
Простой пример, построения цепочки функций с данным интерфейсом:
@callback def first(*args): print first.__name__ print args return senders, args @backend @callback def senders(*args): print args return analizer, args @backend @callback def analizer( *args): print args return ended_fun, args @backend def ended_fun(*args): print ended_fun.__name__ print args
Первая функция обёрнута только декоратором
@callback
, т.к. она ничего не принимает из кролика, а последняя — только
@backend
— т.к. она ничего не передаёт.
Заметим, что функция может вызывать сама себя. Также заметим что функцию, которя обёрнута декоратором backend можно вызвать только из rabbitmq.
Для запуска используем функцию, которая обёрнута только callback.
@callback def runer(*args): return test_func, (args) @backend @callback def test_func( *args): print args return test_func, args
Окончательный вариант функций mail_content, email_analizer, run_email:
@backend @call_backend def mail_content(user_mail, mail_pass, mail_from): mail_server = 'pop.'+user_mail.split('@')[1] mail_login = user_mail.split('@')[0] p = poplib.POP3(mail_server) print p.getwelcome() try: p.user(mail_login) p.pass_(mail_pass) except poplib.error_proto: return mail_error, 'Email is blocked' try: print p.list() except: return mail_error, 'dont receive list of mails' numMessages = len(p.list()[1]) print numMessages for i in range(numMessages): m = email.message_from_string("\n".join(p.top(i+1, 1)[1])) try: this_from = m['From'] this_from = this_from.decode('cp1251').split('<'.decode('cp1251'))[1] if this_from.find(mail_from) >= 0: print m['From'] m = email.message_from_string('\n'.join(p.retr(i+1)[1])) content = get_text_body(m) print content return email_analizer, (content, email_from) else: pass except Exception, e: return email_error, (unicode(e, 'utf8'),) return mail_content, (user_mail, mail_pass, mail_from) @backend @call_backend def email_analizer(content, email_from): if content.find(u'Hello'): email_to = email_from text=u'Hello, my dear friend' return send_mail, (email_to, text) return send_twitter_status, (cont,) @call_backend def run_email(): '''получаем из базы, например, email, password, email_from ''' return mail_content, (email, password, email_from)
Подитог:
я надеюсь, что не было ничего сложного. Можно использовать, вместо celery, если у Вас к примеру одна небольшая задача (task).
Как это можно сделать средствами celery 3.0
В celery 3.0 в задачу (task) можно передать имя задачи, которой нужно передать результат выполнения задачи
Пример из документации:
@celery.task def add(x, y): return x + y add.apply_async((2, 2), link=add.s(16))
где add — наша задача (task), add.s — подзадача( subtask), которая запускается после выполнения add(2, 2), первым аргументом в подзадачу приходит результат выполнения add(2, 2), вторым аргументом приходит 16. Итого получается (2+2)+16=20. Что такое subtask тут
Применительно к нашей задаче делаем из функции mail_analizer task, оставляем один аргумент — content, убираем декоратор @call_backend и вызываем так:
>>>mail_content.apply_async(mail_addres, mail_password, email_from, link=mail_analizer.s())
Также предусмотрена переменная link_error для случая, когда задача «рэйзит» ошибку.
Подробнее об этом тут
Помимо этого в celery 3.0 появилось:
Group
группа, принимает список задач, которые должны быть применены параллельно:
пример из документации:
>>> from celery import group >>> res = group(add.s(i, i) for i in xrange(10))() >>> res.get(timeout=1) [0, 2, 4, 6, 8, 10, 12, 14, 16, 18] или так >>> g = group(add.s(i, i) for i in xrange(10)) >>> g.apply_async()
chain:
Цепочки вызовов
Теперь задачи можно вызывать цепочками, например:
>>> from celery import chain @task def mul(x,y): return x*y @task def div(x,y): return x/y # (2 + 2) * 8 / 2 >>> res = chain(add.subtask((2, 2)), mul.subtask((8, )), div.subtask((2,))).apply_async() >>> res.get() == 16 >>> res.parent.get() == 32 >>> res.parent.parent.get() == 4 короткая запись >>> (add.s(2, 2) | add.s(4) | add.s(8))().get() 16 </source <h5>immutable</h5> Подзадачу можно определить как неизменяемую, тогда эта подзадача будет вызываться только с определёнными при инициализации аргументами <source lang="python"> >>> add.subtask((2, 2), immutable=True) или >>> add.si(2, 2)
chord
Аккорд:
принимает список задач, которые должны быть выполнены параллельно и задачу, которая принимает список результатов выполнения списка задач. Вот это загнул.
@task def xsum(res_list): return sum(res_list) >>> from celery import chord >>> res = chord((add.s(i, i) for i in xrange(10)), xsum.s())() >>> res.get() 90
используя chain(group) получим chord:
>>> c3 = (group(add.s(i, i) for i in xrange(10) | xsum.s())) >>> res = c3() >>> res.get() 90
map
Как map(fun, [1,2,3])
res=task.map([1,2]) выполнит res=[task(1), task(2)]
starmap
res=add.starmap([(1,2), (2,4)]) выполнит res=[add(1,2), add(2,4)]
chuncs
Разбивает длинные списки аргументов на разные таски,
>>> from proj.tasks import add >>> res = add.chunks(zip(range(100), range(100)), 10)() >>> res.get() [[0, 2, 4, 6, 8, 10, 12, 14, 16, 18], [20, 22, 24, 26, 28, 30, 32, 34, 36, 38], [40, 42, 44, 46, 48, 50, 52, 54, 56, 58], [60, 62, 64, 66, 68, 70, 72, 74, 76, 78], [80, 82, 84, 86, 88, 90, 92, 94, 96, 98], [100, 102, 104, 106, 108, 110, 112, 114, 116, 118], [120, 122, 124, 126, 128, 130, 132, 134, 136, 138], [140, 142, 144, 146, 148, 150, 152, 154, 156, 158], [160, 162, 164, 166, 168, 170, 172, 174, 176, 178], [180, 182, 184, 186, 188, 190, 192, 194, 196, 198]]
Подитог:
Celery 3.0 дает много очень удобных плюшек, которые использовать одно удовольствие, если их использовать.
Итог:
Celery предоставляет много удобных инструментов, но для небольших задач, где 90% этих функций не надо, вполне можно обойтись очередью сообщений (rabbit), таким образом избавиться от необходимости настраивать celery, уменьшить нагрузку на сервер, избавится от дополнительных зависимостей проекта.
Всем спасибо за внимание.
ссылка на оригинал статьи http://habrahabr.ru/post/158961/
Добавить комментарий