PhP-WebSocket Server

от автора

Всем привет!

Не так давно я начал переписывать один интересный проект, своего рода чат, в котором до моего вмешательства использовался комет-сервер на вебсокеты. Рассказывать о том, что такое WebSockets я не стану, т.к я думаю, что это всем уже известно, да и пост не об этом. Поэтому под катом просто немного PhP-кода.

Да, я знаю, что готовых решений полным-полно, однако меня они по тем или иным причинам не устроили и я решил написать свой PhP WebSocket Server. Плюс ко всему, это интересно 🙂 Так как до этого я не сталкивался с сокетами, чтобы понять на деле, что это за блюдо и с чем его едят, пришлось покурить немного мануалов. Так же я разобрал и изучил принцип работы некоторых готовых решений, что позволило мне лучше узнать их. Сам код я выложу в конце статьи, а перед этим хотелось бы сказать пару слов об его использовании.

Сразу скажу, что это первая версия сервера (так сказать 1.0) и ее функционал еще немного сыроват.

Для того, чтобы запустить наш сервер необходимо сделать следущее:

// Инклудим наш сервер require_once "./WebSocketApi.php"; //.. $wsApi = new WebSocketApi(); // Запускаем ! $wsApi->startWServer("127.0.0.7", 8000); 

Ничего сложного, не правда ли?
В сервере есть 3 event’a.

  • onOpen — подключение клиента к серверу
  • onMsg — получение сообщения от клиента
  • onClose — закрытие соединения с клиентом

Эти event’ы позволяют нам вызывать пользовательские функции при определенных событиях.

Например:

// Инклудим наш сервер require_once "./WebSocketApi.php"; //.. $wsApi = new WebSocketApi(); // Вызываем наши функции при соединении с клиентом $wsApi->events['onOpen'] = array(     "myFunction", "myFunction_2" ); // Запускаем ! $wsApi->startWServer("127.0.0.7", 8000); 

Или так:

// Инклудим наш сервер require_once "./WebSocketApi.php"; //.. $wsApi = new WebSocketApi(); // Вызываем наши функции при соединении с клиентом $wsApi->events['onMsg'] = array(     "function_1" => array("param_1", "param_2"),     "function_2" => array("param_1") ); // Запускаем ! $wsApi->startWServer("127.0.0.7", 8000); 

А вот и сам код:

Тынц

<?php /**  * User: byabuzyak  * Date: 6/23/14  * Time: 9:12 PM  */ class WebSocketApi {      const MAX_FRAME_RECV            = 100000;     const MAX_TIMEOUT               = 25;      const TIMEOUT_PONG              = 5;     const TIMEOUT_RECV              = 10;      const OPCODE_CONTINUATION       = 0;     const OPCODE_TEXT               = 1;     const OPCODE_BINARY             = 2;     const OPCODE_CLOSE              = 8;     const OPCODE_PING               = 9;     const OPCODE_PONG               = 10;      const READY_STATE_CONNECTING    = 0;     const READY_STATE_OPEN          = 1;     const READY_STATE_CLOSING       = 2;     const READY_STATE_CLOSED        = 3;      const PAYLOAD_LENGTH_16         = 126;     const PAYLOAD_LENGTH_63         = 127;      const STATUS_PROTOCOL_ERROR     = 1002;     const STATUS_MESSAGE_TOO_BIG    = 1004;      const FIN                       = 128;     const MASK                      = 128;       /**      * @var array      */     public $clients                 = array();     /**      * @var array      */     public $ws                      = array();      /**      * @var array      */     public $events                  = array();     /**      * @var int      */     public $clientsCount            = 0;      /**      * @var null      */     public static $_instance        = null;      /**      * @return WebSocketApi|null      */     public static function getInstance(){         if(self::$_instance === null){             self::$_instance = new self();         }          return self::$_instance;     }      /**      * @param string $host      * @param int $port      * @return bool      */     public function startWServer($host = '127.0.0.1', $port = 8000){         if(isset($this->ws[0]))             return false;          if(phpversion() >= 5.5){             cli_set_process_title("WebSockets Server 1.0");         }          if (!$this->ws[0] = socket_create(AF_INET, SOCK_STREAM, SOL_TCP)) {             return false;         }         if (!socket_set_option($this->ws[0], SOL_SOCKET, SO_REUSEADDR, 1)) {             socket_close($this->ws[0]);             return false;         }         if (!socket_bind($this->ws[0], $host, $port)) {             socket_close($this->ws[0]);             return false;         }         if (!socket_listen($this->ws[0], 10)) {             socket_close($this->ws[0]);             return false;         }          $write  = null;         $except = null;          $nextPingCheck = time() + 1;         while(isset($this->ws[0])){             $read   = $this->ws;             $count  = socket_select($read, $write, $except, 1);             if($count === false){                 socket_close($this->ws[0]);                 return false;             }elseif($count > 0){                 foreach($read as $clientId => $socket){                     if($clientId != 0){                         $buffer = '';                         $bytes  = @socket_recv($socket, $buffer, 4096, 0);                          if($bytes === false){                             $this->_closeClient($clientId);                         }elseif($bytes > 0){                             if(!$this->_checkClient($clientId, $buffer, $bytes)){                                 $this->_closeClient($clientId);                             }                         }else{                             $this->_removeClient($clientId);                         }                     }else{                         $client = socket_accept($this->ws[0]);                         if($client !== false){ //                            TODO: Добавить ограничения по макс. количеству клиентов                             $ip = '';                             $result = socket_getpeername($client, $ip);                             $ip     = ip2long($ip);                             if($result !== false){                                 $this->_addClient($client, $ip);                             }else{                                 socket_close($client);                             }                         }                     }                 }             }             if(time() >= $nextPingCheck){                 $nextPingCheck = time() + 1;                 $this->_checkActiveClients();             }         }         return true;     }      /**      *      */     private function _checkActiveClients(){         $time = time();         foreach($this->clients as $clientId => $socket){             if($socket->state != self::READY_STATE_CLOSED){                 if($socket->ready_state !== false){                     if($time >= $socket->ready_state + self::TIMEOUT_PONG){                         $this->_closeClient($clientId);                         $this->_removeClient($clientId);                     }                 }elseif($time >= $socket->time + self::TIMEOUT_RECV){                     if($socket->state != self::READY_STATE_CONNECTING){                         $this->clients[$clientId]->ready_state = time();                         $this->sendClientMsg($clientId, self::OPCODE_PING, '');                     }else{                         $this->_removeClient($clientId);                     }                 }             }         }     }      /**      * @param $socket      * @param string $clientIp      */     private function _addClient($socket, $clientIp = ''){         $this->clientsCount += 1;         $clientId = $this->_nextId();          $this->clients[$clientId]   = (object) array(             'socket'        => $socket,             'msg_buffer'    => '',             'state'         => self::READY_STATE_CONNECTING,             'time'          => time(),             'ready_state'   => false,             'close_status'  => 0,             'client_ip'     => $clientIp,             'header_length' => false,             'buffer_length' => 0,             'buffer'        => '',             'msg_opcode'    => 0,             'msg_data_len'  => 0         );         $this->ws[$clientId]        = $socket;     }      /**      * @return int      */     private function _nextId(){         $i = 1;         while(isset($this->ws[$i]))             $i++;         return $i;     }      /**      * @param $clientId      */     private function _removeClient($clientId){         if(array_key_exists('onClose', $this->events)){             foreach($this->events['onClose'] as $function => $args){                 if(is_array($args)){                     call_user_func_array($function, $args);                 }else{                     call_user_func($function, $args);                 }             }         }          socket_close($this->clients[$clientId]->socket);         unset($this->ws[$clientId], $this->clients[$clientId]);         $this->clientsCount -= 1;     }      /**      * @param $clientId      * @return bool      */     private function _closeClient($clientId){         if($this->clients[$clientId]->state == self::READY_STATE_CLOSING || $this->clients[$clientId]->state == self::READY_STATE_CLOSED){             return true;         }          $this->clients[$clientId]->close_status = self::STATUS_PROTOCOL_ERROR;         $this->sendClientMsg($clientId, self::OPCODE_CLOSE, pack('n', self::STATUS_PROTOCOL_ERROR));         $this->clients[$clientId]->state = self::READY_STATE_CLOSING;     }      /**      * @param $clientId      * @param $opCode      * @param $msg      * @return bool      */     public function sendClientMsg($clientId, $opCode, $msg){         if($this->clients[$clientId]->state == self::READY_STATE_CLOSING || $this->clients[$clientId]->state == self::READY_STATE_CLOSED){             return true;         }          $msgLength  = strlen($msg);         $buffSize   = 4096;          $frameCount = ceil($msgLength / $buffSize);         if($frameCount == 0){             $frameCount = 1;         }          $maxFrame = $frameCount - 1;         $lastFrameBuffLength = ($msgLength % $buffSize) != 0 ? $msgLength % $buffSize : ($msgLength != 0 ? $buffSize : 0);          for($i=0; $i<$frameCount; $i++){             $final      = $i != $maxFrame ? 0 : self::FIN;             $opCode     = $i != 0 ? self::OPCODE_CONTINUATION : $opCode;             $buffLength = $i != $maxFrame ? $buffSize : $lastFrameBuffLength;              if($buffLength <= 125){                 $payloadLength      =   $buffLength;                 $payloadLengthExt   =   '';                 $payloadLengthExtL  =   0;             }elseif($buffLength <= 65535){                 $payloadLength      =   self::PAYLOAD_LENGTH_16;                 $payloadLengthExt   =   pack('n', $buffLength);                 $payloadLengthExtL  =   2;             }else{                 $payloadLength      =   self::PAYLOAD_LENGTH_63;                 $payloadLengthExt   =   pack('xxxxN', $buffLength);                 $payloadLengthExtL  =   8;             }              $buffer = pack('n', (($final | $opCode) << 8) | $payloadLength) . $payloadLengthExt . substr($msg, $i * $buffSize, $buffLength);             $socket = $this->clients[$clientId]->socket;             $left   = 2 + $payloadLengthExtL + $buffLength;              do{                 $sent = @socket_send($socket, $buffer, $left, 0);                 if($sent === false){                     return false;                 }                 $left -= $sent;                 if($sent > 0){                     $buffer = substr($buffer, $sent);                 }             }             while($left > 0);         }          return true;     }      /**      * @param $clientId      * @param $buffer      * @param $bLength      * @return bool      */     private function _checkClient($clientId, &$buffer, $bLength){         if($this->clients[$clientId]->state == self::READY_STATE_OPEN){             $result = $this->_buildClientFrame($clientId, $buffer, $bLength);         }elseif($this->clients[$clientId]->state == self::READY_STATE_CONNECTING){             $result = $this->_makeHandShake($clientId, $buffer);             if($result){                 $this->clients[$clientId]->state = self::READY_STATE_OPEN;                 if(array_key_exists('onOpen', $this->events)){                     foreach($this->events['onOpen'] as $function => $args){                         if(is_array($args)){                             call_user_func_array($function, $args);                         }else{                             call_user_func($function, $args);                         }                     }                 }             }         }else{             $result = false;         }          return $result;     }      /**      * @param $clientId      * @param $buffer      * @param $bufferLength      * @return bool      */     private function _buildClientFrame($clientId, &$buffer, $bufferLength){         $this->clients[$clientId]->buffer_length += $bufferLength;         $this->clients[$clientId]->buffer .= $buffer;          if($this->clients[$clientId]->header_length !== false || $this->_checkSizeClientFrame($clientId) == true){             $headerLength   = ($this->clients[$clientId]->header_length <= 125 ? 0 : ($this->clients[$clientId]->header_length <= 65535 ? 2 : 8)) + 6;             $frameLength    = $this->clients[$clientId]->header_length + $headerLength;             if($this->clients[$clientId]->buffer_length >= $frameLength){                 $nextFrameLength = $this->clients[$clientId]->buffer_length - $frameLength;                 if($nextFrameLength > 0){                     $this->clients[$clientId]->buffer_length -= $nextFrameLength;                     $nextFrameBytes         = substr($this->clients[$clientId]->buffer, $frameLength);                     $this->clients[$clientId]->buffer = substr($this->clients[$clientId]->buffer, 0, $frameLength);                 }                  $result = $this->_processClientFrame($clientId);                  if(isset($this->clients[$clientId])){                     $this->clients[$clientId]->header_length = false;                     $this->clients[$clientId]->buffer_length = 0;                     $this->clients[$clientId]->buffer = '';                 }                  if($nextFrameLength <= 0 || !$result){                     return $result;                 }                  return $this->_buildClientFrame($clientId, $nextFrameBytes, $nextFrameLength);             }         }          return true;     }      /**      * @param $clientId      * @return bool      */     private function _processClientFrame($clientId){         $this->clients[$clientId]->time = time();          $buffer = &$this->clients[$clientId]->buffer;         if(substr($buffer, 5, 1) === false)             return false;          $a1 = ord(substr($buffer, 0, 1));         $a2 = ord(substr($buffer, 1, 1));          $f      = $a1 & self::FIN;         $opCode = $a1 & 15;         $mask   = $a2 & self::MASK;          if(!$mask)             return false;          $seek       = $this->clients[$clientId]->header_length <= 125 ? 2 : ($this->clients[$clientId]->header_length <= 65535 ? 4 : 10);         $maskKey    = substr($buffer, $seek, 4);          $array = unpack('Na', $maskKey);         $maskKey = $array['a'];         $maskKey = array(             $maskKey >> 24,             ($maskKey >> 16) & 255,             ($maskKey >> 8) & 255,             $maskKey & 255         );         $seek += 4;          if (substr($buffer, $seek, 1) !== false) {             $data = str_split(substr($buffer, $seek));             foreach($data as $key => $byte){                 $data[$key] = chr(ord($byte) ^ ($maskKey[$key % 4]));             }             $data = implode('', $data);         }else{             $data = '';         }          if ($opCode != self::OPCODE_CONTINUATION && $this->clients[$clientId]->msg_data_len > 0) {             $this->clients[$clientId]->msg_data_len    = 0;             $this->clients[$clientId]->msg_buffer     = '';         }          if($f == self::FIN){             if($opCode != self::OPCODE_CONTINUATION){                 return                     $this->_processClientMsg($clientId, $opCode, $data, $this->clients[$clientId]->header_length);             }else{                 $this->clients[$clientId]->msg_data_len += $this->clients[$clientId]->header_length;                 $this->clients[$clientId]->msg_buffer  .= $data;                  $result = $this->_processClientMsg($clientId, $this->clients[$clientId]->msg_opcode, $this->clients[$clientId]->msg_buffer, $this->clients[$clientId]->msg_data_len);                  if(isset($this->clients[$clientId])){                     $this->clients[$clientId]->msg_buffer     = '';                     $this->clients[$clientId]->msg_opcode    = 0;                     $this->clients[$clientId]->msg_data_len    = 0;                 }                  return $result;             }         }else{             if($opCode & 8)                 return false;             $this->clients[$clientId]->msg_data_len += $this->clients[$clientId]->header_length;             $this->clients[$clientId]->msg_buffer  .= $data;              if($opCode != self::OPCODE_CONTINUATION){                 $this->clients[$clientId]->msg_opcode = $opCode;             }         }         return true;     }      /**      * @param $clientId      * @param $opCode      * @param $data      * @param $dataLength      * @return bool      */     private function _processClientMsg($clientId, $opCode, &$data, $dataLength){         if($opCode == self::OPCODE_PING){             $this->sendClientMsg($clientId, self::OPCODE_PONG, $data);         }elseif($opCode == self::OPCODE_PONG){             if($this->clients[$clientId]->ready_state !== false){                 $this->clients[$clientId]->ready_state = false;             }         }elseif($opCode == self::OPCODE_CLOSE){             if($this->clients[$clientId]->state == self::READY_STATE_CLOSING){                 $this->clients[$clientId]->state = self::READY_STATE_CLOSED;             }else{ //                TODO: добавить типы закрытия                 $this->_closeClient($clientId);             }             $this->_removeClient($clientId);         }elseif($opCode == self::OPCODE_TEXT || $opCode == self::OPCODE_BINARY){             if(array_key_exists('onMsg', $this->events)){                 foreach($this->events['onMsg'] as $function => $args){                     if(is_array($args)){                         call_user_func_array($function, $args);                     }else{                         call_user_func($function, $args);                     }                 }             }         }else{             return false;         }          return true;     }      /**      * @param $clientId      * @return bool      */     private function _checkSizeClientFrame($clientId){         if($this->clients[$clientId]->buffer_length > 1){             $payloadLength = ord(substr($this->clients[$clientId]->buffer, 1, 1)) & 127;             if ($payloadLength <= 125) {                 $this->clients[$clientId]->header_length = $payloadLength;             }elseif($payloadLength == 126){                 if (substr($this->clients[$clientId]->buffer, 3, 1) !== false) {                     $payloadLengthExtended  = substr($this->clients[$clientId]->buffer, 2, 2);                     $array                  = unpack('na', $payloadLengthExtended);                     $this->clients[$clientId]->header_length = $array['a'];                 }             }else{                 if (substr($this->clients[$clientId]->buffer, 9, 1) !== false) {                     $payloadLengthExtended      = substr($this->clients[$clientId]->buffer, 2, 8);                     $payloadLengthExtended32_1  = substr($payloadLengthExtended, 0, 4);                     $array                      = unpack('Na', $payloadLengthExtended32_1);                      if ($array['a'] != 0 || ord(substr($payloadLengthExtended, 4, 1)) & 128) {                         $this->_closeClient($clientId, self::STATUS_MESSAGE_TOO_BIG);                         return false;                     }                      $payloadLengthExtended32_2  = substr($payloadLengthExtended, 4, 4);                     $array                      = unpack('Na', $payloadLengthExtended32_2);                      if ($array['a'] > 2147479538) {                         $this->_closeClient($clientId, self::STATUS_MESSAGE_TOO_BIG);                         return false;                     }                      $this->clients[$clientId]->header_length = $array['a'];                 }             }              if ($this->clients[$clientId]->header_length !== false) {                 if ($this->clients[$clientId]->header_length > self::MAX_FRAME_RECV) {                     $this->clients[$clientId]->header_length = false;                     $this->_closeClient($clientId, self::STATUS_MESSAGE_TOO_BIG);                     return false;                 }                  $controlFrame = (ord(substr($this->clients[$clientId]->buffer, 0, 1)) & 8) == 8;                 if (!$controlFrame) {                     $newMessagePayloadLength = $this->clients[$clientId]->msg_data_len + $this->clients[$clientId]->header_length;                     if ($newMessagePayloadLength > self::MAX_FRAME_RECV || $newMessagePayloadLength > 2147483647) {                         $this->_closeClient($clientId, self::STATUS_MESSAGE_TOO_BIG);                         return false;                     }                 }                  return true;             }         }         return false;     }      /**      * @param $clientId      * @param $buffer      * @return bool      */     private function _makeHandShake($clientId, $buffer){         $sep = strpos($buffer, "\r\n\r\n");         if (!$sep) return false;          $headers = explode("\r\n", substr($buffer, 0, $sep));         $headersCount = sizeof($headers);         if ($headersCount < 1)             return false;           $request = &$headers[0];         $requestParts = explode(' ', $request);         $requestPartsSize = sizeof($requestParts);         if ($requestPartsSize < 3)             return false;          if (strtoupper($requestParts[0]) != 'GET')             return false;          $httpPart = &$requestParts[$requestPartsSize - 1];         $httpParts = explode('/', $httpPart);         if (!isset($httpParts[1]) || (float) $httpParts[1] < 1.1)             return false;          $headersKeyed = array();         for ($i=1; $i<$headersCount; $i++) {             $parts = explode(':', $headers[$i]);             if (!isset($parts[1]))                 return false;              $headersKeyed[trim($parts[0])] = trim($parts[1]);         }          if (!isset($headersKeyed['Host']))             return false;          if (!isset($headersKeyed['Sec-WebSocket-Key']))             return false;          $key = $headersKeyed['Sec-WebSocket-Key'];         if (strlen(base64_decode($key)) != 16)             return false;          if (!isset($headersKeyed['Sec-WebSocket-Version']) || (int) $headersKeyed['Sec-WebSocket-Version'] < 7)             return false;          $hash = base64_encode(sha1($key.'258EAFA5-E914-47DA-95CA-C5AB0DC85B11', true));          $headers = array(             'HTTP/1.1 101 Switching Protocols',             'Upgrade: websocket',             'Connection: Upgrade',             'Sec-WebSocket-Accept: '.$hash         );         $headers    = implode("\r\n", $headers)."\r\n\r\n";         $socket     = $this->clients[$clientId]->socket;          $left = strlen($headers);         do{             $sent = @socket_send($socket, $headers, $left, 0);             if($sent === false)                 return false;              $left -= $sent;             if($sent > 0)                 $headers = substr($headers, $sent);         }         while($left > 0);          return true;     } }  

Чуть позже создам репозиторий на гитхабе и выложу туда. В дальнейшем планирую поддерживать и развивать.
Хотелось бы услышать от Вас уважаемых советы, пожелания или замечания.

Спасибо за внимание!

P.S в планах реализация интересных плюшек. Просто пока что не хватает времени на них.

ссылка на оригинал статьи http://habrahabr.ru/post/227815/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *