Celery throttling — настраивам rate limit для очередей

от автора

​ В этой статье я покажу как решить одну из проблем, возникающих при использовании распределенных очередей задач — регулирование пропускной способности очереди, или же, более простым языком, настройка ее rate limit’a. В качестве примера я возьму python и свою любимую связку Celery+RabbitMQ, хотя алгоритм, который я использую, никак не зависит от этих инструментов и может быть реализован на любом другом стэке.

Celery+RabbitMQ

So what’s the problem?

​ Для начала пара слов о том, какую проблему я вообще пытаюсь решить. Дело в том, что 99.9% сервисов в интернете запрещают бесконтрольно закидывать их сотнями/тысячами запросов в секунду, угрожая дать в ответ какой-нибудь 403 или 500. Нет, ну правда, жалко им чтоле? Иногда таким сервисом может выступать даже своя собственная БД… Вобщем, доверять нынче нельзя никому, поэтому приходится себя как-то сдерживать.

​ Конечно, если вся работа ведется внутри 1го процесса, то никакой проблемы нет, но т.к мы работаем с Celery, то у нас может быть не только N процессов (далее воркеров), но и M машин, и задача все это дело синхронизировать уже не кажется столь тривиальной.

What’s in the box

​ Первое, на что натыкаешься, когда ищешь, как же настроить throttling в celery, это встроенный параметр rate_limit класса Task. Звучит как то, что надо, но, копнув чуть глубже, замечаем, что:

Нельзя задать rate limit на группу задач.
Это неудобно, т.к зачастую доступ к какому-то лимитированому ресурсу размазан между разными тасками.

   # представим что у нас лимит на вызовы API гитхаба 60 req/min    # придется поделить вызовы поровну    @app.task(rate_limit='30/m')    def get_github_api1():        ...     @app.task(rate_limit='30/m')    def get_github_api2():        ...

Этот лимит работает только внутри воркера, то есть он локальный и у каждого воркера свой.

​ Конечно, можно еще раз поделить лимит, теперь взяв в расчет еще и количество воркеров. Но все это начнет работать дико неэффективно, если таски будут прилетать неравномерно, например в какую-то минуту мы получим 60 вызовов get_github_api1() и 0 вызовов get_github_api2() — будут выполнены только 30 вызовов первого типа, хотя могли бы быть все 60. К тому же каждый раз, как появится новая таска, которой нужен доступ к этому ресурсу, придется снова везде пересчитывать все лимиты. Вобщем фича конечно полезная, но только для самых простых вариантов.

Bringing decision

Token Bucket

​ Решением проблемы для меня стал Token Bucket — алгоритм, использующийся для контроля полосы пропускания канала в компьютерных и телекомуникационных сетях. Опишу его в 2ух словах: пакет данных, чтобы пройти проверку канала на лимит, должен иметь при себе токен, который он взял из хранилища; в то же время в хранилище токены поступают с некоторой частотой. То есть пропускная способоность канала ограничивается скоростью выпуска токенов, которую нам и надо регулировать.
​ В нашем же случае вместо пакета данных мы имеем таску, а хранилищем токенов будут выступать очереди RabbitMQ.

Token Bucket

Wrting some code

Чтож, приступим к написанию кода. Создадим файл main.py и зададим базовые настройки:

from celery import Celery from kombu import Queue  app = Celery('Test app', broker='amqp://guest@localhost//')  # 1 очередь под сами таски и 1 очередь под токены для них app.conf.task_queues = [     Queue('github'),     # я ограничил длину очереди до 2ух, чтобы токены не скапливались     # иначе это может привести к пробою нашего rate limit'a     Queue('github_tokens', max_length=2) ]  # это таска будет играть роль нашего токена # она никогда не будет запущена, мы просто будем забирать ее как сообщение из очереди @app.task def token():     return 1  # настраиваем постоянный выпуск нашего токена @app.on_after_configure.connect def setup_periodic_tasks(sender, **kwargs):     # мы будем выпускать по 1му токену в секунду     # это значит что rate limit для очереди github - 60 задач в минуту     sender.add_periodic_task(1.0, token.signature(queue='github_tokens'))

Не забудьте развернуть Rabbit, я предпочитаю делать это 1ой строчкой докера:

docker run -d --rm --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3-management

Теперь запустим celery beat — это специальный воркер celery, запускаемый всегда в единственном экземпляре и отвечающий за запуск периодических задач.

celery -A main beat --loglevel=info

После этого в консоли раз в секунду начнут появляться сообщения:

[2020-03-22 22:49:00,992: INFO/MainProcess] Scheduler: Sending due task main.token() (main.token)

Отлично, мы наладили выпуск токенов для нашего ‘ведра’. Осталось только научить наших воркеров из него брать. Попробуем оптимизировать код, который мы написали ранее для запросов в github. Добавим эти строчки к main.py:

# Напишем функцию для взятия токена из очереди def rate_limit(task, task_group):     # берем соединение с брокером из пула     with task.app.connection_for_read() as conn:         # забираем токен         msg = conn.default_channel.basic_get(task_group+'_tokens', no_ack=True)         # получили None - очередь пуста, токенов нет         if msg is None:             # повторить таску через 1 сек             task.retry(countdown=1)  # Добавим print в таски для логирования # Здесь я поставил max_retries=None, так что таски будут # повторяться, пока не будут выполнены @app.task(bind=True) def get_github_api1(self, max_retries=None):     rate_limit(self, 'github')     print ('Called Api 1')  @app.task(bind=True) def get_github_api2(self, max_retries=None):     rate_limit(self, 'github')     print ('Called Api 2')

А теперь проверим, как это все работает. В дополнение к уже запущенному beat добавим 8 воркеров:

celery -A main worker -с 8 -Q github

И создадим отдельный маленький скрипт для запуска этих задач, назовем его producer.py:

from main import get_github_api1, get_github_api2  tasks = [get_github_api1, get_github_api2]  for  i in range(100):     # запускаю таски в перемешку     tasks[i % 2].apply_async(queue='github')

Запускаем — python producer.py, и смотрим в логи воркеров:

[2020-03-23 13:04:15,017: WARNING/ForkPoolWorker-3] Called Api 2 [2020-03-23 13:04:16,053: WARNING/ForkPoolWorker-8] Called Api 2 [2020-03-23 13:04:17,112: WARNING/ForkPoolWorker-1] Called Api 2 [2020-03-23 13:04:18,187: WARNING/ForkPoolWorker-1] Called Api 1 ... (96 more lines)

Несмотря на то, что у нас целых 8 рабочих процессов, таски выполняются примерно раз в секунду, отправляясь в конец очереди, если на момент их выполнения не оказалось токена. Также, я думаю, вы уже заметили, что на самом деле мы накладываем rate limit не совсем на очередь, а скорее на какую-то логически связанную группу задач, которые на самом деле могут находится как в разных очередях, так и в одной. Таким образом наш контроль становится даже более детальным и гранулированным.

Putting it all together

​ Конечно, количество таких групп задач не ограничено (разве что возможностями брокера). Соберем весь код в кучку, расширим и причешим его:

from celery import Celery from kombu import Queue from queue import Empty from functools import wraps  app = Celery('hello', broker='amqp://guest@localhost//')  task_queues = [     Queue('github'),     Queue('google') ]  # количество запусков в минуту rate_limits = {     'github': 60,     'google': 100 }  # автоматически сгенерируем очереди с токенами под все группы, на которые нужен лимит task_queues += [Queue(name+'_tokens', max_length=2) for name, limit in rate_limits.items()]  app.conf.task_queues = task_queues  @app.task def token():     return 1  @app.on_after_configure.connect def setup_periodic_tasks(sender, **kwargs):     # автоматически настроим выпуск токенов с нужной скоростью     for name, limit in rate_limits.items():         sender.add_periodic_task(60 / limit, token.signature(queue=name+'_tokens'))  # Как можно не любить декораторы? def rate_limit(task_group):     def decorator_func(func):         @wraps(func)         def function(self, *args, **kwargs):             with self.app.connection_for_read() as conn:                 # тут я для примера использовал другой более высокоуровневый подход:                 # в замен на получение полноценного интерфейса очереди                 # мы немного теряем в перфомансе, т.к под капотом происходит обмен                 # несколькими сообщениями с брокером                 with conn.SimpleQueue(task_group+'_tokens', no_ack=True, queue_opts={'max_length':2}) as queue:                     try:                         # из плюсов также - наличие вот такого блокирующего вызова                         # это может быть удобнее, чем постоянная ротация с retry()                         # впрочем, это нужно подбирать под кейс                         queue.get(block=True, timeout=5)                         return func(self, *args, **kwargs)                     except Empty:                         self.retry(countdown=1)         return function     return decorator_func  # с декораторами все-таки намного красивее и читабельнее, согласитесь? ;) @app.task(bind=True, max_retries=None) @rate_limit('github') def get_github_api1(self):     print ('Called github Api 1')  @app.task(bind=True, max_retries=None) @rate_limit('github') def get_github_api2(self):     print ('Called github Api 2')  @app.task(bind=True, max_retries=None) @rate_limit('google') def query_google_api1(self):     print ('Called Google Api 1')  @app.task(bind=True, max_retries=None) @rate_limit('google') def query_google_api1(self):     print ('Called Google Api 2')

Таким образом суммарные вызовы задач группы google не превысят 100/мин, а группы github — 60/мин. Заметьте, что для того, чтобы настроить такой throttling, понадобилось меньше 50 строк. Как по мне, достаточно просто.

Moving further

​ Ну, вот все и работает как надо, причем без каких-либо сторонних примочек, средствами только самого брокера. Но зачем останавливаться на достигнутом ;)? Грамотно используя данный алгоритм, можно пойти дальше и создать намного более сложные и гибкие стратегии. Например, некоторые таски могут брать не 1, а несколько токенов (возможно даже из разных очередей, если обращение идет к нескольким сервисам), таким образом у нас появится понятие ‘веса’ задачи, или же расширить размер нашего ‘ведра’ токенов, позволив им накапливаться, тем самым компенсируя периоды простоя. Вобщем, пространство для маневра просто огромное и ограничено только вашим воображением и инженерными навыками) Всем спасибо, всем удачи!

P.s. Поделитесь кто как решал подобную проблему, будет интересно услышать 😉

ссылка на оригинал статьи https://habr.com/ru/post/494090/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *