Асинхронность: синхронизируем с помощью Mojo::IOLoop::Delay
Mojo::IOLoop::Delay предоставляет механизм, обеспечивающий для асинхронно выполняющихся callback-ов:
- описание последовательно выполняющихся операций без «лапши» callback-ов
- передачу результатов из callback-а(ов) текущего шага на следующий
- общие данные для callback-ов, объединённых в одну задачу
- синхронизацию групп callback-ов
- перехват и обработку исключений в callback-ах
Используемые термины:
- (асинхронная) операция — обычно это вызов асинхронной функции вроде таймера или выкачивания url, которой необходимо передать callback
- шаг — callback, который анализирует данные полученные с предыдущего шага (если это не первый шаг), и запускает одну или несколько новых операций, либо возвращает финальный результат (если это последний шаг)
- задача — список шагов, которые должны выполняться последовательно (т.е. следующий шаг вызывается только после того, как все операции запущенные на предыдущем шаге завершаются)
Альтернатива Promises
Это альтернативный подход к проблеме, обычно решаемой с помощью Promise/Deferred или Future. Вот приблизительное сравнение со спецификацией Promises/A+:
- Вместо цепочки
->then(\&cb1)->then(\&cb2)->…
используется один вызов->steps(\&cb1, \&cb2, …)
. - Вместо передачи обработчика ошибки вторым параметром в
->then()
он устанавливается через->catch()
. Следствие: на все шаги этой задачи может быть только один обработчик ошибок. - Результат возвращается через
->pass()
, но в отличие от->resolve()
в большинстве случаев он вызывается неявно — асинхронной операции в качестве callback передаётся результат вызова генератора анонимных функций->begin
, и возвращённая им функция автоматически делает->pass()
, передавая срез своих параметров (т.е. результата работы асинхронной операции) на следующий шаг. Следствие: не нужно писать для каждой асинхронной функции callback, который будет возвращённый ею результат преобразовывать в->resolve()
и->reject()
. - Ошибки возвращаются только через исключения, аналога
->reject()
нет. - Есть дополнительный шаг выполняемый в самом конце
->on(finish=>\&cb)
, на который также можно перейти из обработчика ошибок. - Есть поддержка групп асинхронных операций: если на текущем шаге запустить несколько операций, то следующий шаг будет вызван когда все они завершатся.
- Есть хранилище пользовательский данных, доступное всем шагам текущей задачи.
По этим отличиям виден типичный для Mojo подход: всё что можно упрощено и предоставлены удобные «ленивчики» для типичных задач.
Что осталось за кадром
Я не буду описывать работу ->wait
, с ним всё просто и понятно из официальной документации.
Кроме того, есть синонимы/альтернативы:
Mojo::IOLoop->delay(@params) # это полный аналог более длинного: Mojo::IOLoop::Delay->new->steps(@params)
$delay->catch(\&cb) # это более удобный (т.к. возвращает $delay, а не \&cb, # что позволяет продолжить цепочку вызовов) аналог: $delay->on(error=>\&cb)
$delay→begin
Это ключевая функция, без неё использовать Mojo::IOLoop::Delay не получится. Каждый вызов ->begin
увеличивает счётчик запущенных (обычно асинхронных) операций и возвращает ссылку на новую анонимную функцию. Эту возвращённую функцию необходимо однократно вызвать по завершению операции — она уменьшит счётчик запущенных операций и позволит передать результаты операции на следующий шаг (который будет запущен когда счётчик дойдёт до нуля).
Есть два способа использования ->begin
: вручную и автоматически.
В первом варианте функция возвращённая ->begin
запоминается во временной переменной и по завершению операции вызывается вручную:
my $delay = Mojo::IOLoop->delay; for my $i (1 .. 10) { my $end = $delay->begin; Mojo::IOLoop->timer($i => sub { say 10 - $i; $end->(); }); }
Во втором варианте функция возвращённая ->begin
используется в качестве callback для операции:
my $delay = Mojo::IOLoop->delay; for my $i (1 .. 10) { Mojo::IOLoop->timer($i => $delay->begin); }
В обоих вариантах если определить для $delay
следующий (в данном случае он же первый и единственный) шаг, то он будет вызван после завершения всех 10-ти операций:
$delay->step(sub{ say "all timers done" });
В данном примере есть проблема: во втором варианте не выполняется say 10 - $i
т.к. таймер не передаёт никаких параметров своему callback, и мы не можем узнать значение $i
в callback если только не заклозурим его как в первом варианте. Но даже если бы таймер передавал $i
параметром в callback вам бы это всё-равно не сильно помогло, т.к. шанс выполнить все десять say 10 - $i
мы бы получили только на следующем шаге, а он запустится только после завершения всех таймеров — т.е. пропадёт эффект обратного отсчёта, когда say
выполнялся раз в секунду.
В таких, редких, ситуациях необходимо использовать первый «ручной» вариант работы с ->begin
. Но во всех остальных намного лучше использовать второй вариант: это избавит от временной переменной, «лапши» callback-ов, и даст возможность использовать (точнее, перехватывать) исключения в callback-ах (исключение в обычном callback-е — не «шаге» — попадёт не в $delay->catch
а в обработчик исключений event loop и, по умолчанию, будет проигнорировано).
Функции ->begin
можно передать параметры, и на первый взгляд (в официальную документацию) они могут выглядеть не очень понятно. Суть в том, что когда функция возвращаемая ->begin
используется не в ручном варианте (когда вы сами её вызываете и контролируете с какими параметрами она будет вызвана), а в качестве непосредственного callback для операции, то она будет вызвана с теми параметрами, с которыми её вызовет эта операция. И все эти параметры вы получите как результат этой операции в параметрах следующего шага.
Например, $ua->get($url,\&cb)
передаёт в callback два параметра: ($ua, $tx)
, и если на одном шаге запустить выкачку 3-х url, то следующий шаг получит 6 параметров (каждый шаг получает первым обязательным параметром объект $delay, а зачем в этом примере используется ->begin(0)
я скоро объясню):
Mojo::IOLoop->delay( sub { my ($delay) = @_; $ua->get($url1, $delay->begin(0)); $ua->get($url2, $delay->begin(0)); $ua->get($url3, $delay->begin(0)); }, sub { my ($delay, $ua1,$tx1, $ua2,$tx2, $ua3,$tx3) = @_; }, );
При этом все три $ua
полученные вторым шагом будут одинаковыми. Поскольку это типичная ситуация, ->begin
даёт вам возможность контролировать, какие именно из переданных операцией параметров он должен передать на следующий шаг. Для этого он принимает два параметра: индекс первого параметра и их количество — чтобы передать на следующий шаг срез. По умолчанию ->begin
работает как ->begin(1)
— т.е. передаёт на следующий шаг все параметры переданные операцией кроме первого:
Mojo::IOLoop->delay( sub { my ($delay) = @_; $ua->get($url1, $delay->begin); $ua->get($url2, $delay->begin); $ua->get($url3, $delay->begin); }, sub { my ($delay, $tx1, $tx2, $tx3) = @_; }, );
$delay→data
В принципе с ->data
всё банально: хеш, доступный всем шагам — альтернатива передаче данных с одного шага на другой через параметры.
Mojo::IOLoop->delay( sub { my ($delay) = @_; $delay->data->{key} = 'value'; ... }, sub { my ($delay) = @_; say $delay->data->{key}; }, );
Альтернативой является использование клозур, что выглядит более лениво, привычно и читабельно:
sub do_task { my $key; Mojo::IOLoop->delay( sub { $key = 'value'; ... }, sub { say $key; }, ); }
Но здесь вас поджидает неприятный сюрприз. Клозуры живут пока кто-то на них ссылается. А по мере выполнения шагов Mojo удаляет их из памяти. Таким образом, когда будет выполнен последний шаг, ссылавшийся на заклозуренную переменную — она тоже будет удалена. Что приводит к неприятному эффекту, если эта переменная была, например, объектом Mojo::UserAgent:
sub do_task { my $ua = Mojo::UserAgent->new->max_redirects(5); Mojo::IOLoop->delay( sub { my ($delay) = @_; $ua->get($url1, $delay->begin); $ua->get($url2, $delay->begin); $ua->get($url3, $delay->begin); }, sub { my ($delay, $tx1, $tx2, $tx3) = @_; # все $tx будут с ошибкой "соединение разорвано" }, ); }
Как только первый шаг запустит неблокирующие операции выкачки url, завершится, и будет удалён из памяти — вместе с ним будет удалена и переменная $ua
, т.к. больше нет шагов, которые на неё ссылаются. А как только будет удалена $ua
все открытые соединения, относящиеся к ней, будут разорваны и их callback-и будут вызваны с ошибкой в параметре $tx
.
Один из вариантов решения этой проблемы — использовать ->data
для гарантирования времени жизни клозур не меньше, чем время выполнения всей задачи:
sub do_task { my $ua = Mojo::UserAgent->new->max_redirects(5); Mojo::IOLoop->delay->data(ua=>$ua)->steps( sub { my ($delay) = @_; $ua->get($url1, $delay->begin); $ua->get($url2, $delay->begin); $ua->get($url3, $delay->begin); }, sub { my ($delay, $tx1, $tx2, $tx3) = @_; # все $tx будут с результатами }, ); }
finish
Устанавливать обработчик события «finish» не обязательно, но во многих случаях очень удобно последний шаг указать не после остальных шагов, а обработчиком события «finish». Это вам даст следующие возможности:
- Если используется обработчик исключений
->catch
, и бывают не фатальные ошибки, после которых всё-таки имеет смысл штатно завершить текущую задачу выполнив последний шаг — обработчик исключений сможет передать управление обработчику «finish» через->emit("finish",@results)
, но не сможет обычному шагу. - Если финальный результат получен на промежуточном шаге, то чтобы передать его на последний шаг нужно реализовать ручной механизм «прокидывания» готового результата через все шаги между ними — но если вместо последнего шага используется обработчик «finish», то можно сразу вызвать его через
->remaining([])->pass(@result)
.- Так же нужно учитывать, что если этот шаг успел запустить какие-то операции до передачи результатов в «finish», то обработчик «finish» будет запущен только после того, как эти операции завершатся, причём он получит параметрами не только вышеупомянутый
@result
, но и всё что вернут операции.
- Так же нужно учитывать, что если этот шаг успел запустить какие-то операции до передачи результатов в «finish», то обработчик «finish» будет запущен только после того, как эти операции завершатся, причём он получит параметрами не только вышеупомянутый
ВНИМАНИЕ! Делать ->emit("finish")
можно только внутри обработчика исключений, а в обычном шаге нельзя. При этом в обычном шаге это же делается через ->remaining([])->pass(@result)
, но в обработчике исключений это не сработает.
$delay→pass
Очень часто шаг запускает операции условно — внутри if
или в цикле, у которого может быть 0 итераций. В этом случае, как правило, необходимо чтобы этот шаг (обычно в самом начале или конце) вызвал:
$delay->pass;
Эта команда просимулирует запуск одной операции, которая тут же завершилась и вернула пустой список в качестве результата. Поскольку она вернула пустой список, то этот её «запуск» никак не скажется на параметрах, которые получит следующий шаг.
Дело в том, что если шаг не запустит ни одной операции вообще, то он будет считаться последним шагом (что логично — следующему шагу уже нечего «ожидать» так что в нём пропадает смысл). Иногда такой способ завершить выполнение задачи подходит, но если вы установили обработчик «finish», то он будет вызван после этого шага, причём получит параметрами параметры этого шага — что, как правило, не то, чего вы хотели.
Пример сложного парсера
Давайте рассмотрим пример, в котором используется почти всё вышеописанное. Предположим, что нам нужно скачать данные с сайта. Сначала нужно залогиниться ($url_login
), потом перейти на страницу со списком нужных записей ($url_list
), для некоторых записей может быть доступна ссылка на страницу с деталями, а на странице с деталями могут быть ссылки на несколько файлов «приаттаченных» к этой записи, которые необходимо скачать.
sub parse_site { my ($user, $pass) = @_; # сюда будем накапливать данные в процессе выкачки: # @records = ( # { # key1 => "value1", # … # attaches => [ "content of file1", … ], # }, # … # ); my @records; # каждой запущенной задаче нужен свой $ua, т.к. можно запустить # несколько одновременных выкачек с разными $user/$pass, и нужно # чтобы в $ua разных задач были разные куки my $ua = Mojo::UserAgent->new->max_redirects(5); # запускаем задачу, удерживая $ua до конца задачи Mojo::IOLoop->delay->data(ua=>$ua)->steps( sub { $ua->post($url_login, form=>{user=>$user,pass=>$pass}, shift->begin); }, sub { my ($delay, $tx) = @_; die $tx->error->{message} if $tx->error; # проверим ошибку аутентификации if (!$tx->res->dom->at('#logout')) { die 'failed to login: bad user/pass'; } # всё в порядке, качаем список записей $ua->get($url_list, $delay->begin); }, sub { my ($delay, $tx) = @_; die $tx->error->{message} if $tx->error; # если записей на странице не будет и никаких операций # на этом шаге не запустится - перейдём на следующий шаг $delay->pass; # считаем все записи for ($tx->res->dom('.record')->each) { # парсим обычные поля текущей записи my $record = { key1 => $_->at('.key1')->text, # … }; # добавляем эту запись к финальному результату push @records, $record; # если есть страница с деталями - качаем if (my $a = $_->at('.details a')) { # качаем страницу с деталями и приаттаченные к ней # файлы как отдельную задачу - это немного # усложнит, но зато ускорит процесс т.к. можно # будет одновременно качать и страницы с # деталями и файлы приаттаченные к уже скачанным # страницам (плюс при таком подходе мы лениво # клозурим $record и не нужно думать как привязать # конкретную страницу с деталями к конкретной # записи) - альтернативой было бы поставить на # выкачку только страницы с деталями, а на # следующем шаге основной задачи когда все # страницы с деталями скачаются ставить на выкачку # приаттаченные файлы Mojo::IOLoop->delay( sub { $ua->get($a->{href}, shift->begin); }, sub { my ($delay, $tx) = @_; die $tx->error->{message} if $tx->error; # если файлов не будет - идём на след.шаг $delay->pass; # качаем 0 или более приаттаченных файлов $tx->res->dom('.file a')->each(sub{ $ua->get($_->{href}, $delay->begin); }); }, sub { my ($delay, @tx) = @_; die $_->error->{message} for grep {$_->error} @tx; # добавляем файлы к нужной записи for my $tx (@tx) { push @{ $record->{attaches} }, $tx->body; } # нам необходимо чтобы finish вызвался без # параметров, а не с нашими @tx, поэтому: $delay->pass; }, )->catch( sub { my ($delay, $err) = @_; warn $err; # ошибка выкачки или парсинга $delay->emit(finish => 'failed to get details'); } )->on(finish => $delay->begin); } ### if .details } ### for .record }, )->catch( sub { my ($delay, $err) = @_; warn $err; # ошибка логина, выкачки или парсинга $delay->emit(finish => 'failed to get records'); } )->on(finish => sub { my ($delay, @err) = @_; if (!@err) { process_records(@records); } } ); }
Немного не очевидным моментом является способ обработки ошибок. Поскольку результаты работы передавать между шагами не требуется (они накапливаются в заклозуренном @records
), то при успехе на следующий шаг передаётся пустой список (через $delay->pass;
), а при ошибке передаётся текст ошибки. Таким образом, если последний шаг в обработчике finish получит какие-то параметры — значит где-то в процессе выкачки или парсинга была ошибка(и). Саму ошибку уже перехватили и обработали (через warn
) в обработчиках ->catch
— собственно это как раз они и обеспечили передачу ошибки параметром в обработчик finish.
Если кто-то знает, как можно проще и/или нагляднее решить такую задачу — пишите. Пример аналогичного решения на Promises тоже был бы кстати.
______________________
Текст конвертирован используя habrahabr backend для AsciiDoc.
ссылка на оригинал статьи http://habrahabr.ru/post/228141/
Добавить комментарий