Пишем простой чат с консольным интерфейсом используя трубно-ориентированное программирование с котами

от автора

Если в процессе изучения gRPC хотите попрактиковаться с Bidirectional Streaming (двунаправленная потоковая передача данных), c запросами в рамках одного соединения, инициированием событий со стороны сервера, то создание простого чата может быть отличным способом.

Демонстрация работы консольного чата
Демонстрация работы консольного чата

Проект будем писать на языке Scala с использованием библиотеки fs2-grpc. Будем использовать клиент-серверную архитектуру, где клиенты могут отправлять сообщения на сервер, который будет ретранслировать их всем подключенным клиентам.

gRPC

Но прежде чем начать, давайте вспомним, что такое gRPC и как он связан с HTTP/2 не углубляясь в подробности (на эту тему и так достаточно статей).

gRPC — это RPC-фреймворк (Remote Procedure Call), который позволяет создавать клиент-серверные приложения для обмена данными. gRPC использует под капотом протокол HTTP/2, который позволяет ускорить передачу данных, уменьшить объем передаваемых данных и снизить задержку. Важно упомянуть о том, что gRPC использует Protobuf чтобы определить методы и структуру сообщений с помощью специального языка описания интерфейсов, а затем сгенерировать код для работы с этими сообщениями на различных языках программирования. Protobuf обеспечивает эффективную сериализацию/десериализацию данных в компактный бинарный формат.

Механизм работы чата

Ограничения

Во первых, чтобы не усложнять проект, я решил делать чат консольным и не хранить сообщения на стороне сервера.

Bidirectional Streaming

Поговорим про механизм работы чата с Bidirectional Streaming в gRPC. Процесс обмена сообщениями будет работать таким образом, что сервер и клиент обмениваются потоками сообщений в рамках одного соединения. Клиент отправляет событие, сервер его получает, обрабатывает, а затем отправляет ответное событие. Клиент и сервер обмениваются сообщениями асинхронно. Таким образом, данные передаются между клиентом и сервером в реальном времени и в обе стороны.

Мультикастинг событий внутри сервера

Когда клиенты подключаются к серверу, каждый из них может отправлять события на сервер. Однако, возникает проблема — как переслать сообщения от одного клиента остальным подключенным клиентам.

Для решения этой проблемы можно использовать механизм мультикастинга с помощью топика. Топик — это объект, который позволяет отправлять сообщения одновременно нескольким подписчикам. То есть, если один клиент отправляет событие, то полученное событие на стороне сервера будет направлено на этот топик, а оттуда автоматически пересылается всем клиентам, подписанным на этот топик.

Для реализации мультикастинга я использовал Topic из библиотеки fs2 (Functional Streams for Scala).

Таким образом, визуально механизм взаимодействия клиентов в сервером выглядит примерно так

Реализация

Для языка Scala есть несколько библиотек для работы с gRPC. Я использую fs2-grpc, который является оберткой над ScalaPB и сделана на основе функциональной библиотеки для работы со стримами — fs2.

fs2-grpc поддерживает все типы RPC-вызовов — Unary, Server Streaming, Client Streaming и Bidirectional Streaming. Она также предоставляет механизмы обработки ошибок и управления ресурсами, такие как Resource и Bracket. fs2-grpc интегрируется со стеком функциональных библиотек для работы с эффектами (cats-effect, zio, monix). В моем примере используется Cats Effect 3.

Proto

И так, приступим. В первую очередь нужно накидать прото-файл, в котором опишем контракт взаимодействия клиента и сервера.

Создадим некоторый ChatService с методом eventsStream, у которого на входе и на выходе потоковые данные с типом Events (то есть будем события через стримы туда-сюда делать).

service ChatService {        rpc eventsStream(stream Events) returns (stream Events) { }   }

Events содержит данные обернутые в тип события, которые могут быть инициированы как на стороне клиента, так и сервера (в нашем случае только на стороне клиентов).

