Пишем на Go простой балансировщик

в 9:46, , рубрики: Go, load balancing, Блог компании Mail.Ru Group, высокая производительность, никто не читает теги, Разработка веб-сайтов, Сетевые технологии
Пишем на Go простой балансировщик - 1

Балансировщики нагрузки играют в веб-архитектуре ключевую роль. Они позволяют распределять нагрузку по нескольким бэкендам, тем самым улучшая масштабируемость. А поскольку у нас сконфигурировано несколько бэкендов, сервис становится высокодоступным, потому что в случае сбоя на одном сервере балансировщик может выбирать другой работающий сервер.

Поигравшись с профессиональными балансировщиками наподобие NGINX, я попробовал ради веселья создать простенький балансировщик. Написал я его на Go, это современный язык, поддерживающий полноценный параллелизм. Стандартная библиотека в Go имеет широкие возможности и позволяет писать высокопроизводительные приложения с меньшим количеством кода. К тому же для простоты распространения она генерирует единственный статически скомпонованный бинарник.

Как работает наш балансировщик

Для распределения нагрузки по бэкендам используются разные алгоритмы. Например:

  • Round Robin — нагрузка распределяется равномерно, с учётом одинаковой вычислительной мощности серверов.
  • Weighted Round Robin — в зависимости от вычислительной мощности серверам могут присваиваться разные веса.
  • Least Connections — нагрузка распределяется по серверам с наименьшим количеством активных подключений.

В нашем балансировщике мы реализуем простейший алгоритм — Round Robin.

Пишем на Go простой балансировщик - 2

Выбор в Round Robin

Алгоритм Round Robin прост. Он даёт всем исполнителям одинаковые возможности по выполнению задач.

Пишем на Go простой балансировщик - 3
Выбор серверов в Round Robin для обработки входящих запросов.

Как показано на иллюстрации, алгоритм выбирает серверы по кругу, циклически. Но мы не можем выбирать их напрямую, верно?

А если сервер лежит? Вероятно, нам не нужно отправлять на него трафик. То есть сервер не может использоваться напрямую, пока мы не приведём его в нужное состояние. Нужно направлять трафик только на те серверы, которые запущены и работают.

Определим структуры

Нам нужно отслеживать все подробности, связанные с бэкендом. Необходимо знать, живой ли он, а также отслеживать URL. Для этого мы можем определить такую структуру:

type Backend struct {
  URL          *url.URL
  Alive        bool
  mux          sync.RWMutex
  ReverseProxy *httputil.ReverseProxy
}

Не волнуйтесь, я поясню значения полей в Backend.

Теперь в балансировщике нужно как-то отслеживать все бэкенды. Для этого можно воспользоваться Slice и счётчиком переменных. Определим его в ServerPool:

type ServerPool struct {
  backends []*Backend
  current  uint64
}

Использование ReverseProxy

Как мы уже определили, суть балансировщика в распределении трафика по разным серверам и возвращении результатов клиенту. Как сказано в документации Go:

ReverseProxy — это обработчик HTTP, который берёт входящие запросы и отправляет на другой сервер, проксируя ответы обратно клиенту.

Именно то, что нам нужно. Не надо изобретать колесо. Можно просто транслировать наши запросы через ReverseProxy.

u, _ := url.Parse("http://localhost:8080")
rp := httputil.NewSingleHostReverseProxy(u)
  
// initialize your server and add this as handler
http.HandlerFunc(rp.ServeHTTP)

C помощью httputil.NewSingleHostReverseProxy(url) можно инициализировать ReverseProxy, который будет транслировать запросы на переданный url. В приведённом выше примере все запросы переданы на localhost:8080, а результаты отосланы клиенту.

Если посмотреть на сигнатуру метода ServeHTTP, то в ней можно найти сигнатуру HTTP-обработчика. Поэтому можно передавать его HandlerFunc в http.

Другие примеры есть в документации.

Для нашего балансировщика можно инициировать ReverseProxy с ассоциированным URL в Backend, чтобы ReverseProxy маршрутизировал запросы в URL.

Процесс выбора серверов

В ходе очередного выбора сервера нам нужно пропускать лежащие серверы. Но необходимо организовать подсчёт.

Многочисленные клиенты будут подключаться к балансировщику, и когда каждый из них попросит следующий узел передать трафик, может возникнуть состояние гонки. Для предотвращения этого мы можем блокировать ServerPool с помощью mutex. Но это будет избыточно, к тому же мы вообще не хотим блокировать ServerPool. Нам лишь нужно увеличить счётчик на единицу.

Наилучшим решением, соблюдающим эти требования, будет атомарное инкрементирование. Go поддерживает его с помощью пакета atomic.

func (s *ServerPool) NextIndex() int {
  return int(atomic.AddUint64(&s.current, uint64(1)) % uint64(len(s.backends)))
}

Мы атомарно увеличиваем текущее значение на единицу и возвращаем индекс, изменяя длину массива. Это означает, что значение всегда должно лежать в диапазоне от 0 до длины массива. В конце нас будет интересовать конкретный индекс, а не весь счётчик.

Выбор живого сервера

Мы уже знаем, что наши запросы циклически ротируются по всем серверам. И нам нужно лишь пропускать неработающие.

GetNext() всегда возвращает значение, лежащее в диапазоне от 0 до длины массива. В любой момент мы можем получить следующий узел, и если он неактивен, нужно в рамках цикла искать дальше по массиву.

Пишем на Go простой балансировщик - 4
Циклически проходим по массиву.

Как показано на иллюстрации, мы хотим пройти от следующего узла до конца списка. Это можно сделать с помощью next + length. Но для выбора индекса нужно ограничить его рамками длины массива. Это легко можно сделать с помощью операции модифицирования.

После того, как мы в ходе поиска нашли работающий сервер, его нужно пометить как текущий:

// GetNextPeer returns next active peer to take a connection
func (s *ServerPool) GetNextPeer() *Backend {
  // loop entire backends to find out an Alive backend
  next := s.NextIndex()
  l := len(s.backends) + next // start from next and move a full cycle
  for i := next; i < l; i++ {
    idx := i % len(s.backends) // take an index by modding with length
    // if we have an alive backend, use it and store if its not the original one
    if s.backends[idx].IsAlive() {
      if i != next {
        atomic.StoreUint64(&s.current, uint64(idx)) // mark the current one
      }
      return s.backends[idx]
    }
  }
  return nil
}

Избегаем состояния гонки в структуре Backend

Здесь нужно помнить о важной проблеме. Структура Backend содержит переменную, которую могут изменять или запрашивать одновременно несколько горутин.

Мы знаем, что читать переменную будет больше горутин, чем записывать в неё. Поэтому для сериализации доступа к Alive мы выбрали RWMutex.

// SetAlive for this backend
func (b *Backend) SetAlive(alive bool) {
  b.mux.Lock()
  b.Alive = alive
  b.mux.Unlock()
}

// IsAlive returns true when backend is alive
func (b *Backend) IsAlive() (alive bool) {
  b.mux.RLock()
  alive = b.Alive
  b.mux.RUnlock()
  return
}

Балансируем запросы

Теперь можно сформулировать простой метод для балансировки наших запросов. Он будет сбоить лишь в том случае, если упадут все серверы.

// lb load balances the incoming request
func lb(w http.ResponseWriter, r *http.Request) {
  peer := serverPool.GetNextPeer()
  if peer != nil {
    peer.ReverseProxy.ServeHTTP(w, r)
    return
  }
  http.Error(w, "Service not available", http.StatusServiceUnavailable)
}

Этот метод можно передать HTTP-серверу просто в виде HandlerFunc.

server := http.Server{
  Addr:    fmt.Sprintf(":%d", port),
  Handler: http.HandlerFunc(lb),
}

Маршрутизируем трафик только на работающие серверы

У нашего балансировщика серьёзная проблема. Мы не знаем, работает ли сервер. Чтобы узнать это, нужно проверить сервер. Сделать это можно двумя способами:

  • Активный: выполняя текущий запрос, мы обнаруживаем, что выбранный сервер не отвечает, и помечаем его как нерабочий.
  • Пассивный: можно пинговать серверы с каким-то интервалом и проверять статус.

Активно проверяем работающие серверы

При любой ошибке ReverseProxy инициирует функцию обратного вызова ErrorHandler. Это можно применять для обнаружения сбоев:

proxy.ErrorHandler = func(writer http.ResponseWriter, request *http.Request, e error) {
  log.Printf("[%s] %sn", serverUrl.Host, e.Error())
  retries := GetRetryFromContext(request)
  if retries < 3 {
    select {
      case <-time.After(10 * time.Millisecond):
        ctx := context.WithValue(request.Context(), Retry, retries+1)
        proxy.ServeHTTP(writer, request.WithContext(ctx))
      }
      return
    }

  // after 3 retries, mark this backend as down
  serverPool.MarkBackendStatus(serverUrl, false)

  // if the same request routing for few attempts with different backends, increase the count
  attempts := GetAttemptsFromContext(request)
  log.Printf("%s(%s) Attempting retry %dn", request.RemoteAddr, request.URL.Path, attempts)
  ctx := context.WithValue(request.Context(), Attempts, attempts+1)
  lb(writer, request.WithContext(ctx))
}

При разработке этого обработчика ошибок мы использовали возможности замыканий. Это позволяет нам захватывать в наш метод такие внешние переменные, как серверный URL. Обработчик проверяет счётчик повторов, и если он меньше 3, то мы снова отправляем тот же запрос тому же серверу. Это делается потому, что из-за временных ошибок сервер может отбрасывать наши запросы, но вскоре он становится доступен (возможно, у сервера не было свободных сокетов для новых клиентов). Так что нужно настроить таймер задержки для новой попытки примерно через 10 мс. С каждым запросом мы увеличиваем счётчик попыток.

После сбоя каждой попытки мы помечаем сервер как нерабочий.

Теперь нужно назначить для того же запроса новый сервер. Делать мы это будем с помощью счётчика попыток, использующего пакет context. После увеличения счётчика попыток мы передаём его в lb для выбора нового сервера для обработки запроса.

Мы не можем делать это бесконечно, так что будем проверять в lb, не достигнуто ли максимальное количество попыток, прежде чем продолжить обработку запроса.

Можно просто получить счётчик попыток из запроса, если он достиг максимума, то мы прерываем запрос.

// lb load balances the incoming request
func lb(w http.ResponseWriter, r *http.Request) {
  attempts := GetAttemptsFromContext(r)
  if attempts > 3 {
    log.Printf("%s(%s) Max attempts reached, terminatingn", r.RemoteAddr, r.URL.Path)
    http.Error(w, "Service not available", http.StatusServiceUnavailable)
    return
  }

  peer := serverPool.GetNextPeer()
  if peer != nil {
    peer.ReverseProxy.ServeHTTP(w, r)
    return
  }
  http.Error(w, "Service not available", http.StatusServiceUnavailable)
}

Это рекурсивная реализация.

Использование пакета context

Пакет context позволяет сохранять полезные данные в HTTP-запросах. Мы будем активно это использовать для отслеживания данных, относящихся к запросам — счётчиков Attempt и Retry.

Во-первых, нужно задать ключи для контекста. Рекомендуется использовать не строковые, а уникальные числовые значения. В Go есть ключевое слова iota для инкрементальной реализации констант, каждая из которых содержит уникальное значение. Это прекрасное решение для определения числовых ключей.

const (
  Attempts int = iota
  Retry
)

Затем можно извлечь значение, как мы обычно делаем с помощью HashMap. Возвращаемое по умолчанию значение может зависеть от текущей ситуации.

// GetAttemptsFromContext returns the attempts for request
func GetRetryFromContext(r *http.Request) int {
  if retry, ok := r.Context().Value(Retry).(int); ok {
    return retry
  }
  return 0
}

Пассивная проверка серверов

Пассивные проверки позволяют идентифицировать и восстанавливать упавшие серверы. Мы пингуем их с определённым интервалом, чтобы определить их статус.

Для пингования попробуем установить TCP-соединение. Если сервер отвечает, мы помечаем его рабочим. Этот метод можно адаптировать для вызова специфических конечных точек наподобие /status. Удостоверьтесь, что закрыли подключение после его создания, чтобы уменьшить дополнительную нагрузку на сервер. Иначе он будет пытаться поддерживать это подключение и в конце концов исчерпает свои ресурсы.

// isAlive checks whether a backend is Alive by establishing a TCP connection
func isBackendAlive(u *url.URL) bool {
  timeout := 2 * time.Second
  conn, err := net.DialTimeout("tcp", u.Host, timeout)
  if err != nil {
    log.Println("Site unreachable, error: ", err)
    return false
  }
  _ = conn.Close() // close it, we dont need to maintain this connection
  return true
}

Теперь можно итерировать серверы и отмечать их статусы:

// HealthCheck pings the backends and update the status
func (s *ServerPool) HealthCheck() {
  for _, b := range s.backends {
    status := "up"
    alive := isBackendAlive(b.URL)
    b.SetAlive(alive)
    if !alive {
      status = "down"
    }
    log.Printf("%s [%s]n", b.URL, status)
  }
}

Для периодического запуска этого кода можно запустить в Go таймер. Он позволит слушать события в канале.

// healthCheck runs a routine for check status of the backends every 2 mins
func healthCheck() {
  t := time.NewTicker(time.Second * 20)
  for {
    select {
    case <-t.C:
      log.Println("Starting health check...")
      serverPool.HealthCheck()
      log.Println("Health check completed")
    }
  }
}

В этом коде канал <-t.C будет возвращать значение каждые 20 секунд. select позволяет определять это событие. При отсутствии ситуации default он ждёт, пока хотя бы один case может быть выполнен.

Теперь запускаем код в отдельной горутине:

go healthCheck()

Заключение

В этой статье мы рассмотрели много вопросов:

  • Алгоритм Round Robin
  • ReverseProxy из стандартной библиотеки
  • Мьютексы
  • Атомарные операции
  • Замыкания
  • Обратные вызовы
  • Операция выбора

Есть ещё много способов улучшить наш балансировщик. Например:

  • Использовать кучу для сортировки живых серверов, чтобы уменьшить область поиска.
  • Собирать статистику.
  • Реализовать алгоритм weighted round-robin c наименьшим количеством коннектов.
  • Добавить поддержку конфигурационных файлов.

И так далее.

Исходный код лежит здесь.

Автор: Макс

Источник


* - обязательные к заполнению поля


https://ajax.googleapis.com/ajax/libs/jquery/3.4.1/jquery.min.js