Еще раз о многопоточности в одну строку

от автора

Давеча понадобилось мне в моем проекте на Flask ускорить ответ сервера. Из-за того, что во view последовательно вызывается запрос к трём удаленным веб-сервисам, время загрузки страницы с данными не из кеша доходило до 10 сек. Да, возможно, Flask не тот фреймворк, который стоило использовать, но что имеем, то имеем.
Итак, приступим. Поскольку реальный код я публиковать не могу, рассмотрю на академических примерах.

Задача 1 Имеются три функции a, b ,c, которые необходимо вызвать в отдельных потоках, дождаться результата их выполнения и выдать ответ.
Для решения задачи 1 я воспользовался этим переводом, ибо был очарован простотой использования библиотеки.

import multiprocessing.dummy as multiprocessing import time  def a():     time.sleep(2)     return 'a' def b():     time.sleep(2)     return 'b' def c():     time.sleep(1)     return 'c'  p = multiprocessing.Pool()  results = p.map(lambda f: f(),[a,b,c]) print(results) p.close() p.join()  

Результат выполнения кода:

['a', 'b', 'c'] 

Замечательно, но есть существенный минус. Время выполнения кода не ограничено, он будет ждать результата выполнения всех процедур. Изменяем формулировку задачи.

Задача 2 Имеются три функции a, b ,c, которые необходимо вызвать в отдельных потоках, и спустя интервал времени проверить, завершились они или нет, выдать результат.

Для решения используем ту же библиотеку, но уже функцию map_async. Ее отличие в том, что она возвращает объект AsyncResult.

import multiprocessing.dummy as multiprocessing import time  def a():  time.sleep(2)  return 'a'  def b():  time.sleep(2)  return 'b'  def c():  time.sleep(1)  return 'c'  p = multiprocessing.Pool()  result = p.map_async(lambda f: f(),[a,b,c])  TIMEOUT  =3 print(results.get(TIMEOUT))  p.close() p.join()  

Результат выполнения при TIMEOUT>=3 такой же, как и в предыдущем случае, но если хоть одна из процедур не успевает завершится, выдается исключение TimeoutError. Однако и этот результат меня устроил не вполне. Дело в том, что в моем случае мне существенно было, чтобы успевала отработать одна функция, остальные могли и отсутствовать при выдаче.

Задача 3 Имеются три функции a, b ,c, которые необходимо вызвать в отдельных потоках, дождаться результата функции a.

import multiprocessing.dummy as multiprocessing import time   def a():  time.sleep(2)  print(1)  return 'a'   def b():  time.sleep(3)  print(2)  return 'b'  def c():  time.sleep(1)  print(3)  return 'c'  p = multiprocessing.Pool()  results=[] for r  in p.imap(lambda f: f(),[a,b,c]):  results.append(r)  break  print(results) p.close() p.join()   

Результат выполнения:

3 1 ['a'] 2  

Как видно, хотя отработали 2 функции из 3, результат мы получили только для приоритетной. Чтобы получить результат второй, следует использовать imap_unordered:

results=[] for r  in p.imap_unordered(lambda f: f(),[a,b,c]):  results.append(r) if  r =='a': 	break 

Результат:

3 1 ['c', 'a'] 2 

Что, если нам в основном потоке нужен результат только одного потока, наиболее быстрого? Достаточно убрать вызов p.join() из предыдущего примера и выйти из цикла по первому результату.

Теперь еще такой момент. При попытке использовать модуль multiprocessing, который работает с процессами, вместо multiprocessing.dummy, работающего с тредами будет выдана ошибка сериализации cPickle.PicklingError, поскольку при межпроцессном взаимодействии не удается сериализовать функцию. Для того, чтобы код работал, нужно ввести функцию-псевдоним, код будет не настолько красив, но:

import multiprocessing import time  def a():     time.sleep(2)     return 'a' def b():     time.sleep(2)     return 'b' def c():     time.sleep(1)     return 'c'  def func(param):  if param == 'a':     return a()  elif param == 'b':     return b()  elif param == 'c':     return c()  p = multiprocessing.Pool()  results = p.map(func,['a','b','c']) print(results) p.close() p.join() 

ссылка на оригинал статьи http://habrahabr.ru/post/260431/