Массовая асинхронная обработка запросов с последующей синхронной CPU-bound логикой

от автора

В данной статье мы рассмотрим задачу массовой асинхронной обработки запросов с последующей синхронной и ресурсоёмкой (CPU-bound) логикой. Главная сложность в том, что асинхронный код отлично справляется с большим количеством запросов к внешним сервисам, но CPU-bound вычисления в той же среде могут существенно снизить пропускную способность. Решение — вынести тяжёлую обработку в отдельный пул процессов.

Суть задачи

  1. Имеется список кортежей (url, json_data), которых может быть до 1 000 000:

request_samples = [     ("https://api.example.com/products/segment_a", {"items": ["id1", "id2"]}),     ("https://api.example.com/products/segment_b", {"items": ["id3", "id4"]}),     # ... до 1 000 000 подобных кортежей ] 
  1. Для каждого такого кортежа необходимо:

  • Отправить запрос на указанный url с параметрами json=json_data и получить base_resp.

  • Из base_resp извлечь данные для трёх сервисных запросов (к url_1, url_2, url_3).

  • Получить ответы resp1, resp2, resp3 от этих сервисов.

  • Передать resp1, resp2, resp3 в функцию business_logic(resp1, resp2, resp3), которая выполняет синхронную и CPU-bound обработку.

  • Результат business_logic добавляется к итоговому списку.

  1. Цель — выполнить всю обработку максимально эффективно, не блокируя асинхронный event loop при выполнении тяжелой CPU-bound логики.

Почему ProcessPoolExecutor?

  • CPU-bound обработка: Если операция действительно нагружает CPU, то использование ThreadPoolExecutor может быть неэффективным из-за GIL (Global Interpreter Lock) в CPython.

  • ProcessPoolExecutor запускает обработку в отдельных процессах, что позволяет выполнять CPU-bound задачи параллельно на разных ядрах, минуя ограничения GIL.

Архитектура решения

  • Используем asyncio и aiohttp для асинхронной загрузки данных.

  • После получения трех ответов (resp1, resp2, resp3) для каждого запроса используем ProcessPoolExecutor и run_in_executor, чтобы выполнить business_logic в отдельном процессе.

  • Собираем результаты в единый список.

import asyncio import aiohttp from concurrent.futures import ProcessPoolExecutor  def business_logic(resp1, resp2, resp3):     # Синхронная, CPU-bound обработка данных.     # Здесь может быть сложная логика:     # обработка списков, вычисления, агрегации и т.д.     aggregated = {         "item_count": len(resp1["items"]) + len(resp2["items"]) + len(resp3["items"]),         "details": resp1["details"] + resp2["details"] + resp3["details"]     }     return aggregated  async def fetch(session, url, json_data):     async with session.post(url, json=json_data) as response:         return await response.json()  async def process_single_request(session, url, json_payload, url_1, url_2, url_3, executor):     # 1. Запрос к основному API     base_resp = await fetch(session, url, json_payload)      # Предположим, извлекаем из base_resp список товаров     items_for_services = base_resp.get("data", {}).get("items", [])     service_payload = {"items": items_for_services}      # 2. Запросы к трем сервисным API     resp1 = await fetch(session, url_1, service_payload)     resp2 = await fetch(session, url_2, service_payload)     resp3 = await fetch(session, url_3, service_payload)      # 3. Вызов синхронной CPU-bound логики в отдельном процессе     loop = asyncio.get_event_loop()     aggregated_result = await loop.run_in_executor(         executor, business_logic, resp1, resp2, resp3     )      return aggregated_result  async def main(request_samples, url_1, url_2, url_3, max_parallel=1000):     results = []     conn = aiohttp.TCPConnector(limit=max_parallel)      # Инициализируем пул процессов     # Число процессов можно подобрать на основе числа CPU ядер     executor = ProcessPoolExecutor(max_workers=4)      async with aiohttp.ClientSession(connector=conn) as session:         tasks = [             asyncio.create_task(                 process_single_request(session, url, payload, url_1, url_2, url_3, executor)             )             for url, payload in request_samples         ]          for fut in asyncio.as_completed(tasks):             result = await fut             results.append(result)      return results  if __name__ == "__main__":     # Пример исходных данных     request_samples = [         ("https://api.example.com/products/segment_a", {"items": ["id1", "id2"]}),         ("https://api.example.com/products/segment_b", {"items": ["id3", "id4"]}),         # ... и далее до 1 000 000     ]      url_1 = "https://service-a.example.com/details"     url_2 = "https://service-b.example.com/prices"     url_3 = "https://service-c.example.com/inventory"      final_results = asyncio.run(main(request_samples, url_1, url_2, url_3))     # final_results теперь будет содержать список агрегированных результатов, по одному на каждую запись из request_samples  

Итоги и преимущества

  • Асинхронный ввод-вывод обеспечивает высокую пропускную способность при работе с внешними API.

  • ProcessPoolExecutor решает проблему CPU-bound логики, позволяя эффективно параллелить тяжелые вычисления.

  • Подобный конвейер можно масштабировать, изменяя количество процессов, размер порций запросов и добавляя дополнительные оптимизации (кеширование, rate limiting и т.д.). Таким образом, предложенный подход позволяет эффективно совмещать массовую асинхронную загрузку данных с синхронными, ресурсоёмкими вычислительными задачами.


ссылка на оригинал статьи https://habr.com/ru/articles/869142/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *