Параллельные вычисления, конкурентность и асинхронное программирование в Python: обзор для начинающих

от автора

Однопоточные приложения на Python ограничены в производительности: они выполняют задачи последовательно и не используют преимущества многоядерных процессоров. Кроме того, такие программы не справляются с обработкой множества операций одновременно, особенно если речь идет о задачах, связанных с вводом-выводом, например сетевыми запросами или чтением файлов.

Производительность можно значительно улучшить, внедрив в код параллельные вычисления, конкурентность или асинхронное программирование. Для этого Python предлагает такие инструменты, как multiprocessing, threading и asyncio.

Multiprocessing, threading и asyncio: в чем разница?

Для начала вспомним, что такое потоки и процессы.

Процессы — это отдельные программы, каждая со своей памятью, которые могут работать параллельно на разных ядрах процессора.

Потоки — это части одного процесса, которые одновременно выполняют задачи и используют для этого общую память.

  • Модуль threading запускает потоки для конкурентной работы внутри одного процесса. 

  • Библиотека asyncio управляет задачами асинхронно, без дополнительных потоков или процессов. 

  • Модуль multiprocessing создает процессы для параллельного выполнения на нескольких ядрах.

Разберем, как эти инструменты работают и где используются.

Многопоточность и конкурентное выполнение задач

Модуль threading позволяет запускать несколько потоков внутри одного процесса для конкурентного выполнения задач. 

Конкурентность — это способность программы управлять несколькими задачами так, чтобы они выполнялись одновременно с точки зрения пользователя, даже если физически они чередуются. Например, пока одна задача ждет ответа от сервера, другая может обработать данные.

Потоки в threading работают схожим образом — они переключаются между задачами и разделяют общую память и ресурсы процесса.

Работа с threading

Основной инструмент модуля — класс Thread, который позволяет запускать функции в отдельных потоках. 

Базовый синтаксис:

import threading    def task(name):      print(f"Задача {name} выполняется в потоке {threading.current_thread().name}")    t = threading.Thread(target=task, args=("A",))  t.start()  t.join()

Здесь метод start() запускает поток, а join() заставляет основной код ждать его завершения.

GIL и его влияние на многопоточность

В стандартной реализации Python (CPython) есть глобальная блокировка интерпретатора (GIL), которая ограничивает многопоточность. GIL позволяет только одному потоку выполняться в интерпретаторе в любой момент времени, даже на многоядерных процессорах. 

Это означает, что потоки в threading не могут параллельно использовать несколько ядер для вычислений. Поэтому они неэффективны для задач CPU-bound, где скорость зависит от процессора (например, сложных математических расчетов или обработки больших массивов данных).

А вот в задачах I/O-bound, где программа много времени ждет внешних операций, таких как сетевые запросы или чтение файлов, GIL не мешает. Потоки переключаются между собой, пока одна операция ждет завершения.

Синхронизация потоков: Lock, Semaphore, Event

Поскольку потоки делят общую память, могут возникать конфликты, например если два потока одновременно изменяют одну переменную. Для их предотвращения используются инструменты синхронизации:

  • Lock блокирует доступ к ресурсу, разрешая только одному потоку работать с ним в данный момент. Это предотвращает «гонку данных», когда результат зависит от порядка выполнения потоков.

Пример: 

import threading  import time    shared_resource = 0  lock = threading.Lock()    def increment():      global shared_resource      thread_name = threading.current_thread().name      print(f"Поток {thread_name} пытается увеличить значение")      with lock:          current_value = shared_resource  # Имитация небольшой задержки          time.sleep(0.1)          shared_resource = current_value + 1          print(f"Поток {thread_name} увеличил значение до {shared_resource}")    threads = [threading.Thread(target=increment) for  in range(10)]  for t in threads:      t.start()  for t in threads:      t.join()  print(f"Итоговое значение: {sharedresource}")

Здесь Lock гарантирует, что shared_resource увеличивается ровно 10 раз, а не теряет значения из-за одновременных изменений. 

Вывод покажет, как разные потоки по очереди получают доступ к ресурсу, ожидая, пока предыдущий освободит блокировку. Без Lock результат мог бы быть меньше 10 из-за наложения операций. 

  • Semaphore ограничивает количество потоков, которые могут одновременно обращаться к ресурсу. Это полезно, когда доступ нужно дать не одному, а нескольким потокам, но не всем сразу.

Пример: 

import threading  import time    # Ставим ограничение на максимум 2 одновременных потока  semaphore = threading.Semaphore(2)    def task(name):      with semaphore:          print(f"Поток {name} начал работу")          time.sleep(1)          print(f"Поток {name} завершил работу")    threads = [threading.Thread(target=task, args=(i,)) for i in range(5)]  for t in threads:      t.start()  for t in threads:      t.join()

В этом случае только два потока работают одновременно, остальные ждут, пока освободится «место». Вывод покажет, что задачи выполняются партиями по две. 

  • Event позволяет одному потоку ждать сигнала от другого, чтобы продолжить работу. Это удобно для координации действий между потоками.

Пример: 

import threading  import time    event = threading.Event()    def waiter():      print("Ожидаю сигнала...")      event.wait()      print("Сигнал получен, продолжаю работу")    def signaler():      time.sleep(2)      print("Отправляю сигнал")      event.set()    t1 = threading.Thread(target=waiter)  t2 = threading.Thread(target=signaler)  t1.start()  t2.start()  t1.join()  t2.join()

Здесь поток waiter ждет, пока signaler не вызовет event.set(), чтобы продолжить выполнение.

Примеры многопоточного программирования

Модуль threading особенно полезен в ситуациях, где задачи тратят много времени на ожидание внешних ресурсов (задачи типа I/O-bound). То есть скорость выполнения задач ограничена не вычислениями процессора, а операциями ввода-вывода (input/output), которые часто связаны с длительными задержками. 

Потоки позволяют переключаться между ними: пока одна задача ждет завершения операции, другая может начать работу. 

Рассмотрим два примера: загрузку веб-страниц и фоновую обработку задач.

Конкурентная загрузка URL

Загрузка данных из интернета — классический пример задачи ввода-вывода, где последовательное выполнение может быть неэффективным.

import threading  import urllib.request    urls = ["https://python.org", "https://example.com"]  def fetch_url(url):      response = urllib.request.urlopen(url)      print(f"Загружен {url}, длина: {len(response.read())}")    threads = [threading.Thread(target=fetch_url, args=(url,)) for url in urls]  for t in threads:      t.start()  for t in threads:      t.join()

Этот код создает отдельный поток для каждого URL, запускает их с помощью start() и ждет завершения через join(). В результате страницы загружаются конкурентно, что быстрее, чем последовательное выполнение, особенно если сетевые задержки значительны.

Обработка задач в фоновом режиме 

Потоки можно использовать для выполнения задач, таких как логирование или мониторинг, пока основная программа продолжает работу. 

Пример с фоновым логированием

import threading  import time    def background_logger(logs):      while True:          if logs:              print(f"Лог: {logs.pop(0)}")          time.sleep(1)    def main_task(logs):      for i in range(5):          logs.append(f"Событие {i} в основном потоке")          print(f"Основной поток работает: шаг {i}")          time.sleep(0.5)    logs = []  logger_thread = threading.Thread(target=background_logger, args=(logs,), daemon=True)  logger_thread.start()  main_task(logs)

В этом примере поток background_logger работает в фоновом режиме, периодически выводя сообщения из списка logs, пока основной поток выполняет свою задачу. Атрибут daemon=True означает, что фоновый поток завершится вместе с основной программой.

Асинхронное программирование с asyncio

Асинхронное программирование позволяет выполнять задачи без блокировки основного потока, что особенно полезно для I/O-bound-задач. В отличие от threading, где для конкурентности используются потоки, asyncio управляет задачами в одном потоке через цикл событий. 

Это снижает накладные расходы на создание потоков и подходит для приложений с большим количеством операций ввода-вывода, например веб-скрейперов или серверов.

В чем отличие синхронного кода от асинхронного?

Асинхронный код отличается от синхронного тем, что он не блокирует выполнение программы во время ожидания операций, а переключается между задачами в одном потоке через цикл событий.

  • Синхронный код: последовательное выполнение с блокировкой, сравнение с очередью. 

  • Асинхронный код: запуск задач без ожидания, переключение через цикл событий.

Основы asyncio: async и await

В основе библиотеки asyncio сопрограммы (корутины) с ключевыми словами async и await, введенные в Python 3.5. Они позволяют писать асинхронный код, похожий на синхронный по читаемости.

  • async def определяет корутину, которая может приостанавливаться, не блокируя выполнение программы.

  • await указывает, где корутина должна ждать завершения другой асинхронной операции, освобождая управление для других задач.

Пример:

