В процессе разработки микросервисных приложений часто необходимо наладить эффективную и быструю коммуникацию между сервисами. Разработанный Google gRPC предоставляет высокопроизводительный фреймворк для организации такого взаимодействия. Однако стандартные балансировщики нагрузки в gRPC не всегда удовлетворяют специфическим требованиям, особенно когда требуется приоритизация адресов для минимизации сетевых задержек и обеспечения отказоустойчивости.
В этой статье я поделюсь опытом создания кастомного балансировщика нагрузки на Go для gRPC, который использует приоритеты адресов для выбора наилучшего соединения. Это решение позволяет гибко управлять распределением клиентских запросов между серверами с разными уровнями доступности и обеспечивает подключение к оптимальному ЦОД с минимальными задержками.
Постановка задачи
При разработке одного из проектов VK Tech мне потребовалось реализовать балансировщик, который выбирает первый доступный адрес из приоритетного списка. Приоритеты адресов определяются порядком в конфигурационном файле: чем выше адрес в списке, тем выше его приоритет. В случае недоступности адреса с наивысшим приоритетом балансировщик должен автоматически переключаться на следующий доступный адрес по приоритету.
Требования к балансировщику:
-
Приоритизация адресов: выбор адреса с наивысшим приоритетом из списка.
-
Отказоустойчивость: автоматическое переключение на следующий адрес при недоступности текущего.
-
Минимизация задержек: подключение к ближайшему или наиболее оптимальному ЦОД.
Почему стандартные балансировщики не подходят
Стандартные балансировщики в gRPC, такие как round-robin (циклический) и pick-first («первый доступный»), не учитывают приоритизацию адресов в списке.
Round-robin равномерно распределяет запросы между всеми доступными серверами, что может привести к увеличению сетевых задержек, если некоторые серверы географически удалены или менее производительны.
Pick-first всегда выбирает первый доступный адрес, но не переключается на адреса с более высоким приоритетом, если они становятся доступными после первоначального подключения.
Таким образом, для решения задачи минимизации задержек и обеспечения гибкости подключения к различным ЦОДам стандартные балансировщики не подходят.
Основная идея кастомного балансировщика
Наш кастомный балансировщик использует приоритизацию адресов, заданную в конфигурационном файле, для выбора наилучшего соединения.
-
Порядок адресов: адреса упорядочены по приоритету; индекс 0 — наивысший приоритет.
-
Выбор соединения: всегда выбирается первое доступное соединение с наивысшим приоритетом.
-
Автоматическое переключение: при недоступности текущего соединения балансировщик переключается на следующий по приоритету.
Преимущества такого подхода:
-
Минимизация сетевых задержек.
-
Повышенная отказоустойчивость.
-
Гибкость настройки.
Обзор архитектуры решения
Перед тем как перейти к реализации, рассмотрим основные компоненты нашего балансировщика и их взаимодействие.
BalancerBuilder
Балансировщик в gRPC создаётся с помощью билдера. Наш BalancerBuilder регистрирует балансировщик с определённым именем и схемой, чтобы gRPC-клиент мог его использовать.
type BalancerBuilder struct{} func (b BalancerBuilder) Build(cc balancer.ClientConn, _ balancer.BuildOptions) balancer.Balancer { return &Balancer{ cc: cc, subConns: resolver.NewAddressMap(), scStates: make(map[balancer.SubConn]connectivity.State), csEvltr: &balancer.ConnectivityStateEvaluator{}, state:connectivity.Connecting, } } func (b BalancerBuilder) Name() string { return balancerName } func init() { balancer.Register(&BalancerBuilder{}) }
Основные задачи билдера:
-
Создание и инициализация балансировщика.
-
Настройка взаимодействия с ClientConn.
-
Регистрация балансировщика для использования клиентом.
Resolver
Резолвер предоставляет балансировщику список адресов с их приоритетами. Он преобразует адреса из конфигурационного файла в resolver.Address, присваивая каждому адресу атрибут index, соответствующий его приоритету.
type resolverBuilder struct { addresses []resolver.Address } func (b *resolverBuilder) Build( target resolver.Target, clientConn resolver.ClientConn, _ resolver.BuildOptions, ) (resolver.Resolver, error) { ctx, cancel := context.WithCancel(context.Background()) res := &fiResolver{ ctx: ctx, cancel: cancel, target: target, cc: clientConn, addressesStore: b.addresses, } if len(b.addresses) > 1 { res.serviceConfig = clientConn.ParseServiceConfig(defaultConfig) } go res.start() return res, nil } func (*resolverBuilder) Scheme() string { return scheme } func initResolver(addresses []string) { addressesStore := make([]resolver.Address, len(addresses)) for i, addr := range addresses { addressesStore[i] = resolver.Address{ Addr: addr, Attributes: attributes.New("index", i), } } resolver.Register(&resolverBuilder{addresses: addressesStore}) }
Функции резолвера:
-
Динамическое обновление адресов.
-
Предоставление адресов с приоритетами балансировщику.
-
Сообщение об ошибках в случае недоступности адресов.
Picker
Picker выбирает соединение с наименьшим индексом (наивысшим приоритетом) из доступных. Если соединение с более высоким приоритетом становится доступным, балансировщик автоматически переключается на него.
type firstIdxPicker struct { result balancer.PickResult err error } func (p *firstIdxPicker) Pick(_ balancer.PickInfo) (balancer.PickResult, error) { return p.result, p.err } func NewFIPicker(info base.PickerBuildInfo) balancer.Picker { if len(info.ReadySCs) == 0 { return &firstIdxPicker{err: balancer.ErrNoSubConnAvailable} } minIdx := math.MaxInt var selectedConn balancer.SubConn for sc, scInfo := range info.ReadySCs { idx, ok := scInfo.Address.Attributes.Value("index").(int) // <- наш простенький алгоритм определения оптимального соединения if ok && idx < minIdx { minIdx = idx selectedConn = sc } } if selectedConn != nil { return &firstIdxPicker{result: balancer.PickResult{SubConn: selectedConn}} } return &firstIdxPicker{err: balancer.ErrNoSubConnAvailable} }
Алгоритм выбора:
-
Проходит по всем готовым соединениям.
-
Выбирает соединение с наименьшим index.
-
Возвращает выбранное соединение для обработки запроса.
Balancer
Балансировщик отслеживает состояния соединений и регенерирует Picker при их изменении.
type Balancer struct { cc balancer.ClientConn csEvltr *balancer.ConnectivityStateEvaluator stateconnectivity.State subConns *resolver.AddressMap scStates map[balancer.SubConn]connectivity.State picker balancer.Picker resolverErr error connErr error } func (b *Balancer) UpdateClientConnState(ccs balancer.ClientConnState) error { b.resolverErr = nil addressMap := b.createNewSubConnections(ccs) for _, addr := range b.subConns.Keys() { if _, ok := addressMap.Get(addr); !ok { sci, _ := b.subConns.Get(addr) sc := sci.(balancer.SubConn) sc.Shutdown() b.subConns.Delete(addr) } } if len(ccs.ResolverState.Addresses) == 0 { b.ResolverError(errZeroAddresses) return balancer.ErrBadResolverState } b.regeneratePicker() b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker}) return nil } func (b *Balancer) UpdateSubConnState(subConn balancer.SubConn, state balancer.SubConnState) { oldState, ok := b.scStates[subConn] if !ok { return } b.scStates[subConn] = state.ConnectivityState switch state.ConnectivityState { case connectivity.Idle: subConn.Connect() case connectivity.Shutdown: delete(b.scStates, subConn) case connectivity.TransientFailure: b.connErr = state.ConnectionError } b.state = b.csEvltr.RecordTransition(oldState, state.ConnectivityState) if (state.ConnectivityState == connectivity.Ready) != (oldState == connectivity.Ready) || b.state == connectivity.TransientFailure { b.regeneratePicker() } b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker}) } func (b *Balancer) regeneratePicker() { if b.state == connectivity.TransientFailure { b.picker = &firstIdxPicker{err: errors.Join(b.resolverErr, b.connErr)} return } readySCs := make(map[balancer.SubConn]base.SubConnInfo) for _, addr := range b.subConns.Keys() { sci, _ := b.subConns.Get(addr) sc := sci.(balancer.SubConn) if state, ok := b.scStates[sc]; ok && state == connectivity.Ready { readySCs[sc] = base.SubConnInfo{Address: addr} } } b.picker = NewFIPicker(base.PickerBuildInfo{ReadySCs: readySCs}) }
Отслеживание состояний соединений:
-
UpdateClientConnState: создание новых и удаление неактуальных соединений.
-
UpdateSubConnState: обновление состояний существующих соединений.
-
regeneratePicker: обновление пикера при изменении состояний для выбора оптимального соединения.
Настройка и конфигурация
Для использования кастомного балансировщика необходимо определить его имя и схему, а также настроить подключение.
const ( scheme = "scheme-name" balancerName = "pick_idx_first" defaultConfig = `{"loadBalancingConfig": [{"pick_idx_first": {}}]}` retryTimeout = time.Millisecond * 100 maxRetries= 10 ) type ConnOptions struct { Addrs []string Opts []grpc.DialOption } func NewConn(ctx context.Context, connOptions ConnOptions) (*grpc.ClientConn, error) { conn, err := dialContext(ctx, connOptions.Addrs, connOptions.Opts...) if err != nil { return nil, fmt.Errorf("unable to initialize conn: %w", err) } return conn, nil } func dialContext(ctx context.Context, addresses []string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { ... opts = append(opts, grpc.WithDefaultServiceConfig(defaultConfig), grpc.WithStreamInterceptor( retry.StreamClientInterceptor(retry.WithPerRetryTimeout(retryTimeout), retry.WithMax(maxRetries)), ), grpc.WithUnaryInterceptor( retry.UnaryClientInterceptor(retry.WithPerRetryTimeout(retryTimeout), retry.WithMax(maxRetries)), ), ) ... initResolver(addresses) return grpc.DialContext(ctx, fmt.Sprintf("%s:///", scheme), opts...)
Параметры подключения:
-
scheme и balancerName: определяют кастомный балансировщик.
-
defaultConfig: задаёт конфигурацию балансировки.
-
Интерсепторы: добавлены для повторных подключений при кратковременных сбоях. Я использовал пакет github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/retry.
Тестирование и результаты
В процессе тестирования балансировщик показал стабильную работу при переключении между адресами в случае недоступности сервера с более высоким приоритетом. Задержки были минимизированы благодаря приоритетному подключению к ближайшему ЦОДу.
Заключение
Создание кастомного gRPC-балансировщика с приоритизацией адресов позволяет более точно контролировать распределение клиентских запросов и улучшить производительность приложения. Такое решение обеспечивает гибкость настройки, минимизацию сетевых задержек и повышенную отказоустойчивость, что особенно важно в современных микросервисных архитектурах.
Преимущества кастомного решения:
-
Гибкость: настройка приоритетов адресов.
-
Эффективность: минимизация задержек за счёт выбора оптимального соединения.
-
Отказоустойчивость: автоматическое переключение при недоступности сервера.
Перспективы развития:
-
Динамическое обновление приоритетов.
-
Интеграция с сервисами обнаружения.
-
Расширение логики выбора на основе метрик производительности.
Надеюсь, эта статья поможет вам в создании кастомных решений для ваших gRPC-приложений.
Ссылки и дополнительные материалы
ссылка на оригинал статьи https://habr.com/ru/articles/858290/
Добавить комментарий