Пишем на Go простой балансировщик

от автора

Балансировщики нагрузки играют в веб-архитектуре ключевую роль. Они позволяют распределять нагрузку по нескольким бэкендам, тем самым улучшая масштабируемость. А поскольку у нас сконфигурировано несколько бэкендов, сервис становится высокодоступным, потому что в случае сбоя на одном сервере балансировщик может выбирать другой работающий сервер.

Поигравшись с профессиональными балансировщиками наподобие NGINX, я попробовал ради веселья создать простенький балансировщик. Написал я его на Go, это современный язык, поддерживающий полноценный параллелизм. Стандартная библиотека в Go имеет широкие возможности и позволяет писать высокопроизводительные приложения с меньшим количеством кода. К тому же для простоты распространения она генерирует единственный статически скомпонованный бинарник.

Как работает наш балансировщик

Для распределения нагрузки по бэкендам используются разные алгоритмы. Например:

  • Round Robin — нагрузка распределяется равномерно, с учётом одинаковой вычислительной мощности серверов.
  • Weighted Round Robin — в зависимости от вычислительной мощности серверам могут присваиваться разные веса.
  • Least Connections — нагрузка распределяется по серверам с наименьшим количеством активных подключений.

В нашем балансировщике мы реализуем простейший алгоритм — Round Robin.

Выбор в Round Robin

Алгоритм Round Robin прост. Он даёт всем исполнителям одинаковые возможности по выполнению задач.

Выбор серверов в Round Robin для обработки входящих запросов.

Как показано на иллюстрации, алгоритм выбирает серверы по кругу, циклически. Но мы не можем выбирать их напрямую, верно?

А если сервер лежит? Вероятно, нам не нужно отправлять на него трафик. То есть сервер не может использоваться напрямую, пока мы не приведём его в нужное состояние. Нужно направлять трафик только на те серверы, которые запущены и работают.

Определим структуры

Нам нужно отслеживать все подробности, связанные с бэкендом. Необходимо знать, живой ли он, а также отслеживать URL. Для этого мы можем определить такую структуру:

type Backend struct {   URL          *url.URL   Alive        bool   mux          sync.RWMutex   ReverseProxy *httputil.ReverseProxy }

Не волнуйтесь, я поясню значения полей в Backend.

Теперь в балансировщике нужно как-то отслеживать все бэкенды. Для этого можно воспользоваться Slice и счётчиком переменных. Определим его в ServerPool:

type ServerPool struct {   backends []*Backend   current  uint64 }

Использование ReverseProxy

Как мы уже определили, суть балансировщика в распределении трафика по разным серверам и возвращении результатов клиенту. Как сказано в документации Go:

ReverseProxy — это обработчик HTTP, который берёт входящие запросы и отправляет на другой сервер, проксируя ответы обратно клиенту.

Именно то, что нам нужно. Не надо изобретать колесо. Можно просто транслировать наши запросы через ReverseProxy.

u, _ := url.Parse("http://localhost:8080") rp := httputil.NewSingleHostReverseProxy(u)    // initialize your server and add this as handler http.HandlerFunc(rp.ServeHTTP)

C помощью httputil.NewSingleHostReverseProxy(url) можно инициализировать ReverseProxy, который будет транслировать запросы на переданный url. В приведённом выше примере все запросы переданы на localhost:8080, а результаты отосланы клиенту.

Если посмотреть на сигнатуру метода ServeHTTP, то в ней можно найти сигнатуру HTTP-обработчика. Поэтому можно передавать его HandlerFunc в http.

Другие примеры есть в документации.

Для нашего балансировщика можно инициировать ReverseProxy с ассоциированным URL в Backend, чтобы ReverseProxy маршрутизировал запросы в URL.

Процесс выбора серверов

В ходе очередного выбора сервера нам нужно пропускать лежащие серверы. Но необходимо организовать подсчёт.

Многочисленные клиенты будут подключаться к балансировщику, и когда каждый из них попросит следующий узел передать трафик, может возникнуть состояние гонки. Для предотвращения этого мы можем блокировать ServerPool с помощью mutex. Но это будет избыточно, к тому же мы вообще не хотим блокировать ServerPool. Нам лишь нужно увеличить счётчик на единицу.

Наилучшим решением, соблюдающим эти требования, будет атомарное инкрементирование. Go поддерживает его с помощью пакета atomic.

func (s *ServerPool) NextIndex() int {   return int(atomic.AddUint64(&s.current, uint64(1)) % uint64(len(s.backends))) }

Мы атомарно увеличиваем текущее значение на единицу и возвращаем индекс, изменяя длину массива. Это означает, что значение всегда должно лежать в диапазоне от 0 до длины массива. В конце нас будет интересовать конкретный индекс, а не весь счётчик.

Выбор живого сервера

Мы уже знаем, что наши запросы циклически ротируются по всем серверам. И нам нужно лишь пропускать неработающие.

GetNext() всегда возвращает значение, лежащее в диапазоне от 0 до длины массива. В любой момент мы можем получить следующий узел, и если он неактивен, нужно в рамках цикла искать дальше по массиву.

Циклически проходим по массиву.

Как показано на иллюстрации, мы хотим пройти от следующего узла до конца списка. Это можно сделать с помощью next + length. Но для выбора индекса нужно ограничить его рамками длины массива. Это легко можно сделать с помощью операции модифицирования.

После того, как мы в ходе поиска нашли работающий сервер, его нужно пометить как текущий:

// GetNextPeer returns next active peer to take a connection func (s *ServerPool) GetNextPeer() *Backend {   // loop entire backends to find out an Alive backend   next := s.NextIndex()   l := len(s.backends) + next // start from next and move a full cycle   for i := next; i < l; i++ {     idx := i % len(s.backends) // take an index by modding with length     // if we have an alive backend, use it and store if its not the original one     if s.backends[idx].IsAlive() {       if i != next {         atomic.StoreUint64(&s.current, uint64(idx)) // mark the current one       }       return s.backends[idx]     }   }   return nil } 

Избегаем состояния гонки в структуре Backend

Здесь нужно помнить о важной проблеме. Структура Backend содержит переменную, которую могут изменять или запрашивать одновременно несколько горутин.

Мы знаем, что читать переменную будет больше горутин, чем записывать в неё. Поэтому для сериализации доступа к Alive мы выбрали RWMutex.

// SetAlive for this backend func (b *Backend) SetAlive(alive bool) {   b.mux.Lock()   b.Alive = alive   b.mux.Unlock() }  // IsAlive returns true when backend is alive func (b *Backend) IsAlive() (alive bool) {   b.mux.RLock()   alive = b.Alive   b.mux.RUnlock()   return } 

Балансируем запросы

Теперь можно сформулировать простой метод для балансировки наших запросов. Он будет сбоить лишь в том случае, если упадут все серверы.

// lb load balances the incoming request func lb(w http.ResponseWriter, r *http.Request) {   peer := serverPool.GetNextPeer()   if peer != nil {     peer.ReverseProxy.ServeHTTP(w, r)     return   }   http.Error(w, "Service not available", http.StatusServiceUnavailable) }

Этот метод можно передать HTTP-серверу просто в виде HandlerFunc.

server := http.Server{   Addr:    fmt.Sprintf(":%d", port),   Handler: http.HandlerFunc(lb), } 

Маршрутизируем трафик только на работающие серверы

У нашего балансировщика серьёзная проблема. Мы не знаем, работает ли сервер. Чтобы узнать это, нужно проверить сервер. Сделать это можно двумя способами:

  • Активный: выполняя текущий запрос, мы обнаруживаем, что выбранный сервер не отвечает, и помечаем его как нерабочий.
  • Пассивный: можно пинговать серверы с каким-то интервалом и проверять статус.

Активно проверяем работающие серверы

При любой ошибке ReverseProxy инициирует функцию обратного вызова ErrorHandler. Это можно применять для обнаружения сбоев:

proxy.ErrorHandler = func(writer http.ResponseWriter, request *http.Request, e error) {   log.Printf("[%s] %s\n", serverUrl.Host, e.Error())   retries := GetRetryFromContext(request)   if retries < 3 {     select {       case <-time.After(10 * time.Millisecond):         ctx := context.WithValue(request.Context(), Retry, retries+1)         proxy.ServeHTTP(writer, request.WithContext(ctx))       }       return     }    // after 3 retries, mark this backend as down   serverPool.MarkBackendStatus(serverUrl, false)    // if the same request routing for few attempts with different backends, increase the count   attempts := GetAttemptsFromContext(request)   log.Printf("%s(%s) Attempting retry %d\n", request.RemoteAddr, request.URL.Path, attempts)   ctx := context.WithValue(request.Context(), Attempts, attempts+1)   lb(writer, request.WithContext(ctx)) }

При разработке этого обработчика ошибок мы использовали возможности замыканий. Это позволяет нам захватывать в наш метод такие внешние переменные, как серверный URL. Обработчик проверяет счётчик повторов, и если он меньше 3, то мы снова отправляем тот же запрос тому же серверу. Это делается потому, что из-за временных ошибок сервер может отбрасывать наши запросы, но вскоре он становится доступен (возможно, у сервера не было свободных сокетов для новых клиентов). Так что нужно настроить таймер задержки для новой попытки примерно через 10 мс. С каждым запросом мы увеличиваем счётчик попыток.

После сбоя каждой попытки мы помечаем сервер как нерабочий.

Теперь нужно назначить для того же запроса новый сервер. Делать мы это будем с помощью счётчика попыток, использующего пакет context. После увеличения счётчика попыток мы передаём его в lb для выбора нового сервера для обработки запроса.

Мы не можем делать это бесконечно, так что будем проверять в lb, не достигнуто ли максимальное количество попыток, прежде чем продолжить обработку запроса.

Можно просто получить счётчик попыток из запроса, если он достиг максимума, то мы прерываем запрос.

// lb load balances the incoming request func lb(w http.ResponseWriter, r *http.Request) {   attempts := GetAttemptsFromContext(r)   if attempts > 3 {     log.Printf("%s(%s) Max attempts reached, terminating\n", r.RemoteAddr, r.URL.Path)     http.Error(w, "Service not available", http.StatusServiceUnavailable)     return   }    peer := serverPool.GetNextPeer()   if peer != nil {     peer.ReverseProxy.ServeHTTP(w, r)     return   }   http.Error(w, "Service not available", http.StatusServiceUnavailable) }

Это рекурсивная реализация.

Использование пакета context

Пакет context позволяет сохранять полезные данные в HTTP-запросах. Мы будем активно это использовать для отслеживания данных, относящихся к запросам — счётчиков Attempt и Retry.

Во-первых, нужно задать ключи для контекста. Рекомендуется использовать не строковые, а уникальные числовые значения. В Go есть ключевое слова iota для инкрементальной реализации констант, каждая из которых содержит уникальное значение. Это прекрасное решение для определения числовых ключей.

const (   Attempts int = iota   Retry )

Затем можно извлечь значение, как мы обычно делаем с помощью HashMap. Возвращаемое по умолчанию значение может зависеть от текущей ситуации.

// GetAttemptsFromContext returns the attempts for request func GetRetryFromContext(r *http.Request) int {   if retry, ok := r.Context().Value(Retry).(int); ok {     return retry   }   return 0 } 

Пассивная проверка серверов

Пассивные проверки позволяют идентифицировать и восстанавливать упавшие серверы. Мы пингуем их с определённым интервалом, чтобы определить их статус.

Для пингования попробуем установить TCP-соединение. Если сервер отвечает, мы помечаем его рабочим. Этот метод можно адаптировать для вызова специфических конечных точек наподобие /status. Удостоверьтесь, что закрыли подключение после его создания, чтобы уменьшить дополнительную нагрузку на сервер. Иначе он будет пытаться поддерживать это подключение и в конце концов исчерпает свои ресурсы.

// isAlive checks whether a backend is Alive by establishing a TCP connection func isBackendAlive(u *url.URL) bool {   timeout := 2 * time.Second   conn, err := net.DialTimeout("tcp", u.Host, timeout)   if err != nil {     log.Println("Site unreachable, error: ", err)     return false   }   _ = conn.Close() // close it, we dont need to maintain this connection   return true }

Теперь можно итерировать серверы и отмечать их статусы:

// HealthCheck pings the backends and update the status func (s *ServerPool) HealthCheck() {   for _, b := range s.backends {     status := "up"     alive := isBackendAlive(b.URL)     b.SetAlive(alive)     if !alive {       status = "down"     }     log.Printf("%s [%s]\n", b.URL, status)   } }

Для периодического запуска этого кода можно запустить в Go таймер. Он позволит слушать события в канале.

// healthCheck runs a routine for check status of the backends every 2 mins func healthCheck() {   t := time.NewTicker(time.Second * 20)   for {     select {     case <-t.C:       log.Println("Starting health check...")       serverPool.HealthCheck()       log.Println("Health check completed")     }   } }

В этом коде канал <-t.C будет возвращать значение каждые 20 секунд. select позволяет определять это событие. При отсутствии ситуации default он ждёт, пока хотя бы один case может быть выполнен.

Теперь запускаем код в отдельной горутине:

go healthCheck() 

Заключение

В этой статье мы рассмотрели много вопросов:

  • Алгоритм Round Robin
  • ReverseProxy из стандартной библиотеки
  • Мьютексы
  • Атомарные операции
  • Замыкания
  • Обратные вызовы
  • Операция выбора

Есть ещё много способов улучшить наш балансировщик. Например:

  • Использовать кучу для сортировки живых серверов, чтобы уменьшить область поиска.
  • Собирать статистику.
  • Реализовать алгоритм weighted round-robin c наименьшим количеством коннектов.
  • Добавить поддержку конфигурационных файлов.

И так далее.

Исходный код лежит здесь.


ссылка на оригинал статьи https://habr.com/ru/company/mailru/blog/476276/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *