Akka, акторы и реактивное программирование

от автора

Здравствуйте, уважаемые читатели.

Сегодня мы хотели поговорить с вами на тему «все новое — это хорошо забытое старое» и вспомнить об акторах, описанных Карлом Хьюиттом еще в начале 70-х. А все дело в том, что недавно вышла вот такая книга:

Она довольно объемная — в переводе должна получиться более 500 страниц.

Несмотря на подчеркнутую элитарность книги (Akka и Scala), ее автор Вон Вернон (крупнейший специалист по DDD) уверен, что архитектурные паттерны, описанные в этой работе, вполне реализуемы на .NET и C#, о чем рассказывает в приложении. Мы же размещаем под катом перевод статьи, автор которой допускает перенос акторной парадигмы на язык Java. Поскольку рейтинг книги на Amazon стабильно высок, а тема универсальна, просим поделиться вашими мнениями как о ней, так и об акторной архитектуре в принципе.

В первой статье из этой серии был сделан общий обзор Akka. Теперь мы как следует углубимся в сферу акторов Akka, вооружившись модулем akka-actor, который закладывает основы для всех остальных модулей Akka.

На наш взгляд, можно научиться программировать, даже не имея практики чтения/написания кода. Здесь мы пошагово разработаем маленькую акторную библиотеку: событийную шину PubSub, работающую по принципу «публикация-подписка». Разумеется, Akka поставляется с готовыми к работе локальным и глобальным решением такого рода, поэтому здесь мы просто повозимся с хорошо известным примером. Мы будем работать на языке Scala, просто потому, что на нем гораздо удобнее писать Akka-образный код, но ровно таких же результатов можно достичь и на Java.

Акторная модель

В акторной модели – которая была изобретена в 1973 году Карлом Хьюиттом и др. — акторы представляют собой «фундаментальные единицы вычислений, реализующие обработку, хранение и коммуникацию». Хорошо, давайте разберемся по порядку.

Понятие «фундаментальная единица вычислений» означает, что когда мы пишем программу в соответствии с акторной моделью, наша работа по проектированию и реализации строится вокруг акторов. В потрясающем интервью, данном Эрику Мейеру, Карл Хьюитт объясняет, что «акторы повсюду», а также что «одиночных акторов не бывает, они существуют в системах». Мы уже резюмировали эту мысль: при использовании акторной модели весь наш код будет состоять из акторов.

Как же выглядит актор? Что такое, наконец, «обработка», «хранение» и «коммуникация»? В сущности, коммуникация — это асинхронный обмен сообщениями, хранение означает, что акторы могут иметь состояние, а обработка заключается в том, что акторы могут иметь дело с сообщениями. Обработка также именуется «поведением». Не слишком сложно звучит, правда? Итак, давайте сделаем следующий шаг и рассмотрим акторы Akka.

Устройство актора Akka

Как понятно из следующей картинки, актор Akka состоит из нескольких взаимодействующих компонентов. ActorRef – это логический адрес актора, позволяющий асинхронно отправлять актору сообщения по принципу «послал и забыл». Диспетчер — в данном случае по умолчанию на каждую систему акторов приходится по одному диспетчеру — отвечает за постановку сообщений в очередь, ведущую в почтовый ящик актора, а также приказывает этому ящику изъять из очереди одно или несколько сообщений, но только по одному за раз — и передать их актору на обработку. Последнее, но немаловажное: актор — обычно это единственный API, который нам приходится реализовать — инкапсулирует состояние, и поведение.

Как будет показано ниже, Akka не позволяет получить непосредственный доступ к актору и поэтому гарантирует, что единственный способ взаимодействия с актором — это асинхронные сообщения. Невозможно вызвать метод в акторе.
Кроме того, необходимо отметить, что отправка сообщения актору и обработка этого сообщения актором — это две отдельных операции, которые, скорее всего, происходят в разных потоках. Разумеется, Akka обеспечивает необходимую синхронизацию, чтобы гарантировать, что любые изменения состояния будут видимы всем потокам.

Соответственно, Akka как бы разрешает нам запрограммировать иллюзию однопоточности, и мы можем не пользоваться в акторном коде никакими синхронизационными примитивами наподобие volatile или synchronized — более того, не следует этого делать.

Реализация актора

Довольно слов, переходим к коду! В Akka актор – это класс, к которому подмешивается типаж Actor:

class MyActor extends Actor { override def receive = ??? }

Метод receive возвращает так называемое исходное поведение актора. Это просто частично вычислимая функция, используемая Akka для обработки сообщений, отправляемых актору. Поскольку поведение равно PartialFunction[Any, Unit], в настоящее время невозможно определять такие акторы, которые принимают сообщения лишь заданного типа. В Akka уже есть экспериментальный модуль akka-typed, обеспечивающий на этой платформе безопасность типов, но он еще дорабатывается. Кстати, поведение актора может изменяться, и именно поэтому в исходном поведении вызывается возвращаемое значение метода receive.

Хорошо, давайте реализуем базовый актор для нашей библиотеки PubSub:

class PubSubMediator extends Actor { override def receive = Actor.emptyBehavior }

Пока нам не требуется, чтобы PubSubMediator обрабатывал какие-либо сообщения, поэтому мы используем обычную частично вычислимую функцию Actor.emptyBehavior, для которой не определено какое-либо значение.

Акторные системы и создание акторов

Как было указано выше, “одиночных акторов не бывает, они существуют в системах”. В Akka система акторов представляет собой взаимосвязанный ансамбль, члены которого организованы иерархически. Таким образом, у каждого актора есть свой родительский актор, как показано на следующей картинке.

При создании акторной системы, Akka — на внутреннем уровне использующая множество так называемых «системных акторов» — создает три актора: это «корневой страж» (root guardian), расположенный в корне акторной иерархии, а также системный и пользовательский стражи. Пользовательский страж — зачастую именуемый просто «страж» — является родительским элементом для всех создаваемых нами акторов верхнего уровня (в данном контексте имеется в виду «наивысший уровень, к которому мы имеем доступ»).

Допустим, но как создать систему акторов? Нужно просто вызвать фабричный метод, предоставляемый объектом-одиночкой ActorSystem:

val system = ActorSystem("pub-sub-mediator-spec-system")

А зачем мы вообще создаем ActorSystem? Почему бы просто не создавать акторы? Последнее невозможно, поскольку при непосредственном вызове конструктора актора система выбросит исключение. Вместо этого нам придется использовать фабричный метод, предоставляемый — вы угадали — ActorSystem для создания актора верхнего уровня:

system.actorOf(Props(new PubSubMediator), "pub-sub-mediator")

Разумеется, actorOf возвращает не экземпляр Actor, а ActorRef. Так Akka не позволяет нам получить доступ к экземпляру Actor, что, в свою очередь, гарантирует: обмен информацией с актором возможен только через асинхронные сообщения. Имя, указываемое нами, должно быть уникальным среди сиблингов данного актора, иначе будет выдано исключение. Если мы не укажем имени, Akka создаст его за нас, поскольку у каждого актора должно быть имя.

А что за такая штуковина Props? Это просто конфигурационный объект для актора. Он принимает конструктор как параметр, передаваемый по имени (то есть, лениво) и может содержать другую важную информацию – например, о маршрутизации или развертывании.

Когда заходит речь о дистанционной связи, важно учитывать, что Props можно сериализовать, поэтому уже сложилась практика добавлять Props-фабрику к сопутствующему объекту актора. Здесь также удобно ставить константу, соответствующую имени актора.

Зная все это, давайте допишем PubSubMediator, а также создадим для него тест при помощи ScalaTest и Akka Testkit — еще один модуль Akka, упрощающий тестирование акторов Akka:

object PubSubMediator { final val Name = "pub-sub-mediator" def props: Props = Props(new PubSubMediator) } class PubSubMediator extends Actor { override def receive = Actor.emptyBehavior } class PubSubMediatorSpec extends WordSpec with Matchers with BeforeAndAfterAll { implicit val system = ActorSystem("pub-sub-mediator-spec-system") "A PubSubMediator" should { "be suited for getting started" in { EventFilter.debug(occurrences = 1, pattern = s"started.*${classOf[PubSubMediator].getName}").intercept { system.actorOf(PubSubMediator.props) } } } override protected def afterAll() = { Await.ready(system.terminate(), Duration.Inf) super.afterAll() } }

Как видите, мы создаем ActorSystem и актор PubSubMediator в PubSubMediatorSpec. Сам тест немного надуманный, поскольку наш PubSubMediator пока довольно сырой. В нем используется отладка жизненного цикла и ожидается логирование отладочного сообщения вида “started … PubSubMediator …”. Полный код его актуальной версии находится по адресу GitHub под меткой step-01.

Коммуникация

Итак, научившись создавать акторы, давайте поговорим о коммуникации, которая — как было указано выше — основывается на асинхронных сообщениях и тесно связана с двумя другими свойствами актора: поведением (то есть, возможностью обрабатывать сообщения) и состоянием.

Чтобы отправить актору сообщение, нужно знать его адрес, то есть, ActorRef:

mediator! GetSubscribers(«topic»)

Как видите, в ActorRef есть оператор ! – так называемый “bang”, который отправляет заданное сообщение соответствующему актору. Как только сообщение доставлено, операция завершается, и код отправки продолжает работу. Подразумевается, что здесь нет возвращаемого значения (кроме Unit), следовательно, сообщения действительно уходят по принципу «послал и забыл».

Пусть это и просто, нам часто требуется отклик. Благодаря тому, что оператор ! неявно принимает отправителя как ActorRef, сделать это можно без труда:

override def receive = {
case Subscribe(topic) =>
// ИМЕННО ТУТ обрабатывается подписка
sender()! Subscribed
}

В данном примере поведение актора-получателя обрабатывает конкретное сообщение – команду Subscribe – и передает сообщение – событие Subscribed – обратно отправителю. Затем метод sender используется для доступа к отправителю того сообщения, которое сейчас обрабатывается.

Учитывая все это, давайте дополнительно усовершенствуем PubSubMediator и соответствующий тест.
Для начала добавим протокол сообщения – множество всех сообщений, относящихся к PubSubMediator – к сопутствующему объекту:

object PubSubMediator { case class Publish(topic: String, message: Any) case class Published(publish: Publish) case class Subscribe(topic: String, subscriber: ActorRef) case class Subscribed(subscribe: Subscribe) case class AlreadySubscribed(subscribe: Subscribe) case class Unsubscribe(topic: String, subscriber: ActorRef) case class Unsubscribed(unsubscribe: Unsubscribe) case class NotSubscribed(unsubscribe: Unsubscribe) case class GetSubscribers(topic: String) final val Name = "pub-sub-mediator" def props: Props = Props(new PubSubMediator) }

Далее давайте реализуем поведение, которое до сих пор оставалось незаполненным:

class PubSubMediator extends Actor { import PubSubMediator._ private var subscribers = Map.empty[String, Set[ActorRef]].withDefaultValue(Set.empty) override def receive = { case publish @ Publish(topic, message) => subscribers(topic).foreach(_ ! message) sender() ! Published(publish) case subscribe @ Subscribe(topic, subscriber) if subscribers(topic).contains(subscriber) => sender() ! AlreadySubscribed(subscribe) case subscribe @ Subscribe(topic, subscriber) => subscribers += topic -> (subscribers(topic) + subscriber) sender() ! Subscribed(subscribe) case unsubscribe @ Unsubscribe(topic, subscriber) if !subscribers(topic).contains(subscriber) => sender() ! NotSubscribed(unsubscribe) case unsubscribe @ Unsubscribe(topic, subscriber) => subscribers += topic -> (subscribers(topic) - subscriber) sender() ! Unsubscribed(unsubscribe) case GetSubscribers(topic) => sender() ! subscribers(topic) } }

Как видите, поведение обрабатывает все команды – например, Publish или Subscribe – и всегда посылает утвердительный или отрицательный отклик отправителю. Тот факт, валидна ли команда и выдает ли она положительный результат – напр., Subscribed – зависит как от команды, так и от состояния, представляемого в виде приватного изменяемого поля subscribers.

Как было указано выше, одновременно обрабатывается всего одно сообщение, и Akka гарантирует, что изменения состояния останутся видимы и при обработке следующего сообщения, поэтому не потребуется вручную синхронизировать весь доступ к подписчикам. Конкурентность без проблем!

Наконец, давайте рассмотрим фрагмент расширенного теста:

val subscribe01 = Subscribe(topic01, subscriber01.ref) mediator ! subscribe01 sender.expectMsg(Subscribed(subscribe01)) val subscribe02 = Subscribe(topic01, subscriber02.ref) mediator ! subscribe02 sender.expectMsg(Subscribed(subscribe02)) val subscribe03 = Subscribe(topic02, subscriber03.ref) mediator ! subscribe03 sender.expectMsg(Subscribed(subscribe03))

Как видите, мы отправляем сообщения Subscribe посреднику при помощи оператора ! и ожидаем получить соответствующие отклики. Как и выше, весь код проекта по состоянию на текущий момент находится по адресу GitHub под меткой step-02.

Жизненный цикл

До сих пор мы пренебрегали одним важным аспектом акторов: их существование конечно — то есть, они завершаются, и завершить актор можно в любой момент.

Имея доступ к ActorRef, мы не знаем, «жив» ли соответствующий актор. В частности, мы не получим исключения, если будем посылать сообщения к завершенному актору. В таком случае ActorRef остается валиден, но Akka выполняет переадресацию, и для сообщений, направляемых в мертвые почтовые ящики, действует негарантированная доставка. Таким образом, эти сообщения логируются, что полезно при тестировании, но этот способ отнюдь не подходит, чтобы реализовать нечто вроде повторной передачи или даже гарантированной доставки.

Но иногда нам действительно требуется знать, «жив» ли актор до сих пор, либо нет. В описываемом случае нам нужна возможность избавиться от завершенных подписчиков, так как в противном случае PubSubMediator отправляет ненужные сообщения и даже может рано или поздно израсходовать всю память.

По всем этим причинам Akka предоставляет возможность отслеживать жизненный цикл акторов. Поскольку мы можем наблюдать только завершение актора, этот механизм называется «мертвая вахта» (death watch). Для отслеживания актора мы просто вызываем метод watch, предоставляемый ActorContext, доступный в Actor посредством context:

context.watch(subscriber)

Затем Akka отправит наблюдающему актору сообщение Terminated после того, как наблюдающий актор завершится. Это сообщение гарантированно будет последним, полученным от актора, даже при удаленной связи.

Хорошо, доделываем PubSubMediator:

class PubSubMediator extends Actor { import PubSubMediator._ ... override def receive = { ... case subscribe @ Subscribe(topic, subscriber) => subscribers += topic -> (subscribers(topic) + subscriber) context.watch(subscriber) sender() ! Subscribed(subscribe) ... case Terminated(subscriber) => subscribers = subscribers.map { case (topic, ss) => topic -> (ss - subscriber) } } }

Как видите, мы отслеживаем всех подписчиков, обрабатывая валидную команду Subscribe и удаляя каждый завершенный подписчик при работе с соответствующим ему сообщением Terminated. Опять же, полный актуальный код этого примера находится на GitHub под меткой step-03.

Заключение

На этом заканчивается предварительное знакомство с акторами Akka. Итак, мы рассмотрели самые важные аспекты акторной модели — коммуникацию, поведение и состояние, а также поговорили о системах акторов. Также мы обсудили реализацию этих концепций при помощи Akka и поговорили о мертвой вахте.

Разумеется, пришлось опустить массу интересного и важного материала: создание дочерних акторов, слежение (supervision) и т.д. Отсылаем вас к интересным дополнительным ресурсам, например, к отличной документации Akka.

Мнение о книге

Никто ещё не голосовал. Воздержавшихся нет.

Только зарегистрированные пользователи могут участвовать в опросе. Войдите, пожалуйста.

ссылка на оригинал статьи http://habrahabr.ru/post/266103/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *