Как Discord обрабатывает более 1 000 000 push-запросов в минуту с помощью Elixir GenStage

в 19:21, , рубрики: Discord, Elixir, erlang, Erlang/OTP, firebase, GenStage, push-запросы, push-уведомления, xmpp, бэкенд, высокая производительность, обратное давление, Разработка систем передачи данных, сброс нагрузки, Системы обмена сообщениями, Тестирование веб-сервисов, чаты

Как Discord обрабатывает более 1 000 000 push-запросов в минуту с помощью Elixir GenStage - 1
Discord

Discord испытал небывалый рост. Чтобы справиться с ним, нашему отделу разработки досталась приятная проблема — искать способ масштабирования сервисов бэкенда.

В этом деле мы добились большого успеха с помощью одной технологии, которая называется Elixir GenStage.

Идеальный шторм: Overwatch и Pokémon GO

Этим летом наша система мобильных push-уведомлений стала скрипеть от нагрузки. Чат /r/Overwatch перевалил за 25 000 одновременных пользователей, а чат-группы Pokémon GO возникали повсеместно, так что внезапные всплески потока уведомлений стали серьёзной проблемой.

Всплески потока уведомлений тормозят всю систему push-уведомлений, а иногда кладут её. Push-уведомления или приходят поздно, или не приходят вовсе.

GenStage идёт на помощь

После небольшого расследования мы выяснили, что основным бутылочным горлышком была отправка push-уведомлений в сервис Google Firebase Cloud Messaging.

Мы поняли, что можем немедленно улучшить пропускную способность, если отправлять push-запросы к Firebase по XMPP, а не по HTTP.

Firebase XMPP слегка сложнее, чем HTTP. Firebase требует, чтобы у каждого XMPP-соединения в каждый момент времени было не более 100 запросов в очереди. Если от вас улетело 100 запросов, то следует подождать, пока Firebase подтвердит получение запроса, прежде чем отправить следующий.

Поскольку в очередь допускаются только 100 запросов в каждый момент времени, нам пришлось спроектировать новую систему, чтобы XMPP-соединения не переполнялись во время всплесков потока запросов.

На первый взгляд показалось, что GenStage будет идеальным решением проблемы.

GenStage

Что такое GenStage?

GenStage — это новый режим Elixir для обмена событиями под обратным давлением между процессами Elixir. [0]

Что это значит на самом деле? По существу, этот режим даёт вам необходимые инструменты, чтобы ни одна часть вашей системы не перегружалась.

На практике, система с режимами GenStage обычно имеет несколько этапов.

Этапы (stages) — это шаги вычислений, которые отправляют и/или получают данные от других этапов.

Когда этап отправляет данные, он выступает в качестве производителя. Когда получает данные, то в качестве потребителя. Этапы могут играть роли одновременно и производителя, и потребителя.

Кроме назначения ролей производителя и потребителя, этап можно назначить «источником» (source), если он только производит элементы, или назначить «стоком» (sink), если он их только потребляет. [1]

Подход

Как Discord обрабатывает более 1 000 000 push-запросов в минуту с помощью Elixir GenStage - 2

Мы разделили систему на два этапа GenStage. Один источник и один сток.

  • Этап 1 — Push Collector. Это производитель, который получает push-запросы. Сейчас у нас один процесс Erlang для Push Collector на одну машину.
  • Этап 2 — Pusher. Это потребитель, который требует push-запросы от Push Collector и отправлят их к Firebase. Он запрашивает только по 100 запросов за раз, чтобы не превысить лимит Firebase на количество одновременных запросов. Процессов типа Pusher (тоже на Erlang) много на каждой машине.

Обратное давление и сброс нагрузки с помощью GenStage

У GenStage есть две ключевые функции, которые помогают нам во время всплеска запросов: обратное давление (back-pressure) и сброс нагрузки (load-shedding).

Обратное давление

Pusher использует функциональность GenStage, чтобы запросить у Push Collector'а максимальное количество запросов, которые Pusher может обработать. Это гарантирует верхнюю границу по количеству push-запросов, которые находятся в ожидании. Когда Firebase подтверждает запрос, тогда Pusher требует ещё от Push Collector'а.

Pusher знает точное количество запросов, которое может выдержать соединение Firebase XMPP, и никогда не требует лишнего. А Push Collector никогда не высылает запрос в сторону Pusher, если тот не попросил.

Сброс нагрузки

Поскольку Pusher'ы оказывают обратное давление на Push Collector, то появляется потенциальное бутылочное горлышко в Push Collector. Супер-дупер мощные всплески могут его перегрузить.

В GenStage имеется другая встроенная функция для таких ситуаций: буферизованные события.

В Push Collector мы определяем, сколько push-запросов помещать в буфер. В нормальном состоянии буфер пустой, но один раз в месяц при наступлении катастрофических событий он приходится весьма кстати.

Если через систему проходит ну уж очень много событий и буфер заполняется, тогда Push Collector сбрасывает входящие push-запросы. Это происходит само собой просто за счёт указания опции buffer_size в функции init Push Collector'а.

С этими двумя функциями мы способны справляться со всплесками push-уведомлений.

Код (наконец, самая важная часть)

Ниже пример кода, как мы настроили этапы Pusher и Push Collector. Для простоты мы убрали много фрагментов, отвечающих за обработку отказов, когда теряется соединение, Firebase возвращает ошибки и т.д.

Вы можете пропустить код, если хотите посмотреть на результат.

Push Collector (производитель)

push_collector.ex

defmodule GCM.PushCollector do
  use GenStage
  
  # Client
  
  def push(pid, push_requests) do
    GenServer.cast(pid, {:push, push_requests})
  end
  
  # Server
  
  def init(_args) do
    # Run as producer and specify the max amount 
    # of push requests to buffer.
    {:producer, :ok, buffer_size: @max_buffer_size}
  end
  
  def handle_cast({:push, push_requests}, state) do
    # Dispatch the push_requests as events.
    # These will be buffered if there are no consumers ready.
    {:noreply, push_requests, state}
  end
  
  def handle_demand(_demand, state) do
    # Do nothing. Events will be dispatched as-is.
    {:noreply, [], state}
  end
end

Pusher (потребитель)

pusher.ex

defmodule GCM.Pusher do
  use GenStage
  # The maximum number of requests Firebase allows at once per XMPP connection
  @max_demand 100 
  
  defstruct [
    :producer,
    :producer_from,
    :fcm_conn_pid,
    :pending_requests,
  ]
  
  def start_link(producer, fcm_conn_pid, opts \ []) do
    GenStage.start_link(__MODULE__, {producer, fcm_conn_pid}, opts)
  end
  
  def init({producer, fcm_conn_pid}) do
    state = %__MODULE__{
      next_id: 1,
      pending_requests: Map.new,
      producer: producer,
      fcm_conn_pid: fcm_conn_pid,
    }
    send(self, :init)
    # Run as consumer
    {:consumer, state}
  end
  
  def handle_info(:init, %{producer: producer}=state) do
    # Subscribe to the Push Collector
    GenStage.async_subscribe(self, to: producer, cancel: :temporary)
    {:noreply, [], state}
  end
  
  def handle_subscribe(:producer, _opts, from, state) do
    # Start demanding requests now that we are subscribed
    GenStage.ask(from, @max_demand)
    {:manual, %{state | producer_from: from}}
  end
  
  def handle_events(push_requests, _from, state) do
    # We got some push requests from the Push Collector.
    # Let’s send them.
    state = Enum.reduce(push_requests, state, &do_send/2)
    {:noreply, [], state}
  end
  
  # Send the message to FCM, track as a pending request
  defp do_send(%{fcm_conn_pid: fcm_conn_pid, pending_requests: pending_requests}=state, push_request) do
    {message_id, state} = generate_id(state)
    xml = PushRequest.to_xml(push_request, message_id)
    :ok = FCM.Connection.send(fcm_conn_pid, xml)
    pending_requests = Map.put(pending_requests, message_id, push_request)
    %{state | pending_requests: pending_requests}
  end
  
  # FCM response handling
  defp handle_response(%{message_id: message_id}=response, %{pending_requests: pending_requests, producer_from: producer_from}=state) do
    {push_request, pending_requests} = Map.pop(pending_requests, message_id)
    
    # Since we finished a request, ask the Push Collector for more.
    GenStage.ask(producer_from, 1)
    
    %{state | pending_requests: pending_requests}
  end
  
  defp generate_id(%{next_id: next_id}=state) do
    {to_string(next_id), %{state | next_id: next_id + 1}}
  end
end

Пример инцидента
Ниже показан реальный инцидент, с которым столкнулась система. На верхнем графике показано количество push-запросов в секунду, проходящих через систему. На нижнем графике — количество push-запросов, помещённых в буфер Push Collector.

Как Discord обрабатывает более 1 000 000 push-запросов в минуту с помощью Elixir GenStage - 3

Как Discord обрабатывает более 1 000 000 push-запросов в минуту с помощью Elixir GenStage - 4

Хроника событий:

  • ~17:47:00  — Система работает в нормальном режиме.
  • ~17:47:30  —  К нам начинает поступать поток сообщений. Push Collector немного задействовал буфер, ожидая реакции Pusher. Вскоре буфер чуть освободился.
  • ~17:48:50  — Pusher'ы не могут отправлять сообщения в Firebase быстрее, чем они поступают, так что буфер Push Collector'а начинает заполняться.
  • ~17:50:00  — Буфер Pusher Collector достигает пика и начинает сбрасывать некоторые запросы.
  • ~17:50:50  — Буфер Pusher Collector начинает освобождаться и перестаёт сбрасывать запросы.
  • ~17:51:30  —  Наплыв запросов пошёл на спад.
  • ~17:52:30  — Система полностью вернулась в норму.

Успех Elixir

Мы в Discord очень довольны использованием Elixir и Erlang как ключевой технологии на наших сервисах бэкенда. Приятно видеть расширения вроде GenStage, которые опираются на нерушимые технологии Erlang/OTP.

Мы ищем смелых духом, чтобы помочь в решении таких проблем, поскольку Discord продолжает расти. Если вы любите игры и такого рода задачи заставляют ваше сердце биться чаще, посмотрите наши вакансии.

Автор: m1rko

Источник

Поделиться

* - обязательные к заполнению поля