Привет! Я Александр Зверев, инженер архитектурных решений в компании «Флант». Сегодня поговорим про всеми любимый Kubernetes. Этот оркестратор стал таким популярным не только потому что он эффективно управляет контейнерами. Ещё его можно прокачать под себя. И для этого у него есть все необходимые инструменты. Новые возможности можно добавить с помощью контроллеров, операторов и создания новых типов ресурсов.
При этом уже есть проекты, которые пошли дальше и научились управлять ресурсами за пределами кластера. Например, Deckhouse Commander, Argo CD, Crossplane. Они позволяют описывать ресурсы в кластере, а операторы «идут» во внешний мир и развёртывают новые кластеры или приложения.

В статье попробуем научить контроллер взаимодействовать с внешним миром. Мы создадим простой оператор, который будет следить за доступностью HTTP-серверов вне кластера, используя kubebuilder. Статья будет полезна тем, кто уже пробовал писать свои контроллеры Kubernetes и хочет расширить их возможности.
Анатомия контроллера: знакомство с kubebuilder
В своё время для написания контролеров появилось много подходов, библиотек и фреймворков. Среди популярных библиотек выделю controller-runtime, а у фреймворков можно выделить kubebuilder и operator SDK, созданные на базе controller-runtime.
Давайте ближе познакомимся с kubebuilder. Верхнеуровнево созданный на нём контроллер включает в себя:
-
менеджер, который организовывает работу других компонентов и кэширование, осуществляет leader-election, если в нём есть необходимость, и отвечает за обработку сигналов от ОС;
-
вебхуки, являющиеся опциональными, отвечают за установку пропущенных полей в спецификации и валидацию ресурсов. Они останутся за рамками данной статьи;
-
один и более контроллеров, которые «следят» за ресурсами определённого типа в кластере и дочерними ресурсами.
Про контроллеры поговорим подробнее. Их основная задача — получать события, связанные с нужными ресурсами: создание, удаление, изменения. Дальше осуществляется фильтрация этих событий. И затем событие передаётся reconciler’у, который сравнивает желаемое состояние ресурса с существующим в кластере и пытается привести к желаемому, если они расходятся.
Когда контроллер реагирует на изменения внутри кластера, всё просто. У него есть методы For и Owns, которые за это отвечают. А что касается кейса, когда контроллер реагирует на изменения вне кластера, тут информации мало. Этот пробел я и постараюсь исправить.
Задача: мониторинг HTTP-серверов кластера
Чтобы показать, как контроллеры управляют внешними ресурсами, я придумал простую задачку. Главное — понять принцип работы. Представим, что у нас есть ресурсы, которые описывают HTTP-серверы вне кластера. Контроллер постоянно проверяет, доступны ли эти серверы, и обновляет статус соответствующих ресурсов в кластере.
Процесс инсталляции kubebuilder под различные платформы описан в официальной документации.
Инициализируем проект в kubebuilder:
mkdir probes && cd probes kubebuilder init --domain network.io --repo probes
Создаём наш ресурс:
kubebuilder create api --group test --version v1alpha1 --kind WebChecker --namespaced=true
Система задаст два вопроса про создание ресурса и контроллера. Отвечаем утвердительно.
На выходе получаем сгенерированный проект на Go со всей необходимой структурой и Makefile. Открываем проект в любимом редакторе и находим файл probes/api/v1alpha1/webchecker_types.go. В нём мы опишем структуру нашего ресурса, а затем сгенерируем нужные методы, CustomResourceDefinitions и манифесты.
Отредактируем структуры, описывающие спецификации и статус ресурса, следующим образом:
type WebCheckerSpec struct { Host string `json:"host"` // +kubebuilder:default:="/" Path string `json:"path,omitempty"` } type WebCheckerStatus struct { Conditions []metav1.Condition `json:"conditions,omitempty" patchStrategy:"merge" patchMergeKey:"type"` } type WebChecker struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata,omitempty"` Spec WebCheckerSpec `json:"spec,omitempty"` Status WebCheckerStatus `json:"status,omitempty"` }
Запустим кодогенерацию:
make generate && make manifests
В файле config/crd/bases/test.network.io_webcheckers.yaml сгенерируется CustomResourceDefinintion. Также в файле api/v1alpha1/zz_generated.deepcopy.go будут сгенерированы методы, реализующие DeepCopy.
Файл main.go мы редактировать не будем. Предлагаю познакомиться с его содержимым самостоятельно. В нём считываются ключи запуска, закодирован выбор лидера, запускается единственный наш контроллер и регистрируются ответные действия на сигналы от ОС.
Сердце оператора
А мы переходим к файлу internal/controller/webchecker_controller.go, так как там находится всё, что интересует нас сейчас.
Определяем типы и константы:
package controller ... const ( WebCheckerWorkersCount = 2 WebCheckerProbeTaskChannelSize = 100 WebCheckerProbeResultChannelSize = 100 ) type WebCheckerState struct { Host string Path string LastCheckTime time.Time IsSuccessful bool LastError string } type WebCheckerProbeTask struct { NamespacedName types.NamespacedName Host string Path string } type WebCheckerProbeResult struct { NamespacedName types.NamespacedName IsSuccessful bool LastError string }
Наш основной тип данных, который содержит все необходимые поля и определяет методы для reconcile:
// WebCheckerReconciler reconciles a WebChecker object type WebCheckerReconciler struct { client.Client Scheme *runtime.Scheme events chan event.GenericEvent mu sync.RWMutex webCheckersStates map[types.NamespacedName]WebCheckerState // канал для задач на проверку tasks chan WebCheckerProbeTask // канал для результатов проверки tasksResults chan WebCheckerProbeResult cancelFunc context.CancelFunc }
Ниже основной метод для нашего контроллера. В нём определено, как реагировать на добавление, удаление или изменение ресурсов в кластере. В нашем примере этот метод ещё и обрабатывает запросы от внешних раздражителей:
func (r *WebCheckerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { logger := log.FromContext(ctx) logger.Info("Reconciling WebChecker", "request", req) var reconciledResource testv1alpha1.WebChecker // Вместо ресурса в метод Reconcile приходит запрос с указанием пространства имён / имени ресурса, // поэтому нужно запросить ресурс из API кластера. if err := r.Get(ctx, req.NamespacedName, &reconciledResource); err != nil { logger.Error(err, "Failed to get WebChecker resource") if errors.IsNotFound(err) { r.mu.Lock() delete(r.webCheckersStates, req.NamespacedName) r.mu.Unlock() } return ctrl.Result{}, client.IgnoreNotFound(err) } // Если ресурс помечен на удаление, то удаляем запись о нём из контроллера, // чтобы по нему не формировались задачи на проверку. if reconciledResource.DeletionTimestamp != nil { logger.Info("WebChecker resource is being deleted") r.mu.Lock() delete(r.webCheckersStates, req.NamespacedName) r.mu.Unlock() return ctrl.Result{}, nil } // Если ресурс создан в кластере, добавляем его в список проверяемых ресурсов. r.mu.Lock() defer r.mu.Unlock() var cachedState WebCheckerState var ok bool if cachedState, ok = r.webCheckersStates[req.NamespacedName]; !ok { r.webCheckersStates[req.NamespacedName] = WebCheckerState{ Host: reconciledResource.Spec.Host, Path: reconciledResource.Spec.Path, } return ctrl.Result{}, nil } if cachedState.Host != reconciledResource.Spec.Host || cachedState.Path != reconciledResource.Spec.Path { r.webCheckersStates[req.NamespacedName] = WebCheckerState{ Host: reconciledResource.Spec.Host, Path: reconciledResource.Spec.Path, } return ctrl.Result{}, nil } // Если запрос пришёл из канала событий, необходимо обновить статус ресурса. webCheckerStatus := testv1alpha1.WebCheckerStatus{ Conditions: []metav1.Condition{ statusFromWebCheckerState(cachedState), }, } reconciledResource.Status = webCheckerStatus err := r.Status().Update(ctx, &reconciledResource) if err != nil { logger.Error(err, "Failed to update WebChecker status") return ctrl.Result{}, err } return ctrl.Result{}, nil }
Небольшая вспомогательная функция, которая формирует статус ресурса:
func statusFromWebCheckerState(webCheckerState WebCheckerState) metav1.Condition { cond := metav1.Condition{ LastTransitionTime: metav1.Now(), Type: "Ready", Reason: "WebResourceReady", } if webCheckerState.IsSuccessful { cond.Status = metav1.ConditionTrue cond.Message = "WebChecker is ready" } else { cond.Status = metav1.ConditionFalse cond.Message = webCheckerState.LastError } return cond }
Ниже метод, который реализован в виде удобного билдера. Позволяет указать, какие ресурсы будут обрабатываться в кластере. В нашем случае, помимо обычных операций добавления и удаления, мы добавляем дополнительный канал для событий, в который собираются данные от проверок внешних серверов:
func (r *WebCheckerReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&testv1alpha1.WebChecker{}). Named("webchecker"). // Именно WatchesRawSource отвечает за обработку "сторонних событий". WatchesRawSource(source.Channel(r.events, &handler.EnqueueRequestForObject{})). Complete(r) }
Воркер, который выполняет проверку внешних серверов:
func (r *WebCheckerReconciler) RunTasksWorker(ctx context.Context) { logger := log.FromContext(ctx) logger.Info("Starting tasks worker") for task := range r.tasks { logger.Info("Processing task", "task", task) checker := WebCheckerProbe{ NamespacedName: task.NamespacedName, Host: task.Host, Path: task.Path, } taskResult := checker.PerformCheck(ctx) r.tasksResults <- taskResult } }
Запускаем все горутины, которые будут формировать задачи на проверку, выполнять проверку и обрабатывать результаты:
func (r *WebCheckerReconciler) RunWorkers(ctx context.Context) { ctx, cancel := context.WithCancel(ctx) r.cancelFunc = cancel go r.runTasksScheduler(ctx) for i := 0; i < WebCheckerWorkersCount; i++ { go r.RunTasksWorker(ctx) } go r.runTaskResultsAnalyzer(ctx) }
Данный метод анализирует результаты проверки и записывает их в канал событий:
func (r *WebCheckerReconciler) runTaskResultsAnalyzer(ctx context.Context) { logger := log.FromContext(ctx) logger.Info("Starting task results analyzer") for result := range r.tasksResults { r.mu.Lock() if resultDetail, ok := r.webCheckersStates[result.NamespacedName]; !ok { logger.Info("WebChecker resource not found", "namespacedName", result.NamespacedName) } else { resultDetail.LastCheckTime = time.Now() resultDetail.IsSuccessful = result.IsSuccessful resultDetail.LastError = result.LastError r.webCheckersStates[result.NamespacedName] = resultDetail // Отправляем событие в канал, чтобы контроллер в reconcile обновил статус ресурса. r.events <- event.GenericEvent{ Object: &testv1alpha1.WebChecker{ ObjectMeta: metav1.ObjectMeta{ Namespace: result.NamespacedName.Namespace, Name: result.NamespacedName.Name, }, }, } } r.mu.Unlock() } }
Метод, который по заданному временному интервалу в 10 секунд определяет, нужно ли запустить очередную проверку для внешних серверов:
func (r *WebCheckerReconciler) runTasksScheduler(ctx context.Context) { logger := log.FromContext(ctx) logger.Info("Starting tasks scheduler") ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() for { select { case <-ticker.C: r.mu.RLock() for namespacedName, state := range r.webCheckersStates { now := time.Now() // Если ресурс не проверялся более 10 секунд, формируем задачу на проверку. if now.Sub(state.LastCheckTime) > 10*time.Second { r.tasks <- WebCheckerProbeTask{ NamespacedName: namespacedName, Host: state.Host, Path: state.Path, } } } r.mu.RUnlock() case <-ctx.Done(): return } } }
Код, который проверяет доступность хоста по HTTP, вынесем в отдельный файл — internal/controller/types.go. Я его максимально упростил. В реальной жизни ресурс в кластере мог бы символизировать что-то другое. Например, мы могли бы обращаться к API внешних сервисов, получать от них данные и анализировать их.
package controller import ( "context" "fmt" "net/http" "slices" "time" "k8s.io/apimachinery/pkg/types" ) var ( successfulStatuses = []int{200, 300, 301, 302, 303} ) type WebCheckerProbe struct { NamespacedName types.NamespacedName Host string Path string } func (p *WebCheckerProbe) PerformCheck(ctx context.Context) WebCheckerProbeResult { c := http.Client{ Timeout: 1 * time.Second, } req, err := http.NewRequest("GET", p.Host+p.Path, nil) if err != nil { return WebCheckerProbeResult{ NamespacedName: p.NamespacedName, IsSuccessful: false, LastError: err.Error(), } } resp, err := c.Do(req) if err != nil { return WebCheckerProbeResult{ NamespacedName: p.NamespacedName, IsSuccessful: false, LastError: err.Error(), } } defer resp.Body.Close() if slices.Contains(successfulStatuses, resp.StatusCode) { return WebCheckerProbeResult{ NamespacedName: p.NamespacedName, IsSuccessful: true, LastError: "", } } return WebCheckerProbeResult{ NamespacedName: p.NamespacedName, IsSuccessful: false, LastError: fmt.Sprintf("Status code: %d", resp.StatusCode), } }
Я попытался сделать проверку максимально простой. Не стал заморачиваться, например, с sync.Pool и повторным использованием соединений.
Запуск контроллера
В файле cmd/main.go немного изменим вызов SetupWithManager, чтобы он выглядел так:
webcheckerController := controller.NewWebChecker(mgr.GetClient(), mgr.GetScheme()) if err = webcheckerController.SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "WebChecker") os.Exit(1) } webcheckerController.RunWorkers(context.Background())
Теперь можно посмотреть, как наш контроллер будет работать в кластере. Используем kind и создадим локальный кластер:
kind create cluster
Установим наш CustomResourceDefinition в кластер:
make install
Запустим наш контроллер:
make build && make run
После запуска контроллера сможем наблюдать его логи. Запустим ещё один терминал и задеплоим в кластер два ресурса. Раз:
apiVersion: test.network.io/v1alpha1 kind: WebChecker metadata: name: flant spec: host: http://flant.ru
Два:
apiVersion: test.network.io/v1alpha1 kind: WebChecker metadata: name: badnews spec: host: http://badnews.me
Теперь можно посмотреть статус этих ресурсов. Раз:
kubectl get webcheckers.test.network.io badnews -o yaml apiVersion: test.network.io/v1alpha1 kind: WebChecker metadata: annotations: creationTimestamp: "2025-02-04T12:32:55Z" generation: 1 name: badnews namespace: default resourceVersion: "3118" uid: 942765a7-e3d1-4f2a-b73f-f11377e1c72d spec: host: http://badnews.me path: / status: conditions: - lastTransitionTime: "2025-02-04T12:33:07Z" message: 'Get "http://badnews.me/": dial tcp: lookup badnews.me: no such host' reason: WebResourceReady status: "False" type: Ready
Два:
kubectl get webcheckers.test.network.io flant -o yaml apiVersion: test.network.io/v1alpha1 kind: WebChecker metadata: annotations: creationTimestamp: "2025-02-04T12:32:51Z" generation: 1 name: flant namespace: default resourceVersion: "3112" uid: 4dc3f052-4893-4089-8e6d-ec98fad393f5 spec: host: http://flant.ru path: / status: conditions: - lastTransitionTime: "2025-02-04T12:33:02Z" message: WebChecker is ready reason: WebResourceReady status: "True" type: Ready
Итоги
В этой статье мы рассмотрели пример создания Kubernetes-оператора, который наблюдает за внешними HTTP-серверами. Мы использовали kubebuilder для создания базовой структуры контроллера, определили CustomResourceDefinition для описания внешних ресурсов и реализовали логику для проверки доступности этих ресурсов.
Не стоит рассматривать данный пример как рабочий контроллер. Он создан исключительно в ознакомительных целях. Исходный код можно найти здесь. Но его можно расширить для управления более сложными внешними ресурсами, такими как базы данных или облачные сервисы.
P. S.
Читайте также в нашем блоге:
ссылка на оригинал статьи https://habr.com/ru/articles/884566/
Добавить комментарий