Пишем чат на Vert.x 3

от автора

На Хабре не так уж много статей, посвященных Vert.x, раз, два и обчёлся. Поэтому решил внести свой вклад и опубликовать небольшой урок, в котором рассказано, как написать простой чат с помощью Vert.x 3.


Содержание

  1. Что такое Vert.x?
  2. О чате
  3. Структура проекта
  4. Сервер
  5. Клиент
  6. Тестирование
  7. Сборка и запуск исполняемого модуля
  8. Полный исходный код
  9. Полезные ресурсы

Что такое Vert.x?

Vert.x это событийно-ориентированный фреймворк работающий на JVM. На данный момент последняя версия этого фреймворка 3.2. Vert.x 3 предоставляет следующие возможности:

  • Мультиязычность. Компоненты приложения могут быть разработаны на Java, JavaScript, Scala, Python, Ruby, Groovy, а также Clojure;
  • Параллелизм. Довольно-таки простая модель параллелизма, освобождающая от хлопот многопоточного программирования;
  • Асинхронность. Простая модель асинхронного взаимодействия без блокировки;
  • Распределенная шина событий. Включающая как клиентскую, так и серверную стороны. Играет непосредственно главную роль в нашем чате;
  • Java 8. Vert.x 3 требует версии Java не ниже 8.

Более подробно о чате

Приложение запускается на сервере, после развертывания публикуется адрес анонимного чата, к которому можно присоединиться, через любой браузер. По этому адресу приложение транслирует в реальном времени сообщения от всех пользователей.

Поехали!

Разработку будем вести в IntelliJ IDEA 15, достаточно Community-версии.

Структура проекта

Создаем maven-проект. К сожалению готового архетипа, для vert.x 3 нет (хотя для 2 существует), поэтому генерим обычный maven-проект. Его конечная структура будет иметь следующий вид:

структура

src +---main |   +---java |   |   |   Server.java |   |   |   VerticleLoader.java |   |   | |   |   \---webroot |   |           date-format.js |   |           index.html |   |           vertx-eventbus.js |   | |   \---resources \---test     \---java             ChatTest.java 

В pom.xml задаем следующие зависимости. Где vertx-core библиотека поддержки Verticles (более подробно, что это такое, немного дальше), vertx-web – позволяет использовать обработчик событий (и не только) и vertx-unit – для модульного тестирования.

pom.xml

<dependencies>     <dependency>         <groupId>io.vertx</groupId>         <artifactId>vertx-core</artifactId>         <version>3.0.0</version>     </dependency>      <dependency>         <groupId>io.vertx</groupId>         <artifactId>vertx-web</artifactId>         <version>3.0.0</version>     </dependency>      <dependency>         <groupId>io.vertx</groupId>         <artifactId>vertx-unit</artifactId>         <version>3.0.0</version>         <scope>test</scope>     </dependency> </dependencies> 

Сервер

Особенностью данного фреймворка является то, что все компоненты должны быть представлены в виде Verticle.

Verticle – это некоторый аналог сервлета, является атомарной единицей развёртывания. Сами разработчики описывают Verticle, как нечто похожее на актера в модели акторов. Собственно, эта конструкция позволяет организовать высокую степень параллелизма и асинхронности, чем и славится Vert.x. В реализации нашего сервера, мы наследуем абстрактный класс AbstractVerticle.

Переопределяемый нами метод start() является точкой входа в программу. Сначала выполняется развертывание приложения – функция deploy(), затем вешается обработчик – метод handle().

Server.java

public class Server extends AbstractVerticle {     private Logger log = LoggerFactory.getLogger(Server.class);     private SockJSHandler handler = null;     private AtomicInteger online = new AtomicInteger(0);      //точка входа.     @Override     public void start() throws Exception {          if (!deploy()) {             log.error("Failed to deploy the server.");             return;         }          handle();     }      //... } 

Для развертывания приложения необходимо получить свободный порт, в случае, если не удалось его получить, в hostPort будет отрицательное значение. Далее создаем роутер, указываем для него адрес получателя и вешаем обработчик. И наконец, запускаем HTTP-Server на доступном порту.

Server.java

//развертывание приложения. private boolean deploy() {     int hostPort = getFreePort();      if (hostPort < 0)         return false;      Router router = Router.router(vertx);      //обработчик событий.     handler = SockJSHandler.create(vertx);      router.route("/eventbus/*").handler(handler);     router.route().handler(StaticHandler.create());      //запуск веб-сервера.     vertx.createHttpServer().requestHandler(router::accept).listen(hostPort);      try {         String addr = InetAddress.getLocalHost().getHostAddress();         log.info("Access to \"CHAT\" at the following address: \nhttp://" + addr + ":" + hostPort);     } catch (UnknownHostException e) {         log.error("Failed to get the local address: [" + e.toString() + "]");         return false;     }      return true; } 

Процесс получения свободного порта представлен во фрагменте кода ниже. Сначала проверяется static-поле PROCESS_ARGS на наличие аргументов запуска приложения, одним из которых может быть порт развёртывания приложения, заданный пользователем. В случае, если порт не был задан, используется порт по умолчанию: 8080.

Server.java

//получение свободного порта для развертывания приложения. private int getFreePort() {     int hostPort = 8080;      //если порт задан в качестве аргумента,     // при запуске приложения.     if (Starter.PROCESS_ARGS != null             && Starter.PROCESS_ARGS.size() > 0) {         try {             hostPort = Integer.valueOf(Starter.PROCESS_ARGS.get(0));         } catch (NumberFormatException e) {             log.warn("Invalid port: [" + Starter.PROCESS_ARGS.get(0) + "]");         }     }      //если некорректно указан порт.     if (hostPort < 0 || hostPort > 65535)         hostPort = 8080;      return getFreePort(hostPort); } 

Если в качестве аргумента конструктора создания сокета, указан параметр со значением 0, то в таком случае будет выдан случайный свободный порт.

Когда порт уже занят (например, порт 8080 уже используется другим приложением, но при этом, он указан в качестве аргумента запуска текущего приложения), выбрасывается исключение BindException, в таком случае выполняется повторная попытка получения свободного порта.

Server.java

private int getFreePort(int hostPort) {     try {         ServerSocket socket = new ServerSocket(hostPort);         int port = socket.getLocalPort();         socket.close();          return port;     } catch (BindException e) {         //срабатывает, когда указанный порт уже занят.         if (hostPort != 0)             return getFreePort(0);          log.error("Failed to get the free port: [" + e.toString() + "]");         return -1;     } catch (IOException e) {         log.error("Failed to get the free port: [" + e.toString() + "]");         return -1;     } } 

В случае успешного развертывания, начинается прослушивание шины событий по адресам: chat.to.server (входящие события) и chat.to.client (исходящие события).

После обработки очередного события на шине, необходимо чекнуть это событие.

Server.java

private void handle() {     BridgeOptions opts = new BridgeOptions()             .addInboundPermitted(new PermittedOptions().setAddress("chat.to.server"))             .addOutboundPermitted(new PermittedOptions().setAddress("chat.to.client"));      //обработка приходящих событий.     handler.bridge(opts, event -> {         if (event.type() == PUBLISH)             publishEvent(event);          if (event.type() == REGISTER)             registerEvent(event);          if (event.type() == SOCKET_CLOSED)             closeEvent(event);          //обратите внимание, после обработки события         // должен вызываться говорящий сам за себя метод.         event.complete(true);     }); } 

Любые события, которые происходят на шине, могут быть представлены 7 следующими типами:

Тип Событие
SOCKET_CREATED возникает при создании сокета
SOCKET_CLOSED при закрытии сокета
SEND попытка отправки сообщения от клиента к серверу
PUBLISH публикация сообщения клиентом для сервера
RECEIVE уведомление от сервера, о доставленном сообщении
REGISTER попытка зарегистрировать обработчик
UNREGISTER попытка отменить зарегистрированный обработчик

В нашем приложении нам достаточно лишь обрабатывать события с типом PUBLISH, REGISTER и SOCKET_CLOSED.

Событие с типом PUBLISH срабатывает, когда кто-то из пользователей отправляет сообщение в чат.
REGISTER – срабатывает тогда, когда пользователь регистрирует обработчик. Почему не SOCKET_CREATED? Потому что, событие с типом SOCKET_CREATED предшествует – REGISTER, и, естественно, пока клиент не зарегистрирует обработчик, он не сможет получать события.
SOCKET_CLOSED – возникает, всегда когда пользователь покидает чат или когда возникает непредвиденная ситуация.

При публикации сообщения, срабатывает обработчик и вызывает метод publishEvent. Проверяется адрес назначения, в случае, если он корректен, сообщение извлекается, затем проверяется и публикуется на шине событий для всех клиентов (в т.ч. и отправителя).

Server.java

private boolean publishEvent(BridgeEvent event) {     if (event.rawMessage() != null             && event.rawMessage().getString("address").equals("chat.to.server")) {         String message = event.rawMessage().getString("body");         if (!verifyMessage(message))             return false;          String host = event.socket().remoteAddress().host();         int port = event.socket().remoteAddress().port();          Map<String, Object> publicNotice = createPublicNotice(host, port, message);         vertx.eventBus().publish("chat.to.client", new Gson().toJson(publicNotice));         return true;     } else         return false; } 

Генерация уведомления для публикации сообщения выглядит следующим образом:

Server.java

//создание уведомления о публикации сообщения. private Map<String, Object> createPublicNotice(String host, int port, String message) {     Date time = Calendar.getInstance().getTime();      Map<String, Object> notice = new TreeMap<>();     notice.put("type", "publish");     notice.put("time", time.toString());     notice.put("host", host);     notice.put("port", port);     notice.put("message", message);     return notice; } 

Вход и выход пользователей в чат обрабатываются следующим способом:

Server.java

//тип события - регистрация обработчика. private void registerEvent(BridgeEvent event) {     if (event.rawMessage() != null             && event.rawMessage().getString("address").equals("chat.to.client"))         new Thread(() ->         {             Map<String, Object> registerNotice = createRegisterNotice();             vertx.eventBus().publish("chat.to.client", new Gson().toJson(registerNotice));         }).start(); }  //создание уведомления о регистрации пользователя. private Map<String, Object> createRegisterNotice() {     Map<String, Object> notice = new TreeMap<>();     notice.put("type", "register");     notice.put("online", online.incrementAndGet());     return notice; }  //тип события - закрытие сокета. private void closeEvent(BridgeEvent event) {     new Thread(() ->     {         Map<String, Object> closeNotice = createCloseNotice();         vertx.eventBus().publish("chat.to.client", new Gson().toJson(closeNotice));     }).start(); }  //создание уведомления о выходе пользвателя из чата. private Map<String, Object> createCloseNotice() {     Map<String, Object> notice = new TreeMap<>();     notice.put("type", "close");     notice.put("online", online.decrementAndGet());     return notice; } 

Проверка публикуемого сообщения достаточно примитивная, но для примера и этого достаточно, т.е. вы её можете сами усложнить проверяя, например, на передачу скриптов в виде сообщения и прочих хаков.

Server.java

private boolean verifyMessage(String msg) {     return msg.length() > 0             && msg.length() <= 140; } 

Для обмена данными используется формат JSON, поэтому файл pom.xml необходимо обновить, добавив следующую зависимость:

pom.xml

<dependency>     <groupId>com.google.code.gson</groupId>     <artifactId>gson</artifactId>     <version>2.3.1</version> </dependency> 

Также, в нашем чате будет отображаться счетчик числа онлайн-пользователей, т.к. наше приложение многопоточное, оно гарантировано должно быть thread-safety, поэтому наиболее простой способ объявить наш счётчик как AtomicInteger.

Клиент

Создаем index.html в разделе webroot, как это представленной на структуре в начале статьи. Для общения с сервером, а точнее, с шиной событий используется библиотека vertx-eventbus.js.

Для форматирования даты, будем использовать библиотеку date-format.js, довольно-таки простая и удобная. Помимо этого, в качестве html оформления будем использовать bootstrap версии 3.3.5, sockjs.js версии 0.3.4, необходимый для библиотеки vertx-eventbus.js и jquery версии 1.11.3.

Обработчик шины событий на стороне клиента выглядит следующим образом:

index.html

var online = 0; //счетчик онлайн-пользователей. var eb = new EventBus("/eventbus/"); //шина событий.  eb.onopen = function() {     //обработчик событий в чате.     eb.registerHandler("chat.to.client", eventChatProcessing); };  //обработчик событий в чате. function eventChatProcessing(err, msg) {     var event = jQuery.parseJSON(msg.body);      if (event.type == 'publish') { //сообщение.         var time = Date.parse(event.time);         var formattedTime = dateFormat(time, "dd.mm.yy HH:MM:ss");          //добавить сообщение.         appendMsg(event.host, event.port, event.message, formattedTime);     } else { //изменение числа пользователей.         //type: register или close.         online = event.online;         $('#online').text(online);     } }; 

В случае, если тип события publish (т.е. публикация сообщения), то данные из события (event) формируются в кортеж и присоединяются к таблице сообщений. Иначе, когда тип события соответствует новому или ушедшему пользователю, просто обновляется счетчик онлайн пользователей. Функция добавления сообщения довольно-таки проста.

index.html

//добавление нового сообщения. function appendMsg(host, port, message, formattedTime) {     var $msg = $('<tr bgcolor="#dff0d8"><td align="left">' + formattedTime + '</td><td align="left">' + host + ' [' + port + ']' + '</td><td>' + message + '</td></tr>');      var countMsg = $('#messages tr').length;     if (countMsg == 0)         $('#messages').append($msg);     else         $('#messages > tbody > tr:first').before($msg); } 

Во время отправки сообщения, оно сначала публикуется по адресу “chat.to.server”, где его обрабатывает сервер, в случае, если сообщение проходит верификацию, оно рассылается всем клиентам, в т.ч. и отправителю.

index.html

$(document).ready(function() {     //событие отправления сообщения.     $('#chatForm').submit(function(evt) {         evt.preventDefault();         var message = $('#message').val();         if (message.length > 0) {             //отправление сообщения на шину событий.             eb.publish("chat.to.server", message);             $('#message').val("").focus();             countChar();         }     }); }); 

Ну и наконец последний метод, который обрабатывает количество введенных символов, по условию, пользователь не может ввести сообщение длиной более 140 символов.

index.html

//счетчик введенных символов. function countChar() {     var len = $('#message').val().length;     if (len > 140) {         var msg = $('#message').val().substring(0, 140);         $('#message').val(msg);     } else {         $('#charNum').text(140 - len);         var per = 100 / 140 * len;         $('#charNumProgressBar').css('width', per + '%').attr('aria-valuenow', per);     } }; 

Полная версия index.html, включая разметку, размещена в конце статьи.

После того, как мы написали серверную и клиентскую части, наступила очередь запуска приложения. Для запуска и удобной отладки, я рекомендую написать собственный загрузчик Verticle, хотя и есть более простая альтернатива, которую я приведу немного позже.

Единственно, значение, которое инициализирует переменную dir должно быть актуально, т.е. в действительности должен существовать такой путь. А также, переменная verticleID должна инициализироваться именем запускаемого verticle-класса, весь остальной код не подлежит изменению.

VerticleLoader.java

public class VerticleLoader {     private static Vertx vertx;      public static Vertx getVertx() {         return vertx;     }      public static void load() {         load(null);     }      public static void load(Handler<AsyncResult<String>> completionHandler) {         VertxOptions options = new VertxOptions().setClustered(false);         //путь до verticle-класса.         String dir = "chat/src/main/java/";          try {             File current = new File(".").getCanonicalFile();             if (dir.startsWith(current.getName()) && !dir.equals(current.getName())) {                 dir = dir.substring(current.getName().length() + 1);             }         } catch (IOException e) {         }          System.setProperty("vertx.cwd", dir);         String verticleID = Server.class.getName();          Consumer<Vertx> runner = vertx ->         {             try {                 if (completionHandler == null)                     vertx.deployVerticle(verticleID);                 else                     vertx.deployVerticle(verticleID, completionHandler);             } catch (Throwable t) {                 t.printStackTrace();             }         };          if (options.isClustered()) {             Vertx.clusteredVertx(options, res ->             {                 if (res.succeeded()) {                     vertx = res.result();                     runner.accept(vertx);                 } else {                     res.cause().printStackTrace();                 }             });         } else {             vertx = Vertx.vertx(options);             runner.accept(vertx);         }     }      public static void main(String[] args) {         load();     } } 

Теперь, когда загрузчик готов, создадим конфигурацию запуска: Run – Edit Configuration… – Add New Configuration (Alt + Insert) – Application. Указываем Main Class как VerticleLoader, сохраняем конфигурацию и запускаем.

Изображение конфигурации

PROFIT!

Обещанная альтернатива.

Альтернативная конфигурация

Необходимо создать конфигурацию запуска, как это представлено на картинке. На самом деле, класс Starter является главным классом, он содержит метод main, который-то и является точкой входа приложения.

Тестирование

Давайте протестируем разработанное нами приложение. Делать мы это будем с использованием JUnit, поэтому необходимо снова открыть pom.xml и добавить следующую зависимость:

pom.xml

<dependency>     <groupId>junit</groupId>     <artifactId>junit</artifactId>     <version>4.12</version>     <scope>test</scope> </dependency> 

В setUp мы создаем экземпляр Vertx и развертываем на него нашу Verticle. В отличии от традиционного JUnit методов, все текущие методы получают еще TestContext. Задача этого объекта соблюдать асинхронность наших тестов.

В методе tearDown() для объекта TestContext вызывается asyncAssertSuccess(), он терпит неудачу, если при завершении работы Verticle возникли проблемы.

ChatTest.java

@RunWith(VertxUnitRunner.class) public class ChatTest {     private Vertx vertx;     private int port = 8080;     private Logger log = LoggerFactory.getLogger(ChatTest.class);      //@Ignore     @Before     public void setUp(TestContext context) throws IOException {         VerticleLoader.load(context.asyncAssertSuccess());         vertx = VerticleLoader.getVertx();     }      //@Ignore     @After     public void tearDown(TestContext context) {         vertx.close(context.asyncAssertSuccess());     }      //... } 

В методе loadVerticleTest мы проверяем загрузку нашего приложения. Создаем клиента и пытаемся удостовериться, что приложение, развернутое по указанному нами адресу доступно. В случае успеха, мы получаем код состояния 200.

Затем, пытаемся получить содержимое страницы, заголовок которой должен содержать текст “Chat”.

Так как запрос и ответ являются асинхронными операциями, поэтому необходимо это как-то контролировать и получать уведомления, когда тест завершился для этого используется объект Async, вызывающий всегда метод complete() по завершению теста.

ChatTest.java

@Test public void loadVerticleTest(TestContext context) {     log.info("*** loadVerticleTest ***");      Async async = context.async();     vertx.createHttpClient().getNow(port, "localhost", "/", response ->     {         context.assertEquals(response.statusCode(), 200);         context.assertEquals(response.headers().get("content-type"), "text/html");         response.bodyHandler(body ->         {             context.assertTrue(body.toString().contains("<title>Chat</title>"));             async.complete();         });     }); } 

В методе eventBusTest создается клиент шины событий и вешается обработчик. В то время, пока клиент ждет какие-либо события на шине, публикуется сообщение. Обработчик реагирует на это и проверяет тело входящего события на эквивалентность, в случае успешной проверки тест завершается вызовом async.complete().

ChatTest.java

@Test public void eventBusTest(TestContext context) {     log.info("*** eventBusTest ***");      Async async = context.async();     EventBus eb = vertx.eventBus();     eb.consumer("chat.to.server").handler(message ->     {         String getMsg = message.body().toString();         context.assertEquals(getMsg, "hello");         async.complete();     });      eb.publish("chat.to.server", "hello"); } 

Запускаем тесты.

Посмотреть как…

Вкладка Maven Projects – Lifecycle – test – Run [test].

Сборка и запуск исполняемого модуля

Для этого необходимо в pom.xml добавить плагин maven-shade-plugin. Где Main-Verticle в нашем случае должен указывать на класс Server.

pom.xml

<plugin>     <groupId>org.apache.maven.plugins</groupId>     <artifactId>maven-shade-plugin</artifactId>     <version>2.3</version>     <executions>         <execution>             <phase>package</phase>             <goals>                 <goal>shade</goal>             </goals>             <configuration>                 <transformers>                     <transformer                             implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">                         <manifestEntries>                             <Main-Class>io.vertx.core.Starter</Main-Class>                             <Main-Verticle>Server</Main-Verticle>                         </manifestEntries>                     </transformer>                 </transformers>                 <artifactSet/>                 <outputFile>${project.build.directory}/${project.artifactId}-${project.version}-fat.jar</outputFile>             </configuration>         </execution>     </executions> </plugin> 

Выполняем команду Run Maven Build, после чего в каталоге target появится chat-1.0-fat.jar. Для запуска приложения исполняемый модуль и папка webroot должны находиться в одном каталоге. Чтобы развернуть наше приложение на порту 12345 необходимо выполнить команду:
java -jar chat-1.0-fat.jar 12345

На этом всё. Успехов!

Полный исходный код

Server.java

import com.google.gson.Gson; import io.vertx.core.AbstractVerticle; import io.vertx.core.Starter; import io.vertx.core.logging.Logger; import io.vertx.core.logging.LoggerFactory; import io.vertx.ext.web.Router; import io.vertx.ext.web.handler.StaticHandler; import io.vertx.ext.web.handler.sockjs.BridgeEvent; import io.vertx.ext.web.handler.sockjs.BridgeOptions; import io.vertx.ext.web.handler.sockjs.PermittedOptions; import io.vertx.ext.web.handler.sockjs.SockJSHandler;  import java.io.IOException; import java.net.BindException; import java.net.InetAddress; import java.net.ServerSocket; import java.net.UnknownHostException; import java.util.*; import java.util.concurrent.atomic.AtomicInteger;  import static io.vertx.ext.web.handler.sockjs.BridgeEvent.Type.*;  public class Server extends AbstractVerticle {     private Logger log = LoggerFactory.getLogger(Server.class);     private SockJSHandler handler = null;     private AtomicInteger online = new AtomicInteger(0);      //точка входа.     @Override     public void start() throws Exception {          if (!deploy()) {             log.error("Failed to deploy the server.");             return;         }          handle();     }      //развертывание приложения.     private boolean deploy() {         int hostPort = getFreePort();          if (hostPort < 0)             return false;          Router router = Router.router(vertx);          //обработчик событий.         handler = SockJSHandler.create(vertx);          router.route("/eventbus/*").handler(handler);         router.route().handler(StaticHandler.create());          //запуск веб-сервера.         vertx.createHttpServer().requestHandler(router::accept).listen(hostPort);          try {             String addr = InetAddress.getLocalHost().getHostAddress();             log.info("Access to \"CHAT\" at the following address: \nhttp://" + addr + ":" + hostPort);         } catch (UnknownHostException e) {             log.error("Failed to get the local address: [" + e.toString() + "]");             return false;         }          return true;     }      //получение свободного порта для развертывания приложения.     private int getFreePort() {         int hostPort = 8080;          //если порт задан в качестве аргумента,         // при запуске приложения.         if (Starter.PROCESS_ARGS != null                 && Starter.PROCESS_ARGS.size() > 0) {             try {                 hostPort = Integer.valueOf(Starter.PROCESS_ARGS.get(0));             } catch (NumberFormatException e) {                 log.warn("Invalid port: [" + Starter.PROCESS_ARGS.get(0) + "]");             }         }          //если некорректно указан порт.         if (hostPort < 0 || hostPort > 65535)             hostPort = 8080;          return getFreePort(hostPort);     }      //если в качестве порта указано значение 0,     // то выдается случайный свободный порт.     private int getFreePort(int hostPort) {         try {             ServerSocket socket = new ServerSocket(hostPort);             int port = socket.getLocalPort();             socket.close();              return port;         } catch (BindException e) {             //срабатывает, когда указанный порт уже занят.             if (hostPort != 0)                 return getFreePort(0);              log.error("Failed to get the free port: [" + e.toString() + "]");             return -1;         } catch (IOException e) {             log.error("Failed to get the free port: [" + e.toString() + "]");             return -1;         }     }      private void handle() {         BridgeOptions opts = new BridgeOptions()                 .addInboundPermitted(new PermittedOptions().setAddress("chat.to.server"))                 .addOutboundPermitted(new PermittedOptions().setAddress("chat.to.client"));          //обработка приходящих событий.         handler.bridge(opts, event -> {             if (event.type() == PUBLISH)                 publishEvent(event);              if (event.type() == REGISTER)                 registerEvent(event);              if (event.type() == SOCKET_CLOSED)                 closeEvent(event);              //обратите внимание, после обработки события             // должен вызываться говорящий сам за себя метод.             event.complete(true);         });     }      //тип события - публикация сообщения.     private boolean publishEvent(BridgeEvent event) {         if (event.rawMessage() != null                 && event.rawMessage().getString("address").equals("chat.to.server")) {             String message = event.rawMessage().getString("body");             if (!verifyMessage(message))                 return false;              String host = event.socket().remoteAddress().host();             int port = event.socket().remoteAddress().port();              Map<String, Object> publicNotice = createPublicNotice(host, port, message);             vertx.eventBus().publish("chat.to.client", new Gson().toJson(publicNotice));             return true;         } else             return false;     }      //создание уведомления о публикации сообщения.     private Map<String, Object> createPublicNotice(String host, int port, String message) {         Date time = Calendar.getInstance().getTime();          Map<String, Object> notice = new TreeMap<>();         notice.put("type", "publish");         notice.put("time", time.toString());         notice.put("host", host);         notice.put("port", port);         notice.put("message", message);         return notice;     }      //тип события - регистрация обработчика.     private void registerEvent(BridgeEvent event) {         if (event.rawMessage() != null                 && event.rawMessage().getString("address").equals("chat.to.client"))             new Thread(() ->             {                 Map<String, Object> registerNotice = createRegisterNotice();                 vertx.eventBus().publish("chat.to.client", new Gson().toJson(registerNotice));             }).start();     }      //создание уведомления о регистрации пользователя.     private Map<String, Object> createRegisterNotice() {         Map<String, Object> notice = new TreeMap<>();         notice.put("type", "register");         notice.put("online", online.incrementAndGet());         return notice;     }      //тип события - закрытие сокета.     private void closeEvent(BridgeEvent event) {         new Thread(() ->         {             Map<String, Object> closeNotice = createCloseNotice();             vertx.eventBus().publish("chat.to.client", new Gson().toJson(closeNotice));         }).start();     }      //создание уведомления о выходе пользвателя из чата.     private Map<String, Object> createCloseNotice() {         Map<String, Object> notice = new TreeMap<>();         notice.put("type", "close");         notice.put("online", online.decrementAndGet());         return notice;     }      //довольно простая проверка сообщения,     // конечно её можно усложнить,     // но для пример и этого достаточно ;)     private boolean verifyMessage(String msg) {         return msg.length() > 0                 && msg.length() <= 140;     } } 

VerticleLoader.java

import io.vertx.core.AsyncResult; import io.vertx.core.Handler; import io.vertx.core.Vertx; import io.vertx.core.VertxOptions; import io.vertx.core.impl.StringEscapeUtils;  import java.io.File; import java.io.IOException; import java.util.function.Consumer;  public class VerticleLoader {     private static Vertx vertx;      public static Vertx getVertx() {         return vertx;     }      public static void load() {         load(null);     }      public static void load(Handler<AsyncResult<String>> completionHandler) {         VertxOptions options = new VertxOptions().setClustered(false);         //путь до verticle-класса.         String dir = "chat/src/main/java/";          try {             File current = new File(".").getCanonicalFile();             if (dir.startsWith(current.getName()) && !dir.equals(current.getName())) {                 dir = dir.substring(current.getName().length() + 1);             }         } catch (IOException e) {         }          System.setProperty("vertx.cwd", dir);         String verticleID = Server.class.getName();          Consumer<Vertx> runner = vertx ->         {             try {                 if (completionHandler == null)                     vertx.deployVerticle(verticleID);                 else                     vertx.deployVerticle(verticleID, completionHandler);             } catch (Throwable t) {                 t.printStackTrace();             }         };          if (options.isClustered()) {             Vertx.clusteredVertx(options, res ->             {                 if (res.succeeded()) {                     vertx = res.result();                     runner.accept(vertx);                 } else {                     res.cause().printStackTrace();                 }             });         } else {             vertx = Vertx.vertx(options);             runner.accept(vertx);         }     }      public static void main(String[] args) {         load();     } } 

ChatTest.java

import io.vertx.core.Vertx; import io.vertx.core.eventbus.EventBus; import io.vertx.core.logging.Logger; import io.vertx.core.logging.LoggerFactory; import io.vertx.ext.unit.Async; import io.vertx.ext.unit.TestContext; import io.vertx.ext.unit.junit.VertxUnitRunner; import org.junit.After; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith;  import java.io.IOException;  @RunWith(VertxUnitRunner.class) public class ChatTest {     private Vertx vertx;     private int port = 8080;     private Logger log = LoggerFactory.getLogger(ChatTest.class);      //@Ignore     @Before     public void setUp(TestContext context) throws IOException {         //развертывание нашей Verticle.         VerticleLoader.load(context.asyncAssertSuccess());         vertx = VerticleLoader.getVertx();     }      //@Ignore     @After     public void tearDown(TestContext context) {         vertx.close(context.asyncAssertSuccess());     }      //@Ignore     @Test     public void loadVerticleTest(TestContext context) {         log.info("*** loadVerticleTest ***");          Async async = context.async();         vertx.createHttpClient().getNow(port, "localhost", "/", response ->         {             //проверка доступности развернутого нами приложения.             context.assertEquals(response.statusCode(), 200);             context.assertEquals(response.headers().get("content-type"), "text/html");              //проверка содержимого страницы.             response.bodyHandler(body ->             {                 context.assertTrue(body.toString().contains("<title>Chat</title>"));                 async.complete();             });         });     }      //@Ignore     @Test     public void eventBusTest(TestContext context) {         log.info("*** eventBusTest ***");          Async async = context.async();         EventBus eb = vertx.eventBus();         //ожидание события на шине.         eb.consumer("chat.to.server").handler(message ->         {             String getMsg = message.body().toString();             context.assertEquals(getMsg, "hello");             async.complete();         });          //отправка сообщения на шину.         eb.publish("chat.to.server", "hello");     } } 

index.html

<!DOCTYPE html> <html lang="en"> <head>     <meta charset="UTF-8">     <title>Chat</title>     <meta charset="windows-1251">     <meta name="viewport" content="width=device-width, initial-scale=1.0">     <script src="//cdn.jsdelivr.net/sockjs/0.3.4/sockjs.min.js"></script>     <link rel="stylesheet" href="http://maxcdn.bootstrapcdn.com/bootstrap/3.3.5/css/bootstrap.min.css">     <script src="https://ajax.googleapis.com/ajax/libs/jquery/1.11.3/jquery.min.js"></script>     <script src="http://maxcdn.bootstrapcdn.com/bootstrap/3.3.5/js/bootstrap.min.js"></script>     <script src="date-format.js"></script>     <script src="vertx-eventbus.js"></script>      <style type="text/css">         body {             padding-top: 40px;             padding-bottom: 40px;             background-color: #f5f5f5;         }          .received{             width: 160px;             font-size: 10px;         }          input[type=text]:focus, textarea:focus{             box-shadow: 0 0 5px #4cae4c;             border: 1px solid #4cae4c;         }          .tab-content{             padding:5px         }     </style>      <script>         var online = 0; //счетчик онлайн-пользователей.         var eb = new EventBus("/eventbus/"); //шина событий.          eb.onopen = function() {             //обработчик событий в чате.             eb.registerHandler("chat.to.client", eventChatProcessing);         };          //обработчик событий в чате.         function eventChatProcessing(err, msg) {             var event = jQuery.parseJSON(msg.body);  			if (event.type == 'publish') {//сообщение. 				var time = Date.parse(event.time); 				var formattedTime = dateFormat(time, "dd.mm.yy HH:MM:ss");  				//добавить сообщение. 				appendMsg(event.host, event.port, event.message, formattedTime); 			} else { //изменение числа пользователей. 			    //type: register или close. 			    online = event.online; 				$('#online').text(online); 			}         };          //добавление нового сообщения. 		function appendMsg(host, port, message, formattedTime){ 			var $msg = $('<tr bgcolor="#dff0d8"><td align="left">' + formattedTime 					+ '</td><td align="left">' + host + ' [' + port + ']' 					+ '</td><td>' + message 					+ '</td></tr>');              var countMsg = $('#messages tr').length; 			if (countMsg == 0) 				$('#messages').append($msg); 			else 			    $('#messages > tbody > tr:first').before($msg); 		}          $(document).ready(function() {             //событие отправления сообщения.             $('#chatForm').submit(function(evt) {                 evt.preventDefault();                 var message = $('#message').val();                 if (message.length > 0) {                     //отправление сообщения на шину событий.                     eb.publish("chat.to.server", message);                     $('#message').val("").focus();                     countChar();                 }             });         });          //счетчик введенных символов.         function countChar() {             var len = $('#message').val().length;             if (len > 140) {                 var msg = $('#message').val().substring(0, 140);                 $('#message').val(msg);             } else {                 $('#charNum').text(140 - len);                 var per = 100 / 140 * len;                 $('#charNumProgressBar').css('width', per+'%').attr('aria-valuenow', per);             }         };     </script> </head> <body>     <div class="container chat-wrapper">         <form id="chatForm">             <h2 align="center" class="alert alert-success">CHAT ROOM</h2>             <fieldset>                 <div class="input-group input-group-lg">                     <span class="input-group-addon" id="onlineIco">                         <span class="glyphicon glyphicon-eye-open"></span>                     </span>                     <span class="input-group-addon" id="online">                         <span class="glyphicon glyphicon-option-horizontal"></span>                     </span>                     <input type="text" maxlength="141" autocomplete="off" class="form-control"                         placeholder="What's new?" id="message" aria-describedby="sizing-addon1"                         onkeyup="countChar()"/>                     <span class="input-group-btn">                         <button class="btn btn-success" type="submit">                             <span class="glyphicon glyphicon-send"></span>                         </button>                     </span>                 </div>             </fieldset>              <h3 id="charNum">140</h3>              <div class="progress">                 <div id="charNumProgressBar" class="progress-bar progress-bar-success active" role="progressbar"                      aria-valuenow="0" aria-valuemin="0" aria-valuemax="100" style="width: 0%">                     <span class="sr-only">100% Complete</span>                 </div>             </div>              <div class="panel panel-success">                 <div class="panel-heading"><h3>New messages</h3></div>                     <table id="messages" class="table table-hover" width="100%">                         <colgroup>                             <col style="width:10%">                             <col style="width:10%">                             <col style="width:10%">                         </colgroup>                     </table>             </div>         </form>     </div> </body> </html> 

Полезные ресурсы

ссылка на оригинал статьи https://habrahabr.ru/post/276771/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *