Колбэки в акторной модели

от автора

Как часто вам доводилось оказываться в ситуации, когда инкапсуляция в чужой библиотеке или фреймворке скрывает от вас то, что вам необходимо, а авторам — показалось деталями реализации? Мне — постоянно.

Иногда проблему можно решить пулл реквестом, иногда — хаком (спасибо Матцу за доверие к разработчикам, в руби скрыть вообще ничего нельзя, но и в менее дружественных к подкостыливанию языках — есть всякие рефлекшены, аспекты, указатели на сырую память, наконец). Наконец, можно форкнуть библиотеку, экспортировать всё, что нужно — и окунуться в ад поддержки форка. Особенно это досадно в иммутабельных языках: ну зачем что-то там от меня скрывать, если я все равно ничего испортить в вашем коде не смогу?

Поэтому уже добрый десяток лет я во всех местах своих библиотек общего назначения, в которых происходит что-то мало-мальски важное, позволяю добавить необязательные пользовательские колбэки, которые будут вызваны, когда это «важное» произойдет. Классическая реализация такого типа поведения — осуществляется через необязательную инъекцию зависимости. Что-то типа такого:

def change_the_world(world, new_world, listener \\ nil) def change_the_world(world, new_world, nil),   do: Worlds.replace(world, new_world) def change_the_world(world, new_world, listener),   do: change_the_world(world, new_world, nil) |> tap(&listener.on_change/1)

Это псевдокод, но смысл ясен: если никаких колбэков нет, мы не делаем ничего, если есть — вызываем заранее оговоренную интерфейсом (behaviour) функцию. Мне это не сто́ит ни копейки, а пользователи библиотеки скажут спасибо, когда им захочется оповестить об этом клиента, или перерисовать картинку мира.

Этот код уже значительно лучше изменения мира втихомолочку (это же деталь реализации, так?), но в продакшн его допускать нельзя. Потому что этот код синхронный. И если уставший разработчик начнет перерисовывать весь мир прямо из колбэка — код моей библиотеки перестанет выполняться вплоть до полной перерисовки. В высококонкурентной среде — это сродни смертному приговору.

Надо сказать, что синхронный код — в принципе — признак недостаточной квалификации программиста примерно в 99% случаев. Не, когда надо получить значение — лучше получать его синхронно, во избежание рассинхронизации. Но во всех остальных случаях — синхронность исполнения приводит только к тормозам. Например, в эрланге (и, как следствие, эликсире и LFE) — сообщения не бывают синхронными в принципе. Они всегда асинхронны. Синхронный GenServer.call/3 — не более, чем абстракция поверх отсылки сообщения и ожидания ответа, но оба эти сообщения — по-прежнему асинхронны.

Как быть вне акторной модели?

Надеюсь, что мои заметки читают не только люди, которые 24/7 пишут код на эрланге, поэтому сначала расскажу, что придумали умные люди в малоприспособленных для нормальной конкурентности языках.

Люди придумали PubSub. В коде выше, вместо прямого синхронного вызова listener.on_change(world), мы отправляем сообщение «на деревню дедушке», в канал (event channel), на который подписчики (пользовательский код) могут подписаться в любой момент (и отписаться тоже, мы же не электронная рассылка фейсбука). Эта модель еще называется «fire and forget». Имеющий уши (и вовремя подписавшийся) — да услышит.

Еще люди придумали брокеры сообщений. Публикация в такой брокер — фактически мгновенна для вызывающего кода, а приёмник(и) могут разгребать очередь, как им удобно. Роутинг тут перекладывается на брокер, сами сообщения могут сохраняться, может гарантироваться доставка и всё такое. С брокером даже реализуема асинхронная модель с согласованием (наш библиотечный код продолжит выполнение, но он может ждать ответ из другого канала того же брокера и что-то предпринять по факту получения, или неполучения, оного). Паттерн Saga — это оно.

В языках с гринтредами (горутинами), можно еще запустить гринтред, который озаботится вызовом колбэков и, по желанию, собиранием ответов и какой-то реакцией на них).

Но если у вас под рукой акторная модель — всё гораздо проще и изящнее.

Как это сделать на акторах?

Акторы хороши тем, что они прямо из коробки — гринтреды. Они умеют посылать и принимать сообщения безо всяких костылей типа PubSub или брокеров сбоку (это не значит, что PubSub или брокеры вообще не нужны — нет, конечно, они прекрасно выполняют те задачи, ради которых были созданы; но колбэки — не такая задача).

Вместо прямого синхронного вызова listener.on_change/1 в коде выше — вы можете просто послать сообщение. Вашему процессу это ничего не сто́ит. А тот актор, который был заинъекчен в вызов — примет сообщение и разберется, что с ним делать. Так просто.

Пример из жизни

В OTP25 появилась адекватная реализация модуля :pg, который, наконец, научился вышеизложенному. Раньше это была вещь в себе, которую можно было, конечно, спросить о её текущем состоянии, но для отслеживания изменений приходилось городить жуткие костыли, типа поллинга. У меня скулы сводит, когда приходится прибегать к поллингу в акторной модели.

Как только это произошло, я исполнил свою давнюю мечту: реализовал распределенный динамический супервизор (и реализацию распределенного Registry) для кластера поверх process groups. В прошлом остались хаки с ручной балансировкой процессов и удаленными вызовами инстансов супервизора для поиска процесса. Я знаю про Horde, но там слишком много левого запутанного кода. Мне нужен был просто распределенный реестр процессов со стандартным синтаксисом {:via, Reg, name} и супервизор. Весь код доступен по ссылкам из документации библиотеки DistributedSupervisor.

Здесь же я хочу просто показать, как я облегчил жизнь пользователям. Когда какой-то процесс падает, супервизор его перезапускает с состоянием, которое у него было на момент последнего успешного старта. Такой механизм был выбран для того, чтобы обеспечить гарантию успешного рестарта (в прошлый раз же запустилось). И это работает в подавляющем большинстве простых случаев. Но что делать, если мы накопили какой-то стейт в процессе долгой жизни процесса? Ну, представьте себе FSM. Можно, конечно, сплёвывать состояние в базу на каждый чих, но это крайне неэлегантно (особенно, например, для случая горячего кэша, которому в базе вообще не место по определению).

Я даже написал отдельную библиотеку Peeper, экспортирующую имплементацию GenServer, которая умеет восстанавливать кэш после падений, но у нее есть неизбежный оверхэд: вместо одного процесса стартуют три, и в высоконагруженной среде с сотнями тысячами процессов это может оказаться слишком накладно. Почему бы не помочь с восстановлением стейта прямо из моего нового динамического супервизора?

Для этого всего лишь надо разрешить колбэки когда процессы стартуют и когда они падают. Если кому-то надо восстанавливать стейт — они будут его сохранять в, например, ETS, а из колбэка «процесс запущен» — проверять, что до этого он падал — и вычитывать состояние из стороннего источника, если да. Или проводить еще какую инициализацию, которая нужна только при падении и рестарте.

Чтобы эта заметка не была чистой беллетристикой, покажу код, который занимается вызовом колбэков.

  def handle_cast({:leave, listeners, name, id, pid}, state) when is_local(pid) do     Enum.each(listeners, fn listener ->       {listener, notify?} = parse_listener(listener, id)       notify? = notify? and function_exported?(listener, :on_process_stop, 3)        maybe_notify_leave(notify?, listener, name, id, pid)     end)      {:noreply, state}   end    defp maybe_notify_leave(true, listener, name, id, pid),     do: listener.on_process_stop(name, id, pid)    defp maybe_notify_leave(_, _, _, _, _), do: :ok

Для maybe_notify_join/5 он такой же. Вот полный код модуля Notifier. Я запускаю свой процесс для рассылки нотификаций и из основного кода шлю сообщения в него, а он уже дергает имплементацию интерфейса Listener.

Я не могу просто отослать сообщение в процесс, который зарегистрирован как listener по той простой причине, что тогда мне не удастся определить behaviour для имплементаций (и я не смогу предоставить инструменты для тестирования этого поведения).

Удачной прослушки!


ссылка на оригинал статьи https://habr.com/ru/articles/891294/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *