Стриминговые микросервисы с ZIO и Kafka

от автора

В Scalac мы ежедневно разрабатываем и внедряем распределенные приложения с высокой степенью параллелизма. Распределенные системы в настоящее время активно развиваются и не собираются в этом останавливаться. В архитектуре подобных систем, помимо Kubernetes, важное место занимает Apache Kafka.

Мы используем Apache Kafka как основу для асинхронного взаимодействия микросервисов. Простота масштабирования, устойчивость к потере и повреждению данных, репликация и легко достижимый параллелизм через консьюмер-группы (consumer groups) — вот только некоторые из причин, почему Kafka является одним из основных инструментов построения распределенных систем.

Scala, ZIO и, наконец, Apache Kafka

Не секрет, что мы любим Scala и функциональное программирование. Данная статья предполагает, что у вас есть некоторое знакомство с библиотеками функциональных эффектов, такими как ZIO, и базовое понимание Apache Kafka. Тем не менее, хотя эта статья ориентирована на разработчиков уже знакомых с ZIO, которые хотят разобраться с тем, как интегрировать Kafka в ZIO-проекты, я все-таки расскажу об основах ZIO, ZIO Streams, а также о реализации сервисов потоковой обработки данных с использованием принципов функционального программирования.

Давайте начнем.

Основы ZIO

ZIO — это самостоятельная библиотека без дополнительных зависимостей для асинхронного и параллельного программирования. Благодаря неблокирующим файберам, не расходующим ресурсы впустую, ZIO позволяет создавать масштабируемые, устойчивые и реактивные приложения, отвечающие потребностям вашего бизнеса. Узнать больше о ZIO и его преимуществах можно в официальной документации или в нашей недавней статье

ZIO Streams

Функциональные потоки как абстракция, часто используются для элегантного решения проблем параллелизма и обработки неограниченного потока данных, ставя в приоритет безопасность ресурсов и использование процессора.

ZIO Streams очень похож на тип ZIO, о котором мы говорили в предыдущей статье. Он выглядит следующим образом:

ZStream[-R, +E, +O]

ZStream принимает окружение типа R, тип ошибки E, с которой может упасть, и тип O, возвращаемый в случае успешного выполнения. Вы также можете думать о ZStream как о значении ZIO, которое может выдавать несколько значений вместо одного. 

ZStream — это чисто функциональный pull-поток, то есть ленивый (lazy) и с обратным давлением (backpressure), что освобождает разработчика от необходимости вручную управлять и писать код обработки обратного давления.

Неудивительно, что ZIO Kafka основана на ZIO и ZIO Streams, обеспечивая удобный интерфейс для подписки, процессинга и коммита записей Kafka наряду с уже существующими операторами из ZIO Streams. Таким образом, после настройки входящего ZStream нет никакой разницы между тем, откуда поступают данные: из Kafka, файловой системы или любого другого источника. Операторы обработки будут полностью идентичны. Хотя при обработке записей из Kafka есть и некоторые специфические операции, такие как ручной коммит смещения (offset commit).

ZIO Kafka

Apache Kafka — это распространенное решение для хранения событий в событийно-ориентированных приложениях. Поскольку топики (topic) Kafka постоянно наполняются новыми событиями, то событийно-ориентированные приложения выглядят как бесконечные циклы, в которых принимаются события и каким-то образом обрабатываются. Это может быть преобразование и повторная отправка в Kafka или запись в какое-то хранилище данных для построения материализованного представления или какого-то агрегата.

Конечно, вполне допустимо использовать обычного клиента Kafka, но крайне важно, реализовать следующие возможности: буферизация, агрегирование пакетов записей до заданного тайм-аута и управление количеством отправляемых сообщений в единицу времени. Это довольно нетривиальные задачи, которые будут отвлекать разработчика и задерживать реализацию бизнес-требований. А теперь представьте, что вам нужна асинхронность и неблокирующая синхронизация. Звучит чрезвычайно сложно. Потому что так оно и есть. К счастью, вся эта функциональность доступна в ZIO Streams. И это, вероятно, основная причина, по которой вам следует отказаться от обычного клиента Kafka.

Итак, если вы хотите сосредоточиться на бизнес-логике и возложить на библиотеку все сложности, такие как безопасность ресурсов и уровень параллелизма вашего пайплайна обработки событий, то вам определенно стоит обратить внимание на ZIO Streams. А если ваша система построена на основе Apache Kafka, то вам, несомненно, понравится ZIO Kafka.

Микросервисы с ZIO и Kafka

Теперь, после того как мы разобрались с основами ZIO, ZIO Streams и ZIO Kafka, пришло время рассмотреть реализацию системы, использующей все эти технологии.

Наша система состоит из двух сервисов. Первый — это сервис producer, который генерирует события и отправляет их в Kafka, второй — processor, который получает события, отправленные producer, потом обращается к некоторому внешнему API за дополнительной информацией, вносит эту информацию в исходное событие и отправляет это дополненное сообщение обратно в Kafka. 

Полная реализация доступна в репозитории, а здесь мы продолжим разбирать ее по частям.

Проект реализован как многомодульный sbt-проект. Модуль kafka запускает встроенную Kafka, используемую для разработки. В модуле protocol описаны события в виде кейс-классов. Для сообщений Kafka будем использовать JSON, поэтому в protocol также реализован codec для  JSON.

Первый кейс-класс из модуля protocol — это TransactionRaw.

TransactionRaw

final case class TransactionRaw(userId: Long, country: String, amount: BigDecimal)  object TransactionRaw {     implicit val codec: Codec[TransactionRaw] = deriveCodec[TransactionRaw] } 

TransactionRaw моделирует события, которые публикуются сервисом producer

Следующим событием в нашей системе будет TransactionEnriched.

TransactionEnriched

final case class TransactionEnriched(userId: Long, country: Country, amount: BigDecimal)  object TransactionEnriched {   implicit val codec: Codec[TransactionEnriched] = deriveCodec[TransactionEnriched] }  final case class Country(     name: String,     capital: String,     region: String,         subregion: String,       population: Long )  object Country {   implicit val codec: Codec[Country] = deriveCodec[Country] } 

TransactionEnriched содержит те же поля, что и TransactionRaw, но, как вы уже догадались, дополнен информацией о стране (Country). За получение сведений о стране из внешнего API и создание TransactionEnriched отвечает сервис processor

Давайте рассмотрим код producer.

object ProducerApp extends App {   override def run(args: List[String]) =     program.provideSomeLayer[Any with Blocking](appLayer).exitCode    private lazy val program =     ZStream       .fromIterable(EventGenerator.transactions)       .map(toProducerRecord)       .mapM { producerRecord =>         log.info(s"Producing $producerRecord to Kafka...") *>           Producer.produce[Any, Long, String](producerRecord)       }       .runDrain    private lazy val appLayer = {     val producerSettings = ProducerSettings(List("localhost:9092"))     val producerLayer = Producer.make[Any, Long, String](         producerSettings, Serde.long, Serde.string     ).toLayer      val loggingLayer = Slf4jLogger.make { (context, message) =>       val correlationId = LogAnnotation.CorrelationId.render(                  context.get(LogAnnotation.CorrelationId))       "[correlation-id = %s] %s".format(correlationId, message)     }      loggingLayer ++ producerLayer   }    private def toProducerRecord(transaction: TransactionRaw): ProducerRecord[Long, String] =     new ProducerRecord("transactions.raw", transactionRaw.userId, transactionRaw.asJson.toString) } 

Основная логика представлена в program, где мы сначала создаем ZStream из некоторого жестко заданного списка транзакций TransactionRaw, а затем преобразуем элементы потока в ProducerRecord, чтобы их можно было отправить с помощью Kafka Producer. После создадим ZIO Layer для Kafka и логера. Если вы уже сталкивались с каким-либо потокоориентированным кодом, то здесь вам все должно быть знакомо. Единственное, могут вызвать недоумение такие операторы как mapM и runDrain, поэтому я их поясню. 

Как и стандартный оператор map, преобразующий элементы потока с помощью предоставленной функции, mapM делает то же самое, но в него передается эффект. Например, если map требует функцию f: A => B, то mapM — функцию f: A => F[B], где F может быть IO, Task или каким-либо другим эффектом. 

Поскольку наш основной метод в ZIO-приложении должен возвращать тип ZIO, нам необходимо преобразовать ZStream в ZIO. Именно для этого преобразования используется оператор runDrain, который запускает поток для выполнения только указанных эффектов. Смотрите исходный код.

Мы постепенно подошли к основному сервису — processor. Так как processor довольно большой, не будем рассматривать каждую строчку кода, а обратим внимание на наиболее важные фрагменты.

Как мы уже говорили, событийно-управляемые микросервисы проектируются как бесконечные циклы, которые постоянно опрашивают сообщения в Kafka и реагируют на них тем или иным образом. Главным компонентом нашего сервиса является Pipeline, который представляет собой ZLayer, содержащий метод run, выполняющийся бесконечно.

lazy val live: ZLayer[PipelineEnvironment, Nothing, Pipeline] =     ZLayer.fromFunction { env =>       new Service {         override def run(): IO[Throwable, Unit] =           (log.info("Starting processing pipeline") *>             Consumer               .subscribeAnd(Subscription.topics("transactions.raw"))               .plainStream(Serde.long, Serde.string)               .mapM { cr =>                 val parsed = decode[TransactionRaw](cr.value)                  parsed match {                   case Right(transactionRaw) =>                     Enrichment                       .enrich(transactionRaw)                       .map(toProducerRecord)                       .flatMap(Producer.produce[Any, Long, String](_))                       .as(cr)                   case Left(error) =>                     (log.info(s"Deserialization error $error")                      *> ZIO.succeed(cr))                 }               }               .map(_.offset)               .aggregateAsync(Consumer.offsetBatches)               .mapM(_.commit)               .runDrain)             .provide(env)       }     }

Вся логика инкапсулирована в этом сервисе. После того как Consumer подпишется на указанный топик, он вернет абстракцию на основе ZStream над Kafka Topic, в которой обрабатываются элементы потока. Когда событие получено, оно парсится из JSON-строки в кейс-класс TransactionRaw, после чего используется сервис Enrichment для связи с внешним API, чтобы получить сведения о стране и создать событие TransactionEnriched. Если сообщение не может быть получено из JSON, то логируется сообщение об ошибке и передается дальше в исходном виде.

Далее рассмотрим сервис Enrichment:

lazy val live: ZLayer[   Logging with CountryCache with SttpClient, Nothing, Enrichment ] = ZLayer.fromFunction { env =>     new Service {       override def enrich(         transactionRaw: TransactionRaw       ): IO[ProcessorError, TransactionEnriched] =         (for {           _       <- log.info("Getting country details from cache for")           country <- CountryCache.get(transactionRaw.country)           result  <- country.fold(             fetchAndCacheCountryDetails(transactionRaw.country)           )(ZIO.succeed(_))         } yield toTransactionEnriched(           transactionRaw, result         )).provide(env)       }   }

И несколько вспомогательных методов для этого сервиса:

private def fetchAndCacheCountryDetails(countryName: String): ZIO[Logging with CountryCache with SttpClient, ProcessorError, Country]  =  for {       _ <- log.info(s"Cache miss. Fetching details from external API.")       country <- fetchCountryDetails(countryName)       _ <- CountryCache.put(country)   } yield country   private def fetchCountryDetails(   countryName: String ): ZIO[SttpClient, ProcessorError, Country] =  for {   req <- ZIO.succeed(       basicRequest.get(urlOf(countryName)).response(asJson[List[Country]])   )   res <- SttpClient.send(req).orElseFail(CountryApiUnreachable)   country <- res.body.fold(         _ => ZIO.fail(ResponseExtractionError),         res => ZIO.succeed(res.head)   ) } yield country

Возможно, вы заметили, что сервис Enrichment использует CountryCache, который представляет собой слой, отвечающий за кэширование HTTP-ответов от внешнего API.

Полный исходный код доступен в репозитории.

Резюме

В этом посте мы рассмотрели, что такое ZIO и как его использовать в сочетании с ZIO Streams для разработки микросервисов в функциональном стиле с событийно-ориентированной архитектурой. Я надеюсь, что вас заинтересовали  описанные в статье преимущества функциональных стриминговых библиотек и вы начнете использовать их в своих проектах.


Статья переведена в преддверии старта курса «Scala-разработчик». Всех желающих приглашаем на бесплатный демоурок в рамках которого познакомимся с основными Scala коллекциями, рассмотрим что у них общего и в чем отличия. Разберем особенности.


ссылка на оригинал статьи https://habr.com/ru/articles/587190/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *