Первое, что я решил, это сделать отдельный процесс(ы) «worker», который бы брал на себя всю синхронную работу. Задумал, чтобы «worker» был максимально прост, и делал задачи из очереди одну за другой. Скажем, выбрал из базы что-нибудь, ответил, взял на себя следующую задачу и так далее. Самих «worker»ов можно запустить много и тогда AMQP выступает уже в роли некоего подобия IPC.
Спустя некоторое время из этого вырос модуль, который берет на себя всю рутину связанную с AMQP и передачей сообщений туда и назад, а также сжимает их gzipом, если данных слишком много. Так родился crew. Собственно, используя его, мы с вами напишем простой API, который будет состоять из сервера на tornado и простых и незамысловатых «worker» процессов. Забегая вперед скажу, что весь код доступен на github, а то, о чем я буду рассказывать дальше, собрано в папке example.
Подготовка
Итак, давайте разберемся по порядку. Первое, что нам нужно будет сделать — это установить RabbitMQ. Как это делать я описывать не буду. Скажу лишь то, что на той-же убунте он ставится и работает из коробки. У меня на маке единственное, что пришлось сделать, это поставить LaunchRocket, который собрал все сервисы, что были установлены через homebrew и вывел в GUI:
Дальше создадим наш проект virtualenv и установим сам модуль через pip:
mkdir -p api cd api virtualenv env source env/bin/activate pip install crew tornado
В зависимостях модуля умышленно не указан tornado, так как на хосте с workerом его может и не быть. А на веб-части обычно создают requirements.txt, где указаны все остальные зависимости.
Код я буду писать частями, чтобы не нарушать порядок повествования. То, что у нас получится в итоге, можно посмотреть тут.
Пишем код
Сам tornado сервер состоит из двух частей. В первой части мы определяем обработчики запросов handlers, а во второй запускается event-loop. Давайте напишем сервер и создадим наш первый метод api.
Файл master.py:
# encoding: utf-8 import tornado.ioloop import tornado.gen import tornado.web import tornado.options class MainHandler(tornado.web.RequestHandler): @tornado.gen.coroutine def get(self): # Вызываем задачу test c приоритетом 100 resp = yield self.application.crew.call('test', priority=100) self.write("{0}: {1}".format(type(resp).__name__, str(resp))) application = tornado.web.Application( [ ('/', MainHandler), ], autoreload=True, debug=True, ) if __name__ == "__main__": tornado.options.parse_command_line() application.listen(8888) tornado.ioloop.IOLoop.instance().start()
Благодаря coroutine в торнадо, код выглядит просто. Можно написать тоже самое без coroutine.
Файл master.py:
class MainHandler(tornado.web.RequestHandler): def get(self): # Вызываем задачу test c приоритетом 100 self.application.crew.call('test', priority=100, callback=self._on_response) def _on_response(resp, headers): self.write("{0}: {1}".format(type(resp).__name__, str(resp)))
Наш сервер готов. Но если мы его запустим, и сходим на /, то не дождемся ответа, его некому обрабатывать.
Теперь напишем простой worker:
Файл worker.py:
# encoding: utf-8 from crew.worker import run, context, Task @Task('test') def long_task(req): context.settings.counter += 1 return 'Wake up Neo.\n' run( counter=0, # This is a part of this worker context )
Итак, как видно в коде, есть простая функция, обернутая декоратором Task(«test»), где test — это уникальный идентификатор задачи. В вашем worker не может быть двух задач с одинаковыми идентификаторами. Конечно, правильно было бы назвать задачу «crew.example.test» (так обычно и называю в продакшн среде), но для нашего примера достаточно просто «test».
Сразу бросается в глаза context.settings.counter. Это некий контекст, который инициализируется в worker процессе при вызове функции run. Также в контексте уже есть context.headers — это заголовки ответа для отделения метаданных от ответа. В примере с callback-функцией именно этот словарь передается в _on_response.
Заголовки сбрасываются после каждого ответа, а вот context.settings — нет. Я использую context.settings для передачи в функции worker(ы) соединения с базой данных и вообще любого другого объекта.
Также worker обрабатывает ключи запуска, их не много:
$ python worker.py --help Usage: worker.py [options] Options: -h, --help show this help message and exit -v, --verbose make lots of noise --logging=LOGGING Logging level -H HOST, --host=HOST RabbitMQ host -P PORT, --port=PORT RabbitMQ port
URL подключения к базе и прочие переменные можно брать из переменный окружения. Поэтому worker в параметрах ждет только как ему соединиться c AMQP (хост и порт) и уровень логирования.
Итак, запускаем все и проверяем:
$ python master.py & python worker.py
Работает, но что случилось за ширмой?
При запуске tornado-сервера tornado подключился к RabbitMQ, создал Exchange DLX и начал слушать очередь DLX. Это Dead-Letter-Exchange — специальная очередь, в которую попадают задачи, которые не взял ни один worker за определенный timeout. Также он создал очередь с уникальным идентификатором, куда будут поступать ответы от workerов.
После запуска worker создал по очереди на каждую обернутую декоратором Task очередь и подписался на них. При поступлении задачи воркер main-loop создает один поток, контролируя в главном потоке время исполнения задачи и выполняет обернутую функцию. После return из обернутой функции сериализует его и ставит в очередь ответов сервера.
После поступления запроса tornado-сервер cтавит задачу в соответствующую очередь, указывая при этом идентификатор своей уникальной очереди, в которую должен поступить ответ. Если ни один воркер не взял задачу, тогда RabbitMQ перенаправляет задачу в exchange DLX и tornado-сервер получает сообщение о том, что истек таймаут пребывания очереди, генерируя исключение.
Зависшая задача
Чтобы продемонстрировать, как работает механизм завершения задач, которые повисли в процессе выполнения, напишем еще один веб-метод и задачу в worker.
В файл master.py добавим:
class FastHandler(tornado.web.RequestHandler): @tornado.gen.coroutine def get(self): try: resp = yield self.application.crew.call( 'dead', persistent=False, priority=255, expiration=3, ) self.write("{0}: {1}".format(type(resp).__name__, str(resp))) except TimeoutError: self.write('Timeout') except ExpirationError: self.write('All workers are gone')
И добавим его в список хендлеров:
application = tornado.web.Application( [ (r"/", MainHandler), (r"/stat", StatHandler), ], autoreload=True, debug=True, )
А в worker.py:
@Task('dead') def infinite_loop_task(req): while True: pass
Как видно из приведенного выше примера, задача уйдет в бесконечный цикл. Однако, если задача не выполнится за 3 секунды (считая время получения из очереди), main-loop в воркере пошлет потоку исключение SystemExit. И да, вам придется обработать его.
Контекст
Как уже упоминалось выше, контекст — это такой специальный объект, который импортируется и имеет несколько встроенных переменных.
Давайте сделаем простую статистику по ответам нашего worker.
В файл master.py добавим следующий handler:
class StatHandler(tornado.web.RequestHandler): @tornado.gen.coroutine def get(self): resp = yield self.application.crew.call('stat', persistent=False, priority=0) self.write("{0}: {1}".format(type(resp).__name__, str(resp)))
Также зарегистрируем в списке обработчиков запросов:
application = tornado.web.Application( [ (r"/", MainHandler), (r"/fast", FastHandler), (r"/stat", StatHandler), ], autoreload=True, debug=True, )
Этот handler не очень отличается от предыдущих, просто возвращает значение, которое ему передал worker.
Теперь сама задача.
В файл worker.py добавим:
@Task('stat') def get_counter(req): context.settings.counter += 1 return 'I\'m worker "%s". And I serve %s tasks' % (context.settings.uuid, context.settings.counter)
Функция возвращает строку, с информацией о количестве задач, обработанных workerом.
PubSub и Long polling
Теперь реализуем пару обработчиков. Один при запросе будет просто висеть и ждать, а второй будет принимать POST данные. После передачи последних первый будет их отдавать.
master.py:
class LongPoolingHandler(tornado.web.RequestHandler): LISTENERS = [] @tornado.web.asynchronous def get(self): self.LISTENERS.append(self.response) def response(self, data): self.finish(str(data)) @classmethod def responder(cls, data): for cb in cls.LISTENERS: cb(data) cls.LISTENERS = [] class PublishHandler(tornado.web.RequestHandler): @tornado.gen.coroutine def post(self, *args, **kwargs): resp = yield self.application.crew.call('publish', self.request.body) self.finish(str(resp)) ... application = tornado.web.Application( [ (r"/", MainHandler), (r"/stat", StatHandler), (r"/fast", FastHandler), (r'/subscribe', LongPoolingHandler), (r'/publish', PublishHandler), ], autoreload=True, debug=True, ) application.crew = Client() application.crew.subscribe('test', LongPoolingHandler.responder) if __name__ == "__main__": application.crew.connect() tornado.options.parse_command_line() application.listen(8888) tornado.ioloop.IOLoop.instance().start()
Напишем задачу publish.
worker.py:
@Task('publish') def publish(req): context.pubsub.publish('test', req)
Если же вам не нужно передавать управление в worker, можно просто публиковать прямо из tornado-сервера
class PublishHandler2(tornado.web.RequestHandler): def post(self, *args, **kwargs): self.application.crew.publish('test', self.request.body)
Параллельное выполнение заданий
Часто бывает ситуация, когда мы можем выполнить несколько заданий параллельно. В crew есть для этого небольшой синтаксический сахар:
class Multitaskhandler(tornado.web.RequestHandler): @tornado.gen.coroutine def get(self, *args, **kwargs): with self.application.crew.parallel() as mc: # mc - multiple calls mc.call('test') mc.call('stat') test_result, stat_result = yield mc.result() self.set_header('Content-Type', 'text/plain') self.write("Test result: {0}\nStat result: {1}".format(test_result, stat_result))
В этом случае, задаче будут поставлены две задачи параллельно и выход из with будет произведен по окончании последней.
Но нужно быть осторожным, так как какая-то задача может вызвать исключение. Оно будет приравнено непосредственно переменной. Таким образом, вам нужно проверить, не является ли test_result и stat_result экземплярами класса Exception.
Планы на будущее
Когда eigrad предложил написать прослойку, которой можно запустить любое wsgi приложение с помощью crew, мне эта идея сразу понравилась. Только представьте, запросы хлынут не на ваше wsgi приложение, а будут равномерно поступать через очередь на wsgi-worker.
Я никогда не писал wsgi сервер и даже не знаю, с чего начать. Но вы можете мне помочь, pull-requestы я принимаю.
Также думаю дописать client для еще одного популярного асинхронного фреймворка, для twisted. Но пока разбираюсь с ним, да и свободного времени не хватает.
Благодарности
Спасибо разработчикам RabbitMQ и AMQP. Замечательные идеи.
Также спасибо вам, читатели. Надеюсь, что вы не зря потратили время.
ссылка на оригинал статьи http://habrahabr.ru/post/241740/
Добавить комментарий