Работаем с Nats в PHP

от автора

Сегодня затронем такую неожиданную тему как работу с брокером сообщений Nats и PHP. Как оказалось, есть очень мало статей на эту тему, что странно, ведь PHP — это лучший язык программирования. Не знаю, почему так вышло, но напишите в комменты 🙂

Немного про Nats

Nats – это написанный на Go высокопроизводительный брокер сообщений, работающий по принципу, схожему с Kafka (если рассматривать JetStream, а не Nats Core). Он использует PUB/SUB модель и топики для разделения, кто, что и куда отправляет и кто, что и откуда получает.

Проткол Nats

Протокол Nats – это просто текстовый протокол на базе TCP/IP, сообщения разделяются новой строкой. Можно даже работать с ним с помощью telnet.

Попробуем подключиться к nats с помощью telnet и поздороваться.

Запускаем nats в Docker

docker run --rm -it -p 4222:4222 nats

и теперь в другом окне терминала можно подключиться:

telnet 127.0.0.1 4222

При подключении мы сразу же получим приветственное сообщение сервера (INFO):

Trying 127.0.0.1... Connected to localhost. Escape character is '^]'. INFO {"server_id":"ND6UPRIAIQ4K4EOMCQFI7NEGEEN7NW7CQCELNB3F2IVK3RGKHCA3UC36","server_name":"ND6UPRIAIQ4K4EOMCQFI7NEGEEN7NW7CQCELNB3F2IVK3RGKHCA3UC36","version":"2.11.1-binary","proto":1,"git_commit":"6cebef9","go":"go1.24.1","host":"0.0.0.0","port":4222,"headers":true,"max_payload":1048576,"client_id":5,"client_ip":"192.168.215.1","cluster":"my_cluster","xkey":"XCTTMXJ3T65TODSZEAHRFDMDUIMH2BP4SARGLKSFYRYOLH3MIQTUODC2"}

Нужно ему ответить сообщением:

CONNECT {}

Он ответит:

+OK

После этого можно отправлять команды. Например, PING

PING

Сервер ответит

PONG

Если у кого-нибудь возникнет желание разобраться поподробнее с тем, как это работает, есть неплохая документацию с демо протокола https://docs.nats.io/reference/reference-protocols/nats-protocol-demo

Нужно отметить, что Nats Core – это обычная почти что синхронная очередь. То, что публикуется в топик, должен забрать consumer. Если консьюмеров нет, сообщение уничтожается.

Для реализации очереди в нашем обычном понимании есть Nats JetStream, который реализует хранилище сообщений на сервере, возможность консьюмерам отключаться и подключаться, а также гарантии доставки.

Протокол JetStream уже сильно сложнее, работать с ним через telnet — больная боль, так что лучше брать библиотеки или хотя бы nats-box

Библиотеки на PHP

Начнем с того, что библиотек, реализующих работу с брокером, не так много. Я нашел вообще всего две:

и только первая поддерживается, так что в рамках данной статьи рассмотрим внедрение с помощью библиотеки nats.php.

Работать с этой библиотекой достаточно удобно, но не хватает более подробной документации и логирования, помимо debug логов. Но ничего, прорвемся 🙂

Для дальнейшей работы примем, что мы установили библиотеку

composer require basis-company/nats

А также подключили composer autoload. Например, можно в начале файла указать:

<?php require_once 'vendor/autoload.php';

Пробуем работать с библиотекой

Для простоты рассказа будем использовать обычный function style PHP с разделением кода на функции для хорошего понимания.
Реализуем подключение и отключение

function connect(): \Basis\Nats\Client   {       $configuration = new \Basis\Nats\Configuration(           host: '127.0.0.1',           port: 4222,       );          return new \Basis\Nats\Client($configuration);   }      function disconnect(\Basis\Nats\Client $client): void   {       $client->disconnect();   }

Библиотека поддерживает много разных способов аутентификации. Но в нашем примере — никакой безопасности 🙂

Для создания потока и обращения к нему будем использовать следующую функцию

function init_stream(\Basis\Nats\Client $client): \Basis\Nats\Stream\Stream   {       $api = $client->getApi();       $stream = $api->getStream('php-nats-test');       // По умолчанию используется:       // хранилище - файловое,         // сабжекты, относящиеся к этому потоку - только имя потока.         $stream->getConfiguration()           ->setRetentionPolicy(\Basis\Nats\Stream\RetentionPolicy::WORK_QUEUE);       $stream->create();          return $stream;   }

Функция init_stream создает поток, который работает как классическая очередь (WORK_QUEUE). Если задача взята и ack-нута, сообщение удаляется. При этом, накладывается ограничение: на один топик — одна группа консьюмеров (консьюмеры с одним name), так реализуется гарантия доставки exactly once.

Более подробно можно посмотреть в документации Nats по JetStream Streams: https://docs.nats.io/nats-concepts/jetstream/streams

В качестве задач будем рассматривать обычные строки. В перспективе можно в эти строки записывать любые значения, например, JSON закодированные структуры данных.
Код добавления задач в поток:

function push_message(\Basis\Nats\Client $client, string $message): string   {       $id = uniqid('', true);       $client->publish(           'php-nats-test',           new \Basis\Nats\Message\Payload($message, [               // Это поле также используется для дедупликации, но также полезно и для отображение логов в системе               'Nats-Msg-ID' => $id,               // Номер попытки обработки сообщения               'x-attempt' => 1,           ]),       );       return $id;   }

При отправке задачи генерируется её ID. При получении сообщения этот ID также придет в заголовках, что может быть полезно для дальнейшей обработки. Смысл заголовка x-attempt разберем немного позже.

На текущий момент, если собрать весь код, мы сможем отправлять задачи в очередь.
Для начала запустим Nats в локальном докере с включенным jetstream:

docker run --rm -it -p 4222:4222 nats -js

Код запуска:

$client = connect(); init_stream($client);    $id = push_message($client, 'Hello World!');   echo 'pushed message id: ' . $id . PHP_EOL;   $id = push_message($client, 'Hello World, again!');   echo 'pushed message id: ' . $id . PHP_EOL;   $id = push_message($client, 'Hello World, once more!');   echo 'pushed message id: ' . $id . PHP_EOL;      disconnect($client);

Пробуем запустить и получаем логи:

% php producer.php pushed message id: 68028fce39f239.23978889 pushed message id: 68028fce3b55a9.67959494 pushed message id: 68028fce3b56d1.07523175
Итоговый код файла producer.php
<?php  require_once 'vendor/autoload.php';  function connect(): \Basis\Nats\Client {     $configuration = new \Basis\Nats\Configuration(         host: '127.0.0.1',         port: 4222,     );      return new \Basis\Nats\Client($configuration); }  function disconnect(\Basis\Nats\Client $client): void {     $client->disconnect(); }  function init_stream(\Basis\Nats\Client $client): \Basis\Nats\Stream\Stream {     $api = $client->getApi();     $stream = $api->getStream('php-nats-test');     // По умолчанию используется:     // хранилище - файловое,     // сабжекты, относящиеся к этому потоку - только имя потока.     $stream->getConfiguration()         ->setRetentionPolicy(\Basis\Nats\Stream\RetentionPolicy::WORK_QUEUE);     $stream->create();      return $stream; }  function push_message(\Basis\Nats\Client $client, string $message): string {     $id = uniqid('', true);     $client->publish(         'php-nats-test',         new \Basis\Nats\Message\Payload($message, [             // Это поле также используется для дедупликации, но также полезно и для отображение логов в системе             'Nats-Msg-ID' => $id,             // Номер попытки обработки сообщения             'x-attempt' => 1,         ]),     );     return $id; }   $client = connect();  init_stream($client); $id = push_message($client, 'Hello World!'); echo 'pushed message id: ' . $id . PHP_EOL; $id = push_message($client, 'Hello World, again!'); echo 'pushed message id: ' . $id . PHP_EOL; $id = push_message($client, 'Hello World, once more!'); echo 'pushed message id: ' . $id . PHP_EOL;  disconnect($client); 

Далее переходим к следующему этапу. Нам ведь нужно эти сообщения забрать!

Потребление сообщений из очереди

Вернемся к вопросу реализации ретраев. В целом, для её реализации есть два варианта:

  • Штатный ограничитель в настройках консьюмера «доставлять мне одну задачу не более X раз, если она nack-нута», при этом при исчерпании попыток, задача из очереди удалена не будет

  • Кастомная логика на основе заголовков и обратной отправки задачи в конец очереди (спойлер: использовать будем именно его)

    Самый очевидный способ следующий:

    • Берем задачу

    • Пытаемся обработать

    • Получаем ошибку

    • Отправляем задачу в очередь с attempt = attempt + 1

      • Проверяем, если attempt === MAX_ATTEMPTS, не отправляем задачу в очередь

Код функции переотправки задачи в очередь:

function redeliver(\Basis\Nats\Client $client, \Basis\Nats\Message\Msg $msg): void   {       $id = uniqid('', true);       $client->publish(           $msg->subject,           new \Basis\Nats\Message\Payload($msg->payload->body, [               'Nats-Msg-ID' => $id,               'x-attempt' => $msg->payload->getHeader('x-attempt') + 1,           ])       );   }

Ну и самое интересное — обработка сообщений. Начнем с одного сообщения, а далее перейдем к прослушиванию очереди и обработке бесконечного количества сообщений:

function handle_message(string $id, string $payload, int $attempt): bool   {       echo "Обработка задачи ID {$id}, номер попытки: {$attempt}, тело: {$payload}\n";          if ($payload === 'Hello World!' && $attempt === 1) {           // Hello world - задача тяжелая, с первого раза никогда не получается           return false;       }          return true;   }

И код обработки сообщения из очереди:

const MAX_ATTEMPTS = 3;   function process_message(\Basis\Nats\Client $client, \Basis\Nats\Message\Msg $message): void   {       $payload = $message->payload;          $id = $payload->getHeader('Nats-Msg-ID');       $attempt = (int)$payload->getHeader('x-attempt');       try {           if (handle_message($id, $payload->body, $attempt)) {               $message->ack();           } else {               if ($attempt < MAX_ATTEMPTS) {                   redeliver($client, $message);               }               $message->ack();           }       } catch (Throwable $e) {           if ($attempt < MAX_ATTEMPTS) {               redeliver($client, $message);           }           $message->ack();           throw $e;       }   }

Код обработки достаточно простой и понятный:

  • Обрабатываем сообщение

  • В случае ошибки проверяем, нужно ли ретраить

    • Если нужно, делаем ретрай

    • Если не нужно, заканчиваем на этом обработку сообщения

    • Если вылетело исключение, пусть его обрабатывает внешний слой

А теперь самое трудное – работа с прослушиванием очереди. Для этого нам понадобится расширение ext-pcntl, чтобы скрипт можно было завершить по CTRL+C:

// Для завершения работы при нажатии CTRL+C   $canContinue = true;   pcntl_signal(SIGINT, function () use (&$canContinue) {       $canContinue = false;   });      function listen(\Basis\Nats\Client $client, \Basis\Nats\Stream\Stream $stream): void   {       global $canContinue;          $consumer = $stream->getConsumer('my-wonderful-consumer');       $consumer->getConfiguration()           // Берем сообщения только из указанных топиков           ->setSubjectFilter('php-nats-test');       $queue = $consumer->create()->getQueue();          while ($canContinue) {           $message = $queue->fetch();           if ($message && !$message->payload->isEmpty()) {               // Иногда в $message может прийти служебное сообщение о том,               // что в fetch не удалось получить сообщения (очередь пуста). Тогда payload будет пустым                         try {                   process_message($client, $message);               } catch (Throwable $e) {                   // Максимально простая обработка исключения                   echo $e->getMessage() . PHP_EOL;               }           }   // Проверяем нужно ли запустить обработчики сигнала выхода         pcntl_signal_dispatch();       }          $client->unsubscribe($queue);   }

Строка my-wonderful-consumer — это имя консьюмера. Если запустить несколько скриптов параллельно, они все будут в одной группе и только один из них будет получать одно сообщение. Таким образом достигается горизонтальное масштабирование без дублирования обработки сообщений.

Пробуем запустить 🙂

$client = connect();   $stream = init_stream($client);      listen($client, $stream));      disconnect($client);

В выводе получим:

% php consumer.php Обработка задачи ID 6802976d4347b5.08190035, номер попытки: 1, тело: Hello World! Обработка задачи ID 6802976d434999.77061748, номер попытки: 1, тело: Hello World, again! Обработка задачи ID 6802976d434a18.95887182, номер попытки: 1, тело: Hello World, once more! Обработка задачи ID 6802976d43e417.25517113, номер попытки: 2, тело: Hello World!

Можно заметить. что задача Hello world! обработалась дважды, первой и последней. Это связано с переотправкой. У неё даже поменялся ID, как и должно быть 🙂

Итоговый код файла consumer.php
<?php  require_once 'vendor/autoload.php';  function connect(): \Basis\Nats\Client {     $configuration = new \Basis\Nats\Configuration(         host: '127.0.0.1',         port: 4222,     );      return new \Basis\Nats\Client($configuration); }  function disconnect(\Basis\Nats\Client $client): void {     $client->disconnect(); }  function init_stream(\Basis\Nats\Client $client): \Basis\Nats\Stream\Stream {     $api = $client->getApi();     $stream = $api->getStream('php-nats-test');     // По умолчанию используется:     // хранилище - файловое,     // сабжекты, относящиеся к этому потоку - только имя потока.     $stream->getConfiguration()         ->setRetentionPolicy(\Basis\Nats\Stream\RetentionPolicy::WORK_QUEUE);     $stream->create();      return $stream; }  function redeliver(\Basis\Nats\Client $client, \Basis\Nats\Message\Msg $msg): void {     $id = uniqid('', true);     $client->publish(         $msg->subject,         new \Basis\Nats\Message\Payload($msg->payload->body, [             'Nats-Msg-ID' => $id,             'x-attempt' => $msg->payload->getHeader('x-attempt') + 1,         ])     ); }  function handle_message(string $id, string $payload, int $attempt): bool {     echo "Обработка задачи ID {$id}, номер попытки: {$attempt}, тело: {$payload}\n";      if ($payload === 'Hello World!' && $attempt === 1) {         // Hello world - задача тяжелая, с первого раза никогда не получается         return false;     }      return true; }  const MAX_ATTEMPTS = 3; function process_message(\Basis\Nats\Client $client, \Basis\Nats\Message\Msg $message): void {     $payload = $message->payload;      $id = $payload->getHeader('Nats-Msg-ID');     $attempt = (int)$payload->getHeader('x-attempt');     try {         if (handle_message($id, $payload->body, $attempt)) {             $message->ack();         } else {             if ($attempt < MAX_ATTEMPTS) {                 redeliver($client, $message);             }             $message->ack();         }     } catch (Throwable $e) {         if ($attempt < MAX_ATTEMPTS) {             redeliver($client, $message);         }         $message->ack();         throw $e;     } }   // Для завершения работы при нажатии CTRL+C $canContinue = true; pcntl_signal(SIGINT, function () use (&$canContinue) {     $canContinue = false; });  function listen(\Basis\Nats\Client $client, \Basis\Nats\Stream\Stream $stream): void {     global $canContinue;      $consumer = $stream->getConsumer('my-wonderful-consumer');     $consumer->getConfiguration()         // Берем сообщения только из указанных топиков         ->setSubjectFilter('php-nats-test');     $queue = $consumer->create()->getQueue();      while ($canContinue) {         $message = $queue->fetch();         if ($message && !$message->payload->isEmpty()) {             // Иногда в $message может прийти служебное сообщение о том,             // что в fetch не удалось получить сообщения (очередь пуста). Тогда payload будет пустым             try {                 process_message($client, $message);             } catch (Throwable $e) {                 // Максимально простая обработка исключения                 echo $e->getMessage() . PHP_EOL;             }         }          pcntl_signal_dispatch();     }      $client->unsubscribe($queue); }   $client = connect(); $stream = init_stream($client);  listen($client, $stream);  disconnect($client); 

Выводы

Мы попробовали поработать с Nats в PHP, реализовали базовую очередь и поняли как работают базовые сущности JetStream. Дальше только больше и круче. Например, можно внедрить отложенную обработку, приоритеты (которых в nats к сожалению нет).

На данный момент внедрение Nats является нетривиальной задачей, по которой крайне мало информации. Я надеюсь, в будущем сообщество заинтересуется этим мощным и одновременно простым в обслуживании брокером очередей, а также будет появляться больше библиотек и адаптеров, чтобы его внедрение на PHP стало проще и надежнее.

Удачи вам, коллеги 🙂


ссылка на оригинал статьи https://habr.com/ru/articles/904638/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *