Discord
Discord испытал небывалый рост. Чтобы справиться с ним, нашему отделу разработки досталась приятная проблема — искать способ масштабирования сервисов бэкенда.
В этом деле мы добились большого успеха с помощью одной технологии, которая называется Elixir GenStage.
Идеальный шторм: Overwatch и Pokémon GO
Этим летом наша система мобильных push-уведомлений стала скрипеть от нагрузки. Чат /r/Overwatch перевалил за 25 000 одновременных пользователей, а чат-группы Pokémon GO возникали повсеместно, так что внезапные всплески потока уведомлений стали серьёзной проблемой.
Всплески потока уведомлений тормозят всю систему push-уведомлений, а иногда кладут её. Push-уведомления или приходят поздно, или не приходят вовсе.
GenStage идёт на помощь
После небольшого расследования мы выяснили, что основным бутылочным горлышком была отправка push-уведомлений в сервис Google Firebase Cloud Messaging.
Мы поняли, что можем немедленно улучшить пропускную способность, если отправлять push-запросы к Firebase по XMPP, а не по HTTP.
Firebase XMPP слегка сложнее, чем HTTP. Firebase требует, чтобы у каждого XMPP-соединения в каждый момент времени было не более 100 запросов в очереди. Если от вас улетело 100 запросов, то следует подождать, пока Firebase подтвердит получение запроса, прежде чем отправить следующий.
Поскольку в очередь допускаются только 100 запросов в каждый момент времени, нам пришлось спроектировать новую систему, чтобы XMPP-соединения не переполнялись во время всплесков потока запросов.
На первый взгляд показалось, что GenStage будет идеальным решением проблемы.
GenStage
Что такое GenStage?
GenStage — это новый режим Elixir для обмена событиями под обратным давлением между процессами Elixir. [0]
Что это значит на самом деле? По существу, этот режим даёт вам необходимые инструменты, чтобы ни одна часть вашей системы не перегружалась.
На практике, система с режимами GenStage обычно имеет несколько этапов.
Этапы (stages) — это шаги вычислений, которые отправляют и/или получают данные от других этапов.
Когда этап отправляет данные, он выступает в качестве производителя. Когда получает данные, то в качестве потребителя. Этапы могут играть роли одновременно и производителя, и потребителя.
Кроме назначения ролей производителя и потребителя, этап можно назначить «источником» (source), если он только производит элементы, или назначить «стоком» (sink), если он их только потребляет. [1]
Подход
Мы разделили систему на два этапа GenStage. Один источник и один сток.
- Этап 1 — Push Collector. Это производитель, который получает push-запросы. Сейчас у нас один процесс Erlang для Push Collector на одну машину.
- Этап 2 — Pusher. Это потребитель, который требует push-запросы от Push Collector и отправлят их к Firebase. Он запрашивает только по 100 запросов за раз, чтобы не превысить лимит Firebase на количество одновременных запросов. Процессов типа Pusher (тоже на Erlang) много на каждой машине.
Обратное давление и сброс нагрузки с помощью GenStage
У GenStage есть две ключевые функции, которые помогают нам во время всплеска запросов: обратное давление (back-pressure) и сброс нагрузки (load-shedding).
Обратное давление
Pusher использует функциональность GenStage, чтобы запросить у Push Collector’а максимальное количество запросов, которые Pusher может обработать. Это гарантирует верхнюю границу по количеству push-запросов, которые находятся в ожидании. Когда Firebase подтверждает запрос, тогда Pusher требует ещё от Push Collector’а.
Pusher знает точное количество запросов, которое может выдержать соединение Firebase XMPP, и никогда не требует лишнего. А Push Collector никогда не высылает запрос в сторону Pusher, если тот не попросил.
Сброс нагрузки
Поскольку Pusher’ы оказывают обратное давление на Push Collector, то появляется потенциальное бутылочное горлышко в Push Collector. Супер-дупер мощные всплески могут его перегрузить.
В GenStage имеется другая встроенная функция для таких ситуаций: буферизованные события.
В Push Collector мы определяем, сколько push-запросов помещать в буфер. В нормальном состоянии буфер пустой, но один раз в месяц при наступлении катастрофических событий он приходится весьма кстати.
Если через систему проходит ну уж очень много событий и буфер заполняется, тогда Push Collector сбрасывает входящие push-запросы. Это происходит само собой просто за счёт указания опции buffer_size
в функции init
Push Collector’а.
С этими двумя функциями мы способны справляться со всплесками push-уведомлений.
Код (наконец, самая важная часть)
Ниже пример кода, как мы настроили этапы Pusher и Push Collector. Для простоты мы убрали много фрагментов, отвечающих за обработку отказов, когда теряется соединение, Firebase возвращает ошибки и т.д.
Вы можете пропустить код, если хотите посмотреть на результат.
Push Collector (производитель)
defmodule GCM.PushCollector do use GenStage # Client def push(pid, push_requests) do GenServer.cast(pid, {:push, push_requests}) end # Server def init(_args) do # Run as producer and specify the max amount # of push requests to buffer. {:producer, :ok, buffer_size: @max_buffer_size} end def handle_cast({:push, push_requests}, state) do # Dispatch the push_requests as events. # These will be buffered if there are no consumers ready. {:noreply, push_requests, state} end def handle_demand(_demand, state) do # Do nothing. Events will be dispatched as-is. {:noreply, [], state} end end
Pusher (потребитель)
defmodule GCM.Pusher do use GenStage # The maximum number of requests Firebase allows at once per XMPP connection @max_demand 100 defstruct [ :producer, :producer_from, :fcm_conn_pid, :pending_requests, ] def start_link(producer, fcm_conn_pid, opts \\ []) do GenStage.start_link(__MODULE__, {producer, fcm_conn_pid}, opts) end def init({producer, fcm_conn_pid}) do state = %__MODULE__{ next_id: 1, pending_requests: Map.new, producer: producer, fcm_conn_pid: fcm_conn_pid, } send(self, :init) # Run as consumer {:consumer, state} end def handle_info(:init, %{producer: producer}=state) do # Subscribe to the Push Collector GenStage.async_subscribe(self, to: producer, cancel: :temporary) {:noreply, [], state} end def handle_subscribe(:producer, _opts, from, state) do # Start demanding requests now that we are subscribed GenStage.ask(from, @max_demand) {:manual, %{state | producer_from: from}} end def handle_events(push_requests, _from, state) do # We got some push requests from the Push Collector. # Let’s send them. state = Enum.reduce(push_requests, state, &do_send/2) {:noreply, [], state} end # Send the message to FCM, track as a pending request defp do_send(%{fcm_conn_pid: fcm_conn_pid, pending_requests: pending_requests}=state, push_request) do {message_id, state} = generate_id(state) xml = PushRequest.to_xml(push_request, message_id) :ok = FCM.Connection.send(fcm_conn_pid, xml) pending_requests = Map.put(pending_requests, message_id, push_request) %{state | pending_requests: pending_requests} end # FCM response handling defp handle_response(%{message_id: message_id}=response, %{pending_requests: pending_requests, producer_from: producer_from}=state) do {push_request, pending_requests} = Map.pop(pending_requests, message_id) # Since we finished a request, ask the Push Collector for more. GenStage.ask(producer_from, 1) %{state | pending_requests: pending_requests} end defp generate_id(%{next_id: next_id}=state) do {to_string(next_id), %{state | next_id: next_id + 1}} end end
Пример инцидента
Ниже показан реальный инцидент, с которым столкнулась система. На верхнем графике показано количество push-запросов в секунду, проходящих через систему. На нижнем графике — количество push-запросов, помещённых в буфер Push Collector.
Хроника событий:
- ~17:47:00 — Система работает в нормальном режиме.
- ~17:47:30 — К нам начинает поступать поток сообщений. Push Collector немного задействовал буфер, ожидая реакции Pusher. Вскоре буфер чуть освободился.
- ~17:48:50 — Pusher’ы не могут отправлять сообщения в Firebase быстрее, чем они поступают, так что буфер Push Collector’а начинает заполняться.
- ~17:50:00 — Буфер Pusher Collector достигает пика и начинает сбрасывать некоторые запросы.
- ~17:50:50 — Буфер Pusher Collector начинает освобождаться и перестаёт сбрасывать запросы.
- ~17:51:30 — Наплыв запросов пошёл на спад.
- ~17:52:30 — Система полностью вернулась в норму.
Успех Elixir
Мы в Discord очень довольны использованием Elixir и Erlang как ключевой технологии на наших сервисах бэкенда. Приятно видеть расширения вроде GenStage, которые опираются на нерушимые технологии Erlang/OTP.
Мы ищем смелых духом, чтобы помочь в решении таких проблем, поскольку Discord продолжает расти. Если вы любите игры и такого рода задачи заставляют ваше сердце биться чаще, посмотрите наши вакансии.
ссылка на оригинал статьи https://habrahabr.ru/post/317724/
Добавить комментарий