Создание кастомного балансировщика нагрузки на Go для gRPC с приоритизацией адресов

от автора

В процессе разработки микросервисных приложений часто необходимо наладить эффективную и быструю коммуникацию между сервисами. Разработанный Google gRPC предоставляет высокопроизводительный фреймворк для организации такого взаимодействия. Однако стандартные балансировщики нагрузки в gRPC не всегда удовлетворяют специфическим требованиям, особенно когда требуется приоритизация адресов для минимизации сетевых задержек и обеспечения отказоустойчивости.

В этой статье я поделюсь опытом создания кастомного балансировщика нагрузки на Go для gRPC, который использует приоритеты адресов для выбора наилучшего соединения. Это решение позволяет гибко управлять распределением клиентских запросов между серверами с разными уровнями доступности и обеспечивает подключение к оптимальному ЦОД с минимальными задержками.

Постановка задачи

При разработке одного из проектов VK Tech мне потребовалось реализовать балансировщик, который выбирает первый доступный адрес из приоритетного списка. Приоритеты адресов определяются порядком в конфигурационном файле: чем выше адрес в списке, тем выше его приоритет. В случае недоступности адреса с наивысшим приоритетом балансировщик должен автоматически переключаться на следующий доступный адрес по приоритету.

Требования к балансировщику:

  • Приоритизация адресов: выбор адреса с наивысшим приоритетом из списка.

  • Отказоустойчивость: автоматическое переключение на следующий адрес при недоступности текущего.

  • Минимизация задержек: подключение к ближайшему или наиболее оптимальному ЦОД.

Почему стандартные балансировщики не подходят

Стандартные балансировщики в gRPC, такие как round-robin (циклический) и pick-first («первый доступный»), не учитывают приоритизацию адресов в списке.

Round-robin равномерно распределяет запросы между всеми доступными серверами, что может привести к увеличению сетевых задержек, если некоторые серверы географически удалены или менее производительны.

Pick-first всегда выбирает первый доступный адрес, но не переключается на адреса с более высоким приоритетом, если они становятся доступными после первоначального подключения.

Таким образом, для решения задачи минимизации задержек и обеспечения гибкости подключения к различным ЦОДам стандартные балансировщики не подходят.

Основная идея кастомного балансировщика

Наш кастомный балансировщик использует приоритизацию адресов, заданную в конфигурационном файле, для выбора наилучшего соединения.

  • Порядок адресов: адреса упорядочены по приоритету; индекс 0 — наивысший приоритет.

  • Выбор соединения: всегда выбирается первое доступное соединение с наивысшим приоритетом.

  • Автоматическое переключение: при недоступности текущего соединения балансировщик переключается на следующий по приоритету.

Преимущества такого подхода:

  • Минимизация сетевых задержек.

  • Повышенная отказоустойчивость.

  • Гибкость настройки.

Обзор архитектуры решения

Перед тем как перейти к реализации, рассмотрим основные компоненты нашего балансировщика и их взаимодействие.

BalancerBuilder

Балансировщик в gRPC создаётся с помощью билдера. Наш BalancerBuilder регистрирует балансировщик с определённым именем и схемой, чтобы gRPC-клиент мог его использовать.

type BalancerBuilder struct{}   func (b BalancerBuilder) Build(cc balancer.ClientConn, _ balancer.BuildOptions) balancer.Balancer {  return &Balancer{      cc:   cc,      subConns: resolver.NewAddressMap(),      scStates: make(map[balancer.SubConn]connectivity.State),      csEvltr:  &balancer.ConnectivityStateEvaluator{},      state:connectivity.Connecting,  }  }   func (b BalancerBuilder) Name() string { return balancerName }   func init() {  balancer.Register(&BalancerBuilder{})  }

Основные задачи билдера:

  • Создание и инициализация балансировщика.

  • Настройка взаимодействия с ClientConn.

  • Регистрация балансировщика для использования клиентом.

Resolver

Резолвер предоставляет балансировщику список адресов с их приоритетами. Он преобразует адреса из конфигурационного файла в resolver.Address, присваивая каждому адресу атрибут index, соответствующий его приоритету.

type resolverBuilder struct {  addresses []resolver.Address  }   func (b *resolverBuilder) Build(  target resolver.Target,  clientConn resolver.ClientConn,  _ resolver.BuildOptions,  ) (resolver.Resolver, error) {  ctx, cancel := context.WithCancel(context.Background())   res := &fiResolver{      ctx:        ctx,      cancel:     cancel,      target:     target,      cc:         clientConn,      addressesStore: b.addresses,  }   if len(b.addresses) > 1 {      res.serviceConfig = clientConn.ParseServiceConfig(defaultConfig)  }   go res.start()   return res, nil  }   func (*resolverBuilder) Scheme() string {  return scheme  }   func initResolver(addresses []string) {  addressesStore := make([]resolver.Address, len(addresses))  for i, addr := range addresses {      addressesStore[i] = resolver.Address{          Addr:   addr,          Attributes: attributes.New("index", i),      }  }       resolver.Register(&resolverBuilder{addresses: addressesStore})  }

Функции резолвера:

  • Динамическое обновление адресов.

  • Предоставление адресов с приоритетами балансировщику.

  • Сообщение об ошибках в случае недоступности адресов.

Picker

Picker выбирает соединение с наименьшим индексом (наивысшим приоритетом) из доступных. Если соединение с более высоким приоритетом становится доступным, балансировщик автоматически переключается на него.

type firstIdxPicker struct {  result balancer.PickResult  err    error  }   func (p *firstIdxPicker) Pick(_ balancer.PickInfo) (balancer.PickResult, error) {  return p.result, p.err  }   func NewFIPicker(info base.PickerBuildInfo) balancer.Picker {  if len(info.ReadySCs) == 0 {      return &firstIdxPicker{err: balancer.ErrNoSubConnAvailable}  }   minIdx := math.MaxInt  var selectedConn balancer.SubConn   for sc, scInfo := range info.ReadySCs {      idx, ok := scInfo.Address.Attributes.Value("index").(int) // <- наш простенький алгоритм определения оптимального соединения      if ok && idx < minIdx {          minIdx = idx          selectedConn = sc      }  }   if selectedConn != nil {      return &firstIdxPicker{result: balancer.PickResult{SubConn: selectedConn}}  }   return &firstIdxPicker{err: balancer.ErrNoSubConnAvailable}  }

Алгоритм выбора:

  1. Проходит по всем готовым соединениям.

  2. Выбирает соединение с наименьшим index.

  3. Возвращает выбранное соединение для обработки запроса.

Balancer

Балансировщик отслеживает состояния соединений и регенерирует Picker при их изменении.

type Balancer struct {  cc   balancer.ClientConn  csEvltr  *balancer.ConnectivityStateEvaluator  stateconnectivity.State   subConns *resolver.AddressMap  scStates map[balancer.SubConn]connectivity.State  picker   balancer.Picker   resolverErr error  connErr error  }   func (b *Balancer) UpdateClientConnState(ccs balancer.ClientConnState) error {  b.resolverErr = nil   addressMap := b.createNewSubConnections(ccs)   for _, addr := range b.subConns.Keys() {      if _, ok := addressMap.Get(addr); !ok {          sci, _ := b.subConns.Get(addr)          sc := sci.(balancer.SubConn)          sc.Shutdown()          b.subConns.Delete(addr)      }  }   if len(ccs.ResolverState.Addresses) == 0 {          b.ResolverError(errZeroAddresses)      return balancer.ErrBadResolverState  }   b.regeneratePicker()      b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker})   return nil  }   func (b *Balancer) UpdateSubConnState(subConn balancer.SubConn, state balancer.SubConnState) {      oldState, ok := b.scStates[subConn]  if !ok {      return  }   b.scStates[subConn] = state.ConnectivityState   switch state.ConnectivityState {  case connectivity.Idle:      subConn.Connect()  case connectivity.Shutdown:      delete(b.scStates, subConn)  case connectivity.TransientFailure:      b.connErr = state.ConnectionError  }   b.state = b.csEvltr.RecordTransition(oldState, state.ConnectivityState)   if (state.ConnectivityState == connectivity.Ready) != (oldState == connectivity.Ready) || b.state == connectivity.TransientFailure {      b.regeneratePicker()  }       b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker})  }   func (b *Balancer) regeneratePicker() {  if b.state == connectivity.TransientFailure {      b.picker = &firstIdxPicker{err: errors.Join(b.resolverErr, b.connErr)}      return  }   readySCs := make(map[balancer.SubConn]base.SubConnInfo)   for _, addr := range b.subConns.Keys() {      sci, _ := b.subConns.Get(addr)      sc := sci.(balancer.SubConn)      if state, ok := b.scStates[sc]; ok && state == connectivity.Ready {          readySCs[sc] = base.SubConnInfo{Address: addr}      }  }   b.picker = NewFIPicker(base.PickerBuildInfo{ReadySCs: readySCs})  }

Отслеживание состояний соединений:

  • UpdateClientConnState: создание новых и удаление неактуальных соединений.

  • UpdateSubConnState: обновление состояний существующих соединений.

  • regeneratePicker: обновление пикера при изменении состояний для выбора оптимального соединения.

Настройка и конфигурация

Для использования кастомного балансировщика необходимо определить его имя и схему, а также настроить подключение.

const (  scheme    = "scheme-name"  balancerName  = "pick_idx_first"  defaultConfig = `{"loadBalancingConfig": [{"pick_idx_first": {}}]}`  retryTimeout  = time.Millisecond * 100  maxRetries= 10  )   type ConnOptions struct {  Addrs []string  Opts  []grpc.DialOption  }   func NewConn(ctx context.Context, connOptions ConnOptions) (*grpc.ClientConn, error) {  conn, err := dialContext(ctx, connOptions.Addrs, connOptions.Opts...)  if err != nil {      return nil, fmt.Errorf("unable to initialize conn: %w", err)  }   return conn, nil  }   func dialContext(ctx context.Context, addresses []string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {   ...  opts = append(opts,          grpc.WithDefaultServiceConfig(defaultConfig),      grpc.WithStreamInterceptor(          retry.StreamClientInterceptor(retry.WithPerRetryTimeout(retryTimeout), retry.WithMax(maxRetries)),      ),      grpc.WithUnaryInterceptor(              retry.UnaryClientInterceptor(retry.WithPerRetryTimeout(retryTimeout), retry.WithMax(maxRetries)),      ),  )  ...   initResolver(addresses)   return grpc.DialContext(ctx, fmt.Sprintf("%s:///", scheme), opts...)

Параметры подключения:

  • scheme и balancerName: определяют кастомный балансировщик.

  • defaultConfig: задаёт конфигурацию балансировки.

  • Интерсепторы: добавлены для повторных подключений при кратковременных сбоях. Я использовал пакет github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/retry.

Тестирование и результаты

В процессе тестирования балансировщик показал стабильную работу при переключении между адресами в случае недоступности сервера с более высоким приоритетом. Задержки были минимизированы благодаря приоритетному подключению к ближайшему ЦОДу.

Заключение

Создание кастомного gRPC-балансировщика с приоритизацией адресов позволяет более точно контролировать распределение клиентских запросов и улучшить производительность приложения. Такое решение обеспечивает гибкость настройки, минимизацию сетевых задержек и повышенную отказоустойчивость, что особенно важно в современных микросервисных архитектурах.

Преимущества кастомного решения:

  • Гибкость: настройка приоритетов адресов.

  • Эффективность: минимизация задержек за счёт выбора оптимального соединения.

  • Отказоустойчивость: автоматическое переключение при недоступности сервера.

Перспективы развития:

  • Динамическое обновление приоритетов.

  • Интеграция с сервисами обнаружения.

  • Расширение логики выбора на основе метрик производительности.

Надеюсь, эта статья поможет вам в создании кастомных решений для ваших gRPC-приложений. 

Ссылки и дополнительные материалы


ссылка на оригинал статьи https://habr.com/ru/articles/858290/