В прошлой статье я обещал, что опишу:
- запуск нескольких процессов для обработки соединений
- межпроцессное взаимодействие
- разделение процессов мастер-воркер
- проксирование вебсокетов с помощью nginx
- запуск из консоли
- интеграция с вашим фреймворком на примере yii
- демонстрация
И, как обычно, — получившийся код и ссылка на демонстрационный чат в конце статьи.
Запуск нескольких процессов для обработки соединений
Для работы простого сервера вебсокетов достаточно одного процесса, но чтобы увеличить количество одновременных соединений (и обойти ограничение 1024 одновременных соединения), а также для использования ресурсов всего процессора (а не только одного ядра), необходимо, чтобы сервер вебсокетов использовал несколько процессов (оптимально — количество процессов = количество ядер процессора).
Для запуска нескольких процессов мы будем использовать функцию pcntl_fork(). Она создаёт новый процесс (дочерний), который является практически полной копией процесса-родителя, выполняющего этот вызов.
После вызова pcntl_fork() алгоритм разветвляется: в случае успешного выполнения функции pcntl_fork() она возвращает PID дочернего процесса родительскому, а NULL дочернему. Если создание форка закончилось неудачей, функция pcntl_fork() возвращает значение −1).
$pid = pcntl_fork(); //делаем форк //далее весь код будет выполняться в обоих процессах if ($pid == -1) { // Не удалось создать дочерний процесс } elseif ($pid) { // Этот код выполнится родительским процессом } else { // А этот код выполнится дочерним процессом, его PID можно узнать с помощью функции getmypid() }
Про отличие родительского процесса от дочернего можно почитать на википедии.
Мы можем в цикле создавать столько дочерних процессов, сколько нам необходимо:
$childs = array(); for ($i=0; $i<5; $i++) { $pid = pcntl_fork(); //создаём форк if ($pid == -1) { die("error: pcntl_fork"); } elseif ($pid) { //родительский процесс $childs[] = $pid; //заполняем массив дочерними PID, они нам ещё пригодятся :) } else { //дочерний процесс break; //выходим из цикла, чтобы дочерние процессы создавались только из родителя } }
Межпроцессное взаимодействие
Для взаимодействия между родительским и дочерним процессом мы будем использовать сокеты, а именно связанные сокеты:
Функция stream_socket_pair() создаёт пару связанных неразличимых потоковых сокетов. Таким образом мы можем писать в один сокет, а считывать данные из второго.
$pair = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP); //получаем массив из связанных сокетов fwrite($pair[0], 'тест'); //пишем в первый сокет fread($pair[1], mb_strlen('тест')); //читаем из второго
Теперь совмещаем этот код с форками и получаем:
$pair = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP); //создаём связанные сокеты $pid = pcntl_fork(); //делаем форк //далее весь код будет выполняться в обоих процессах if ($pid == -1) { die("error: pcntl_fork"); } elseif ($pid) { //родительский процесс fclose($pair[0]); //закрываем один из сокетов в родителе $child = $pair[1]; //второй будем использовать для связи с потомком } else { //дочерний процесс fclose($pair[1]); //закрываем второй из сокетов в потомке $parent = $pair[0]; //первый будем использовать для связи с родителем }
Итоговый код для создания множества дочерних процессов:
$parent = null; $childs = array(); for ($i=0; $i<5; $i++) { $pair = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP); //создаём связанные сокеты $pid = pcntl_fork(); //создаём форк if ($pid == -1) { die("error: pcntl_fork"); } elseif ($pid) { //родительский процесс fclose($pair[0]); //закрываем один из сокетов в родителе $childs[] = $pair[1]; //второй будем использовать для связи с потомком } else { //дочерний процесс fclose($pair[1]); //закрываем второй из сокетов в потомке $parent = $pair[0]; //первый будем использовать для связи с родителем break; //выходим из цикла, чтобы дочерние процессы создавались только из родителя } }
В результате работы этого кода в родителе массив $childs будет содержать в себе все сокеты для связи с потомками, а потомки для связи с родителем будут использовать $parent.
Разделение процессов на мастера и воркеров
Так как дочерние процессы в нашей реализации не связаны друг с другом напрямую и могут взаимодействовать только через родителя, то целесообразно разделение обязанностей между родителем и потомками:
- родитель будет отвечать за взаимодействие между дочерними процессами (будет мастером)
- дочерние процессы будут выполнять всю работу (будут воркерами)
Также воркер у нас будет заниматься пересылкой сообщений из скриптов со страниц сайта или из крона. Для этого мы создадим дополнительный сокет, и добавим его в массив, прослушиваемых сокетов. Например, можно создать unix-сокет:
$service = stream_socket_server('unix:///tmp/websocket.sock', $errorNumber, $errorString);
Проксирование вебсокетов с помощью nginx
Nginx поддерживает проксирование вебсокетов начиная с версии 1.3.13. Благодаря nginx можно обрабатывать соединения к серверу вебсокетов на том же порту, что и сайт, а также ограничить количество открытых вебсокетов с одного ip и другие полюбившиеся вам плюшки.
Пример nginx-конфига, который это позволяет:
limit_conn_zone $binary_remote_addr zone=perip:10m; server { listen 5.135.163.218:80; server_name sharoid.ru; location / { limit_conn perip 5; #делаем ограничение 5 вебсокетов на 1 ip proxy_pass http://127.0.0.1:8000; proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection "upgrade"; proxy_read_timeout 3600s; #увеличиваем таймаут для вебсокетов } }
Запуск из консоли
Выполняем команду php websocket.php или ./websocket.php (предварительно дав права на выполнение)
Если использовать nohup, например, nohup ./websocket.php &, то скрипт продолжит работать после закрытия консоли.
По-умолчанию есть два ограничения количества соединений на один процесс.
- Первое — на уровне операционной системы: в одном процессе нельзя открыть более чем 1024 соединения. Чтобы обрабатывать больше одновременных соединений, выполните команду:
ulimit -n 65535, а если у пользователя недостаточно привилегий, тоsudo sh -c "ulimit -n 65535 && exec su $LOGNAME". Текущее значение можно посмотреть используя командуulimit -n - Второе — у функции
stream_select(): она не принимает больше чем 1024 соединения. Здесь всё сложнее — нужно перекомпилировать php c увеличеннымFD_SETSIZE
Как я уже писал, эти ограничения можно обойти, используя дочерние процессы (воркеры).
Интеграция с вашим фреймворком на примере yii
Так как наш мастер прослушивает дополнительный сокет для связи с нашими скриптами (в примере выше был unix:///tmp/websocket.sock), мы можем в любом месте нашего сайта или в кроне соединиться с этим сокетом и отправить сообщение, которое мастер разошлёт всем воркерам, а они, в свою очередь, все клиентам:
$service = stream_socket_client ('unix:///tmp/websocket.sock', $errno, $errstr); fwrite($service, 'всем привет');
С использованием компонента yii это будет выглядеть вот так:
Yii::app()->websocket->send('всем привет');
extensions/websocketВ папку
components кладём Websocket.php, WebsocketMasterHandler.php и WebsocketWorkerHandler.php из папки sample/yii.В папку
commands кладём из WebsocketCommand.php из папки sample/yii.В конфиги main.php и console.php вставляем в секцию components:
'websocket' => array( 'class' => 'Websocket', //'websocket' => 'tcp://127.0.0.1:8000', //'localsocket' => 'tcp://127.0.0.1:8001',// unix:///tmp/mysock //'workers' => 1 ),
В конфиг console.php также вставляем в секцию import:
'ext.websocket.*'
Демонстрация
Демонстрационный чат 2.0 (добавлен список пользователей, добавлено ограничение: 1 сообщение в секунду с одного IP)
В нём были использованы описанные выше функции, а также исправлены недостатки, выявленные после публикации предыдущей статьи.
Демонстрационный чат 1.0 (без списка пользователей, без ограничений)
Все исходники я оформил в виде библиотеки и выложил на github
ссылка на оригинал статьи http://habrahabr.ru/company/ifree/blog/210228/
Добавить комментарий