import asyncio    async def say_hello():      print("Начинаем...")      # Ждем 1 секунду, не блокируя      await asyncio.sleep(1)      print("Привет!")    async def main():      # Запускаем корутину      await say_hello()    # Запускаем программу  asyncio.run(main())

Здесь asyncio.sleep(1) имитирует асинхронную операцию (например, запрос к серверу), а await позволяет программе продолжать работу, пока ожидание не завершится. Функция asyncio.run() — стандартный способ запускать асинхронный код.

Event Loop и управление задачами

Асинхронность в asyncio работает благодаря циклу событий (Event Loop) — механизму, который управляет выполнением корутин, переключаясь между ними, когда они ждут операций. 

Пока одна корутина ждет ответа от сети, другая может обработать свои данные.

Для планирования нескольких задач используется asyncio.create_task():

async def task(name):      print(f"Задача {name} началась")      await asyncio.sleep(1)      print(f"Задача {name} завершилась")    async def main():      task1 = asyncio.create_task(task("A"))      task2 = asyncio.create_task(task("B"))      await task1  # Ждем первую задачу      await task2  # Ждем вторую    asyncio.run(main())

В этом примере две задачи запускаются конкурентно, и цикл событий переключается между ними. Метод create_task() превращает корутину в задачу, которую asyncio выполняет независимо.

Асинхронные HTTP-запросы 

Asyncio работает с другими библиотеками, например aiohttp для быстрой, конкурентной загрузки страниц. 

В отличие от синхронной загрузки страниц с помощью urllib.request, где каждая страница загружается последовательно, aiohttp отправляет запросы к нескольким URL одновременно, не блокируя основной поток.

Пример:

import asyncio  import aiohttp    async def fetch_url(url):      async with aiohttp.ClientSession() as session:          async with session.get(url) as response:              content = await response.text()              print(f"Загружен {url}, длина: {len(content)}")    async def main():      urls = ["https://python.org", "https://example.com"]      tasks = [asyncio.create_task(fetch_url(url)) for url in urls]      await asyncio.wait(tasks)    asyncio.run(main())

В итоге если одна страница отвечает медленно, то другая ее не ждет. Это значительно сокращает общее время по сравнению с синхронным кодом, где задержки суммируются.

Работа с сокетами

Asyncio позволяет создавать асинхронные серверы и клиенты для обработки соединений. 

Рассмотрим пример, где сервер принимает сообщения от клиентов и отправляет их обратно, преобразовав текст в заглавные буквы: если клиент отправляет «привет», сервер отвечает «ПРИВЕТ».

import asyncio    async def handle_client(reader, writer):      data = await reader.read(100)      writer.write(data.upper())      await writer.drain()      writer.close()    async def main():      server = await asyncio.start_server(handle_client, "127.0.0.1", 8888)      async with server:          await server.serve_forever()    asyncio.run(main())

Этот сервер принимает сообщения и отвечает заглавными буквами, обрабатывая множество клиентов одновременно.

Многопроцессность и параллельные вычисления

Параллельные вычисления — это подход, при котором несколько вычислительных задач выполняются одновременно, распределяясь по доступным ресурсам, таким как ядра процессора. Этот метод позволяет ускорить выполнение программ. 

Одна из форм параллельных вычислений — многопроцессность, при которой задачи распределяются между несколькими независимыми процессами. В Python она реализована с помощью модуля multiprocessing.

Работа с задачами CPU-bound

Многопроцессность реализуется с собственным интерпретатором Python в обход GIL. Это позволяет параллельно задействовать все ядра процессора и обеспечивать настоящий параллелизм, а не только конкурентность, как в threading или asyncio. 

Поэтому модуль multiprocessing — лучший выбор для задач CPU-bound, в отличие от I/O-bound сценариев, где лучше работают потоки или асинхронность.

Как работает модуль multiprocessing

Модуль multiprocessing создает отдельные процессы, каждый из которых работает со своим собственным интерпретатором Python. 

Пример:

from multiprocessing import Process    def square(number):      print(f"Квадрат числа {number}: {number * number}")    if name == "__main__":      processes = []      for i in range(5):          p = Process(target=square, args=(i,))          processes.append(p)          p.start()      for p in processes:          p.join()

В этом примере создается пять процессов, каждый из которых вычисляет квадрат числа. Метод start() запускает процесс, а join() ожидает его завершения.

В multiprocessing есть инструменты:

  • Process класс для создания и управления отдельным процессом. 

Нужно указать целевую функцию (target) и аргументы (args), после чего процесс запускается с помощью метода start().

Пример:

from multiprocessing import Process    def square(number):      result = number ** 2      print(f"Квадрат числа {number}: {result}")    if name == "__main__":      processes = []      for i in range(5):          p = Process(target=square, args=(i,))          processes.append(p)          p.start()      for p in processes:          p.join()

Здесь создаем пять процессов, каждый из которых вычисляет квадрат числа. Проверка if name == «__main__» обязательна, чтобы избежать рекурсивного импорта при запуске процессов. 

Это особенно критично на Windows, где процессы создаются через spawn и импортируют модуль заново, но также важно для переносимости кода на другие платформы, такие как Linux или macOS, где поведение зависит от метода запуска.

  • Pool класс для создания пула процессов, который автоматически распределяет задачи между несколькими процессами. Метод map() позволяет применить функцию к списку данных и разделяет работу между процессами в пуле.

Пример:

from multiprocessing import Pool    def cube(number):      return number ** 3    if name == "__main__":  # Определяем пул из 3 процессов      with Pool(3) as pool:            results = pool.map(cube, [1, 2, 3, 4, 5])      print(results)

В этом случае Pool распределяет вычисление кубов чисел между тремя процессами. Каждый процесс берет часть списка [1, 2, 3, 4, 5], выполняет задачу и возвращает результат, который собирается в итоговый список.

Сравнение подходов

Давайте разберем, чем отличаются друг от друга все три подхода: конкурентность, асинхронность и параллелизм.

multiprocessing

threading

asyncio

Использует процессы, обходит GIL, подходит для CPU-bound-задач (вычисления, обработка данных). 

Требует больше ресурсов, чем потоки, из-за изоляции процессов.

Работает с потоками в одном процессе, ограничен GIL, эффективен для I/O-bound-задач (сеть, файлы), где ожидание преобладает над вычислениями. 

Легче и быстрее в создании, чем процессы.

Асинхронность в одном потоке через цикл событий, идеален для I/O-bound-задач с большим количеством операций (например, веб-запросы). 

Не подходит для CPU-bound-задач, но минимизирует накладные расходы.

Оптимизация производительности кода

Чтобы оптимизировать код, нужно сперва найти узкие места производительности с помощью профилирования. 

Профилирование — это анализ кода для выявления узких мест, то есть участков, где программа тратит больше всего времени или ресурсов.

В Python есть несколько инструментов для этого:

  • cProfile встроенный модуль, который измеряет время выполнения каждой функции и количество их вызовов.

Пример:

import cProfile  def slow_function():      return sum(i * i for i in range(1000000))  cProfile.run("slow_function()")

Вывод покажет таблицу с колонками: ncalls (число вызовов), tottime (общее время в функции), cumtime (время с учетом подфункций). Например, если цикл занимает 0,5 секунды, вы увидите, что именно он — узкое место.

  • time простой инструмент для замера общего времени выполнения, подойдет для сравнения подходов (например, последовательного и параллельного).

Пример: 

import time  from multiprocessing import Pool    def square(n):      return n * n    start = time.time()  with Pool(4) as pool:      pool.map(square, range(1000))  print(f"Время: {time.time() - start:.2f} сек")

Это позволяет быстро проверить, ускоряет ли multiprocessing код (например, 0,1 сек вместо 0,4 сек в однопоточном варианте).

  • Другие инструменты . Модуль line_profiler показывает время выполнения каждой строки кода (требует установки через pip install line_profiler), а Py-Spy визуальный профайлер, который строит графики работы программы в реальном времени.

После профилирования используйте результаты для оптимизации:

  1. Определите тип задачи. Если узкое место — вычисления (CPU-bound), переходите на multiprocessing. Для ожидания ввода-вывода (I/O-bound) — asyncio или threading.

  2. Параллелизуйте тяжелые операции. Если cProfile показывает, что функция занимает много времени, разбейте данные и используйте Pool для распределения по ядрам.

  3. Минимизируйте накладные расходы. Не создавайте процессы или потоки для мелких задач — запуск Process дороже, чем выгода от параллелизма на коротких операциях.

  4. Тестируйте изменения. Замеряйте время с time до и после оптимизации, чтобы убедиться, что многопроцессность или асинхронность действительно ускоряют код.

Обучиться с нуля на программиста и растить навыки до техлида или архитектора ПО можно на магистратуре МИФИ и Skillfactory по направлению «Программная инженерия». Оттачивайте навыки на реальных задачах от бизнеса и набирайтесь опыта.


ссылка на оригинал статьи https://habr.com/ru/articles/896160/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *