Не так давно я начал переписывать один интересный проект, своего рода чат, в котором до моего вмешательства использовался комет-сервер на вебсокеты. Рассказывать о том, что такое WebSockets я не стану, т.к я думаю, что это всем уже известно, да и пост не об этом. Поэтому под катом просто немного PhP-кода.
Да, я знаю, что готовых решений полным-полно, однако меня они по тем или иным причинам не устроили и я решил написать свой PhP WebSocket Server. Плюс ко всему, это интересно 🙂 Так как до этого я не сталкивался с сокетами, чтобы понять на деле, что это за блюдо и с чем его едят, пришлось покурить немного мануалов. Так же я разобрал и изучил принцип работы некоторых готовых решений, что позволило мне лучше узнать их. Сам код я выложу в конце статьи, а перед этим хотелось бы сказать пару слов об его использовании.
Сразу скажу, что это первая версия сервера (так сказать 1.0) и ее функционал еще немного сыроват.
Для того, чтобы запустить наш сервер необходимо сделать следущее:
// Инклудим наш сервер require_once "./WebSocketApi.php"; //.. $wsApi = new WebSocketApi(); // Запускаем ! $wsApi->startWServer("127.0.0.7", 8000);
Ничего сложного, не правда ли?
В сервере есть 3 event’a.
- onOpen — подключение клиента к серверу
- onMsg — получение сообщения от клиента
- onClose — закрытие соединения с клиентом
Эти event’ы позволяют нам вызывать пользовательские функции при определенных событиях.
Например:
// Инклудим наш сервер require_once "./WebSocketApi.php"; //.. $wsApi = new WebSocketApi(); // Вызываем наши функции при соединении с клиентом $wsApi->events['onOpen'] = array( "myFunction", "myFunction_2" ); // Запускаем ! $wsApi->startWServer("127.0.0.7", 8000);
Или так:
// Инклудим наш сервер require_once "./WebSocketApi.php"; //.. $wsApi = new WebSocketApi(); // Вызываем наши функции при соединении с клиентом $wsApi->events['onMsg'] = array( "function_1" => array("param_1", "param_2"), "function_2" => array("param_1") ); // Запускаем ! $wsApi->startWServer("127.0.0.7", 8000);
А вот и сам код:
<?php /** * User: byabuzyak * Date: 6/23/14 * Time: 9:12 PM */ class WebSocketApi { const MAX_FRAME_RECV = 100000; const MAX_TIMEOUT = 25; const TIMEOUT_PONG = 5; const TIMEOUT_RECV = 10; const OPCODE_CONTINUATION = 0; const OPCODE_TEXT = 1; const OPCODE_BINARY = 2; const OPCODE_CLOSE = 8; const OPCODE_PING = 9; const OPCODE_PONG = 10; const READY_STATE_CONNECTING = 0; const READY_STATE_OPEN = 1; const READY_STATE_CLOSING = 2; const READY_STATE_CLOSED = 3; const PAYLOAD_LENGTH_16 = 126; const PAYLOAD_LENGTH_63 = 127; const STATUS_PROTOCOL_ERROR = 1002; const STATUS_MESSAGE_TOO_BIG = 1004; const FIN = 128; const MASK = 128; /** * @var array */ public $clients = array(); /** * @var array */ public $ws = array(); /** * @var array */ public $events = array(); /** * @var int */ public $clientsCount = 0; /** * @var null */ public static $_instance = null; /** * @return WebSocketApi|null */ public static function getInstance(){ if(self::$_instance === null){ self::$_instance = new self(); } return self::$_instance; } /** * @param string $host * @param int $port * @return bool */ public function startWServer($host = '127.0.0.1', $port = 8000){ if(isset($this->ws[0])) return false; if(phpversion() >= 5.5){ cli_set_process_title("WebSockets Server 1.0"); } if (!$this->ws[0] = socket_create(AF_INET, SOCK_STREAM, SOL_TCP)) { return false; } if (!socket_set_option($this->ws[0], SOL_SOCKET, SO_REUSEADDR, 1)) { socket_close($this->ws[0]); return false; } if (!socket_bind($this->ws[0], $host, $port)) { socket_close($this->ws[0]); return false; } if (!socket_listen($this->ws[0], 10)) { socket_close($this->ws[0]); return false; } $write = null; $except = null; $nextPingCheck = time() + 1; while(isset($this->ws[0])){ $read = $this->ws; $count = socket_select($read, $write, $except, 1); if($count === false){ socket_close($this->ws[0]); return false; }elseif($count > 0){ foreach($read as $clientId => $socket){ if($clientId != 0){ $buffer = ''; $bytes = @socket_recv($socket, $buffer, 4096, 0); if($bytes === false){ $this->_closeClient($clientId); }elseif($bytes > 0){ if(!$this->_checkClient($clientId, $buffer, $bytes)){ $this->_closeClient($clientId); } }else{ $this->_removeClient($clientId); } }else{ $client = socket_accept($this->ws[0]); if($client !== false){ // TODO: Добавить ограничения по макс. количеству клиентов $ip = ''; $result = socket_getpeername($client, $ip); $ip = ip2long($ip); if($result !== false){ $this->_addClient($client, $ip); }else{ socket_close($client); } } } } } if(time() >= $nextPingCheck){ $nextPingCheck = time() + 1; $this->_checkActiveClients(); } } return true; } /** * */ private function _checkActiveClients(){ $time = time(); foreach($this->clients as $clientId => $socket){ if($socket->state != self::READY_STATE_CLOSED){ if($socket->ready_state !== false){ if($time >= $socket->ready_state + self::TIMEOUT_PONG){ $this->_closeClient($clientId); $this->_removeClient($clientId); } }elseif($time >= $socket->time + self::TIMEOUT_RECV){ if($socket->state != self::READY_STATE_CONNECTING){ $this->clients[$clientId]->ready_state = time(); $this->sendClientMsg($clientId, self::OPCODE_PING, ''); }else{ $this->_removeClient($clientId); } } } } } /** * @param $socket * @param string $clientIp */ private function _addClient($socket, $clientIp = ''){ $this->clientsCount += 1; $clientId = $this->_nextId(); $this->clients[$clientId] = (object) array( 'socket' => $socket, 'msg_buffer' => '', 'state' => self::READY_STATE_CONNECTING, 'time' => time(), 'ready_state' => false, 'close_status' => 0, 'client_ip' => $clientIp, 'header_length' => false, 'buffer_length' => 0, 'buffer' => '', 'msg_opcode' => 0, 'msg_data_len' => 0 ); $this->ws[$clientId] = $socket; } /** * @return int */ private function _nextId(){ $i = 1; while(isset($this->ws[$i])) $i++; return $i; } /** * @param $clientId */ private function _removeClient($clientId){ if(array_key_exists('onClose', $this->events)){ foreach($this->events['onClose'] as $function => $args){ if(is_array($args)){ call_user_func_array($function, $args); }else{ call_user_func($function, $args); } } } socket_close($this->clients[$clientId]->socket); unset($this->ws[$clientId], $this->clients[$clientId]); $this->clientsCount -= 1; } /** * @param $clientId * @return bool */ private function _closeClient($clientId){ if($this->clients[$clientId]->state == self::READY_STATE_CLOSING || $this->clients[$clientId]->state == self::READY_STATE_CLOSED){ return true; } $this->clients[$clientId]->close_status = self::STATUS_PROTOCOL_ERROR; $this->sendClientMsg($clientId, self::OPCODE_CLOSE, pack('n', self::STATUS_PROTOCOL_ERROR)); $this->clients[$clientId]->state = self::READY_STATE_CLOSING; } /** * @param $clientId * @param $opCode * @param $msg * @return bool */ public function sendClientMsg($clientId, $opCode, $msg){ if($this->clients[$clientId]->state == self::READY_STATE_CLOSING || $this->clients[$clientId]->state == self::READY_STATE_CLOSED){ return true; } $msgLength = strlen($msg); $buffSize = 4096; $frameCount = ceil($msgLength / $buffSize); if($frameCount == 0){ $frameCount = 1; } $maxFrame = $frameCount - 1; $lastFrameBuffLength = ($msgLength % $buffSize) != 0 ? $msgLength % $buffSize : ($msgLength != 0 ? $buffSize : 0); for($i=0; $i<$frameCount; $i++){ $final = $i != $maxFrame ? 0 : self::FIN; $opCode = $i != 0 ? self::OPCODE_CONTINUATION : $opCode; $buffLength = $i != $maxFrame ? $buffSize : $lastFrameBuffLength; if($buffLength <= 125){ $payloadLength = $buffLength; $payloadLengthExt = ''; $payloadLengthExtL = 0; }elseif($buffLength <= 65535){ $payloadLength = self::PAYLOAD_LENGTH_16; $payloadLengthExt = pack('n', $buffLength); $payloadLengthExtL = 2; }else{ $payloadLength = self::PAYLOAD_LENGTH_63; $payloadLengthExt = pack('xxxxN', $buffLength); $payloadLengthExtL = 8; } $buffer = pack('n', (($final | $opCode) << 8) | $payloadLength) . $payloadLengthExt . substr($msg, $i * $buffSize, $buffLength); $socket = $this->clients[$clientId]->socket; $left = 2 + $payloadLengthExtL + $buffLength; do{ $sent = @socket_send($socket, $buffer, $left, 0); if($sent === false){ return false; } $left -= $sent; if($sent > 0){ $buffer = substr($buffer, $sent); } } while($left > 0); } return true; } /** * @param $clientId * @param $buffer * @param $bLength * @return bool */ private function _checkClient($clientId, &$buffer, $bLength){ if($this->clients[$clientId]->state == self::READY_STATE_OPEN){ $result = $this->_buildClientFrame($clientId, $buffer, $bLength); }elseif($this->clients[$clientId]->state == self::READY_STATE_CONNECTING){ $result = $this->_makeHandShake($clientId, $buffer); if($result){ $this->clients[$clientId]->state = self::READY_STATE_OPEN; if(array_key_exists('onOpen', $this->events)){ foreach($this->events['onOpen'] as $function => $args){ if(is_array($args)){ call_user_func_array($function, $args); }else{ call_user_func($function, $args); } } } } }else{ $result = false; } return $result; } /** * @param $clientId * @param $buffer * @param $bufferLength * @return bool */ private function _buildClientFrame($clientId, &$buffer, $bufferLength){ $this->clients[$clientId]->buffer_length += $bufferLength; $this->clients[$clientId]->buffer .= $buffer; if($this->clients[$clientId]->header_length !== false || $this->_checkSizeClientFrame($clientId) == true){ $headerLength = ($this->clients[$clientId]->header_length <= 125 ? 0 : ($this->clients[$clientId]->header_length <= 65535 ? 2 : 8)) + 6; $frameLength = $this->clients[$clientId]->header_length + $headerLength; if($this->clients[$clientId]->buffer_length >= $frameLength){ $nextFrameLength = $this->clients[$clientId]->buffer_length - $frameLength; if($nextFrameLength > 0){ $this->clients[$clientId]->buffer_length -= $nextFrameLength; $nextFrameBytes = substr($this->clients[$clientId]->buffer, $frameLength); $this->clients[$clientId]->buffer = substr($this->clients[$clientId]->buffer, 0, $frameLength); } $result = $this->_processClientFrame($clientId); if(isset($this->clients[$clientId])){ $this->clients[$clientId]->header_length = false; $this->clients[$clientId]->buffer_length = 0; $this->clients[$clientId]->buffer = ''; } if($nextFrameLength <= 0 || !$result){ return $result; } return $this->_buildClientFrame($clientId, $nextFrameBytes, $nextFrameLength); } } return true; } /** * @param $clientId * @return bool */ private function _processClientFrame($clientId){ $this->clients[$clientId]->time = time(); $buffer = &$this->clients[$clientId]->buffer; if(substr($buffer, 5, 1) === false) return false; $a1 = ord(substr($buffer, 0, 1)); $a2 = ord(substr($buffer, 1, 1)); $f = $a1 & self::FIN; $opCode = $a1 & 15; $mask = $a2 & self::MASK; if(!$mask) return false; $seek = $this->clients[$clientId]->header_length <= 125 ? 2 : ($this->clients[$clientId]->header_length <= 65535 ? 4 : 10); $maskKey = substr($buffer, $seek, 4); $array = unpack('Na', $maskKey); $maskKey = $array['a']; $maskKey = array( $maskKey >> 24, ($maskKey >> 16) & 255, ($maskKey >> 8) & 255, $maskKey & 255 ); $seek += 4; if (substr($buffer, $seek, 1) !== false) { $data = str_split(substr($buffer, $seek)); foreach($data as $key => $byte){ $data[$key] = chr(ord($byte) ^ ($maskKey[$key % 4])); } $data = implode('', $data); }else{ $data = ''; } if ($opCode != self::OPCODE_CONTINUATION && $this->clients[$clientId]->msg_data_len > 0) { $this->clients[$clientId]->msg_data_len = 0; $this->clients[$clientId]->msg_buffer = ''; } if($f == self::FIN){ if($opCode != self::OPCODE_CONTINUATION){ return $this->_processClientMsg($clientId, $opCode, $data, $this->clients[$clientId]->header_length); }else{ $this->clients[$clientId]->msg_data_len += $this->clients[$clientId]->header_length; $this->clients[$clientId]->msg_buffer .= $data; $result = $this->_processClientMsg($clientId, $this->clients[$clientId]->msg_opcode, $this->clients[$clientId]->msg_buffer, $this->clients[$clientId]->msg_data_len); if(isset($this->clients[$clientId])){ $this->clients[$clientId]->msg_buffer = ''; $this->clients[$clientId]->msg_opcode = 0; $this->clients[$clientId]->msg_data_len = 0; } return $result; } }else{ if($opCode & 8) return false; $this->clients[$clientId]->msg_data_len += $this->clients[$clientId]->header_length; $this->clients[$clientId]->msg_buffer .= $data; if($opCode != self::OPCODE_CONTINUATION){ $this->clients[$clientId]->msg_opcode = $opCode; } } return true; } /** * @param $clientId * @param $opCode * @param $data * @param $dataLength * @return bool */ private function _processClientMsg($clientId, $opCode, &$data, $dataLength){ if($opCode == self::OPCODE_PING){ $this->sendClientMsg($clientId, self::OPCODE_PONG, $data); }elseif($opCode == self::OPCODE_PONG){ if($this->clients[$clientId]->ready_state !== false){ $this->clients[$clientId]->ready_state = false; } }elseif($opCode == self::OPCODE_CLOSE){ if($this->clients[$clientId]->state == self::READY_STATE_CLOSING){ $this->clients[$clientId]->state = self::READY_STATE_CLOSED; }else{ // TODO: добавить типы закрытия $this->_closeClient($clientId); } $this->_removeClient($clientId); }elseif($opCode == self::OPCODE_TEXT || $opCode == self::OPCODE_BINARY){ if(array_key_exists('onMsg', $this->events)){ foreach($this->events['onMsg'] as $function => $args){ if(is_array($args)){ call_user_func_array($function, $args); }else{ call_user_func($function, $args); } } } }else{ return false; } return true; } /** * @param $clientId * @return bool */ private function _checkSizeClientFrame($clientId){ if($this->clients[$clientId]->buffer_length > 1){ $payloadLength = ord(substr($this->clients[$clientId]->buffer, 1, 1)) & 127; if ($payloadLength <= 125) { $this->clients[$clientId]->header_length = $payloadLength; }elseif($payloadLength == 126){ if (substr($this->clients[$clientId]->buffer, 3, 1) !== false) { $payloadLengthExtended = substr($this->clients[$clientId]->buffer, 2, 2); $array = unpack('na', $payloadLengthExtended); $this->clients[$clientId]->header_length = $array['a']; } }else{ if (substr($this->clients[$clientId]->buffer, 9, 1) !== false) { $payloadLengthExtended = substr($this->clients[$clientId]->buffer, 2, 8); $payloadLengthExtended32_1 = substr($payloadLengthExtended, 0, 4); $array = unpack('Na', $payloadLengthExtended32_1); if ($array['a'] != 0 || ord(substr($payloadLengthExtended, 4, 1)) & 128) { $this->_closeClient($clientId, self::STATUS_MESSAGE_TOO_BIG); return false; } $payloadLengthExtended32_2 = substr($payloadLengthExtended, 4, 4); $array = unpack('Na', $payloadLengthExtended32_2); if ($array['a'] > 2147479538) { $this->_closeClient($clientId, self::STATUS_MESSAGE_TOO_BIG); return false; } $this->clients[$clientId]->header_length = $array['a']; } } if ($this->clients[$clientId]->header_length !== false) { if ($this->clients[$clientId]->header_length > self::MAX_FRAME_RECV) { $this->clients[$clientId]->header_length = false; $this->_closeClient($clientId, self::STATUS_MESSAGE_TOO_BIG); return false; } $controlFrame = (ord(substr($this->clients[$clientId]->buffer, 0, 1)) & 8) == 8; if (!$controlFrame) { $newMessagePayloadLength = $this->clients[$clientId]->msg_data_len + $this->clients[$clientId]->header_length; if ($newMessagePayloadLength > self::MAX_FRAME_RECV || $newMessagePayloadLength > 2147483647) { $this->_closeClient($clientId, self::STATUS_MESSAGE_TOO_BIG); return false; } } return true; } } return false; } /** * @param $clientId * @param $buffer * @return bool */ private function _makeHandShake($clientId, $buffer){ $sep = strpos($buffer, "\r\n\r\n"); if (!$sep) return false; $headers = explode("\r\n", substr($buffer, 0, $sep)); $headersCount = sizeof($headers); if ($headersCount < 1) return false; $request = &$headers[0]; $requestParts = explode(' ', $request); $requestPartsSize = sizeof($requestParts); if ($requestPartsSize < 3) return false; if (strtoupper($requestParts[0]) != 'GET') return false; $httpPart = &$requestParts[$requestPartsSize - 1]; $httpParts = explode('/', $httpPart); if (!isset($httpParts[1]) || (float) $httpParts[1] < 1.1) return false; $headersKeyed = array(); for ($i=1; $i<$headersCount; $i++) { $parts = explode(':', $headers[$i]); if (!isset($parts[1])) return false; $headersKeyed[trim($parts[0])] = trim($parts[1]); } if (!isset($headersKeyed['Host'])) return false; if (!isset($headersKeyed['Sec-WebSocket-Key'])) return false; $key = $headersKeyed['Sec-WebSocket-Key']; if (strlen(base64_decode($key)) != 16) return false; if (!isset($headersKeyed['Sec-WebSocket-Version']) || (int) $headersKeyed['Sec-WebSocket-Version'] < 7) return false; $hash = base64_encode(sha1($key.'258EAFA5-E914-47DA-95CA-C5AB0DC85B11', true)); $headers = array( 'HTTP/1.1 101 Switching Protocols', 'Upgrade: websocket', 'Connection: Upgrade', 'Sec-WebSocket-Accept: '.$hash ); $headers = implode("\r\n", $headers)."\r\n\r\n"; $socket = $this->clients[$clientId]->socket; $left = strlen($headers); do{ $sent = @socket_send($socket, $headers, $left, 0); if($sent === false) return false; $left -= $sent; if($sent > 0) $headers = substr($headers, $sent); } while($left > 0); return true; } }
Чуть позже создам репозиторий на гитхабе и выложу туда. В дальнейшем планирую поддерживать и развивать.
Хотелось бы услышать от Вас уважаемых советы, пожелания или замечания.
Спасибо за внимание!
P.S в планах реализация интересных плюшек. Просто пока что не хватает времени на них.
ссылка на оригинал статьи http://habrahabr.ru/post/227815/
Добавить комментарий