Как Discord обрабатывает более 1 000 000 push-реквестов в минуту с помощью Elixir GenStage

от автора


Discord

Discord испытал небывалый рост. Чтобы справиться с ним, нашему отделу разработки досталась приятная проблема — искать способ масштабирования сервисов бэкенда.

В этом деле мы добились большого успеха с помощью одной технологии, которая называется Elixir GenStage.

Идеальный шторм: Overwatch и Pokémon GO

Этим летом наша система мобильных push-уведомлений стала скрипеть от нагрузки. Чат /r/Overwatch перевалил за 25 000 одновременных пользователей, а чат-группы Pokémon GO возникали повсеместно, так что внезапные всплески потока уведомлений стали серьёзной проблемой.

Всплески потока уведомлений тормозят всю систему push-уведомлений, а иногда кладут её. Push-уведомления или приходят поздно, или не приходят вовсе.

GenStage идёт на помощь

После небольшого расследования мы выяснили, что основным бутылочным горлышком была отправка push-уведомлений в сервис Google Firebase Cloud Messaging.

Мы поняли, что можем немедленно улучшить пропускную способность, если отправлять push-запросы к Firebase по XMPP, а не по HTTP.

Firebase XMPP слегка сложнее, чем HTTP. Firebase требует, чтобы у каждого XMPP-соединения в каждый момент времени было не более 100 запросов в очереди. Если от вас улетело 100 запросов, то следует подождать, пока Firebase подтвердит получение запроса, прежде чем отправить следующий.

Поскольку в очередь допускаются только 100 запросов в каждый момент времени, нам пришлось спроектировать новую систему, чтобы XMPP-соединения не переполнялись во время всплесков потока запросов.

На первый взгляд показалось, что GenStage будет идеальным решением проблемы.

GenStage

Что такое GenStage?

GenStage — это новый режим Elixir для обмена событиями под обратным давлением между процессами Elixir. [0]

Что это значит на самом деле? По существу, этот режим даёт вам необходимые инструменты, чтобы ни одна часть вашей системы не перегружалась.

На практике, система с режимами GenStage обычно имеет несколько этапов.

Этапы (stages) — это шаги вычислений, которые отправляют и/или получают данные от других этапов.

Когда этап отправляет данные, он выступает в качестве производителя. Когда получает данные, то в качестве потребителя. Этапы могут играть роли одновременно и производителя, и потребителя.

Кроме назначения ролей производителя и потребителя, этап можно назначить «источником» (source), если он только производит элементы, или назначить «стоком» (sink), если он их только потребляет. [1]

Подход

Мы разделили систему на два этапа GenStage. Один источник и один сток.

  • Этап 1 — Push Collector. Это производитель, который получает push-запросы. Сейчас у нас один процесс Erlang для Push Collector на одну машину.
  • Этап 2 — Pusher. Это потребитель, который требует push-запросы от Push Collector и отправлят их к Firebase. Он запрашивает только по 100 запросов за раз, чтобы не превысить лимит Firebase на количество одновременных запросов. Процессов типа Pusher (тоже на Erlang) много на каждой машине.

Обратное давление и сброс нагрузки с помощью GenStage

У GenStage есть две ключевые функции, которые помогают нам во время всплеска запросов: обратное давление (back-pressure) и сброс нагрузки (load-shedding).

Обратное давление

Pusher использует функциональность GenStage, чтобы запросить у Push Collector’а максимальное количество запросов, которые Pusher может обработать. Это гарантирует верхнюю границу по количеству push-запросов, которые находятся в ожидании. Когда Firebase подтверждает запрос, тогда Pusher требует ещё от Push Collector’а.

Pusher знает точное количество запросов, которое может выдержать соединение Firebase XMPP, и никогда не требует лишнего. А Push Collector никогда не высылает запрос в сторону Pusher, если тот не попросил.

Сброс нагрузки

Поскольку Pusher’ы оказывают обратное давление на Push Collector, то появляется потенциальное бутылочное горлышко в Push Collector. Супер-дупер мощные всплески могут его перегрузить.

В GenStage имеется другая встроенная функция для таких ситуаций: буферизованные события.

В Push Collector мы определяем, сколько push-запросов помещать в буфер. В нормальном состоянии буфер пустой, но один раз в месяц при наступлении катастрофических событий он приходится весьма кстати.

Если через систему проходит ну уж очень много событий и буфер заполняется, тогда Push Collector сбрасывает входящие push-запросы. Это происходит само собой просто за счёт указания опции buffer_size в функции init Push Collector’а.

С этими двумя функциями мы способны справляться со всплесками push-уведомлений.

Код (наконец, самая важная часть)

Ниже пример кода, как мы настроили этапы Pusher и Push Collector. Для простоты мы убрали много фрагментов, отвечающих за обработку отказов, когда теряется соединение, Firebase возвращает ошибки и т.д.

Вы можете пропустить код, если хотите посмотреть на результат.

Push Collector (производитель)

push_collector.ex

defmodule GCM.PushCollector do   use GenStage      # Client      def push(pid, push_requests) do     GenServer.cast(pid, {:push, push_requests})   end      # Server      def init(_args) do     # Run as producer and specify the max amount      # of push requests to buffer.     {:producer, :ok, buffer_size: @max_buffer_size}   end      def handle_cast({:push, push_requests}, state) do     # Dispatch the push_requests as events.     # These will be buffered if there are no consumers ready.     {:noreply, push_requests, state}   end      def handle_demand(_demand, state) do     # Do nothing. Events will be dispatched as-is.     {:noreply, [], state}   end end

Pusher (потребитель)

pusher.ex

defmodule GCM.Pusher do   use GenStage   # The maximum number of requests Firebase allows at once per XMPP connection   @max_demand 100       defstruct [     :producer,     :producer_from,     :fcm_conn_pid,     :pending_requests,   ]      def start_link(producer, fcm_conn_pid, opts \\ []) do     GenStage.start_link(__MODULE__, {producer, fcm_conn_pid}, opts)   end      def init({producer, fcm_conn_pid}) do     state = %__MODULE__{       next_id: 1,       pending_requests: Map.new,       producer: producer,       fcm_conn_pid: fcm_conn_pid,     }     send(self, :init)     # Run as consumer     {:consumer, state}   end      def handle_info(:init, %{producer: producer}=state) do     # Subscribe to the Push Collector     GenStage.async_subscribe(self, to: producer, cancel: :temporary)     {:noreply, [], state}   end      def handle_subscribe(:producer, _opts, from, state) do     # Start demanding requests now that we are subscribed     GenStage.ask(from, @max_demand)     {:manual, %{state | producer_from: from}}   end      def handle_events(push_requests, _from, state) do     # We got some push requests from the Push Collector.     # Let’s send them.     state = Enum.reduce(push_requests, state, &do_send/2)     {:noreply, [], state}   end      # Send the message to FCM, track as a pending request   defp do_send(%{fcm_conn_pid: fcm_conn_pid, pending_requests: pending_requests}=state, push_request) do     {message_id, state} = generate_id(state)     xml = PushRequest.to_xml(push_request, message_id)     :ok = FCM.Connection.send(fcm_conn_pid, xml)     pending_requests = Map.put(pending_requests, message_id, push_request)     %{state | pending_requests: pending_requests}   end      # FCM response handling   defp handle_response(%{message_id: message_id}=response, %{pending_requests: pending_requests, producer_from: producer_from}=state) do     {push_request, pending_requests} = Map.pop(pending_requests, message_id)          # Since we finished a request, ask the Push Collector for more.     GenStage.ask(producer_from, 1)          %{state | pending_requests: pending_requests}   end      defp generate_id(%{next_id: next_id}=state) do     {to_string(next_id), %{state | next_id: next_id + 1}}   end end

Пример инцидента
Ниже показан реальный инцидент, с которым столкнулась система. На верхнем графике показано количество push-запросов в секунду, проходящих через систему. На нижнем графике — количество push-запросов, помещённых в буфер Push Collector.

Хроника событий:

  • ~17:47:00  — Система работает в нормальном режиме.
  • ~17:47:30  —  К нам начинает поступать поток сообщений. Push Collector немного задействовал буфер, ожидая реакции Pusher. Вскоре буфер чуть освободился.
  • ~17:48:50  — Pusher’ы не могут отправлять сообщения в Firebase быстрее, чем они поступают, так что буфер Push Collector’а начинает заполняться.
  • ~17:50:00  — Буфер Pusher Collector достигает пика и начинает сбрасывать некоторые запросы.
  • ~17:50:50  — Буфер Pusher Collector начинает освобождаться и перестаёт сбрасывать запросы.
  • ~17:51:30  —  Наплыв запросов пошёл на спад.
  • ~17:52:30  — Система полностью вернулась в норму.

Успех Elixir

Мы в Discord очень довольны использованием Elixir и Erlang как ключевой технологии на наших сервисах бэкенда. Приятно видеть расширения вроде GenStage, которые опираются на нерушимые технологии Erlang/OTP.

Мы ищем смелых духом, чтобы помочь в решении таких проблем, поскольку Discord продолжает расти. Если вы любите игры и такого рода задачи заставляют ваше сердце биться чаще, посмотрите наши вакансии.
ссылка на оригинал статьи https://habrahabr.ru/post/317724/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *