Как часто вам доводилось оказываться в ситуации, когда инкапсуляция в чужой библиотеке или фреймворке скрывает от вас то, что вам необходимо, а авторам — показалось деталями реализации? Мне — постоянно.
Иногда проблему можно решить пулл реквестом, иногда — хаком (спасибо Матцу за доверие к разработчикам, в руби скрыть вообще ничего нельзя, но и в менее дружественных к подкостыливанию языках — есть всякие рефлекшены, аспекты, указатели на сырую память, наконец). Наконец, можно форкнуть библиотеку, экспортировать всё, что нужно — и окунуться в ад поддержки форка. Особенно это досадно в иммутабельных языках: ну зачем что-то там от меня скрывать, если я все равно ничего испортить в вашем коде не смогу?
Поэтому уже добрый десяток лет я во всех местах своих библиотек общего назначения, в которых происходит что-то мало-мальски важное, позволяю добавить необязательные пользовательские колбэки, которые будут вызваны, когда это «важное» произойдет. Классическая реализация такого типа поведения — осуществляется через необязательную инъекцию зависимости. Что-то типа такого:
def change_the_world(world, new_world, listener \\ nil) def change_the_world(world, new_world, nil), do: Worlds.replace(world, new_world) def change_the_world(world, new_world, listener), do: change_the_world(world, new_world, nil) |> tap(&listener.on_change/1)
Это псевдокод, но смысл ясен: если никаких колбэков нет, мы не делаем ничего, если есть — вызываем заранее оговоренную интерфейсом (behaviour) функцию. Мне это не сто́ит ни копейки, а пользователи библиотеки скажут спасибо, когда им захочется оповестить об этом клиента, или перерисовать картинку мира.
Этот код уже значительно лучше изменения мира втихомолочку (это же деталь реализации, так?), но в продакшн его допускать нельзя. Потому что этот код синхронный. И если уставший разработчик начнет перерисовывать весь мир прямо из колбэка — код моей библиотеки перестанет выполняться вплоть до полной перерисовки. В высококонкурентной среде — это сродни смертному приговору.
Надо сказать, что синхронный код — в принципе — признак недостаточной квалификации программиста примерно в 99% случаев. Не, когда надо получить значение — лучше получать его синхронно, во избежание рассинхронизации. Но во всех остальных случаях — синхронность исполнения приводит только к тормозам. Например, в эрланге (и, как следствие, эликсире и LFE) — сообщения не бывают синхронными в принципе. Они всегда асинхронны. Синхронный GenServer.call/3
— не более, чем абстракция поверх отсылки сообщения и ожидания ответа, но оба эти сообщения — по-прежнему асинхронны.
Как быть вне акторной модели?
Надеюсь, что мои заметки читают не только люди, которые 24/7 пишут код на эрланге, поэтому сначала расскажу, что придумали умные люди в малоприспособленных для нормальной конкурентности языках.
Люди придумали PubSub. В коде выше, вместо прямого синхронного вызова listener.on_change(world)
, мы отправляем сообщение «на деревню дедушке», в канал (event channel), на который подписчики (пользовательский код) могут подписаться в любой момент (и отписаться тоже, мы же не электронная рассылка фейсбука). Эта модель еще называется «fire and forget». Имеющий уши (и вовремя подписавшийся) — да услышит.
Еще люди придумали брокеры сообщений. Публикация в такой брокер — фактически мгновенна для вызывающего кода, а приёмник(и) могут разгребать очередь, как им удобно. Роутинг тут перекладывается на брокер, сами сообщения могут сохраняться, может гарантироваться доставка и всё такое. С брокером даже реализуема асинхронная модель с согласованием (наш библиотечный код продолжит выполнение, но он может ждать ответ из другого канала того же брокера и что-то предпринять по факту получения, или неполучения, оного). Паттерн Saga — это оно.
В языках с гринтредами (горутинами), можно еще запустить гринтред, который озаботится вызовом колбэков и, по желанию, собиранием ответов и какой-то реакцией на них).
Но если у вас под рукой акторная модель — всё гораздо проще и изящнее.
Как это сделать на акторах?
Акторы хороши тем, что они прямо из коробки — гринтреды. Они умеют посылать и принимать сообщения безо всяких костылей типа PubSub или брокеров сбоку (это не значит, что PubSub или брокеры вообще не нужны — нет, конечно, они прекрасно выполняют те задачи, ради которых были созданы; но колбэки — не такая задача).
Вместо прямого синхронного вызова listener.on_change/1
в коде выше — вы можете просто послать сообщение. Вашему процессу это ничего не сто́ит. А тот актор, который был заинъекчен в вызов — примет сообщение и разберется, что с ним делать. Так просто.
Пример из жизни
В OTP25 появилась адекватная реализация модуля :pg
, который, наконец, научился вышеизложенному. Раньше это была вещь в себе, которую можно было, конечно, спросить о её текущем состоянии, но для отслеживания изменений приходилось городить жуткие костыли, типа поллинга. У меня скулы сводит, когда приходится прибегать к поллингу в акторной модели.
Как только это произошло, я исполнил свою давнюю мечту: реализовал распределенный динамический супервизор (и реализацию распределенного Registry) для кластера поверх process groups. В прошлом остались хаки с ручной балансировкой процессов и удаленными вызовами инстансов супервизора для поиска процесса. Я знаю про Horde, но там слишком много левого запутанного кода. Мне нужен был просто распределенный реестр процессов со стандартным синтаксисом {:via, Reg, name}
и супервизор. Весь код доступен по ссылкам из документации библиотеки DistributedSupervisor.
Здесь же я хочу просто показать, как я облегчил жизнь пользователям. Когда какой-то процесс падает, супервизор его перезапускает с состоянием, которое у него было на момент последнего успешного старта. Такой механизм был выбран для того, чтобы обеспечить гарантию успешного рестарта (в прошлый раз же запустилось). И это работает в подавляющем большинстве простых случаев. Но что делать, если мы накопили какой-то стейт в процессе долгой жизни процесса? Ну, представьте себе FSM. Можно, конечно, сплёвывать состояние в базу на каждый чих, но это крайне неэлегантно (особенно, например, для случая горячего кэша, которому в базе вообще не место по определению).
Я даже написал отдельную библиотеку Peeper, экспортирующую имплементацию GenServer
, которая умеет восстанавливать кэш после падений, но у нее есть неизбежный оверхэд: вместо одного процесса стартуют три, и в высоконагруженной среде с сотнями тысячами процессов это может оказаться слишком накладно. Почему бы не помочь с восстановлением стейта прямо из моего нового динамического супервизора?
Для этого всего лишь надо разрешить колбэки когда процессы стартуют и когда они падают. Если кому-то надо восстанавливать стейт — они будут его сохранять в, например, ETS, а из колбэка «процесс запущен» — проверять, что до этого он падал — и вычитывать состояние из стороннего источника, если да. Или проводить еще какую инициализацию, которая нужна только при падении и рестарте.
Чтобы эта заметка не была чистой беллетристикой, покажу код, который занимается вызовом колбэков.
def handle_cast({:leave, listeners, name, id, pid}, state) when is_local(pid) do Enum.each(listeners, fn listener -> {listener, notify?} = parse_listener(listener, id) notify? = notify? and function_exported?(listener, :on_process_stop, 3) maybe_notify_leave(notify?, listener, name, id, pid) end) {:noreply, state} end defp maybe_notify_leave(true, listener, name, id, pid), do: listener.on_process_stop(name, id, pid) defp maybe_notify_leave(_, _, _, _, _), do: :ok
Для maybe_notify_join/5
он такой же. Вот полный код модуля Notifier. Я запускаю свой процесс для рассылки нотификаций и из основного кода шлю сообщения в него, а он уже дергает имплементацию интерфейса Listener.
Я не могу просто отослать сообщение в процесс, который зарегистрирован как listener
по той простой причине, что тогда мне не удастся определить behaviour
для имплементаций (и я не смогу предоставить инструменты для тестирования этого поведения).
Удачной прослушки!
ссылка на оригинал статьи https://habr.com/ru/articles/891294/
Добавить комментарий