Я занимаюсь разработкой и поддержкой сервиса уведомлений в Ostrovok.ru. Сервис написан на Python3 и Django. Помимо транзакционных писем, пушей и сообщений, сервис также берёт на себя задачи по массовым рассылкам коммерческих предложений (не спам! trust me, отписки у нас работают лучше подписок) пользователям, давшим на это согласие. Со временем база активных получателей разрослась до более миллиона адресов, к чему почтовый сервис не был готов. Я хочу рассказать о том, как новые возможности Python позволили ускорить массовые рассылки и сэкономить ресурсы и с какими проблемами нам пришлось столкнуться при работе с ними.
Исходная реализация
Изначально массовые рассылки были реализованы самым простым способом: на каждого получателя в очередь помещалась задача, которую забирал один из 60 массовых воркеров (особенность наших очередей заключается в том, что каждый воркер работает в отдельном процессе), подготавливал для нее контекст, рендерил шаблон, отправлял HTTP запрос в Mailgun для отправки письма и создавал в базе запись о том, что письмо отправлено. Вся рассылка занимала до 12 часов, отправляя около 0.3 писем в секунду с каждого воркера и блокируя рассылки маленьких кампаний.
Асинхронное решение
Быстрое профилирование показало, что большую часть времени воркеры тратят на установку соединений с Mailgun’ом, поэтому мы стали группировать задачи в чанки, по чанку на каждый воркер. Воркеры стали использовать одно соединение с Mailgun’ом, что позволило сократить время рассылок до 9 часов, отправляя каждым воркером в среднем 0,5 писем в секунду. Последующее профилирование снова показало, что работа с сетью по-прежнему занимает большую часть времени, что и подтолкнуло нас к идее использовать asyncio.
Перед тем как поместить всю обработку в asyncio цикл, нам нужно было продумать решение ряда проблем:
- Django ORM пока ещё не умеет работать с asyncio, однако во время выполнения запросов освобождает GIL. Это значит, что запросы к базе могут выполняться в отдельном потоке и не блокировать работу основного цикла.
- Актуальные версии aiohttp требуют Python версии 3.6 и выше, что в момент реализации потребовало обновить докер образ. Эксперименты на более старых версиях aiohttp и Python 3.5 показали, что скорость отправки на этих версиях гораздо ниже, чем на новых, и сопоставима с последовательной отправкой.
- Хранение большого количества asyncio корутин быстро ведёт к расходованию всей памяти. Это значит, что нельзя заготовить заранее все корутины для писем и вызвать цикл для их обработки, необходимо подготавливать данные по мере отправки уже сформированных писем.
Учитывая все особенности, создадим внутри каждого из воркеров свой asyncio цикл с подобием ThreadPool паттерна, состоящего из:
- Одного или более производителей (producer), работающих с базой данных через Django ORM в отдельном потоке через asyncio.ThreadPoolExecutor. Производитель старается агрегировать запросы получения данных в маленькие батчи, рендерит шаблоны для полученных данных через Jinja2 и складывает данные для отправок в очередь задач.
def get_campaign_send_data(ids: Iterable[int]) -> Iterable[Mapping[str, Any]]: """Формируем данные для отправки писем, здесь происходит работа с Django ORM и рендером шаблонов.""" return [{'id': id} for id in ids] async def mail_campaign_producer(ids: Iterable[int], task_queue: asyncio.Queue) -> None: """ Группируем получателей в подчанки и формируем для них данные для отправки, которые помещаем в очередь. Формирование данных требует работы с базой, поэтому выполняем его в ThreadPoolExecutor. """ loop = asyncio.get_event_loop() total = len(ids) for subchunk_start in range(0, total, PRODUCER_SUBCHUNK_SIZE): subchunk_ids = ids[subchunk_start : min(subchunk_start + PRODUCER_SUBCHUNK_SIZE, total)] send_tasks = await loop.run_in_executor(None, get_campaign_send_data, subchunk_ids) for task in send_tasks: await task_queue.put(task)
- Нескольких сотен отправщиков писем – asyncio корутины, которые в бесконечном цикле читают данные из очереди задач, отправляют сетевые запросы для каждой из них и складывают результат (ответ, или исключение) в очередь отчётов.
async def send_mail(data: Mapping[str, Any], session: aiohttp.ClientSession) -> Union[Mapping[str, Any], Exception]: """Отправляем запрос во внешний сервис.""" async with session.post(REQUEST_URL, data=data) as response: if response.status_code != 200: raise Exception return data async def mail_campaign_sender( task_queue: asyncio.Queue, result_queue: asyncio.Queue, session: aiohttp.ClientSession ) -> None: """ Забираем из очереди данные и отправляем сетевые запросы. Нужно не забывать вызывать task_done, чтобы вызывающий код понял, когда завершится отправка. """ while True: try: task_data = await task_queue.get() result = await send_mail(task_data, session) await result_queue.put(result) except asyncio.CancelledError: # Корректно обрабатываем остановку корутины raise except Exception as exception: # Обрабатываем ошибки отправки писем await result_queue.put(exception) finally: task_queue.task_done()
- Одного или нескольких воркеров, группирующих данные из очереди отчётов и помещающих в базу данных bulk запросом информацию о результате отправки письма.
def process_campaign_results(results: Iterable[Union[Mapping[str, Any], Exception]]) -> None: """Обрабатываем результаты отправок: исключения и успех и помещаем их в базу данных""" pass async def mail_campaign_reporter(task_queue: asyncio.Queue, result_queue: asyncio.Queue) -> None: """ Группируем отчёты в список и передаём на обработку в ThreadPoolExecutor, чтобы положить в базу данных информацию об отправках. """ loop = asyncio.get_event_loop() results_chunk = [] while True: try: results_chunk.append(await result_queue.get()) if len(results_chunk) >= REPORTER_BATCH_SIZE: await loop.run_in_executor(None, process_campaign_results, results_chunk) results_chunk.clear() except asyncio.CancelledError: await loop.run_in_executor(None, process_campaign_results, results_chunk) results_chunk.clear() raise finally: result_queue.task_done()
- Очереди задач, являющейся экземпляром asyncio.Queue, ограниченной по максимальному количеству элементов, чтобы производитель не переполнял её, расходуя всю память.
- Очереди отчётов, также являющуюся экземпляром asyncio.Queue с ограничением на максимальное количество элементов.
- Асинхронного метода, который создаёт очереди, воркеры, и завершает рассылку по их остановке.
async def send_mail_campaign( recipient_ids: Iterable[int], session: aiohttp.ClientSession, loop: asyncio.AbstractEventLoop = None ) -> None: """ Создаёт очереди и запускает воркеры для обработки. Дожидается завершения формирования получателей, после ждёт окончания отправки и сохранения отчётов. """ executor = ThreadPoolExecutor(max_workers=PRODUCERS_COUNT + 1) loop = loop or asyncio.get_event_loop() loop.set_default_executor(executor) task_queue = asyncio.Queue(maxsize=2 * SENDERS_COUNT, loop=loop) result_queue = asyncio.Queue(maxsize=2 * SENDERS_COUNT, loop=loop) producers = [ asyncio.ensure_future(mail_campaign_producer(recipient_ids, task_queue)) for _ in range(PRODUCERS_COUNT) ] consumers = [ asyncio.ensure_future(mail_campaign_sender(task_queue, result_queue, session)) for _ in range(SENDERS_COUNT) ] reporter = asyncio.ensure_future(mail_campaign_reporter(task_queue, result_queue)) # Дожидаемся, когда все письма будут подготовлены done, _ = await asyncio.wait(producers) # Когда завершатся все отправки, останавливаем воркеров await task_queue.join() while consumers: consumers.pop().cancel() # Когда завершится сохранение отчётов, также останавливаем соответствующий воркер await result_queue.join() reporter.cancel()
- Синхронного кода, который создаёт цикл и начинает рассылку.
async def close_session(future: asyncio.Future, session: aiohttp.ClientSession) -> None: """ Закрываем сессию, когда вся обработка завершена. Документация aiohttp рекомендует добавить задержку перед закрытием сессии. """ await asyncio.wait([future]) await asyncio.sleep(0.250) await session.close() def mail_campaign_send_chunk(recipient_ids: Iterable[int]) -> None: """ Точка входа для начала рассылки. Принимает идентификаторы получателей, создаёт asyncio цикл и запускает корутину отправки. """ loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) # Session connector = aiohttp.TCPConnector(limit_per_host=0, limit=0) session = aiohttp.ClientSession( connector=connector, auth=aiohttp.BasicAuth('api', API_KEY), loop=loop, read_timeout=60 ) send_future = asyncio.ensure_future(send_mail_campaign(recipient_ids, session, loop=loop)) cleanup_future = asyncio.ensure_future(close_session(send_future, session)) loop.run_until_complete(asyncio.wait([send_future, cleanup_future])) loop.close()
После реализации такого решения время отправки массовых рассылок сократилось до часа при таких же объёмах рассылок и 12 задействованных воркерах. То есть каждый воркер отправляет 20-25 писем в секунду, что в 50-80 раз производительнее исходного решения. Потребление памяти воркеров сохранилось на исходном уровне, загрузка процессора немного выросла, утилизация сети возросла многократно, что является ожидаемым эффектом. Также выросло количество соединений с базой данных, поскольку каждый из потоков воркеров-производителей и воркеров, сохраняющих отчёты, активно работают с базой. При этом освободившиеся воркеры могут рассылать небольшие рассылки в то время, как отправляется массовая кампания.
Несмотря на все преимущества, такая реализация имеет ряд сложностей, которые необходимо учитывать:
- Необходимо быть осторожными при обработке ошибок. Необработанное исключение может завершить выполнение воркера, из-за чего кампания «подвиснет».
- При завершении отправки необходимо не потерять отчёты по получателям, не заполнившие чанк до конца, и сохранить их в базу данных.
- Усложняется логика принудительной остановки возобновления кампаний, поскольку после остановки рассылающих воркеров, необходимо сопоставлять, каким получателям были отправлены письма, а каким – нет.
- Через какое-то время сотрудники поддержки Mailgun связались с нами и попросили снизить скорость отправки, потому что почтовые сервисы начинают временно отклонять письма, если частота их отправок превышает пороговое значение. Это легко сделать, уменьшив количество воркеров.
- Нельзя было бы использовать asyncio, если какой-то из этапов отправки писем выполнял бы требовательные к ресурсам процессора опрерации. Рендер шаблонов с использованием jinja2 оказался не очень ресурсоёмкой операцией и практически не оказывает влияния на скорость отправки.
- Использование asyncio для рассылок требует, чтобы обработчики очереди рассылок запускались отдельными процессами.
Надеюсь, наш опыт будет вам полезен! Если остались вопросы или появились идеи, пишите в комментариях!
ссылка на оригинал статьи https://habr.com/ru/company/ostrovok/blog/482114/
Добавить комментарий