message Events {        oneof event {       Login    client_login    = 1;       Logout   client_logout   = 2;       Message  client_message  = 3;       Shutdown server_shutdown = 4;     }

Реализация сервера

Ранее мы говорили, что сервер должен получать события от клиентов и транслировать их остальным клиентам.

После компиляции прото-файла будет сгенерирован базовый код для работы с gRPC, среди которого будет интерфейс ChatServiceFs2Grpc. Он должен быть имплементирован на стороне сервера. Моя реализация имеет следующий вид.

object ChatService {        def apply[F[_]: Concurrent: Console](         eventsTopic: Topic[F, Events]     ): ChatServiceFs2Grpc[F, Metadata] = new ChatServiceFs2Grpc[F, Metadata] {          val eventsToClients: Stream[F, Events] =         eventsTopic           .subscribeUnbounded           .evalTap(event => Console[F].println(s"From topic: $event"))        override def eventsStream(           eventsFromClient: fs2.Stream[F, Events],           ctx: Metadata       ): fs2.Stream[F, Events] = {         eventsToClients.concurrently(           eventsFromClient             .evalTap(event => Console[F].println(s"Event from client: $event"))             .evalMap(eventsTopic.publish1)         )       }     }   }

Мы видим метод eventsStream, который описывали в proto-файле. Из потока eventsFromClient получаем события от клиентов. На выходе отдаем некоторый поток событий eventsToClients. Если посмотреть выше, то видим, что eventsToClients это подписка на топик eventsTopic: Topic[F, Events], в который публикуются события от клиента для отправки остальным клиентам.

Сборка и запуск сервера

Собираем все компоненты, которые представляют собой основу серверного приложения.

object ChatServerApp extends IOApp {        private def runServer(service: ServerServiceDefinition): IO[Nothing] = {       NettyServerBuilder         .forPort(50053)         .keepAliveTime(5, TimeUnit.SECONDS)         .addService(service)         .resource[IO]         .evalMap(server => IO(server.start()))         .useForever     }        override def run(args: List[String]): IO[ExitCode] = for {       topic <- Topic[IO, Events]       serviceResource = ChatServiceFs2Grpc.bindServiceResource[IO](ChatService(topic))       _ <- serviceResource.use(runServer)     } yield ExitCode.Success   }

В функции runServer создается и запускается новый сервер с помощью NettyServerBuilder, который прослушивает порт 50053. NettyServerBuilder предоставляется библиотекой gRPC для создания серверов, использующих Netty в качестве транспорта и позволяет настроить параметры сервера (порт, keepAliveTime и т.д.)

В методе run создается топик, который будет использоваться для мультикастинга событий по клиентам. Создаем инстанс сервиса ChatService и биндим его к серверу. Затем запускаем наш сервер.

$ sbt "runMain org.github.ainr.chat.server.ChatServerApp"

В итоге, когда сервер запущен, клиенты смогут подключаться к нему, отправлять сообщения и получать их в режиме реального времени.

Реализация клиента

Что должен делать клиент? Клиент может показаться чутка сложнее, но на самом деле тут тоже все просто. Клиент делает несколько простых вещей:

  • Читает ввод в консоль

  • Отправляет события серверу

  • Получает события от сервера и обрабатывает их

  • Печатает полученные сообщения в консоль

Со стороны клиента тоже все сделано на стримах (Stream).

Чтение ввода из консоли

Для чтения ввода из консоли снова прибегаем к помощи стримов. Создаем класс InputStream с методом read, который возвращает поток сообщений напечатанных клиентом — Stream[F, String].

object InputStream {        def apply[F[_]: Async: Console](bufSize: Int): InputStream[F] = {          new InputStream[F] {            override def read: Stream[F, String] = {           fs2.io             .stdinUtf8(bufSize)             .through(fs2.text.lines)             .evalTap(erase)          // удалить из консоли ввод           .filter(_.nonEmpty)      // фильтруем пустые строки       }            private def erase: PartialFunction[String, F[Unit]] = {           _ => Console[F].print("\u001b[1A\u001b[0K")         }       }     }   } 

По коду видно, что он берет поток символов, преобразует их в строки и фильтрует пустые. Магическим может показаться только лишь метод erase, который печатает что-то непонятное в консоль.

private def erase: PartialFunction[String, F[Unit]] = {     _ => Console[F].print("\u001b[1A\u001b[0K")}  

На самом деле никакой магии нет. Все, что он делает — это удаляет то, что мы напечатали в консоль путем ввода спец-символов ANSI чтобы сообщения не дублировались.

Логика клиента

Далее введенный пользователем в консоль текст нужно преобразовать в тип события Event и отправить серверу.

В целом, логика клиента довольно простая и описана путем композиции стримов в методе start. Здесь снова фигурирует chatService: ChatServiceFs2Grpc[F, Metadata] с методом eventsStream сгенерированный библиотекой fs2-grpc на вход которого отправляем события из консоли (InputStream), генерируемые пользователем.

object ChatClient {        def apply[F[_]: Concurrent: Console](         clientName: String,         inputStream: InputStream[F],         chatService: ChatServiceFs2Grpc[F, Metadata]     ): ChatClient[F] = new ChatClient[F] {          private val grpcMetaData = new Metadata() // empty          override def start: F[Unit] = {         chatService           .eventsStream(             login(clientName) ++ inputStream.read.through(handleInput),             grpcMetaData           )           .through(processEvent)    // обрабатываем полученные события от сервера         .through(writeToConsole)  // пишем в консоль         .compile           .drain       }  private def login(clientName: String): fs2.Stream[F, Events] =     fs2.Stream(Events(ClientLogin(Login(clientName))))    // ...

Metadata в gRPC — это способ передачи дополнительных метаданных между клиентом и сервером, которые представляет собой пары ключ-значение и могут быть добавлены к любому запросу.

На выходе eventsStream ловим события с сервера, сгенерированные другими клиентами, обрабатываем их методом processEvent, который преобразовывает события в строки.

private def processEvent: Pipe[F, Events, String] =     _.map { data =>       data.event match {         case event: ClientLogin   => s"${Color.Green(event.value.name).overlay(Bold.On)} entered the chat."       case event: ClientLogout  => s"${Color.Blue(event.value.name).overlay(Bold.On)} left the chat."       case event: ClientMessage => s"${Color.LightGray(s"${event.value.name}:").overlay(Bold.On)} ${event.value.message}"         case _: ServerShutdown    => s"${Color.LightRed("Server shutdown")}"         case unknown              => s"${Color.Red("Unknown event:")} $unknown"       }     }

Для форматированного вывода текста в консоли используется библиотека fansi от lihaoyi, предназначенная для работы с цветами и стилями текста в консольном приложении. Она позволяет добавлять цветовые и стилевые эффекты к тексту, что делает консольный вывод более информативным и привлекательным. Далее сообщения будут напечатаны в консоль методом writeToConsole.

Сборка и запуск клиента

Собираем все компоненты, которые представляет собой основу клиентского приложения.

object ChatClientApp extends IOApp {        private def buildChatService(channel: Channel): Resource[IO, ChatServiceFs2Grpc[IO, Metadata]] =       ChatServiceFs2Grpc.stubResource[IO](channel)        private def resources: Resource[IO, ChatServiceFs2Grpc[IO, Metadata]] =       NettyChannelBuilder         .forAddress("127.0.0.1", 50053)         .usePlaintext()         .resource[IO]         .flatMap(buildChatService)        override def run(args: List[String]): IO[ExitCode] =       resources.use { chatServiceFs2Grpc =>         ChatClient(           args.headOption.getOrElse("Anonymous"),           InputStream[IO](bufSize = 1024),           chatServiceFs2Grpc         ).start       }.as(ExitCode.Success)   }

NettyChannelBuilder — это класс, предоставляемый библиотекой gRPC для создания клиентов, использующих Netty в качестве транспорта. Он позволяет настроить параметры клиента, такие как адрес сервера, используемый протокол, методы аутентификации и т.д.

В функции buildChatService создается ресурс, который представляет собой клиент для обращения к серверу чата. Для его создания используется метод stubResource из ChatServiceFs2Grpc.

Запускаем клиент через sbt, передав в аргументы имя клиента.

$ sbt "runMain org.github.ainr.chat.client.ChatClientApp Username"

И можем общаться 🙂

Вместо заключения

Создание небольших, простых проектов — это отличный способ попрактиковаться и углубить свои знания в технологиях. Это может быть что-то, что вы можете написать быстро и без особых усилий, но в то же время дает возможность изучить какой-то новый аспект технологии или языка программирования.

Простые проекты могут быть очень разнообразными. Например, вы можете написать небольшой веб-сервер, создать небольшую игру, написать скрипт для автоматического сбора данных, или же написать чат на базе gRPC, как мы обсуждали ранее.

Преимущество создания небольших проектов заключается в том, что вы можете более глубоко изучить технологию и применить знания на практике. Вы также можете быстро увидеть результат своей работы и получить удовлетворение от завершения проекта.

Не бойтесь начинать с чего-то простого и постепенно увеличивать сложность — это поможет вам стать более опытным и уверенным программистом.

Исходники

Код проекта можно посмотреть на гитхабе:


ссылка на оригинал статьи https://habr.com/ru/post/717846/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *