Следующий уровень автоматизации Kubernetes. Создаем свой оператор

от автора

Оператором в Kubernetes принято называть развертывание, которое самостоятельно управляет ресурсами кластера, регистрирует новые Custom Resource Definition (CRD) и, в некоторых случаях, добавляется для наблюдения за существующими ресурсами (через механизмы Dynamic Admission Control). В этой статье на примере создания оператора для развертывания и управления кластером Aerospike мы попробуем разобраться с этапами создания оператора, способами взаимодействия с кластером и проблемами, с которыми можно встретиться в реальной практике. Всех практикующих DevOps и желающих поднять автоматизацию развертывания своих сервисов на новый уровень приглашаю под кат.

Прежде всего нужно определить, какую функциональность будет поддерживать будущий оператор. Поскольку база данных Aerospike допускает работу в режиме кластера, будет разумно создать CRD для описания характеристик кластера (количество подов, ограничения по памяти и процессору и др), конфигурации пространства имен, а также настройки резервного копирования. Также оператор должен отслеживать изменения в существующих ресурсах, связанных с ним, и динамически изменять топологию системы и/или настройки подов.

Для разработки оператора мы будем использовать язык программирования Go и набор инструментов Operator SDK, который нужно будет предварительно установить:

git clone https://github.com/operator-framework/operator-sdk cd operator-sdk make install

После установки SDK можно выполнить инициализацию заготовки проекта оператора:

operator-sdk init --domain dzolotov.tech --owner "Dmitrii Zolotov"

Поскольку мы предполагаем создание дополнительных типов ресурсов, необходимо определить идентификатор версии API, который представляет собой URI на произвольном домене. Для удобства использования operator-sdk можно сразу установить поддержку автодополнения в shell Debian/Ubuntu:

apt install bash-completion echo 'source <(operator-sdk completion bash)' >>~/.bashrc

Operator SDK поддерживает создание операторов для разных DevOps-решений, включая Ansible и Helm, для этого при инициализации можно указывать необходимые плагины. Также инструмент командной строки позволяет выполнять развертывание оператора (operator-sdk olm), запуск оператора в выбранном контексте (operator-sdk run) и выполнять тесты (operator-sdk scorecard). Также можно создавать фрагменты кода для реализации разных типов Kubernetes-ресурсов. Начнем с создания версии apiVersion:

operator-sdk create api --group aerospike --version v1alpha1 --kind AerospikeCluster --resource --controller

Генератор создает несколько файлов для взаимодействия с Kubernetes API:

  • api/v1alpha1/aerospikecluster_types.go — описание типов (CRD);

  • controllers/aerospikecluster_controller.go — контроллер для создания и отслеживания типов CRD;

  • /go.mod — содержит зависимости для подключения к API (k8s.io/apimachinery — доступ к API для управления ресурсами, k8s.io/client-go — Go-клиент для взаимодействия с API, sigs.k8s.io/controller-runtime — базовые классы для реализации контроллера, github.com/onsi/ginkgo — тестовый фреймворк для проверки оператора).

Попробуем сгенерировать манифесты для Kubernetes, которые будут определять Custom Resource Definition и развертывания оператора: make manifests. Результатом выполнения будет генерация файлов в crd/bases — регистрация CRD-ресурсов (apiextensions.k8s.io/v1/CustomResourceDefinition) с названием aerospikeclusters.aerospike.dzolotov.tech для создаваемого API (группа aerospike.my.domain, тип ресурса AerospikeCluster). По умолчанию схема ресурса содержит строковое поле foo, далее мы переопределим схему в коде описания типов на Go:

apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata:   annotations:     controller-gen.kubebuilder.io/version: v0.8.0   name: aerospikeclusters.aerospike.my.domain spec:   group: aerospike.dzolotov.tech   names:     kind: AerospikeCluster     listKind: AerospikeClusterList     plural: aerospikeclusters     singular: aerospikecluster   # будет локальным для namespace (может быть Cluster для создания ресурса на уровне кластера, без привязки к namespace)   scope: Namespaced   versions:   # создаваемая версия CRD   - name: v1alpha1     schema:       # описание схемы yaml-документа       openAPIV3Schema:         description: AerospikeCluster is the Schema for the aerospikeclusters API         properties:           # версия api (позволит делать разные версии для API)           apiVersion:             description: 'APIVersion defines the versioned schema of this representation               of an object. Servers should convert recognized schemas to the latest               internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources'             type: string           # тип ресурса (в нашем случае будет AerospikeCluster)           kind:             description: 'Kind is a string value representing the REST resource this               object represents. Servers may infer this from the endpoint the client               submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds'             type: string           # описание метаданных (.meta) - определяется по схеме metav1.ObjectMeta           metadata:             type: object           # описание спецификации кластера (.spec) - в соответствии со схемой           spec:             description: AerospikeClusterSpec defines the desired state of AerospikeCluster             properties:               # свойства спецификации кластера (.spec)               foo:                 description: Foo is an example field of AerospikeCluster. Edit aerospikecluster_types.go                   to remove/update                 type: string             type: object           # назначение ключа объекта yaml .status (пустой объект)           status:             description: AerospikeClusterStatus defines the observed state of AerospikeCluster             type: object         type: object     # флаг актуальности версии CRD     served: true     # версия по умолчанию     storage: true 

Примером корректного ресурса с использованием указанного CRD:

apiVersion: aerospike.my.domain/alpha1 kind: AerospikeCluster metadata:   name: demo spec:   foo: bar status: {}

Прежде чем мы пойдем рассматривать код контроллера дополним схему данных и добавим следующие поля:

  • deployments (int) — количество серверов в кластере;

  • maxMemory (int) — ограничение по размеру памяти (в Гб);

  • maxCPU (string) — ограничение по использованию процессора;

  • logging (string) — уровень протоколирования;

  • compressionLevel (int) — уровень сжатия;

  • replicationFactor (int) — фактор репликации для кластера;

  • strongConsistency (bool) — включение режима строгой согласованности (в ущерб производительности);

  • heartbeat:

    • interval (int) — интервал для проверки доступности соседних узлов (в миллисекундах).

  • keepalive:

    • enabled (bool) — разрешено использование keepalive-сообщений;

    • interval (int) — интервал в секундах между сообщениями.

Для демонстрации такого набора будет достаточно, в реальном кластере разумеется нужно предусмотреть наличие всех необходимых опций для конфигурации узлов. Структура данных для спецификации кластера определяется в файле api/alphav1/aerospikecluster_types.go, сейчас там даны определения для всех создаваемых типов данных (кластер, список кластеров и связанные с ними типы):

package v1alpha1  import (         metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" )  //ожидаемое состояние кластера (спецификация) type AerospikeClusterSpec struct {         Foo string `json:"foo,omitempty"` }  //текущее состояние кластера type AerospikeClusterStatus struct { }  //+kubebuilder:object:root=true //+kubebuilder:subresource:status  type AerospikeCluster struct {         metav1.TypeMeta   `json:",inline"`   //стандартное определение метаданных (namespace, name, annotations, ...)         metav1.ObjectMeta `json:"metadata,omitempty"`    //кластер содержит спецификацию и текущий статус         Spec   AerospikeClusterSpec   `json:"spec,omitempty"`         Status AerospikeClusterStatus `json:"status,omitempty"` }  //+kubebuilder:object:root=true  type AerospikeClusterList struct {         metav1.TypeMeta `json:",inline"`         metav1.ListMeta `json:"metadata,omitempty"`         Items           []AerospikeCluster `json:"items"` }  //регистрация типов объектов и их схем func init() {         SchemeBuilder.Register(&AerospikeCluster{}, &AerospikeClusterList{}) }

Добавим необходимые поля (и структуры) в структуру AerospikeClusterSpec и обозначим в статусе текущее состояние кластера: deployed (bool) — устанавливается в true когда развертывание завершено, brokenNodes (int) — количество недоступных узлов. Обновим определение типов и выполним повторно команду make в корневом каталоге проекта, это приведет к выполнению форматирования и проверки синтаксиса файлов.

type AerospikeHeartBeat struct {         // Heartbeat interval in milliseconds         Interval int `json:"interval,omitempty"` }  type AerospikeKeepAlive struct {         // Keep alive is enabled         Enabled bool `json:"enabled,omitempty"`         // Keep alive messages interval in seconds         Interval int `json:"interval,omitempty"` }  // ожидаемое состояние кластера (спецификация) type AerospikeClusterSpec struct {         // Deployments count         Deployments int `json:"deployments,omitempty"`         // Maximum memory limit in GB (for example, 4)         MaxMemory *int `json:"maxMemory,omitempty"`         // Maximum cpu limit (100 = full power)         MaxCPU *int `json:"maxCPU,omitempty"`         // Logging level (https://docs.aerospike.com/reference/configuration)         Logging *string `json:"logging,omitempty"`         // Compression level (1 - lower compression, 9 - higher, but slower compression)         CompressionLevel *int `json:"compressionLevel,omitempty"`         // Min replicas count         ReplicationFactor int `json:"replicationFactor,omitempty"`         // Enable strong consistency mode         StrongConsistency bool `json:"strongConsistency,omitempty"`         // Heartbeat configuration         HeartBeat *AerospikeHeartBeat `json:"heartbeat,omitempty"`         // Keepalive configuration         Keepalive *AerospikeKeepAlive `json:"keepalive,omitempty"` }  // текущее состояние кластера type AerospikeClusterStatus struct {         // All nodes are prepared and ready         Deployed bool `json:"deployed"`         // How many nodes isn't available         BrokenNodes int `json:"brokenNodes"` }

После применения make manifests можем убедиться, что описание создания CRD-ресурса соответствует измененной схеме (фрагмент приведен ниже):

compressionLevel: description: Compression level (1 - lower compression, 9 - higher, but slower compression) type: integer deployments: description: Deployments count type: integer heartbeat: description: Heartbeat configuration properties: interval: description: Heartbeat interval in milliseconds type: integer type: object

При необходимости создать дополнительную версию существующего ресурса или создать новый можно использовать как инструмент operator-sdk create api, так и создавать дополнительные каталоги и файлы на основе существующего шаблона.

Дальше посмотрим на контроллер и тут ключевую роль играет Reconciler, который выполняет изменение конфигурации в соответствии с разностью между текущим и ожидаемым состоянием. Кроме это для проверки корректности согласованности спецификации может быть добавлено использование webhook и здесь нужно сделать небольшое введение в Dynamic Admission Control. Динамическое управление позволяет зарегистрировать два вида дополнительных обработчиков во время операций над ресурсами — ValidatingWebhook (проверяет корректность схемы создаваемого ресурса, позволяет отменить операцию при нарушении схемы) и MutatingWebhook (допускает возможность внесения изменений в ресурс перед развертыванием, например используется для встраивания sidecar-контейнеров в операторе управления резервным копированием Stash, который мы рассматривали в этой статье). Operator SDK позволяет также создавать и регистрировать webhook и даже генерировать заготовку ValidatingWebhook для проверки согласованности данных .

operator-sdk create webhook --group aerospike --version v1alpha1 --kind AerospikeCluster --defaulting --programmatic-validation

После вызова генератора создается файл aerospikecluster_webhook.go, в котором нужно будет реализовать функции проверки конфигурации при создании ValidateCreate(), при обновлении ValidateUpdate() и при удалении ValidateDelete(). Добавим дополнительную проверку, что количество реплик не превышает количество узлов и количество узлов является натуральным числом. В результате получим следующий код:

package v1alpha1  import (         apierrors "k8s.io/apimachinery/pkg/api/errors"         "k8s.io/apimachinery/pkg/runtime"         "k8s.io/apimachinery/pkg/runtime/schema"         "k8s.io/apimachinery/pkg/util/validation/field"         ctrl "sigs.k8s.io/controller-runtime"         logf "sigs.k8s.io/controller-runtime/pkg/log"         "sigs.k8s.io/controller-runtime/pkg/webhook"         "strconv" )  // log is for logging in this package. var aerospikeclusterlog = logf.Log.WithName("aerospikecluster-resource")  func (r *AerospikeCluster) SetupWebhookWithManager(mgr ctrl.Manager) error {         return ctrl.NewWebhookManagedBy(mgr).                 For(r).                 Complete() }  //+kubebuilder:webhook:path=/mutate-aerospike-dzolotov-tech-v1alpha1-aerospikecluster,mutating=true,failurePolicy=fail,sideEffects=None,groups=aerospike.dzolotov.tech,resources=aerospikeclusters,verbs=create;update,versions=v1alpha1,name=maerospikecluster.kb.io,admissionReviewVersions=v1  var _ webhook.Defaulter = &AerospikeCluster{}  //здесь можно применить значения по умолчанию (выполняется до валидации) func (r *AerospikeCluster) Default() {         aerospikeclusterlog.Info("default", "name", r.Name) }  // TODO(user): change verbs to "verbs=create;update;delete" if you want to enable deletion validation. //+kubebuilder:webhook:path=/validate-aerospike-dzolotov-tech-v1alpha1-aerospikecluster,mutating=false,failurePolicy=fail,sideEffects=None,groups=aerospike.dzolotov.tech,resources=aerospikeclusters,verbs=create;update,versions=v1alpha1,name=vaerospikecluster.kb.io,admissionReviewVersions=v1  var _ webhook.Validator = &AerospikeCluster{}  func (r *AerospikeCluster) validateReplicas() error {         var allErrs field.ErrorList         if r.Spec.ReplicationFactor > r.Spec.Deployments {                 fldPath := field.NewPath("spec").Child("replicationFactor")                 allErrs = append(allErrs, field.Invalid(fldPath, strconv.Itoa(r.Spec.ReplicationFactor), "Replication factor must be lower than deployments count"))         }         if r.Spec.Deployments < 1 {                 fldPath := field.NewPath("spec").Child("deployments")                 allErrs = append(allErrs, field.Invalid(fldPath, strconv.Itoa(r.Spec.ReplicationFactor), "Deployments counter must be greater than zero"))         }         if len(allErrs) == 0 {                 return nil         }         return apierrors.NewInvalid(schema.GroupKind{Group: "aerospike.dzolotov.tech", Kind: "AerospikeCluster"}, r.Name, allErrs) }  //проверка при создании нового ресурса func (r *AerospikeCluster) ValidateCreate() error {         aerospikeclusterlog.Info("validate create", "name", r.Name)         return r.validateReplicas() }  //проверка при обновлении ресурса func (r *AerospikeCluster) ValidateUpdate(old runtime.Object) error {         aerospikeclusterlog.Info("validate update", "name", r.Name)         return r.validateReplicas() }  //проверка при удалении ресурса (сейчас отключена в метаданных kubebuilder) func (r *AerospikeCluster) ValidateDelete() error {         aerospikeclusterlog.Info("validate delete", "name", r.Name)         return nil }

Здесь мы используем механизм apierrors, рекомендуемый Kubernetes для подробного описания ошибки и места ее возникновения, который позволяет объединить все ошибки в один объект, но можно было использовать и обычный объект ошибки. Обратите внимание на комментарии //+kubebuilder:webhook, они содержат метаданные, которые используются для генерации манифеста и здесь, например, можно изменить список валидаций (сейчас используются только create,update, но при необходимости можно сделать валидацию на удаление (например, так можно запретить удаление ресурса, но лучше такого не делать). После вызова make / make manifests посмотрим на сгенерированный файл манифеста для установки AdmissionWebhooks:

apiVersion: admissionregistration.k8s.io/v1 kind: MutatingWebhookConfiguration metadata:   creationTimestamp: null   name: mutating-webhook-configuration webhooks: - admissionReviewVersions:   - v1   clientConfig:     service:       name: webhook-service       namespace: system       path: /mutate-aerospike-dzolotov-tech-v1alpha1-aerospikecluster   failurePolicy: Fail   name: maerospikecluster.kb.io   rules:   - apiGroups:     - aerospike.dzolotov.tech     apiVersions:     - v1alpha1     operations:     - CREATE     - UPDATE     resources:     - aerospikeclusters   sideEffects: None --- apiVersion: admissionregistration.k8s.io/v1 kind: ValidatingWebhookConfiguration metadata:   creationTimestamp: null   name: validating-webhook-configuration webhooks: - admissionReviewVersions:   - v1   clientConfig:     service:       name: webhook-service       namespace: system       path: /validate-aerospike-dzolotov-tech-v1alpha1-aerospikecluster   failurePolicy: Fail   name: vaerospikecluster.kb.io   rules:   - apiGroups:     - aerospike.dzolotov.tech     apiVersions:     - v1alpha1     operations:     - CREATE     - UPDATE     resources:     - aerospikeclusters   sideEffects: None

Здесь можно увидеть, что и MutatingWebhook и ValidatingWebhook применяются на ресурсы в apiVersion: aerospike.dzolotov.tech/v1alpha1 при операциях создания и обновления на ресурсах aerospikecluster. Теперь посмотрим на другие подкаталоги в config и поговорим о правах доступа (RBAC) и развертывании контроллера.

  • certmanager — манифест для издания сертификата для валидации webhook (создается автоматически), привязывается к сервису контроллера;

  • crd — описание типов ресурсов (CRD), мы рассмотрели его выше, но добавлю что кроме наших определений есть еще патчи, которые позволяют пробросить созданный через certmanager сертификат в аннотации ресурса для использования в коде;

  • default — содержит патчи для контроллера, которые публикуют webhook-сервер и разворачивают system/controller-manager из gcr.io/kubebuilder/kube-rbac-proxy:v0.11.0;

  • manifests — конфигурация для kustomization, который создает манифесты при сборке финального артефакта для установки оператора;

  • prometheus — регистрация ServiceMonitor для экспорта метрик из controller-manager в prometheus;

  • rbac — набор прав для обеспечения доступа контроллера к управлению ресурсами aerospikeclusters, а также для публикации метрик, сервисной записи для выполнения controller-manager и др.;

  • scorecard — конфигурация для запуска тестов через OLM (Operator Lifecycle Manager, позволяет управлять запуском и тестирование операторов внутри существующего кластера kubernetes);

  • webhook — конфигурация для регистрации AdminissionWebhooks (рассмотрели ранее).

Коротко об RBAC (Role-Based Access Control) — это внутренний механизм контроля доступа Kubernetes, который определяет понятие Role (или ClusterRole, если без привязки к пространству имен), перечисляющее какие действия и над какими ресурсами могут быть выполнены и RoleBinding (ClusterRoleBinding) для связывания учетной записи пользователи или сервисной записи (ServiceAccount) с соответствующей ролью. Таким образом, чтобы оператор мог создавать новые ресурсы Deployment, требуется определить роль с разрешениями на create/update/patch/delete для ресурсов kind: Deployment в apiVersion: apps/v1 и связать ее с сервисной записью, под которой происходит развертывание оператора. Для управление нашим контроллером будет запущен процесс controller-manager, он также будет отслеживать доступность контроллера и перезапускать его при необходимости.

Вернемся теперь к контроллеру (controllers/aerospikecluster_controller.go) и посмотрим внимательно на его структуру:

package controllers  import (         "context"          "k8s.io/apimachinery/pkg/runtime"         ctrl "sigs.k8s.io/controller-runtime"         "sigs.k8s.io/controller-runtime/pkg/client"         "sigs.k8s.io/controller-runtime/pkg/log"          aerospikev1alpha1 "github.com/operator-framework/operator-sdk/api/v1alpha1" )  // Объект реконсилера для приведения кластера к ожидаемому состоянию type AerospikeClusterReconciler struct {         client.Client         Scheme *runtime.Scheme }  // Описание разрешений (будут транслированы в роли) //+kubebuilder:rbac:groups=aerospike.dzolotov.tech,resources=aerospikeclusters,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=aerospike.dzolotov.tech,resources=aerospikeclusters/status,verbs=get;update;patch //+kubebuilder:rbac:groups=aerospike.dzolotov.tech,resources=aerospikeclusters/finalizers,verbs=update  func (r *AerospikeClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {         _ = log.FromContext(ctx)    //здесь будет наш код управления кластером         return ctrl.Result{}, nil }  // Регистрация с использованием ControllerManager func (r *AerospikeClusterReconciler) SetupWithManager(mgr ctrl.Manager) error {         return ctrl.NewControllerManagedBy(mgr).                 For(&aerospikev1alpha1.AerospikeCluster{}).                 Complete(r) }

Поскольку мы будем управлять развертыванием кластера серверов (будем использовать StatefulSet, поскольку нам важно сохранить связь экземпляра пода и его местоположения, также нам будет нужно иметь возможность создания и изменения ConfigMap для определения конфигурации сервера по параметрам из манифеста), то нам нужно будет сообщить контроллеру о том, что мы будем наблюдать за ресурсами StatefulSet и Pod и добавить соответствующие правила в комментариях +kubebuilder. Прежде всего добавим импорт структур, описывающих пространство имен apps/v1 и core/v1 (для ConfigMap):

import ( "context"  appsv1 "k8s.io/api/apps/v1"   corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log"  aerospikev1alpha1 "github.com/operator-framework/operator-sdk/api/v1alpha1" )

И сообщим менеджеру о том, что мы будем управлять объектами StatefulSet и ConfigMap:

func (r *AerospikeClusterReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&aerospikev1alpha1.AerospikeCluster{}). Owns(&appsv1.StatefulSet{}). Owns(&corev1.ConfigMap{}). Complete(r) } 

И добавим метаданные перед методом Reconcile для описания требуемых прав доступа:

//+kubebuilder:rbac:groups=apps/v1,resources=statefulsets,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=apps/v1,resources=pods,verbs=get;list //+kubebuilder:rbac:groups=v1,resources=configmaps,verbs=get;list;create;update;patch;delete

Сама реализация будет использовать публичные методы API Kubernetes, прежде всего для нас интересны методы:

  • r.List(ctx, objectList, opts) — возвращает список объектов в objectList с отбором по opts (требует разрешения list на этот тип объектов в RBAC-роли), например:

podList := &v1.PodList{} opts := []client.ListOption{ client.InNamespace(request.NamespacedName.Namespace), client.MatchingLabels{"instance": request.NamespacedName.Name}, client.MatchingFields{"status.phase": "Running"}, } err := r.List(ctx, podList, opts...)

Выбирает все запущенные поды с меткой instance и значением, совпадающих с именем кластера (поиск осуществляется в том же пространстве имен, где развернут ресурс AerospikeCluster).

  • r.Get(ctx, namespacedName, object) — сохраняет объект в object, идентификацию объекта выполняет NamespacedName, включающий пространство имен Namespace и имя внутри него Name (требует разрешение get на этот тип объектов в RBAC-роли);

  • r.Create(ctx, object, opts) — создает новый объект в кластере (требуется наличие разрешений create на этот тип объекта в RBAC-роли), например:

func (r *AerospikeClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { _ = log.FromContext(ctx)    //прочитаем ресурс описания кластера cluster := &aerospikev1alpha1.AerospikeCluster{} err := r.Get(ctx, req.NamespacedName, cluster) if err != nil { return ctrl.Result{}, err }    //преобразование числовых полей в строки и в тип int32 для передачи в структуру   //описания StatefulSet var deployments int32 deployments = int32(cluster.Spec.Deployments) var replicationFactor string replicationFactor = strconv.Itoa(cluster.Spec.ReplicationFactor) var maxMemory string maxMemory = strconv.Itoa(*cluster.Spec.MaxMemory) var maxCpu string maxCpu = strconv.Itoa(*cluster.Spec.MaxCPU)    //описание нового объекта StatefulSet sts := appsv1.StatefulSet{ObjectMeta: ctrl.ObjectMeta{Namespace: req.NamespacedName.Namespace, Name: req.NamespacedName.Name}, Spec: appsv1.StatefulSetSpec{ Replicas: &deployments, Selector: &v1.LabelSelector{ MatchLabels: map[string]string{"label": req.Name}, }, Template: corev1.PodTemplateSpec{ Spec: corev1.PodSpec{ Containers: []corev1.Container{ { Name:    "aerospike", Image:   "aerospike", Args:    nil, EnvFrom: nil, Env: []corev1.EnvVar{ { Name:  "REPL_FACTOR", Value replicationFactor, }, { Name:  "MEM_GB", Value: maxMemory, }, }, Resources: corev1.ResourceRequirements{ Limits: corev1.ResourceList{ corev1.ResourceCPU: resource.MustParse(maxCpu), }, }, }, }, }, }, UpdateStrategy: appsv1.StatefulSetUpdateStrategy{}, }} r.Create(ctx, &sts)  return ctrl.Result{Requeue: true}, nil } 
  • r.Update(ctx, object, opts) — обновление конфигурации объекта (для StatefulSet например можно изменить переменные окружения и количество реплик), требуется разрешение update на этот тип ресурсов в RBAC-роли;

  • r.Patch(ctx, object, patch) — внесение исправлений в структуру, описывающую запущенный объект;

  • r.Delete(ctx, objects, opts) — удаление объекта из кластера (требуется разрешение delete на этот тип ресурсов в RBAC-роли);

  • r.Status().Update(ctx, cluster) — дает доступ к модификации объекта состояния (виден в манифесте и в объекте cluster как .status).

Поведение reconciler в нашем случае может быть описано несколькими возможными сценариями:

  • объект кластера существует, но нет statefulset — создаем новый statefulset и configmap;

  • объект кластера существует и есть statefulset — обновляем параметры конфигурации (лимиты, переменные окружения, количество реплик) и пересоздаем configmap;

  • объект кластера не существует — удаляем statefulset и configmap (игнорируем ошибки).

В целом код можно разделить на несколько частей: генерация и обновление configmap, обработка удаления кластера, обновление конфигурации существующего кластера. Код может выглядеть таким образом. Обновление или создание configmap:

func (r *AerospikeClusterReconciler) generateConfigMap(ctx context.Context, cluster *aerospikev1alpha1.AerospikeCluster) error { //проверим на наличие существующего configmap var cm corev1.ConfigMap err := r.Get(ctx, types.NamespacedName{ Namespace: cluster.Namespace, Name:      cluster.Name, }, &cm) if err != nil { if errors.IsNotFound(err) { //создаем новый configmap newConfigMap := corev1.ConfigMap{ ObjectMeta: v1.ObjectMeta{ Namespace: cluster.Namespace, Name:      cluster.Name, }, Data: map[string]string{ "aerospike": createConfigMap(cluster), }, } err := r.Create(ctx, &newConfigMap) return err } else { return err } } //если объект уже существует, то изменяем данные cm.Data = map[string]string{ "aerospike": createConfigMap(cluster), } err = r.Update(ctx, &cm) return err }  func createConfigMap(cluster *aerospikev1alpha1.AerospikeCluster) string { result := make([]string, 0, 0) if cluster.Spec.Logging != nil { result = append(result, "logging "+*cluster.Spec.Logging) } result = append(result, "strong-consistency "+strconv.FormatBool(cluster.Spec.StrongConsistency)) if cluster.Spec.CompressionLevel != nil { result = append(result, "compression-level "+strconv.Itoa(*cluster.Spec.CompressionLevel)) } result = append(result, "interval "+strconv.Itoa(cluster.Spec.HeartBeat.Interval)) if cluster.Spec.Keepalive != nil { result = append(result, "keepalive-enabled "+strconv.FormatBool(cluster.Spec.Keepalive.Enabled)) result = append(result, "keepalive-intvl "+strconv.Itoa(cluster.Spec.Keepalive.Interval)) } return strings.Join(result, "\n") }

Подготовка объекта StatefulSet по описанию кластера:

func (r *AerospikeClusterReconciler) getConfiguration(cluster *aerospikev1alpha1.AerospikeCluster) *appsv1.StatefulSet { var deployments int32 deployments = int32(cluster.Spec.Deployments) var replicationFactor string replicationFactor = strconv.Itoa(cluster.Spec.ReplicationFactor) var maxMemory string maxMemory = strconv.Itoa(*cluster.Spec.MaxMemory) var maxCpu string maxCpu = strconv.Itoa(*cluster.Spec.MaxCPU)  sts := appsv1.StatefulSet{ObjectMeta: ctrl.ObjectMeta{Namespace: cluster.Namespace, Name: cluster.Name}, Spec: appsv1.StatefulSetSpec{ Replicas: &deployments, Selector: &v1.LabelSelector{ MatchLabels: map[string]string{"instance": cluster.Name}, }, Template: corev1.PodTemplateSpec{ ObjectMeta: ctrl.ObjectMeta{ Labels: map[string]string{"instance": cluster.Name}, }, Spec: corev1.PodSpec{ Containers: []corev1.Container{ { Name:    "aerospike", Image:   "aerospike", Args:    nil, EnvFrom: nil, VolumeMounts: []corev1.VolumeMount{ { Name:      "config", MountPath: "/etc/aerospike/", }, }, Env: []corev1.EnvVar{ { Name:  "REPL_FACTOR", Value: replicationFactor, }, { Name:  "MEM_GB", Value: maxMemory, }, }, Resources: corev1.ResourceRequirements{ Limits: corev1.ResourceList{ corev1.ResourceCPU: resource.MustParse(maxCpu), }, }, }, }, Volumes: []corev1.Volume{ { Name: "config", VolumeSource: corev1.VolumeSource{ ConfigMap: &corev1.ConfigMapVolumeSource{ LocalObjectReference: corev1.LocalObjectReference{Name: cluster.Name}, Items: []corev1.KeyToPath{ { Key:  "aerospike", Path: "aerospike.conf", }, }, }, }, }, }, }, }, UpdateStrategy: appsv1.StatefulSetUpdateStrategy{}, }} return &sts }

Логика удаления, создания и обновления ресурсов кластера и его состояния:

func (r *AerospikeClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { _ = log.FromContext(ctx) //можно сохранить в logger и использовать для сообщений об ошибках или диагностики  cluster := &aerospikev1alpha1.AerospikeCluster{} err := r.Get(ctx, req.NamespacedName, cluster) if err != nil { if errors.IsNotFound(err) { //кластер был удален, удалим развертывание sts := &appsv1.StatefulSet{} err = r.Get(ctx, types.NamespacedName{Name: cluster.Name, Namespace: cluster.Namespace}, sts) if err == nil { err = r.Delete(ctx, sts) if err == nil { cm := &corev1.ConfigMap{} err = r.Get(ctx, types.NamespacedName{Name: cluster.Name, Namespace: cluster.Namespace}, cm) } } } //и завершаем reconfile успешно (или с ошибкой) return ctrl.Result{}, err }  // Check if the deployment already exists, if not create a new deployment. found := &appsv1.StatefulSet{} err = r.Get(ctx, types.NamespacedName{Name: cluster.Name, Namespace: cluster.Namespace}, found) if err != nil { if errors.IsNotFound(err) { // существующего развертывания нет - создаем новое err2 := r.generateConfigMap(ctx, cluster) //создаем configmap if err2 != nil { return ctrl.Result{}, err } dep := r.getConfiguration(cluster) if err = r.Create(ctx, dep); err != nil { return ctrl.Result{}, err } cluster.Status.BrokenNodes = 0 cluster.Status.Deployed = true r.Status().Update(ctx, cluster) return ctrl.Result{Requeue: true}, nil } else { return ctrl.Result{}, err } }  deployments := int32(cluster.Spec.Deployments) //здесь можно сравнить текущий и предыдущий размер кластера и изменить конфигурацию, если требуется //но у нас могут поменяться обновременно и переменные окружения и configmap found.Spec.Replicas = &deployments var replicationFactor string replicationFactor = strconv.Itoa(cluster.Spec.ReplicationFactor) var maxMemory string maxMemory = strconv.Itoa(*cluster.Spec.MaxMemory)  found.Spec.Template.Spec.Containers[0].Env = []corev1.EnvVar{ { Name:  "REPL_FACTOR", Value: replicationFactor, }, { Name:  "MEM_GB", Value: maxMemory, }, }  found.Spec.Template.Spec.Containers[0].Resources.Limits.Cpu().Set(int64(*cluster.Spec.MaxCPU)) err = r.generateConfigMap(ctx, cluster) if err != nil { return ctrl.Result{}, err }  // получим активное количество подов и обновим BrokenNodes в состоянии podList := &corev1.PodList{} listOpts := []client.ListOption{ client.InNamespace(cluster.Namespace), client.MatchingLabels{"instance": cluster.Name}, client.MatchingFields{"status.phase": "Running"}, } if err = r.List(ctx, podList, listOpts...); err != nil { return ctrl.Result{}, err }  // обновим количество неактивных узлов cluster.Status.BrokenNodes = cluster.Spec.Deployments - len(podList.Items) err2 := r.Status().Update(ctx, cluster)  return ctrl.Result{}, err2 }

Для применения созданного оператора можно использовать make deploy в корневом каталоге проекта или через OLM:

operator-sdk olm install make bundle IMG="dzolotov.tech/aerospike-operator:v0.0.1" make bundle-build bundle-push BUNDLE_IMG="dzolotov.tech/aerospike-operator-bundle:v0.0.1" operator-sdk run bundle dzolotov.tech/aerospike-operator-bundle:v0.0.1

Здесь будет собран образ контейнера (его можно отправить, например, в Docker hub) и запущен оператор на текущем кластере Kubernetes. Для остановки оператора применяются действия make undeployили operator-sdk cleanup aerospike-operator (для OLM).

Проверить работу оператора можно через создание ресурса, который был определен ранее:

cat <<< EOF | kubectl apply -f - apiVersion: dzolotov.tech/v1alpha1 kind: AerospikeCluster metadata:   namespace: cluster   name: demo spec:   deployments: 2   maxMemory: 2   compressionLevel: 4   replicationFactor: 1   strongConsistency: false   heartbeat:     interval: 10000 EOF

С использованием операторов возможно реализовать сложную логику по управлению распределенными системами или обеспечить возможность конфигурирования разворачиваемых приложений в терминах предметной области. Большое количество операторов можно найти на https://operatorhub.io/, а документацию и статьи по разработке в этой коллекции ссылок.

Также хочу пригласить всех заинтересованных на бесплатный вебинар, в рамках которого мы комплексно рассмотрим основные векторы по обеспечению безопасности kubernetes кластера и подробно остановимся на каждом из них. Затронем тему безопасности docker-образов, безопасность в рантайме, network и application security.


ссылка на оригинал статьи https://habr.com/ru/company/otus/blog/669806/