Выходим за рамки: создание оператора для наблюдения за внешними ресурсами в Kubernetes

от автора

Привет! Я Александр Зверев, инженер архитектурных решений в компании «Флант». Сегодня поговорим про всеми любимый Kubernetes. Этот оркестратор стал таким популярным не только потому что он эффективно управляет контейнерами. Ещё его можно прокачать под себя. И для этого у него есть все необходимые инструменты. Новые возможности можно добавить с помощью контроллеров, операторов и создания новых типов ресурсов.

При этом уже есть проекты, которые пошли дальше и научились управлять ресурсами за пределами кластера. Например, Deckhouse Commander, Argo CD, Crossplane. Они позволяют описывать ресурсы в кластере, а операторы «идут» во внешний мир и развёртывают новые кластеры или приложения.

В статье попробуем научить контроллер взаимодействовать с внешним миром. Мы создадим простой оператор, который будет следить за доступностью HTTP-серверов вне кластера, используя kubebuilder. Статья будет полезна тем, кто уже пробовал писать свои контроллеры Kubernetes и хочет расширить их возможности.

Анатомия контроллера: знакомство с kubebuilder

В своё время для написания контролеров появилось много подходов, библиотек и фреймворков. Среди популярных библиотек выделю controller-runtime, а у фреймворков можно выделить kubebuilder и operator SDK, созданные на базе controller-runtime.

Давайте ближе познакомимся с kubebuilder. Верхнеуровнево созданный на нём контроллер включает в себя:

  • менеджер, который организовывает работу других компонентов и кэширование, осуществляет leader-election, если в нём есть необходимость, и отвечает за обработку сигналов от ОС;

  • вебхуки, являющиеся опциональными, отвечают за установку пропущенных полей в спецификации и валидацию ресурсов. Они останутся за рамками данной статьи;

  • один и более контроллеров, которые «следят» за ресурсами определённого типа в кластере и дочерними ресурсами.

Про контроллеры поговорим подробнее. Их основная задача — получать события, связанные с нужными ресурсами: создание, удаление, изменения. Дальше осуществляется фильтрация этих событий. И затем событие передаётся reconciler’у, который сравнивает желаемое состояние ресурса с существующим в кластере и пытается привести к желаемому, если они расходятся. 

Когда контроллер реагирует на изменения внутри кластера, всё просто. У него есть методы For и Owns, которые за это отвечают. А что касается кейса, когда контроллер реагирует на изменения вне кластера, тут информации мало. Этот пробел я и постараюсь исправить.

Задача: мониторинг HTTP-серверов кластера

Чтобы показать, как контроллеры управляют внешними ресурсами, я придумал простую задачку. Главное — понять принцип работы. Представим, что у нас есть ресурсы, которые описывают HTTP-серверы вне кластера. Контроллер постоянно проверяет, доступны ли эти серверы, и обновляет статус соответствующих ресурсов в кластере.

Процесс инсталляции kubebuilder под различные платформы описан в официальной документации.

Инициализируем проект в kubebuilder:

mkdir probes && cd probes kubebuilder init --domain network.io --repo probes

Создаём наш ресурс:

kubebuilder create api --group test --version v1alpha1 --kind WebChecker --namespaced=true

Система задаст два вопроса про создание ресурса и контроллера. Отвечаем утвердительно.

На выходе получаем сгенерированный проект на Go со всей необходимой структурой и Makefile. Открываем проект в любимом редакторе и находим файл probes/api/v1alpha1/webchecker_types.go. В нём мы опишем структуру нашего ресурса, а затем сгенерируем нужные методы, CustomResourceDefinitions и манифесты.

Отредактируем структуры, описывающие спецификации и статус ресурса, следующим образом:

type WebCheckerSpec struct { Host string `json:"host"` // +kubebuilder:default:="/" Path   string `json:"path,omitempty"` }  type WebCheckerStatus struct { Conditions []metav1.Condition `json:"conditions,omitempty" patchStrategy:"merge" patchMergeKey:"type"` }  type WebChecker struct { metav1.TypeMeta   `json:",inline"` metav1.ObjectMeta `json:"metadata,omitempty"`  Spec   WebCheckerSpec   `json:"spec,omitempty"` Status WebCheckerStatus `json:"status,omitempty"` }

Запустим кодогенерацию:

make generate && make manifests

В файле config/crd/bases/test.network.io_webcheckers.yaml сгенерируется CustomResourceDefinintion. Также в файле api/v1alpha1/zz_generated.deepcopy.go будут сгенерированы методы, реализующие DeepCopy.

Файл main.go мы редактировать не будем. Предлагаю познакомиться с его содержимым самостоятельно. В нём считываются ключи запуска, закодирован выбор лидера, запускается единственный наш контроллер и регистрируются ответные действия на сигналы от ОС.

Сердце оператора

А мы переходим к файлу internal/controller/webchecker_controller.go, так как там находится всё, что интересует нас сейчас.

Определяем типы и константы:

package controller  ...  const ( WebCheckerWorkersCount           = 2 WebCheckerProbeTaskChannelSize   = 100 WebCheckerProbeResultChannelSize = 100 )  type WebCheckerState struct { Host          string Path          string LastCheckTime time.Time IsSuccessful  bool LastError     string }  type WebCheckerProbeTask struct { NamespacedName types.NamespacedName Host           string Path           string }  type WebCheckerProbeResult struct { NamespacedName types.NamespacedName IsSuccessful   bool LastError      string }

Наш основной тип данных, который содержит все необходимые поля и определяет методы для reconcile:

// WebCheckerReconciler reconciles a WebChecker object type WebCheckerReconciler struct { client.Client Scheme            *runtime.Scheme events            chan event.GenericEvent mu                sync.RWMutex webCheckersStates map[types.NamespacedName]WebCheckerState // канал для задач на проверку tasks chan WebCheckerProbeTask // канал для результатов проверки tasksResults chan WebCheckerProbeResult cancelFunc   context.CancelFunc }

Ниже основной метод для нашего контроллера. В нём определено, как реагировать на добавление, удаление или изменение ресурсов в кластере. В нашем примере этот метод ещё и обрабатывает запросы от внешних раздражителей:

func (r *WebCheckerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { logger := log.FromContext(ctx) logger.Info("Reconciling WebChecker", "request", req)  var reconciledResource testv1alpha1.WebChecker // Вместо ресурса в метод Reconcile приходит запрос с указанием пространства имён / имени ресурса, // поэтому нужно запросить ресурс из API кластера. if err := r.Get(ctx, req.NamespacedName, &reconciledResource); err != nil { logger.Error(err, "Failed to get WebChecker resource") if errors.IsNotFound(err) { r.mu.Lock() delete(r.webCheckersStates, req.NamespacedName) r.mu.Unlock() } return ctrl.Result{}, client.IgnoreNotFound(err) }  // Если ресурс помечен на удаление, то удаляем запись о нём из контроллера, // чтобы по нему не формировались задачи на проверку. if reconciledResource.DeletionTimestamp != nil { logger.Info("WebChecker resource is being deleted") r.mu.Lock() delete(r.webCheckersStates, req.NamespacedName) r.mu.Unlock() return ctrl.Result{}, nil }  // Если ресурс создан в кластере, добавляем его в список проверяемых ресурсов. r.mu.Lock() defer r.mu.Unlock() var cachedState WebCheckerState var ok bool if cachedState, ok = r.webCheckersStates[req.NamespacedName]; !ok { r.webCheckersStates[req.NamespacedName] = WebCheckerState{ Host: reconciledResource.Spec.Host, Path: reconciledResource.Spec.Path, } return ctrl.Result{}, nil }  if cachedState.Host != reconciledResource.Spec.Host || cachedState.Path != reconciledResource.Spec.Path { r.webCheckersStates[req.NamespacedName] = WebCheckerState{ Host: reconciledResource.Spec.Host, Path: reconciledResource.Spec.Path, } return ctrl.Result{}, nil }  // Если запрос пришёл из канала событий, необходимо обновить статус ресурса. webCheckerStatus := testv1alpha1.WebCheckerStatus{ Conditions: []metav1.Condition{ statusFromWebCheckerState(cachedState), }, }  reconciledResource.Status = webCheckerStatus err := r.Status().Update(ctx, &reconciledResource) if err != nil { logger.Error(err, "Failed to update WebChecker status") return ctrl.Result{}, err } return ctrl.Result{}, nil  }

Небольшая вспомогательная функция, которая формирует статус ресурса:

func statusFromWebCheckerState(webCheckerState WebCheckerState) metav1.Condition { cond := metav1.Condition{ LastTransitionTime: metav1.Now(), Type:               "Ready", Reason:             "WebResourceReady", } if webCheckerState.IsSuccessful { cond.Status = metav1.ConditionTrue cond.Message = "WebChecker is ready" } else { cond.Status = metav1.ConditionFalse cond.Message = webCheckerState.LastError } return cond }

Ниже метод, который реализован в виде удобного билдера. Позволяет указать, какие ресурсы будут обрабатываться в кластере. В нашем случае, помимо обычных операций добавления и удаления, мы добавляем дополнительный канал для событий, в который собираются данные от проверок внешних серверов:

func (r *WebCheckerReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&testv1alpha1.WebChecker{}). Named("webchecker"). // Именно WatchesRawSource отвечает за обработку "сторонних событий". WatchesRawSource(source.Channel(r.events, &handler.EnqueueRequestForObject{})). Complete(r) }

Воркер, который выполняет проверку внешних серверов:

func (r *WebCheckerReconciler) RunTasksWorker(ctx context.Context) { logger := log.FromContext(ctx) logger.Info("Starting tasks worker") for task := range r.tasks { logger.Info("Processing task", "task", task) checker := WebCheckerProbe{ NamespacedName: task.NamespacedName, Host:           task.Host, Path:           task.Path, } taskResult := checker.PerformCheck(ctx) r.tasksResults <- taskResult } }

Запускаем все горутины, которые будут формировать задачи на проверку, выполнять проверку и обрабатывать результаты:

func (r *WebCheckerReconciler) RunWorkers(ctx context.Context) { ctx, cancel := context.WithCancel(ctx) r.cancelFunc = cancel  go r.runTasksScheduler(ctx)  for i := 0; i < WebCheckerWorkersCount; i++ { go r.RunTasksWorker(ctx) }  go r.runTaskResultsAnalyzer(ctx) }

Данный метод анализирует результаты проверки и записывает их в канал событий:

func (r *WebCheckerReconciler) runTaskResultsAnalyzer(ctx context.Context) { logger := log.FromContext(ctx) logger.Info("Starting task results analyzer") for result := range r.tasksResults { r.mu.Lock() if resultDetail, ok := r.webCheckersStates[result.NamespacedName]; !ok { logger.Info("WebChecker resource not found", "namespacedName", result.NamespacedName) } else { resultDetail.LastCheckTime = time.Now() resultDetail.IsSuccessful = result.IsSuccessful resultDetail.LastError = result.LastError r.webCheckersStates[result.NamespacedName] = resultDetail // Отправляем событие в канал, чтобы контроллер в reconcile обновил статус ресурса. r.events <- event.GenericEvent{ Object: &testv1alpha1.WebChecker{ ObjectMeta: metav1.ObjectMeta{ Namespace: result.NamespacedName.Namespace, Name:      result.NamespacedName.Name, }, }, } } r.mu.Unlock() } } 

Метод, который по заданному временному интервалу в 10 секунд определяет, нужно ли запустить очередную проверку для внешних серверов:

func (r *WebCheckerReconciler) runTasksScheduler(ctx context.Context) { logger := log.FromContext(ctx) logger.Info("Starting tasks scheduler") ticker := time.NewTicker(1 * time.Second) defer ticker.Stop()  for { select { case <-ticker.C: r.mu.RLock() for namespacedName, state := range r.webCheckersStates { now := time.Now() // Если ресурс не проверялся более 10 секунд, формируем задачу на проверку. if now.Sub(state.LastCheckTime) > 10*time.Second { r.tasks <- WebCheckerProbeTask{ NamespacedName: namespacedName, Host:           state.Host, Path:           state.Path, } } } r.mu.RUnlock() case <-ctx.Done(): return } } }

Код, который проверяет доступность хоста по HTTP, вынесем в отдельный файл — internal/controller/types.go. Я его максимально упростил. В реальной жизни ресурс в кластере мог бы символизировать что-то другое. Например, мы могли бы обращаться к API внешних сервисов, получать от них данные и анализировать их.

package controller  import ( "context" "fmt" "net/http" "slices" "time"  "k8s.io/apimachinery/pkg/types" )  var ( successfulStatuses = []int{200, 300, 301, 302, 303} )  type WebCheckerProbe struct { NamespacedName types.NamespacedName Host           string Path           string }  func (p *WebCheckerProbe) PerformCheck(ctx context.Context) WebCheckerProbeResult { c := http.Client{ Timeout: 1 * time.Second, }  req, err := http.NewRequest("GET", p.Host+p.Path, nil) if err != nil { return WebCheckerProbeResult{ NamespacedName: p.NamespacedName, IsSuccessful:   false, LastError:      err.Error(), } }  resp, err := c.Do(req) if err != nil { return WebCheckerProbeResult{ NamespacedName: p.NamespacedName, IsSuccessful:   false, LastError:      err.Error(), } }  defer resp.Body.Close()  if slices.Contains(successfulStatuses, resp.StatusCode) { return WebCheckerProbeResult{ NamespacedName: p.NamespacedName, IsSuccessful:   true, LastError:      "", } } return WebCheckerProbeResult{ NamespacedName: p.NamespacedName, IsSuccessful:   false, LastError:      fmt.Sprintf("Status code: %d", resp.StatusCode), } }

Я попытался сделать проверку максимально простой. Не стал заморачиваться, например, с sync.Pool и повторным использованием соединений.

Запуск контроллера

В файле cmd/main.go немного изменим вызов SetupWithManager, чтобы он выглядел так:

webcheckerController := controller.NewWebChecker(mgr.GetClient(), mgr.GetScheme()) if err = webcheckerController.SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "WebChecker") os.Exit(1) } webcheckerController.RunWorkers(context.Background())

Теперь можно посмотреть, как наш контроллер будет работать в кластере. Используем kind и создадим локальный кластер:

kind create cluster

Установим наш CustomResourceDefinition в кластер:

make install

Запустим наш контроллер:

make build && make run

После запуска контроллера сможем наблюдать его логи. Запустим ещё один терминал и задеплоим в кластер два ресурса. Раз:

apiVersion: test.network.io/v1alpha1 kind: WebChecker metadata:   name: flant spec:   host: http://flant.ru

Два:

apiVersion: test.network.io/v1alpha1 kind: WebChecker metadata:   name: badnews spec:   host: http://badnews.me

Теперь можно посмотреть статус этих ресурсов. Раз:

kubectl get webcheckers.test.network.io badnews -o yaml apiVersion: test.network.io/v1alpha1 kind: WebChecker metadata:   annotations:   creationTimestamp: "2025-02-04T12:32:55Z"   generation: 1   name: badnews   namespace: default   resourceVersion: "3118"   uid: 942765a7-e3d1-4f2a-b73f-f11377e1c72d spec:   host: http://badnews.me   path: / status:   conditions:   - lastTransitionTime: "2025-02-04T12:33:07Z"     message: 'Get "http://badnews.me/": dial tcp: lookup badnews.me: no such host'     reason: WebResourceReady     status: "False"     type: Ready

Два:

kubectl get webcheckers.test.network.io flant -o yaml apiVersion: test.network.io/v1alpha1 kind: WebChecker metadata:   annotations:   creationTimestamp: "2025-02-04T12:32:51Z"   generation: 1   name: flant   namespace: default   resourceVersion: "3112"   uid: 4dc3f052-4893-4089-8e6d-ec98fad393f5 spec:   host: http://flant.ru   path: / status:   conditions:   - lastTransitionTime: "2025-02-04T12:33:02Z"     message: WebChecker is ready     reason: WebResourceReady     status: "True"     type: Ready

Итоги

В этой статье мы рассмотрели пример создания Kubernetes-оператора, который наблюдает за внешними HTTP-серверами. Мы использовали kubebuilder для создания базовой структуры контроллера, определили CustomResourceDefinition для описания внешних ресурсов и реализовали логику для проверки доступности этих ресурсов.

Не стоит рассматривать данный пример как рабочий контроллер. Он создан исключительно в ознакомительных целях. Исходный код можно найти здесь. Но его можно расширить для управления более сложными внешними ресурсами, такими как базы данных или облачные сервисы.

P. S.

Читайте также в нашем блоге:


ссылка на оригинал статьи https://habr.com/ru/articles/884566/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *