В данной статье мы рассмотрим задачу массовой асинхронной обработки запросов с последующей синхронной и ресурсоёмкой (CPU-bound) логикой. Главная сложность в том, что асинхронный код отлично справляется с большим количеством запросов к внешним сервисам, но CPU-bound вычисления в той же среде могут существенно снизить пропускную способность. Решение — вынести тяжёлую обработку в отдельный пул процессов.
Суть задачи
-
Имеется список кортежей (url, json_data), которых может быть до 1 000 000:
request_samples = [ ("https://api.example.com/products/segment_a", {"items": ["id1", "id2"]}), ("https://api.example.com/products/segment_b", {"items": ["id3", "id4"]}), # ... до 1 000 000 подобных кортежей ]
-
Для каждого такого кортежа необходимо:
-
Отправить запрос на указанный url с параметрами json=json_data и получить base_resp.
-
Из base_resp извлечь данные для трёх сервисных запросов (к url_1, url_2, url_3).
-
Получить ответы resp1, resp2, resp3 от этих сервисов.
-
Передать resp1, resp2, resp3 в функцию business_logic(resp1, resp2, resp3), которая выполняет синхронную и CPU-bound обработку.
-
Результат business_logic добавляется к итоговому списку.
-
Цель — выполнить всю обработку максимально эффективно, не блокируя асинхронный event loop при выполнении тяжелой CPU-bound логики.
Почему ProcessPoolExecutor?
-
CPU-bound обработка: Если операция действительно нагружает CPU, то использование ThreadPoolExecutor может быть неэффективным из-за GIL (Global Interpreter Lock) в CPython.
-
ProcessPoolExecutor запускает обработку в отдельных процессах, что позволяет выполнять CPU-bound задачи параллельно на разных ядрах, минуя ограничения GIL.
Архитектура решения
-
Используем asyncio и aiohttp для асинхронной загрузки данных.
-
После получения трех ответов (resp1, resp2, resp3) для каждого запроса используем ProcessPoolExecutor и run_in_executor, чтобы выполнить business_logic в отдельном процессе.
-
Собираем результаты в единый список.
import asyncio import aiohttp from concurrent.futures import ProcessPoolExecutor def business_logic(resp1, resp2, resp3): # Синхронная, CPU-bound обработка данных. # Здесь может быть сложная логика: # обработка списков, вычисления, агрегации и т.д. aggregated = { "item_count": len(resp1["items"]) + len(resp2["items"]) + len(resp3["items"]), "details": resp1["details"] + resp2["details"] + resp3["details"] } return aggregated async def fetch(session, url, json_data): async with session.post(url, json=json_data) as response: return await response.json() async def process_single_request(session, url, json_payload, url_1, url_2, url_3, executor): # 1. Запрос к основному API base_resp = await fetch(session, url, json_payload) # Предположим, извлекаем из base_resp список товаров items_for_services = base_resp.get("data", {}).get("items", []) service_payload = {"items": items_for_services} # 2. Запросы к трем сервисным API resp1 = await fetch(session, url_1, service_payload) resp2 = await fetch(session, url_2, service_payload) resp3 = await fetch(session, url_3, service_payload) # 3. Вызов синхронной CPU-bound логики в отдельном процессе loop = asyncio.get_event_loop() aggregated_result = await loop.run_in_executor( executor, business_logic, resp1, resp2, resp3 ) return aggregated_result async def main(request_samples, url_1, url_2, url_3, max_parallel=1000): results = [] conn = aiohttp.TCPConnector(limit=max_parallel) # Инициализируем пул процессов # Число процессов можно подобрать на основе числа CPU ядер executor = ProcessPoolExecutor(max_workers=4) async with aiohttp.ClientSession(connector=conn) as session: tasks = [ asyncio.create_task( process_single_request(session, url, payload, url_1, url_2, url_3, executor) ) for url, payload in request_samples ] for fut in asyncio.as_completed(tasks): result = await fut results.append(result) return results if __name__ == "__main__": # Пример исходных данных request_samples = [ ("https://api.example.com/products/segment_a", {"items": ["id1", "id2"]}), ("https://api.example.com/products/segment_b", {"items": ["id3", "id4"]}), # ... и далее до 1 000 000 ] url_1 = "https://service-a.example.com/details" url_2 = "https://service-b.example.com/prices" url_3 = "https://service-c.example.com/inventory" final_results = asyncio.run(main(request_samples, url_1, url_2, url_3)) # final_results теперь будет содержать список агрегированных результатов, по одному на каждую запись из request_samples
Итоги и преимущества
-
Асинхронный ввод-вывод обеспечивает высокую пропускную способность при работе с внешними API.
-
ProcessPoolExecutor решает проблему CPU-bound логики, позволяя эффективно параллелить тяжелые вычисления.
-
Подобный конвейер можно масштабировать, изменяя количество процессов, размер порций запросов и добавляя дополнительные оптимизации (кеширование, rate limiting и т.д.). Таким образом, предложенный подход позволяет эффективно совмещать массовую асинхронную загрузку данных с синхронными, ресурсоёмкими вычислительными задачами.
ссылка на оригинал статьи https://habr.com/ru/articles/869142/
Добавить комментарий