Тестирование параллельных процессов

от автора

image

Вы встречались с ошибками, которые возникают время от времени в продакшне, но никак не воспроизводятся локально? Бывает, изучаешь такой баг и вдруг понимаешь, что он проявляется только при одновременном параллельном выполнении скриптов. Изучив код, понимаешь как это исправить, чтобы такого больше не повторялось. Но на такое исправление хорошо бы написать тест…

В статье я расскажу о своем подходе к тестированию таких ситуаций. А также приведу несколько наглядных (и наверное даже классических) примеров багов, которые удобно протестировать с помощью этого подхода. Все примеры багов живые — то, что встречается в работе.

Забегая вперед сразу скажу, что в конце статьи будет ссылка на github, куда я выложил готовое решение, позволяющее тестировать параллельные консольные процессы легко и просто.

Пример номер один. Параллельное добавление одного и того же

Задача. У нас есть приложение с базой данных (PostgreSQL) и нам надо наладить импорт данных из сторонней системы. Допустим, есть таблица account (id, name) и связи идентификаторов с внешней системой в таблице account_import (id, external_id). Давайте набросаем простой механизм приема сообщений.

При приеме сообщения будем сперва проверять — есть ли такие записи у нас в базе. Если есть, то будем обновлять имеющиеся. Если нет, то будем добавлять в базу.

$data = json_decode($jsonInput, true); // '{"id":1,"name":"account1"}'  try {     $connection->beginTransaction();      // Проверим, есть ли такая запись в базе     $stmt = $connection->prepare("SELECT id FROM account_import          WHERE external_id = :external_id");     $stmt->execute([         ':external_id' => $data['id'],     ]);     $row = $stmt->fetch();      usleep(100000); // 0.1 sec      // Если импортируемая запись в базе есть, то обновим ее     if ($row) {         $stmt = $connection->prepare("UPDATE account SET name = :name WHERE id = (             SELECT id FROM account_import WHERE external_id = :external_id         )");         $stmt->execute([             ':name' => $data['name'],             ':external_id' => $data['id'],         ]);         $accountId = $row['id'];     }     // Иначе создадим новую запись     else {         $stmt = $connection->prepare("INSERT INTO account (name) VALUES (:name)");         $stmt->execute([             ':name' => $data['name'],         ]);         $accountId = $connection->lastInsertId();          $stmt = $connection->prepare("INSERT INTO account_import (id, external_id)              VALUES (:id, :external_id)");         $stmt->execute([             ':id' => $accountId,             ':external_id' => $data['id'],         ]);     }      $connection->commit(); } catch (\Throwable $e) {     $connection->rollBack();     throw $e; }

С первого взгляда выглядит хорошо. Но если данные в нашу систему могут передаваться не строго последовательно, тут можем столкнуться с проблемой. Задержка 0.1 секунды в этом примере нам нужна, чтобы гарантированно воспроизвести проблему. Что будет, если выполнить импорт одних и тех же данных параллельно? Вероятно, вместо того, чтобы данные были добавлены, а потом обновлены, будет попытка повторной вставки данных и, как следствие, ошибка нарушения первичного ключа в account_import.

Чтобы исправить ошибку, ее хорошо бы сперва воспроизвести. А лучше всего — написать тест, который воспроизводит ошибку. Я решил для этого запускать команды асинхронно с помощью bash и написал простой скрипт для этого, который можно использовать не только в связке с PHP.

Идея проста — запускаем в фоне несколько экземпляров команд, потом ждем когда все они завершатся, и проверяем коды выполнения. Если среди кодов выполнения есть отличные от нуля, значит мы нашли баг. В упрощенном виде скрипт будет выглядеть так:

# Команда, которую будем проверять COMMAND=”echo -e '{\"id\":1,\"name\":\"account1\"}' | ./cli app:import”  # PID-ы запущенных фоновых процессов pids=()  # Результаты выполнения фоновых процессов results=()  # Ожидаемые результаты выполнения фоновых процессов (нули) expects=()  # Запустим процессы в фоне и перенаправим вывод в stderr for i in $(seq 2) do   eval $COMMAND 1>&2 & pids+=($!) ; echo -e '>>>' Process ${pids[i-1]} started 1>&2 done  # Ожидаем завершения каждого процесса и сохраняем результаты в $results for pid in "${pids[@]}" do   wait $pid   results+=($?)   expects+=(0)   echo -e '<<<' Process $pid finished 1>&2 done  # Сравним полученные результаты с ожидаемыми result=`( IFS=$', '; echo "${results[*]}" )` expect=`( IFS=$', '; echo "${expects[*]}" )` if [ "$result" != "$expect" ] then   exit 1 fi

Полную версию скрипта выложил на github.

На основе этой команды мы можем дописать к PHPUnit новые assert-ы. Тут уже все проще и я не буду подробно останавливаться на этом. Скажу только, что в вышеупомянутом проекте они реализованы. Чтобы их использовать достаточно подключить трейт AsyncTrait к вашему тесту.

Напишем такой тест.

use App\Command\Initializer; use Mnvx\PProcess\AsyncTrait; use Mnvx\PProcess\Command\Command; use PHPUnit\Framework\TestCase; use Symfony\Component\Console\Tester\CommandTester;  class ImportCommandTest extends TestCase {     use AsyncTrait;      public function testImport()     {         $cli = Initializer::create();         $command = $cli->find('app:delete');          // Удаляем запись c external_id = 1,          // чтобы проверить случай параллельного добавления одной и той же записи         $commandTester = new CommandTester($command);         $commandTester->execute([             'externalId' => 1,         ]);          $asnycCommand = new Command(             'echo -e \'{"id":1,"name":"account1"}\' | ./cli app:import', // Тестируемая команда             dirname(__DIR__), // Каталог, из которого будет запускаться команда             2 // Количество запускаемых экземпляров команд         );         // Запуск проверки         $this->assertAsyncCommand($asnycCommand);     } }

В результате запуска теста получим такой вывод.

$ ./vendor/bin/phpunit PHPUnit 6.1.1 by Sebastian Bergmann and contributors.  F                                                                   1 / 1 (100%)  Time: 230 ms, Memory: 6.00MB  There was 1 failure:  1) ImportCommandTest::testImport Failed asserting that command  	echo -e '{"id":1,"name":"account1"}' | ./cli app:import (path: /var/www/pprocess-playground, count: 2) executed in parallel. Output:   >>> Process 18143 started >>> Process 18144 started Account 25 imported correctly                                                                                    [Doctrine\DBAL\Exception\UniqueConstraintViolationException]                    An exception occurred while executing 'INSERT INTO account_import (id, exte     rnal_id) VALUES (:id, :external_id)' with params ["26", 1]:                     SQLSTATE[23505]: Unique violation: 7 ОШИБКА:  повторяющееся значение ключа      нарушает ограничение уникальности "account_import_pkey"                         DETAIL:  Ключ "(external_id)=(1)" уже существует.                                                                                                             -------  app:import  <<< Process 18143 finished  <<< Process 18144 finished   .  /var/www/pprocess-playground/vendor/mnvx/pprocess/src/AsyncTrait.php:19 /var/www/pprocess-playground/tests/ImportCommandTest.php:30  FAILURES! Tests: 1, Assertions: 1, Failures: 1.

Причину мы уже обсудили. Теперь попробуем добавить принудительную блокировку параллельного выполнения фрагмента нашего скрипта (тут используется malkusch/lock).

$mutex = new FlockMutex(fopen(__FILE__, 'r')); $mutex->synchronized(function () use ($connection, $data) {     // наш код из блока try }); 

Тест пройден:

$ ./vendor/bin/phpunit PHPUnit 6.1.1 by Sebastian Bergmann and contributors.  .                                                                   1 / 1 (100%)  Time: 361 ms, Memory: 6.00MB  OK (1 test, 1 assertion)

Этот и другие примеры я выложил на github, если вдруг кому-то понадобится.

Пример номер два. Подготовка данных в таблице

Этот пример будет немного интереснее. Допустим, у нас есть таблица пользователей users (id, name) и мы желаем хранить в таблице users_active (id) список активных в настоящий момент пользователей.

У нас будет команда, которая каждый раз будет удалять все записи из таблицы users_acitve и добавлять туда данные заново.

try {     $connection->beginTransaction();      $connection->prepare("DELETE FROM users_active")->execute();      usleep(100000); // 0.1 sec      $connection->prepare("INSERT INTO users_active (id) VALUES (3), (5), (6), (10)")->execute();      $connection->commit();     $output->writeln('<info>users_active refreshed</info>'); } catch (\Throwable $e) {     $connection->rollBack();     throw $e; }

Тут только с первого взгляда все хорошо. На самом же деле при параллельном запуске снова получим ошибку.

Напишем тест, чтобы ее воспроизвести.

use Mnvx\PProcess\AsyncTrait; use Mnvx\PProcess\Command\Command; use PHPUnit\Framework\TestCase;  class DetectActiveUsersCommandTest extends TestCase {     use AsyncTrait;      public function testImport()     {         $asnycCommand = new Command(             './cli app:detect-active-users', // Тестируемая команда             dirname(__DIR__), // Каталог, из которого будет запускаться команда             2 // Количество запускаемых экземпляров команд         );         // Запуск проверки         $this->assertAsyncCommand($asnycCommand);     } }

Запускаем тест и видим текст ошибки:

$ ./vendor/bin/phpunit tests/DetectActiveUsersCommandTest.php PHPUnit 6.1.1 by Sebastian Bergmann and contributors.  F                                                                   1 / 1 (100%)  Time: 287 ms, Memory: 4.00MB  There was 1 failure:  1) DetectActiveUsersCommandTest::testImport Failed asserting that command  	./cli app:detect-active-users (path: /var/www/pprocess-playground, count: 2) executed in parallel. Output:   >>> Process 24717 started >>> Process 24718 started users_active refreshed <<< Process 24717 finished                                                                                     [Doctrine\DBAL\Exception\UniqueConstraintViolationException]                    An exception occurred while executing 'INSERT INTO users_active (id) VALUES      (3), (5), (6), (10)':                                                          SQLSTATE[23505]: Unique violation: 7 ОШИБКА:  повторяющееся значение ключа      нарушает ограничение уникальности "users_active_pkey"                           DETAIL:  Ключ "(id)=(3)" уже существует.                                       -------  app:detect-active-users  <<< Process 24718 finished   .  /var/www/pprocess-playground/vendor/mnvx/pprocess/src/AsyncTrait.php:19 /var/www/pprocess-playground/tests/DetectActiveUsersCommandTest.php:19  FAILURES! Tests: 1, Assertions: 1, Failures: 1.

По тексту ошибки понятно, что снова INSERT выполняется параллельно и это приводит к нежелательным последствиям. Попробуем сделать блокировку на уровне записей — добавим строчку после старта транзакции:

$connection->prepare("SELECT id FROM users_active FOR UPDATE")->execute(); 

Запускаем тест — ошибка ушла. Наш тест запускает два экземпляра процесса. Давайте увеличим в нашем тесте количество экземпляров до 3-х и посмотрим, что будет.

$asnycCommand = new Command(     './cli app:detect-active-users', // Тестируемая команда     dirname(__DIR__), // Каталог, из которого будет запускаться команда     3 // Количество запускаемых экземпляров команд ); 

И снова имеем ту же ошибку. В чем дело, мы же добавили блокировку?! Немного подумав, можно догадаться, что такая блокировка поможет только если в таблице users_active есть записи. В случае же, когда работают 3 процесса одновременно, получается картина такая — первый процесс получает блокировку. Второй и третий процесс ждут завершения транзакции первого процесса. Как только транзакция будет завершена, продолжат выполняться параллельно и второй и третий процесс, что приведет к нежелательным последствиям.

Чтобы починить, сделаем блокировку более общую. Например,

$connection->prepare("SELECT id FROM users WHERE id IN (3, 5, 6, 10) FOR UPDATE")->execute();

Либо вместо DELETE мы могли просто воспользоваться TRUNCATE, которая блокирует всю таблицу.

Пример номер три. Deadlock

Бывает, что сама по себе команда не приводит к проблемам, но одновременный вызов двух разных команд, работающих с одними и теми же ресурсами приводит к проблемам. Найти причины таких багов бывает нелегко. Но если причина найдена, то тест написать точно стоит, чтобы избежать возвращения проблемы в будущем при внесении изменений в код.

Напишем пару таких команд. Это классический случай, когда возникает взаимная блокировка.

Первая команда сперва обновляет запись с id=1, потом с id=2.

try {     $connection->beginTransaction();      $connection->prepare("UPDATE deadlock SET value = value + 1 WHERE id = 1")->execute();      usleep(100000); // 0.1 sec      $connection->prepare("UPDATE deadlock SET value = value + 1 WHERE id = 2")->execute();      $connection->commit();     $output->writeln('<info>Completed without deadlocks</info>'); } catch (\Throwable $e) {     $connection->rollBack();     throw $e; }

Вторая команда сперва обновляет запись с id=2, потом с id=1.

try {     $connection->beginTransaction();      $connection->prepare("UPDATE deadlock SET value = value + 1 WHERE id = 2")->execute();      usleep(100000); // 0.1 sec      $connection->prepare("UPDATE deadlock SET value = value + 1 WHERE id = 1")->execute();      $connection->commit();     $output->writeln('<info>Completed without deadlocks</info>'); } catch (\Throwable $e) {     $connection->rollBack();     throw $e; }

Тест будет выглядеть так.

use Mnvx\PProcess\AsyncTrait; use Mnvx\PProcess\Command\CommandSet; use PHPUnit\Framework\TestCase;  class DeadlockCommandTest extends TestCase {     use AsyncTrait;      public function testImport()     {         $asnycCommand = new CommandSet(             [   // Тестируемые команды                 './cli app:deadlock-one',                 './cli app:deadlock-two'             ],             dirname(__DIR__), // Каталог, из которого будет запускаться команда             1 // Количество запускаемых экземпляров команд         );         // Запуск проверки         $this->assertAsyncCommands($asnycCommand);     } }

В результате запуска теста увидим причину ошибки:

$ ./vendor/bin/phpunit tests/DeadlockCommandTest.php PHPUnit 6.1.1 by Sebastian Bergmann and contributors.  F                                                                   1 / 1 (100%)  Time: 1.19 seconds, Memory: 4.00MB  There was 1 failure:  1) DeadlockCommandTest::testImport Failed asserting that commands  	./cli app:deadlock-one, ./cli app:deadlock-two (path: /var/www/pprocess-playground, count: 1) executed in parallel. Output:   >>> Process 5481 started: ./cli app:deadlock-one >>> Process 5481 started: ./cli app:deadlock-two                                                                                    [Doctrine\DBAL\Exception\DriverException]                                       An exception occurred while executing 'UPDATE deadlock SET value = value +      1 WHERE id = 1':                                                                SQLSTATE[40P01]: Deadlock detected: 7 ОШИБКА:  обнаружена взаимоблокировка      DETAIL:  Процесс 5498 ожидает в режиме ShareLock блокировку "транзакция 294     738"; заблокирован процессом 5499.                                              Процесс 5499 ожидает в режиме ShareLock блокировку "транзакция 294737"; заб     локирован процессом 5498.                                                       HINT:  Подробности запроса смотрите в протоколе сервера.                        CONTEXT:  при изменении кортежа (0,48) в отношении "deadlock"                  -------  app:deadlock-two  Completed without deadlocks <<< Process 5481 finished  <<< Process 5484 finished   .  /var/www/pprocess-playground/vendor/mnvx/pprocess/src/AsyncTrait.php:39 /var/www/pprocess-playground/tests/DeadlockCommandTest.php:22  FAILURES! Tests: 1, Assertions: 1, Failures: 1.

Проблема лечится добавлением блокировки по аналогии с первым примером. Либо пересмотром структуры базы или алгоритма работы с данными.

Резюмируем

При параллельном исполнении кода могут возникать неожиданные ситуации, при исправлении которых полезно написать тесты. Мы рассмотрели несколько таких ситуаций и написали тесты, воспользовавшись pprocess.
ссылка на оригинал статьи https://habrahabr.ru/post/327292/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *