
В Scalac мы ежедневно разрабатываем и внедряем распределенные приложения с высокой степенью параллелизма. Распределенные системы в настоящее время активно развиваются и не собираются в этом останавливаться. В архитектуре подобных систем, помимо Kubernetes, важное место занимает Apache Kafka.
Мы используем Apache Kafka как основу для асинхронного взаимодействия микросервисов. Простота масштабирования, устойчивость к потере и повреждению данных, репликация и легко достижимый параллелизм через консьюмер-группы (consumer groups) — вот только некоторые из причин, почему Kafka является одним из основных инструментов построения распределенных систем.
Scala, ZIO и, наконец, Apache Kafka
Не секрет, что мы любим Scala и функциональное программирование. Данная статья предполагает, что у вас есть некоторое знакомство с библиотеками функциональных эффектов, такими как ZIO, и базовое понимание Apache Kafka. Тем не менее, хотя эта статья ориентирована на разработчиков уже знакомых с ZIO, которые хотят разобраться с тем, как интегрировать Kafka в ZIO-проекты, я все-таки расскажу об основах ZIO, ZIO Streams, а также о реализации сервисов потоковой обработки данных с использованием принципов функционального программирования.
Давайте начнем.
Основы ZIO
ZIO — это самостоятельная библиотека без дополнительных зависимостей для асинхронного и параллельного программирования. Благодаря неблокирующим файберам, не расходующим ресурсы впустую, ZIO позволяет создавать масштабируемые, устойчивые и реактивные приложения, отвечающие потребностям вашего бизнеса. Узнать больше о ZIO и его преимуществах можно в официальной документации или в нашей недавней статье.
ZIO Streams
Функциональные потоки как абстракция, часто используются для элегантного решения проблем параллелизма и обработки неограниченного потока данных, ставя в приоритет безопасность ресурсов и использование процессора.
ZIO Streams очень похож на тип ZIO, о котором мы говорили в предыдущей статье. Он выглядит следующим образом:
ZStream[-R, +E, +O]
ZStream принимает окружение типа R, тип ошибки E, с которой может упасть, и тип O, возвращаемый в случае успешного выполнения. Вы также можете думать о ZStream как о значении ZIO, которое может выдавать несколько значений вместо одного.
ZStream — это чисто функциональный pull-поток, то есть ленивый (lazy) и с обратным давлением (backpressure), что освобождает разработчика от необходимости вручную управлять и писать код обработки обратного давления.
Неудивительно, что ZIO Kafka основана на ZIO и ZIO Streams, обеспечивая удобный интерфейс для подписки, процессинга и коммита записей Kafka наряду с уже существующими операторами из ZIO Streams. Таким образом, после настройки входящего ZStream нет никакой разницы между тем, откуда поступают данные: из Kafka, файловой системы или любого другого источника. Операторы обработки будут полностью идентичны. Хотя при обработке записей из Kafka есть и некоторые специфические операции, такие как ручной коммит смещения (offset commit).
ZIO Kafka
Apache Kafka — это распространенное решение для хранения событий в событийно-ориентированных приложениях. Поскольку топики (topic) Kafka постоянно наполняются новыми событиями, то событийно-ориентированные приложения выглядят как бесконечные циклы, в которых принимаются события и каким-то образом обрабатываются. Это может быть преобразование и повторная отправка в Kafka или запись в какое-то хранилище данных для построения материализованного представления или какого-то агрегата.
Конечно, вполне допустимо использовать обычного клиента Kafka, но крайне важно, реализовать следующие возможности: буферизация, агрегирование пакетов записей до заданного тайм-аута и управление количеством отправляемых сообщений в единицу времени. Это довольно нетривиальные задачи, которые будут отвлекать разработчика и задерживать реализацию бизнес-требований. А теперь представьте, что вам нужна асинхронность и неблокирующая синхронизация. Звучит чрезвычайно сложно. Потому что так оно и есть. К счастью, вся эта функциональность доступна в ZIO Streams. И это, вероятно, основная причина, по которой вам следует отказаться от обычного клиента Kafka.
Итак, если вы хотите сосредоточиться на бизнес-логике и возложить на библиотеку все сложности, такие как безопасность ресурсов и уровень параллелизма вашего пайплайна обработки событий, то вам определенно стоит обратить внимание на ZIO Streams. А если ваша система построена на основе Apache Kafka, то вам, несомненно, понравится ZIO Kafka.
Микросервисы с ZIO и Kafka
Теперь, после того как мы разобрались с основами ZIO, ZIO Streams и ZIO Kafka, пришло время рассмотреть реализацию системы, использующей все эти технологии.
Наша система состоит из двух сервисов. Первый — это сервис producer, который генерирует события и отправляет их в Kafka, второй — processor, который получает события, отправленные producer, потом обращается к некоторому внешнему API за дополнительной информацией, вносит эту информацию в исходное событие и отправляет это дополненное сообщение обратно в Kafka.
Полная реализация доступна в репозитории, а здесь мы продолжим разбирать ее по частям.

Проект реализован как многомодульный sbt-проект. Модуль kafka запускает встроенную Kafka, используемую для разработки. В модуле protocol описаны события в виде кейс-классов. Для сообщений Kafka будем использовать JSON, поэтому в protocol также реализован codec для JSON.
Первый кейс-класс из модуля protocol — это TransactionRaw.
TransactionRaw
final case class TransactionRaw(userId: Long, country: String, amount: BigDecimal) object TransactionRaw { implicit val codec: Codec[TransactionRaw] = deriveCodec[TransactionRaw] }
TransactionRaw моделирует события, которые публикуются сервисом producer.
Следующим событием в нашей системе будет TransactionEnriched.
TransactionEnriched
final case class TransactionEnriched(userId: Long, country: Country, amount: BigDecimal) object TransactionEnriched { implicit val codec: Codec[TransactionEnriched] = deriveCodec[TransactionEnriched] } final case class Country( name: String, capital: String, region: String, subregion: String, population: Long ) object Country { implicit val codec: Codec[Country] = deriveCodec[Country] }
TransactionEnriched содержит те же поля, что и TransactionRaw, но, как вы уже догадались, дополнен информацией о стране (Country). За получение сведений о стране из внешнего API и создание TransactionEnriched отвечает сервис processor.
Давайте рассмотрим код producer.
object ProducerApp extends App { override def run(args: List[String]) = program.provideSomeLayer[Any with Blocking](appLayer).exitCode private lazy val program = ZStream .fromIterable(EventGenerator.transactions) .map(toProducerRecord) .mapM { producerRecord => log.info(s"Producing $producerRecord to Kafka...") *> Producer.produce[Any, Long, String](producerRecord) } .runDrain private lazy val appLayer = { val producerSettings = ProducerSettings(List("localhost:9092")) val producerLayer = Producer.make[Any, Long, String]( producerSettings, Serde.long, Serde.string ).toLayer val loggingLayer = Slf4jLogger.make { (context, message) => val correlationId = LogAnnotation.CorrelationId.render( context.get(LogAnnotation.CorrelationId)) "[correlation-id = %s] %s".format(correlationId, message) } loggingLayer ++ producerLayer } private def toProducerRecord(transaction: TransactionRaw): ProducerRecord[Long, String] = new ProducerRecord("transactions.raw", transactionRaw.userId, transactionRaw.asJson.toString) }
Основная логика представлена в program, где мы сначала создаем ZStream из некоторого жестко заданного списка транзакций TransactionRaw, а затем преобразуем элементы потока в ProducerRecord, чтобы их можно было отправить с помощью Kafka Producer. После создадим ZIO Layer для Kafka и логера. Если вы уже сталкивались с каким-либо потокоориентированным кодом, то здесь вам все должно быть знакомо. Единственное, могут вызвать недоумение такие операторы как mapM и runDrain, поэтому я их поясню.
Как и стандартный оператор map, преобразующий элементы потока с помощью предоставленной функции, mapM делает то же самое, но в него передается эффект. Например, если map требует функцию f: A => B, то mapM — функцию f: A => F[B], где F может быть IO, Task или каким-либо другим эффектом.
Поскольку наш основной метод в ZIO-приложении должен возвращать тип ZIO, нам необходимо преобразовать ZStream в ZIO. Именно для этого преобразования используется оператор runDrain, который запускает поток для выполнения только указанных эффектов. Смотрите исходный код.
Мы постепенно подошли к основному сервису — processor. Так как processor довольно большой, не будем рассматривать каждую строчку кода, а обратим внимание на наиболее важные фрагменты.
Как мы уже говорили, событийно-управляемые микросервисы проектируются как бесконечные циклы, которые постоянно опрашивают сообщения в Kafka и реагируют на них тем или иным образом. Главным компонентом нашего сервиса является Pipeline, который представляет собой ZLayer, содержащий метод run, выполняющийся бесконечно.
lazy val live: ZLayer[PipelineEnvironment, Nothing, Pipeline] = ZLayer.fromFunction { env => new Service { override def run(): IO[Throwable, Unit] = (log.info("Starting processing pipeline") *> Consumer .subscribeAnd(Subscription.topics("transactions.raw")) .plainStream(Serde.long, Serde.string) .mapM { cr => val parsed = decode[TransactionRaw](cr.value) parsed match { case Right(transactionRaw) => Enrichment .enrich(transactionRaw) .map(toProducerRecord) .flatMap(Producer.produce[Any, Long, String](_)) .as(cr) case Left(error) => (log.info(s"Deserialization error $error") *> ZIO.succeed(cr)) } } .map(_.offset) .aggregateAsync(Consumer.offsetBatches) .mapM(_.commit) .runDrain) .provide(env) } }
Вся логика инкапсулирована в этом сервисе. После того как Consumer подпишется на указанный топик, он вернет абстракцию на основе ZStream над Kafka Topic, в которой обрабатываются элементы потока. Когда событие получено, оно парсится из JSON-строки в кейс-класс TransactionRaw, после чего используется сервис Enrichment для связи с внешним API, чтобы получить сведения о стране и создать событие TransactionEnriched. Если сообщение не может быть получено из JSON, то логируется сообщение об ошибке и передается дальше в исходном виде.
Далее рассмотрим сервис Enrichment:
lazy val live: ZLayer[ Logging with CountryCache with SttpClient, Nothing, Enrichment ] = ZLayer.fromFunction { env => new Service { override def enrich( transactionRaw: TransactionRaw ): IO[ProcessorError, TransactionEnriched] = (for { _ <- log.info("Getting country details from cache for") country <- CountryCache.get(transactionRaw.country) result <- country.fold( fetchAndCacheCountryDetails(transactionRaw.country) )(ZIO.succeed(_)) } yield toTransactionEnriched( transactionRaw, result )).provide(env) } }
И несколько вспомогательных методов для этого сервиса:
private def fetchAndCacheCountryDetails(countryName: String): ZIO[Logging with CountryCache with SttpClient, ProcessorError, Country] = for { _ <- log.info(s"Cache miss. Fetching details from external API.") country <- fetchCountryDetails(countryName) _ <- CountryCache.put(country) } yield country private def fetchCountryDetails( countryName: String ): ZIO[SttpClient, ProcessorError, Country] = for { req <- ZIO.succeed( basicRequest.get(urlOf(countryName)).response(asJson[List[Country]]) ) res <- SttpClient.send(req).orElseFail(CountryApiUnreachable) country <- res.body.fold( _ => ZIO.fail(ResponseExtractionError), res => ZIO.succeed(res.head) ) } yield country
Возможно, вы заметили, что сервис Enrichment использует CountryCache, который представляет собой слой, отвечающий за кэширование HTTP-ответов от внешнего API.
Полный исходный код доступен в репозитории.
Резюме
В этом посте мы рассмотрели, что такое ZIO и как его использовать в сочетании с ZIO Streams для разработки микросервисов в функциональном стиле с событийно-ориентированной архитектурой. Я надеюсь, что вас заинтересовали описанные в статье преимущества функциональных стриминговых библиотек и вы начнете использовать их в своих проектах.
Статья переведена в преддверии старта курса «Scala-разработчик». Всех желающих приглашаем на бесплатный демоурок в рамках которого познакомимся с основными Scala коллекциями, рассмотрим что у них общего и в чем отличия. Разберем особенности.
ссылка на оригинал статьи https://habr.com/ru/articles/587190/
Добавить комментарий