PubSub в браузере с помощью вебсокетов и протокола WAMP

от автора

Изучая методы реализации real-time обновления данных в браузере, я обнаружил "WAMP" — протокол прикладного уровня для обмена сообщениями, основанный на вебсокетах.
Протокол реализует два распространенных высокоуровневых шаблона для обмена данными: PubSub и RPC (Remote Procedure Call).

Эти шаблоны многим известны и широко применяются в различных областях программирования и межпроцессного взаимодействия:

  • RPC — удаленный вызов процедур. В процессе принимают участие клиент и сервер. Первый отправляет запросы на вызов процедуры на сервере, а второй их выполняет и отправляет результат клиенту. В типичном веб-приложении это может быть, например, запрос на создание комментария или на добавление поста в избранное
  • Publish/Subscribe (PubSub) — метод обмена сообщениями, в котором клиенты «подписываются» на интересующие их события и могут сами генерировать подобные события. Рассылкой информации подписчикам занимается третья сторона — «брокер». В WAMP шаблон PubSub реализован на основе «топиков», или каналов. Например, на сайте такими каналами могут быть «комментарии», «новости», «личные сообщения».

В контексте веб-разработки наиболее интересным вариантом применения протокола WAMP является использование шаблона PubSub. С его помощью можно легко решить задачу обновления информации на открытой у пользователя странице сайта: например, чтобы отобразить только что добавленный комментарий или показать уведомление о получении нового сообщения.
Реализация WAMP существует в виде библиотек под множество языков и платформ, включая, конечно, javascript в виде проекта autobahn.

В качестве примера использования протокола попробуем разработать абстрактное веб-приложение, в котором браузер будет подписываться на канал с новыми комментариями, а сервер — их рассылать. На сервере будет работать PHP с замечательной библиотекой Ratchet, которая помимо реализации собственно вебсокетов умеет работать с протоколом WAMP.

Планируя методы взаимодействия клиента и сервера на таком сайте следует помнить, что еще существуют браузеры, не поддерживающие вебсокеты. И хотя часть проблем с ними могут решить полифиллы, 100% работы в любой среде (например, на андроиде) с их помощью добиться не удастся. Поэтому разумно, на мой взгляд, ограничить использование шаблона PubSub на клиенте лишь подпиской на события. Генерироваться же события будут сервером, получающим «олдскульные» ajax-запросы на создание нового комментария, от имени его автора. Таким образом все клиенты смогут добавлять комментарии (или, в общем случае, генерировать события), а вот получать обновления в реальном времени — только те, кто поддерживает вебсокеты.

Клиентская часть сайта.

Библиотека autobahn экспортирует в глобальную область видимости объект ab, полный список методов которого можно прочитать в документации. Нас же интересует метод connect:

ab.connect(     //куда подключаемся     'ws://site.com:8080',     //коллбэк будет вызван после успешного подключения.     //внутрь будет передан объект session,     //содержащий информацию о соединении и методы для взаимодействия с сервером     function (session) {         //подпишемся на новые комментарии. вторым параметром передаем функцию-обраточик события,         //которая будет вызвана после получения комментария.         session.subscribe('comments', onNewComment);     },     //коллбэк будет вызван после потери соединения.     //библиотека сама попытается переподключиться, если указаны соответствующие опции,     //поэтому в обработчике события реализовывать эту логику не нужно.     function onClose() {         alert('Пропало соединение с сервером');     },     {         //опции для переподключения к серверу         'maxRetries': 100,         'retryDelay': 5000     } );  //обработчик новых сообщений на канале comments function onNewComment(topic, data) {     //topic - название канала, с которого пришло сообщение     //в data находятся данные, переданные сервером.     //в случае с комментариями это могут быть content и author.     console.log('новый комментарий', data.author, data.content); } 

Для простоты в качестве названия канала была выбрана строка ‘comments’, однако согласно спецификации протокола такое именование не является правильным. Каналы должны быть представлены в формате URI, то есть в нашем случае канал может называться http://site.com/comments. В свою очередь, URI каналов можно сокращать до «компактных URI» — CURIE. Более подробно эти детали описаны на странице спецификации.

Логично, что на реальном сайте пользователю не нужны сразу все новые комментарии, а нужны только те, которые появляются на текущей странице. В таком случае можно создать к примеру такой канал: http://site.com/comments/page/1. Разумеется, никаких ограничений на формирование URI нет: можно динамически создавать каналы с любыми параметрами, в зависимости от поставленных задач.

Серверная часть сайта.

В примере с PHP, за доставку сообщений от http-сервера до сервера, отвечающего за рассылку сообщений вебсокетам, отвечает ZMQ. При получении нового комментария сервер сохраняет его в базу данных и отправляет сообщение в очередь ZMQ, из которого он в свою очередь будет получен демоном при помощи упомянутой выше библиотеки Ratchet.
Вот как примерно выглядит реализация такой функции:

//комментарий от пользователя, полученный через ajax или обычной отправкой формы $comment=array('author'=>'Ваня', 'content'=>'Привет, хабрахабр!'); //обрабатываем и сохраняем его... $commentModel->save($comment);  //передаем комментарий для последующей рассылки подписанным пользователям $loop = React\EventLoop\Factory::create(); $context = new React\ZMQ\Context($loop); $push = $context->getSocket(\ZMQ::SOCKET_PUSH); //для передачи сообщений ZMQ используется другой порт, отличный от того, который используют клиенты $push->connect('tcp://127.0.0.1:8081'); //сообщение передается в виде строки json $push->send(json_encode($comment)); //tick выполняет первую операцию в очереди. //операция у нас только одна - отправка сообщения. $loop->tick(); 

Для обработки подключений клиентов и событий от сервера ZMQ нам понадобится процесс, который будет принимать сообщения и обрабатывать их. В документации к библиотеке Ratchet уже содержатся подробные примеры. В частности, нужно обратить внимание на класс Pusher (в нашем примере я назвал его WampProcessor, что кажется более релевантным) — именно он содержит бизнес-логику приложения и отправляет сообщения подписанным на соответствующие каналы клиентам.

Код для запуска такого процесса будет примерно таким:

//websocket-сервер из библиотеки React $loop   = React\EventLoop\Factory::create();   //processor - пользовательский обработчик подключений по WAMP, который должны написать мы сами. //он должен реализовывать интерфейс WampServerInterface. //в нашем примере помимо обработки клиентских подключений //этот класс будет также принимать сообщения от нашего http-сервера. $processor = new WampProcessor();  //будем слушать сообщения от http-сервера через ZMQ на порту 8081... $context = new React\ZMQ\Context($loop); $pull = $context->getSocket(ZMQ::SOCKET_PULL); $pull->bind('tcp://127.0.0.1:8081'); //при получении собщений они будут переданы в метод 'onComment' объекта 'processor' $pull->on('message', array($processor, 'onComment'));  //будем принимать подключения от клиентов - браузеров на порт 8080 с любого IP $app = new \components\SocketServer\App('site.com', 8080, '0.0.0.0', $loop); //при желании одним демоном можно обслуживать несколько приложений или сайтов сразу, //для этого запросы можно маршрутизировать (Ratchet использует роутер из Symfony). //в нашем примере будет один сайт, поэтому маршрутизация не понадобится. //в качестве "контроллера" будет выступать наш класс WampProcessor. $app->route('/', $processor, array('*'));  //... $app->run(); 

Все методы класса WampProcessor будут практически идентичны тем, что можно увидеть в документации Ratchet; из них стоит выделить только обработчик события — метод "onComment":

/**  * @param string строка в JSON, полученная от ZeroMQ  */ public function onComment($json) {     $comment = json_decode($json, true);      //ничего не делаем, если нет ни одного подписчика на новые комментарии     if (!array_key_exists('comments', $this->subscribedTopics)) {         return;     }      $topic = $this->subscribedTopics['comments'];      //иначе отправляем комментарий всем подписанным клиентам.     $topic->broadcast($comment); } 

Таким образом при создании нового комментария все подключенные браузеры будут получать объект с полями author и content, который и ожидает получить javascript-обработчик.

За процессом обмена сообщениями можно наблюдать в консоли chrome (фильтр «websocket» во вкладке «network») или другого браузера. Видно, что при подключении к серверу браузер отправляет приветственное сообщение, а потом — список каналов для подписки.

Заключение.

Вот так, применив технологию WebSockets и протокол WAMP, можно реализовать обновление информации на веб-странице в реальном времени методом PubSub.
Можно возразить, что используя nodejs и библиотеку socket.io сделать это было бы проще, но в нашей реальности, где PHP является доминирующей серверной платформой, описанный вариант вполне жизнеспособен и даже более удобен чем другие, более «костыльные» методы (как, например, периодический опрос сервера с помощью ajax). Также его относительно легко можно внедрить на существующий сайт: изменения потребуется внести только в те части, где происходит генерация каких-либо событий, а сам демон-обработчик от сайта может быть совершенно независим.

Ключевые ссылки:

  • Wamp — websocket application messaging protocol.
  • Authobahn — реализация WAMP на javascript.
  • Ratchet — реализация вебсокетов и WAMP на PHP.

ссылка на оригинал статьи http://habrahabr.ru/post/201658/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *