Делаем вебсокеты на PHP с нуля. Часть 2. IPC

от автора

После написания моей предыдущей статьи Делаем вебсокеты на PHP с нуля я понял, что у сообщества есть некоторый интерес к поднятой мною теме.

В прошлой статье я обещал, что опишу:

  • запуск нескольких процессов для обработки соединений
  • межпроцессное взаимодействие
  • разделение процессов мастер-воркер
  • проксирование вебсокетов с помощью nginx
  • запуск из консоли
  • интеграция с вашим фреймворком на примере yii
  • демонстрация

И, как обычно, — получившийся код и ссылка на демонстрационный чат в конце статьи.

Запуск нескольких процессов для обработки соединений

Для работы простого сервера вебсокетов достаточно одного процесса, но чтобы увеличить количество одновременных соединений (и обойти ограничение 1024 одновременных соединения), а также для использования ресурсов всего процессора (а не только одного ядра), необходимо, чтобы сервер вебсокетов использовал несколько процессов (оптимально — количество процессов = количество ядер процессора).

Для запуска нескольких процессов мы будем использовать функцию pcntl_fork(). Она создаёт новый процесс (дочерний), который является практически полной копией процесса-родителя, выполняющего этот вызов.
После вызова pcntl_fork() алгоритм разветвляется: в случае успешного выполнения функции pcntl_fork() она возвращает PID дочернего процесса родительскому, а NULL дочернему. Если создание форка закончилось неудачей, функция pcntl_fork() возвращает значение −1).

$pid = pcntl_fork(); //делаем форк  //далее весь код будет выполняться в обоих процессах  if ($pid == -1) {     // Не удалось создать дочерний процесс } elseif ($pid) {     // Этот код выполнится родительским процессом } else {     // А этот код выполнится дочерним процессом, его PID можно узнать с помощью функции getmypid() } 

Про отличие родительского процесса от дочернего можно почитать на википедии.

Мы можем в цикле создавать столько дочерних процессов, сколько нам необходимо:

$childs = array();  for ($i=0; $i<5; $i++) {     $pid = pcntl_fork(); //создаём форк      if ($pid == -1) {         die("error: pcntl_fork");     } elseif ($pid) { //родительский процесс         $childs[] = $pid; //заполняем массив дочерними PID, они нам ещё пригодятся :)     } else { //дочерний процесс         break; //выходим из цикла, чтобы дочерние процессы создавались только из родителя     } } 

Межпроцессное взаимодействие

Для взаимодействия между родительским и дочерним процессом мы будем использовать сокеты, а именно связанные сокеты:
Функция stream_socket_pair() создаёт пару связанных неразличимых потоковых сокетов. Таким образом мы можем писать в один сокет, а считывать данные из второго.

$pair = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP); //получаем массив из связанных сокетов  fwrite($pair[0], 'тест'); //пишем в первый сокет fread($pair[1], mb_strlen('тест')); //читаем из второго 

Теперь совмещаем этот код с форками и получаем:

$pair = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP); //создаём связанные сокеты  $pid = pcntl_fork(); //делаем форк  //далее весь код будет выполняться в обоих процессах  if ($pid == -1) {     die("error: pcntl_fork"); } elseif ($pid) { //родительский процесс     fclose($pair[0]); //закрываем один из сокетов в родителе     $child = $pair[1]; //второй будем использовать для связи с потомком } else { //дочерний процесс     fclose($pair[1]); //закрываем второй из сокетов в потомке     $parent = $pair[0]; //первый будем использовать для связи с родителем } 

Итоговый код для создания множества дочерних процессов:

$parent = null; $childs = array();  for ($i=0; $i<5; $i++) {     $pair = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP); //создаём связанные сокеты      $pid = pcntl_fork(); //создаём форк      if ($pid == -1) {         die("error: pcntl_fork");     } elseif ($pid) { //родительский процесс         fclose($pair[0]); //закрываем один из сокетов в родителе         $childs[] = $pair[1]; //второй будем использовать для связи с потомком     } else { //дочерний процесс         fclose($pair[1]); //закрываем второй из сокетов в потомке         $parent = $pair[0]; //первый будем использовать для связи с родителем         break; //выходим из цикла, чтобы дочерние процессы создавались только из родителя     } } 

В результате работы этого кода в родителе массив $childs будет содержать в себе все сокеты для связи с потомками, а потомки для связи с родителем будут использовать $parent.

Разделение процессов на мастера и воркеров

Так как дочерние процессы в нашей реализации не связаны друг с другом напрямую и могут взаимодействовать только через родителя, то целесообразно разделение обязанностей между родителем и потомками:

  • родитель будет отвечать за взаимодействие между дочерними процессами (будет мастером)
  • дочерние процессы будут выполнять всю работу (будут воркерами)

Также воркер у нас будет заниматься пересылкой сообщений из скриптов со страниц сайта или из крона. Для этого мы создадим дополнительный сокет, и добавим его в массив, прослушиваемых сокетов. Например, можно создать unix-сокет:

$service = stream_socket_server('unix:///tmp/websocket.sock', $errorNumber, $errorString); 

Проксирование вебсокетов с помощью nginx

Nginx поддерживает проксирование вебсокетов начиная с версии 1.3.13. Благодаря nginx можно обрабатывать соединения к серверу вебсокетов на том же порту, что и сайт, а также ограничить количество открытых вебсокетов с одного ip и другие полюбившиеся вам плюшки.

Пример nginx-конфига, который это позволяет:

limit_conn_zone $binary_remote_addr zone=perip:10m;  server {     listen 5.135.163.218:80;     server_name sharoid.ru;      location / {         limit_conn perip 5; #делаем ограничение 5 вебсокетов на 1 ip         proxy_pass http://127.0.0.1:8000;         proxy_http_version 1.1;         proxy_set_header Upgrade $http_upgrade;         proxy_set_header Connection "upgrade";         proxy_read_timeout 3600s; #увеличиваем таймаут для вебсокетов     } } 

Запуск из консоли

Выполняем команду php websocket.php или ./websocket.php (предварительно дав права на выполнение)
Если использовать nohup, например, nohup ./websocket.php &, то скрипт продолжит работать после закрытия консоли.

По-умолчанию есть два ограничения количества соединений на один процесс.

  • Первое — на уровне операционной системы: в одном процессе нельзя открыть более чем 1024 соединения. Чтобы обрабатывать больше одновременных соединений, выполните команду: ulimit -n 65535, а если у пользователя недостаточно привилегий, то sudo sh -c "ulimit -n 65535 && exec su $LOGNAME". Текущее значение можно посмотреть используя команду ulimit -n
  • Второе — у функции stream_select(): она не принимает больше чем 1024 соединения. Здесь всё сложнее — нужно перекомпилировать php c увеличенным FD_SETSIZE

Как я уже писал, эти ограничения можно обойти, используя дочерние процессы (воркеры).

Интеграция с вашим фреймворком на примере yii

Так как наш мастер прослушивает дополнительный сокет для связи с нашими скриптами (в примере выше был unix:///tmp/websocket.sock), мы можем в любом месте нашего сайта или в кроне соединиться с этим сокетом и отправить сообщение, которое мастер разошлёт всем воркерам, а они, в свою очередь, все клиентам:

$service = stream_socket_client ('unix:///tmp/websocket.sock', $errno, $errstr); fwrite($service, 'всем привет'); 

С использованием компонента yii это будет выглядеть вот так:

Yii::app()->websocket->send('всем привет'); 

Подробнее для yii

Скачиваем экстеншн, кладём его в папку extensions/websocket
В папку components кладём Websocket.php, WebsocketMasterHandler.php и WebsocketWorkerHandler.php из папки sample/yii.
В папку commands кладём из WebsocketCommand.php из папки sample/yii.
В конфиги main.php и console.php вставляем в секцию components:

'websocket' => array(     'class' => 'Websocket',     //'websocket' => 'tcp://127.0.0.1:8000',     //'localsocket' => 'tcp://127.0.0.1:8001',// unix:///tmp/mysock     //'workers' => 1 ), 

В конфиг console.php также вставляем в секцию import:

'ext.websocket.*' 

Демонстрация

Демонстрационный чат 2.0 (добавлен список пользователей, добавлено ограничение: 1 сообщение в секунду с одного IP)
В нём были использованы описанные выше функции, а также исправлены недостатки, выявленные после публикации предыдущей статьи.
Демонстрационный чат 1.0 (без списка пользователей, без ограничений)

Все исходники я оформил в виде библиотеки и выложил на github

ссылка на оригинал статьи http://habrahabr.ru/company/ifree/blog/210228/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *