В эрланге (и эликсире) мне всегда недоставало способа организовать «потоковый» обмен сообщениями, наподобие того, который обеспечивает какой-нибудь Message Broker. Нормальные разработчики смиряются с ограничениями, которые им задают их фреймворки: в Финиксе есть PubSub, в OTP — :gen_event, в эликсире — депрекейтнутый еще до рождения GenEvent.
Ни один из них меня не устраивал. PubSub — штука мощная, но асинхронность прибита там гвоздями, а иногда все-таки надо вызывать подписчиков синхронно (я понимаю, что можно высылать вместе с сообщением свой pid и дожидаться ответов, но этот ad-hoc, к сожалению, не поможет в ситуации, когда количество подписчиков неизвестно). :gen_event — почти то, что нужно, но из его дизайна прямо торчат уши сайд-эффектных обработчиков, из-за чего удобная фильтрация входящего потока сообщений превращается в спагетти. В общем, всё как всегда: сам не сделаешь — никто не сделает.
Так родилась библиотека Antenna, которая предоставляет все те возможности, которые обычно обеспечиваются посредством вкрячивания дополнительной зависимости от брокера сообщений. Требования, которые я к ней предъявлял, были следующими:
-
готовность к back pressure из коробки (миллион ивентов, отосланных одномоментно, не должны повалить обработчики, а добавление новых нод в кластер должно их прозрачно разгружать)
-
каналы (топики, теги), куда можно отослать ивент, который будет доставлен всем зарегистрированным обработчикам
-
обработчик (помимо подписок на каналы) должен иметь возможность прозрачно фильтровать входящий поток сообщений, используя всю мощь паттерн-матчинга эрланга
-
как обработчики, так и подписки, — должны успешно восстанавливаться после краша
-
отправитель сообщения может затребовать синхронное выполнение обработчиков, блокируя вызывающий процесс до получения всех ответов (аналог rpc)
-
библиотека должна предоставлять удобные примитивы для тестирования её использования
Те, кому доводилось использовать RabbitMQ, без труда заметят, откуда я воровал идеи.
Псевдокод
С давних пор я использую некое подобие TDD с запахом DDD, или наоборот, я не силен в аббревиатурной баззвордистике. В общем, я начинаю проектирование любого куска кода с вызова его (пока несуществующего) API. Первым делом добиваюсь того, чтобы вызывать мой код было легко и приятно, а с реализацией я уж как-нибудь разберусь потом. Поэтому я набросал что-то вот такое:
{:ok, pid} = match({:tag_answer, _}, self()) subscribe(:chan, pid) event(:chan, {:tag_answer, 42}) receive do {:event, :chan, {:tag_answer, 42}} -> :ok after 1_000 -> :error end
Первые две строки — регистрация обработчика и подписка. Следующая — отсылка сообщения. Заключительный блок — проверка, что сообщение получено. Эти три куска обычно будут находиться в разных местах кода, никак не связанных между собой.
Первое, что я заметил, — первые две строки почти всегда ходят парой, поэтому имеет смысл принимать список каналов для подписки прямо в определении «матчера» (не отменяя, разумеется, функцию subscribe/2, если кому-то захочется присосаться к каналу позже).
Еще я заметил, что обработчиков может быть тоже много, почему нет.
Таким образом, определяющим уникальным идентификатором для матчера — остаётся собственно матч. Он может быть довольно заковыристым, поэтому в качестве id я выбрал его текстовое представление (строка "{:tag_answer, _}" для примера выше). Это решение мне не очень нравится, но лучшего у меня (пока) нет. Как минимум, инспектировать матчеры так гораздо проще, чем, например, если использовать что-то хешеподобное.
Итак, у нас получается отношение много ко многим: много обработчиков на один матч, которые могут получать сообщения из многих каналов. Например, мы можем подписаться на все сообщения вида {:error, _}, прилетевшие из всех каналов, и приспособить два обработчика: логгер сплюнет в консоль, телеметрия отошлет что-нибудь в графану, или куда там принято всё отсылать.
Основная архитектура
Back pressure в эликсире подразумевает использование библиотеки GenStage. Я уже реализовывал на ней свой Throttler, теперь настал черед брокера. Мы помним про горизонтальное масштабирование — а значит, — несколько консьюмеров, хотя бы по одному на ноде. Каждый консьюмер будет высылать сообщения в указанные каналы, матчеры — проверять, надо ли вообще суетиться (матчится ли сообщение), и если да — вызывать обработчики. Вроде звучит адекватно.
Консьюмеры я размазал по нодам при помощи безымянных процессов под управлением DistributedSupervisor, броадкастер — тоже (только этот процесс именован, а значит — один на кластер, и будет перезапущен на другой ноде, если текущая скроется в тумане).
Каждый матчер — тоже процесс, который в своём стейте хранит список обработчиков и, собственно, матч. Тут меня поджидала первая архитектурная дилемма: как хранить матч? {:foo, _} — просто так не сохранишь, такой код допустим только как LHO в собственно прямых вызовах паттерн-матчинга, а хранить его AST — не вариант, потому что его тогда не вставить в матч. В общем, я в результате решил генерировать функцию-матчер (всё равно реализация match/4 возможна только в виде макроса):
quote generated: true, location: :keep do matcher = fn unquote(match) -> true _ -> false end … end
Ну, отлично. Теперь в самом процессе матчера появляется колбэк наподобие вот такого:
@impl GenServer @doc false def handle_cast({:handle_event, channel, event}, state) do if state.matcher.(event) do Enum.each(state.handlers, fn handler when is_function(handler, 1) -> handler.(event) handler when is_function(handler, 2) -> handler.(channel, event) process -> send(process, {:antenna_event, channel, event}) end) end {:noreply, state} end
И мы готовы слать ему сообщения из консьюмеров (матчеры тоже находятся под управлением DistributedSupervisor, то есть, равномерно размазаны по кластеру.
В этот момент я сказал git init, потому что MVP уже вырисовывался.
Синхронный вызов
Да этого момента всё было довольно-таки тривиально. Но как организовать синхронный вызов, когда все сообщения проходят через семь кругов ада (броадкаст, консьюмеры, матчеры — и все на хрен пойми каких нодах в кластере)?
Вы когда-нибудь задумывались, почему сигнатура асинхронного колбэка GenServer’a — арности 2, а синхронного — 3? Кстати, это один из моих любимых вопросов на собеседовании: сразу становится понятно, мечтательный формошлёп перед тобой, или низкоуровневый фрик-социопат.
Второй аргумент в колбэке handle_call/3 — это идентификатор процесса (без потери общности), дожидающегося синхронного ответа. И вместо туплы {:reply, result, state} — из этого колбэка можно вернуть {:noreply, new_state}, а когда-нибудь потом уже выслать синхронный ответ напрямую вызывавшему процессу при помощи GenServer.reply/2. Если вы этого не знали, выпейте за меня ковшичек виски: это тот молоток, который всё вокруг вас сделает гвоздями.
GenStage, в свою очередь, тоже экспортирует reply/2. Поэтому теперь мне просто надо пробросить from через все консьюмеры и матчеры, а потом глубоко внутри написать что-то вроде:
Enum.each(results, fn {nil, _} -> :ok {from, results} -> GenStage.reply(from, results) end)
И если это был асинхронный вызов — никакого from нет, и мы ничего не делаем. А если он есть — мы высылаем ему назад аккумулированные результаты вызова всех хендлеров (и пущай он, гад, подавится).
Вот, кажется, и всё, что я хотел рассказать сегодня. Ссылка на библиотеку, её исходный код и тесты — выше.
Удачного брокеринга!
ссылка на оригинал статьи https://habr.com/ru/articles/903302/
Добавить комментарий