Асинхронное выполнение PHP скрипта на подпроцессах

от автора

Добрый день, уважаемые хабровчане.

Сегодня я хотел бы поговорить о таких нетривиальных вещах, как асинхронные (параллельные) расчеты в языке PHP.
Сам по себе PHP — это скриптовый язык, который никогда и не претендовал на многопоточность. Но чем дальше в лес, тем более серьезные задачи стоят перед разработчиками, и тем больше приходится «извращаться» с пыхом, потому что мигрировать на более приспособленный под эти задачи язык программирования многие компании попросту боятся и не хотят. Следовательно, приходится работать с тем, что дают.
Подробности под катом…

Какое-то время назад передо мной стояла достаточно нетривиальная задача.
Если вкратце, то в проекте было реализовано примерно 20 очень тяжеловесных модулей по расчету стоимости товара.
Всё это висело на нескольких реляционных таблицах, каждый из модулей содержал свои собственные правила расчета и тп. Но выдавать на клиент всё это нужно было единым пакетом. И это должно было выполняться быстро. Очень быстро. Кеширование спасало, но в очень ограниченных объемах, совсем недостаточных для выполнения технических требований.

Алгоритм был довольно прост: на вход подавались необходимые аргументы, потом инстанцировались в массив все модули, и в цикле всё это дело просчитывалось. Ответ собирался в единый объект и выплёвывался на клиент для постобработки.

Так вот, в определенный момент мы с командой зашли в тупик, и поняли, что каждый новый модуль добавляет даже не линейное количество времени обработки, а с какой-то возрастающей прогрессией.

Как вы сами уже догадались, было предложено каким-либо образом распараллелить процесс. Но с PHP это непросто, потому что он этого не умеет делать из коробки.

Были опробованы разные решения:

К сожалению, так в итоге ни к чему и не пришли. Было решено свернуть проект.

Но для меня вопрос остался открыт, потому что решение быть должно. И ещё тогда мы задумывались о некоем подобии “подпроцессов”, которые порождает основной скрипт (аналог exec() функции).

С тех пор прошло довольно много времени, из проекта я давно ушел. Но вот буквально на прошлой неделе у меня появилась одна очень нетривиальная задача: написать скрипт, который определенным образом залогирует текущее состояние некоей entity и часть её тяжелых реляционных зависимостей. Для этого используется 2 класса, правильно подготавливающих данные и сохраняющих это в БД. Проблема в том, что таких объектов примерно 2800. Мой скрипт отваливается по

PHP Fatal error:  Allowed memory size of <over9000> bytes exhausted. 

На каждый пакет из 50 entities тратится, в среднем, 190мб памяти, с каждым новым пакетом кол-во использованной памяти росло. При полном отключении ограничений на использование оперативки, я получил такую же ошибку плюс Segmentation Fault.

Т.е. так или иначе, нужно было придумать как избежать переполнения оперативной памяти в скрипте, и постараться сделать его “чуточку” побыстрее. Сперва попытались разобраться, почему увеличивается потребление памяти из итерацию в итерацию. Оказалось, что ноги растут из особенностей работы симфового ServiceContainer и EventDispatcher. Там в event подпихивается весь контейнер, и потом это делается рекурсивно. Обходить нам это всё было, честно говоря, лень, и мой коллега предложил довольно изящное решение.

В наборе компонентов Symfony2 есть такая замечательная штука, как Symfony Process Component.
Эта вундервафля позволяет в ходе выполнения скрипта породить подпроцесс и запустить его в CLI-режиме (как обычную консольную команду).

Сперва мы просто попробовали “отпочковывать” по одному процессу для ограничения использования RAM. Но потом в доках вычитали, что эта штука умеет работать асинхронно.

Было решено опробовать это в деле. В итоге получилось нечто вроде этого(Ниже пример с Example-репозитория на GitHub. Логика самих подпроцессов очень простая, но утяжеленная):

MainCommand

<?php  namespace Example\Command;  use Symfony\Component\Console\Command\Command; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Output\OutputInterface; use Symfony\Component\Process\Process;  class MainCommand extends Command {    protected function configure()    {        $this->setName('example:main')            ->setDescription('Run example command with optional number of CPUs')            ->addArgument('CPUs', null, 'number of working CPUs', 2);    }     protected function execute(InputInterface $input, OutputInterface $output)    {        $channels    = [];        $maxChannels = $input->getArgument('CPUs');         $exampleArray = $this->getExampleArray();        $output->writeln('<fg=green>Start example process</>');        while (count($exampleArray) > 0 || count($channels) > 0) {            foreach ($channels as $key => $channel) {                if ($channel instanceof Process && $channel->isTerminated()) {                    unset($channels[$key]);                }            }            if (count($channels) >= $maxChannels) {                continue;            }             if (!$item = array_pop($exampleArray)) {                continue;            }            $process = new Process(sprintf('php index.php example:sub-process %s', $item), __DIR__ . '/../../../');            $process->start();            if (!$process->isStarted()) {                throw new \Exception($process->getErrorOutput());            }            $channels[] = $process;        }        $output->writeln('<bg=green;fg=black>Done.</>');    }     /**     * @return array     */    private function getExampleArray()    {        $array = [];        for ($i = 0; $i < 30; $i++) {            $name = 'No' . $i;            $x1   = rand(1, 10);            $y1   = rand(1, 10);            $x2   = rand(1, 10);            $y2   = rand(1, 10);             $array[] = $name . '.' . $x1 . '.' . $y1 . '.' . $x2 . '.' . $y2;        }         return $array;    } } 

SubProcessCommand

<?php  namespace Example\Command;  use Symfony\Component\Console\Command\Command; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Output\OutputInterface;  class SubProcessCommand extends Command {    protected function configure()    {        $this->setName('example:sub-process')            ->setDescription('Run example sub-process command')            ->addArgument('item');    }     protected function execute(InputInterface $input, OutputInterface $output)    {        $items = explode('.', $input->getArgument('item'));        $pointName = $items[0];        $x1        = $items[1];        $y1        = $items[2];        $x2        = $items[3];        $y2        = $items[4];         // Used for mocking heavy execution.        $sum = 0;        for ($i = 1; $i <= 30000000; $i++){            $sum += $i;        }         $distance = bcsqrt(pow(($x2 - $x1),2) + pow(($y2 - $y1),2));        $data = sprintf('Point %s: %s', $pointName, (string)$distance);         file_put_contents(__DIR__.'/../../../output/Point'.$pointName , print_r($data, 1), FILE_APPEND);    } } 

index.php

<?php require __DIR__ . '/vendor/autoload.php';  use Symfony\Component\Console\Application;  $application = new Application(); $application->add(new \Example\Command\MainCommand()); $application->add(new \Example\Command\SubProcessCommand()); $application->run(); 

В итоге имеем примерно вот такую картину:

Скажу откровенно, я был очень впечатлён такими возможностями.
Надеюсь, некоторым эта статья будет в помощь. В качестве дополнительного материала оставлю тут ссылку на репозиторий, где был реализован пример, приведенный выше.

Репозиторий

Спасибо за внимание. Буду рад отзывам и комментариям.

ссылка на оригинал статьи http://habrahabr.ru/post/266615/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *