Оператором в Kubernetes принято называть развертывание, которое самостоятельно управляет ресурсами кластера, регистрирует новые Custom Resource Definition (CRD) и, в некоторых случаях, добавляется для наблюдения за существующими ресурсами (через механизмы Dynamic Admission Control). В этой статье на примере создания оператора для развертывания и управления кластером Aerospike мы попробуем разобраться с этапами создания оператора, способами взаимодействия с кластером и проблемами, с которыми можно встретиться в реальной практике. Всех практикующих DevOps и желающих поднять автоматизацию развертывания своих сервисов на новый уровень приглашаю под кат.
Прежде всего нужно определить, какую функциональность будет поддерживать будущий оператор. Поскольку база данных Aerospike допускает работу в режиме кластера, будет разумно создать CRD для описания характеристик кластера (количество подов, ограничения по памяти и процессору и др), конфигурации пространства имен, а также настройки резервного копирования. Также оператор должен отслеживать изменения в существующих ресурсах, связанных с ним, и динамически изменять топологию системы и/или настройки подов.
Для разработки оператора мы будем использовать язык программирования Go и набор инструментов Operator SDK, который нужно будет предварительно установить:
git clone https://github.com/operator-framework/operator-sdk cd operator-sdk make install
После установки SDK можно выполнить инициализацию заготовки проекта оператора:
operator-sdk init --domain dzolotov.tech --owner "Dmitrii Zolotov"
Поскольку мы предполагаем создание дополнительных типов ресурсов, необходимо определить идентификатор версии API, который представляет собой URI на произвольном домене. Для удобства использования operator-sdk можно сразу установить поддержку автодополнения в shell Debian/Ubuntu:
apt install bash-completion echo 'source <(operator-sdk completion bash)' >>~/.bashrc
Operator SDK поддерживает создание операторов для разных DevOps-решений, включая Ansible и Helm, для этого при инициализации можно указывать необходимые плагины. Также инструмент командной строки позволяет выполнять развертывание оператора (operator-sdk olm
), запуск оператора в выбранном контексте (operator-sdk run
) и выполнять тесты (operator-sdk scorecard
). Также можно создавать фрагменты кода для реализации разных типов Kubernetes-ресурсов. Начнем с создания версии apiVersion:
operator-sdk create api --group aerospike --version v1alpha1 --kind AerospikeCluster --resource --controller
Генератор создает несколько файлов для взаимодействия с Kubernetes API:
-
api/v1alpha1/aerospikecluster_types.go
— описание типов (CRD); -
controllers/aerospikecluster_controller.go
— контроллер для создания и отслеживания типов CRD; -
/go.mod
— содержит зависимости для подключения к API (k8s.io/apimachinery — доступ к API для управления ресурсами, k8s.io/client-go — Go-клиент для взаимодействия с API, sigs.k8s.io/controller-runtime — базовые классы для реализации контроллера, github.com/onsi/ginkgo — тестовый фреймворк для проверки оператора).
Попробуем сгенерировать манифесты для Kubernetes, которые будут определять Custom Resource Definition и развертывания оператора: make manifests
. Результатом выполнения будет генерация файлов в crd/bases — регистрация CRD-ресурсов (apiextensions.k8s.io/v1/CustomResourceDefinition
) с названием aerospikeclusters.aerospike.dzolotov.tech
для создаваемого API (группа aerospike.my.domain, тип ресурса AerospikeCluster
). По умолчанию схема ресурса содержит строковое поле foo, далее мы переопределим схему в коде описания типов на Go:
apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: controller-gen.kubebuilder.io/version: v0.8.0 name: aerospikeclusters.aerospike.my.domain spec: group: aerospike.dzolotov.tech names: kind: AerospikeCluster listKind: AerospikeClusterList plural: aerospikeclusters singular: aerospikecluster # будет локальным для namespace (может быть Cluster для создания ресурса на уровне кластера, без привязки к namespace) scope: Namespaced versions: # создаваемая версия CRD - name: v1alpha1 schema: # описание схемы yaml-документа openAPIV3Schema: description: AerospikeCluster is the Schema for the aerospikeclusters API properties: # версия api (позволит делать разные версии для API) apiVersion: description: 'APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources' type: string # тип ресурса (в нашем случае будет AerospikeCluster) kind: description: 'Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' type: string # описание метаданных (.meta) - определяется по схеме metav1.ObjectMeta metadata: type: object # описание спецификации кластера (.spec) - в соответствии со схемой spec: description: AerospikeClusterSpec defines the desired state of AerospikeCluster properties: # свойства спецификации кластера (.spec) foo: description: Foo is an example field of AerospikeCluster. Edit aerospikecluster_types.go to remove/update type: string type: object # назначение ключа объекта yaml .status (пустой объект) status: description: AerospikeClusterStatus defines the observed state of AerospikeCluster type: object type: object # флаг актуальности версии CRD served: true # версия по умолчанию storage: true
Примером корректного ресурса с использованием указанного CRD:
apiVersion: aerospike.my.domain/alpha1 kind: AerospikeCluster metadata: name: demo spec: foo: bar status: {}
Прежде чем мы пойдем рассматривать код контроллера дополним схему данных и добавим следующие поля:
-
deployments (int) — количество серверов в кластере;
-
maxMemory (int) — ограничение по размеру памяти (в Гб);
-
maxCPU (string) — ограничение по использованию процессора;
-
logging (string) — уровень протоколирования;
-
compressionLevel (int) — уровень сжатия;
-
replicationFactor (int) — фактор репликации для кластера;
-
strongConsistency (bool) — включение режима строгой согласованности (в ущерб производительности);
-
heartbeat:
-
interval (int) — интервал для проверки доступности соседних узлов (в миллисекундах).
-
-
keepalive:
-
enabled (bool) — разрешено использование keepalive-сообщений;
-
interval (int) — интервал в секундах между сообщениями.
-
Для демонстрации такого набора будет достаточно, в реальном кластере разумеется нужно предусмотреть наличие всех необходимых опций для конфигурации узлов. Структура данных для спецификации кластера определяется в файле api/alphav1/aerospikecluster_types.go, сейчас там даны определения для всех создаваемых типов данных (кластер, список кластеров и связанные с ними типы):
package v1alpha1 import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) //ожидаемое состояние кластера (спецификация) type AerospikeClusterSpec struct { Foo string `json:"foo,omitempty"` } //текущее состояние кластера type AerospikeClusterStatus struct { } //+kubebuilder:object:root=true //+kubebuilder:subresource:status type AerospikeCluster struct { metav1.TypeMeta `json:",inline"` //стандартное определение метаданных (namespace, name, annotations, ...) metav1.ObjectMeta `json:"metadata,omitempty"` //кластер содержит спецификацию и текущий статус Spec AerospikeClusterSpec `json:"spec,omitempty"` Status AerospikeClusterStatus `json:"status,omitempty"` } //+kubebuilder:object:root=true type AerospikeClusterList struct { metav1.TypeMeta `json:",inline"` metav1.ListMeta `json:"metadata,omitempty"` Items []AerospikeCluster `json:"items"` } //регистрация типов объектов и их схем func init() { SchemeBuilder.Register(&AerospikeCluster{}, &AerospikeClusterList{}) }
Добавим необходимые поля (и структуры) в структуру AerospikeClusterSpec и обозначим в статусе текущее состояние кластера: deployed (bool) — устанавливается в true когда развертывание завершено, brokenNodes (int) — количество недоступных узлов. Обновим определение типов и выполним повторно команду make в корневом каталоге проекта, это приведет к выполнению форматирования и проверки синтаксиса файлов.
type AerospikeHeartBeat struct { // Heartbeat interval in milliseconds Interval int `json:"interval,omitempty"` } type AerospikeKeepAlive struct { // Keep alive is enabled Enabled bool `json:"enabled,omitempty"` // Keep alive messages interval in seconds Interval int `json:"interval,omitempty"` } // ожидаемое состояние кластера (спецификация) type AerospikeClusterSpec struct { // Deployments count Deployments int `json:"deployments,omitempty"` // Maximum memory limit in GB (for example, 4) MaxMemory *int `json:"maxMemory,omitempty"` // Maximum cpu limit (100 = full power) MaxCPU *int `json:"maxCPU,omitempty"` // Logging level (https://docs.aerospike.com/reference/configuration) Logging *string `json:"logging,omitempty"` // Compression level (1 - lower compression, 9 - higher, but slower compression) CompressionLevel *int `json:"compressionLevel,omitempty"` // Min replicas count ReplicationFactor int `json:"replicationFactor,omitempty"` // Enable strong consistency mode StrongConsistency bool `json:"strongConsistency,omitempty"` // Heartbeat configuration HeartBeat *AerospikeHeartBeat `json:"heartbeat,omitempty"` // Keepalive configuration Keepalive *AerospikeKeepAlive `json:"keepalive,omitempty"` } // текущее состояние кластера type AerospikeClusterStatus struct { // All nodes are prepared and ready Deployed bool `json:"deployed"` // How many nodes isn't available BrokenNodes int `json:"brokenNodes"` }
После применения make manifests можем убедиться, что описание создания CRD-ресурса соответствует измененной схеме (фрагмент приведен ниже):
compressionLevel: description: Compression level (1 - lower compression, 9 - higher, but slower compression) type: integer deployments: description: Deployments count type: integer heartbeat: description: Heartbeat configuration properties: interval: description: Heartbeat interval in milliseconds type: integer type: object
При необходимости создать дополнительную версию существующего ресурса или создать новый можно использовать как инструмент operator-sdk create api, так и создавать дополнительные каталоги и файлы на основе существующего шаблона.
Дальше посмотрим на контроллер и тут ключевую роль играет Reconciler, который выполняет изменение конфигурации в соответствии с разностью между текущим и ожидаемым состоянием. Кроме это для проверки корректности согласованности спецификации может быть добавлено использование webhook и здесь нужно сделать небольшое введение в Dynamic Admission Control. Динамическое управление позволяет зарегистрировать два вида дополнительных обработчиков во время операций над ресурсами — ValidatingWebhook (проверяет корректность схемы создаваемого ресурса, позволяет отменить операцию при нарушении схемы) и MutatingWebhook (допускает возможность внесения изменений в ресурс перед развертыванием, например используется для встраивания sidecar-контейнеров в операторе управления резервным копированием Stash, который мы рассматривали в этой статье). Operator SDK позволяет также создавать и регистрировать webhook и даже генерировать заготовку ValidatingWebhook для проверки согласованности данных .
operator-sdk create webhook --group aerospike --version v1alpha1 --kind AerospikeCluster --defaulting --programmatic-validation
После вызова генератора создается файл aerospikecluster_webhook.go, в котором нужно будет реализовать функции проверки конфигурации при создании ValidateCreate(), при обновлении ValidateUpdate() и при удалении ValidateDelete(). Добавим дополнительную проверку, что количество реплик не превышает количество узлов и количество узлов является натуральным числом. В результате получим следующий код:
package v1alpha1 import ( apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/validation/field" ctrl "sigs.k8s.io/controller-runtime" logf "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/webhook" "strconv" ) // log is for logging in this package. var aerospikeclusterlog = logf.Log.WithName("aerospikecluster-resource") func (r *AerospikeCluster) SetupWebhookWithManager(mgr ctrl.Manager) error { return ctrl.NewWebhookManagedBy(mgr). For(r). Complete() } //+kubebuilder:webhook:path=/mutate-aerospike-dzolotov-tech-v1alpha1-aerospikecluster,mutating=true,failurePolicy=fail,sideEffects=None,groups=aerospike.dzolotov.tech,resources=aerospikeclusters,verbs=create;update,versions=v1alpha1,name=maerospikecluster.kb.io,admissionReviewVersions=v1 var _ webhook.Defaulter = &AerospikeCluster{} //здесь можно применить значения по умолчанию (выполняется до валидации) func (r *AerospikeCluster) Default() { aerospikeclusterlog.Info("default", "name", r.Name) } // TODO(user): change verbs to "verbs=create;update;delete" if you want to enable deletion validation. //+kubebuilder:webhook:path=/validate-aerospike-dzolotov-tech-v1alpha1-aerospikecluster,mutating=false,failurePolicy=fail,sideEffects=None,groups=aerospike.dzolotov.tech,resources=aerospikeclusters,verbs=create;update,versions=v1alpha1,name=vaerospikecluster.kb.io,admissionReviewVersions=v1 var _ webhook.Validator = &AerospikeCluster{} func (r *AerospikeCluster) validateReplicas() error { var allErrs field.ErrorList if r.Spec.ReplicationFactor > r.Spec.Deployments { fldPath := field.NewPath("spec").Child("replicationFactor") allErrs = append(allErrs, field.Invalid(fldPath, strconv.Itoa(r.Spec.ReplicationFactor), "Replication factor must be lower than deployments count")) } if r.Spec.Deployments < 1 { fldPath := field.NewPath("spec").Child("deployments") allErrs = append(allErrs, field.Invalid(fldPath, strconv.Itoa(r.Spec.ReplicationFactor), "Deployments counter must be greater than zero")) } if len(allErrs) == 0 { return nil } return apierrors.NewInvalid(schema.GroupKind{Group: "aerospike.dzolotov.tech", Kind: "AerospikeCluster"}, r.Name, allErrs) } //проверка при создании нового ресурса func (r *AerospikeCluster) ValidateCreate() error { aerospikeclusterlog.Info("validate create", "name", r.Name) return r.validateReplicas() } //проверка при обновлении ресурса func (r *AerospikeCluster) ValidateUpdate(old runtime.Object) error { aerospikeclusterlog.Info("validate update", "name", r.Name) return r.validateReplicas() } //проверка при удалении ресурса (сейчас отключена в метаданных kubebuilder) func (r *AerospikeCluster) ValidateDelete() error { aerospikeclusterlog.Info("validate delete", "name", r.Name) return nil }
Здесь мы используем механизм apierrors, рекомендуемый Kubernetes для подробного описания ошибки и места ее возникновения, который позволяет объединить все ошибки в один объект, но можно было использовать и обычный объект ошибки. Обратите внимание на комментарии //+kubebuilder:webhook, они содержат метаданные, которые используются для генерации манифеста и здесь, например, можно изменить список валидаций (сейчас используются только create,update, но при необходимости можно сделать валидацию на удаление (например, так можно запретить удаление ресурса, но лучше такого не делать). После вызова make / make manifests посмотрим на сгенерированный файл манифеста для установки AdmissionWebhooks:
apiVersion: admissionregistration.k8s.io/v1 kind: MutatingWebhookConfiguration metadata: creationTimestamp: null name: mutating-webhook-configuration webhooks: - admissionReviewVersions: - v1 clientConfig: service: name: webhook-service namespace: system path: /mutate-aerospike-dzolotov-tech-v1alpha1-aerospikecluster failurePolicy: Fail name: maerospikecluster.kb.io rules: - apiGroups: - aerospike.dzolotov.tech apiVersions: - v1alpha1 operations: - CREATE - UPDATE resources: - aerospikeclusters sideEffects: None --- apiVersion: admissionregistration.k8s.io/v1 kind: ValidatingWebhookConfiguration metadata: creationTimestamp: null name: validating-webhook-configuration webhooks: - admissionReviewVersions: - v1 clientConfig: service: name: webhook-service namespace: system path: /validate-aerospike-dzolotov-tech-v1alpha1-aerospikecluster failurePolicy: Fail name: vaerospikecluster.kb.io rules: - apiGroups: - aerospike.dzolotov.tech apiVersions: - v1alpha1 operations: - CREATE - UPDATE resources: - aerospikeclusters sideEffects: None
Здесь можно увидеть, что и MutatingWebhook и ValidatingWebhook применяются на ресурсы в apiVersion: aerospike.dzolotov.tech/v1alpha1 при операциях создания и обновления на ресурсах aerospikecluster. Теперь посмотрим на другие подкаталоги в config и поговорим о правах доступа (RBAC) и развертывании контроллера.
-
certmanager
— манифест для издания сертификата для валидации webhook (создается автоматически), привязывается к сервису контроллера; -
crd
— описание типов ресурсов (CRD), мы рассмотрели его выше, но добавлю что кроме наших определений есть еще патчи, которые позволяют пробросить созданный через certmanager сертификат в аннотации ресурса для использования в коде; -
default
— содержит патчи для контроллера, которые публикуют webhook-сервер и разворачивают system/controller-manager из gcr.io/kubebuilder/kube-rbac-proxy:v0.11.0; -
manifests
— конфигурация для kustomization, который создает манифесты при сборке финального артефакта для установки оператора; -
prometheus
— регистрация ServiceMonitor для экспорта метрик из controller-manager в prometheus; -
rbac
— набор прав для обеспечения доступа контроллера к управлению ресурсами aerospikeclusters, а также для публикации метрик, сервисной записи для выполнения controller-manager и др.; -
scorecard
— конфигурация для запуска тестов через OLM (Operator Lifecycle Manager, позволяет управлять запуском и тестирование операторов внутри существующего кластера kubernetes); -
webhook
— конфигурация для регистрации AdminissionWebhooks (рассмотрели ранее).
Коротко об RBAC (Role-Based Access Control) — это внутренний механизм контроля доступа Kubernetes, который определяет понятие Role (или ClusterRole, если без привязки к пространству имен), перечисляющее какие действия и над какими ресурсами могут быть выполнены и RoleBinding (ClusterRoleBinding) для связывания учетной записи пользователи или сервисной записи (ServiceAccount) с соответствующей ролью. Таким образом, чтобы оператор мог создавать новые ресурсы Deployment, требуется определить роль с разрешениями на create/update/patch/delete для ресурсов kind: Deployment в apiVersion: apps/v1 и связать ее с сервисной записью, под которой происходит развертывание оператора. Для управление нашим контроллером будет запущен процесс controller-manager, он также будет отслеживать доступность контроллера и перезапускать его при необходимости.
Вернемся теперь к контроллеру (controllers/aerospikecluster_controller.go) и посмотрим внимательно на его структуру:
package controllers import ( "context" "k8s.io/apimachinery/pkg/runtime" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" aerospikev1alpha1 "github.com/operator-framework/operator-sdk/api/v1alpha1" ) // Объект реконсилера для приведения кластера к ожидаемому состоянию type AerospikeClusterReconciler struct { client.Client Scheme *runtime.Scheme } // Описание разрешений (будут транслированы в роли) //+kubebuilder:rbac:groups=aerospike.dzolotov.tech,resources=aerospikeclusters,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=aerospike.dzolotov.tech,resources=aerospikeclusters/status,verbs=get;update;patch //+kubebuilder:rbac:groups=aerospike.dzolotov.tech,resources=aerospikeclusters/finalizers,verbs=update func (r *AerospikeClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { _ = log.FromContext(ctx) //здесь будет наш код управления кластером return ctrl.Result{}, nil } // Регистрация с использованием ControllerManager func (r *AerospikeClusterReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&aerospikev1alpha1.AerospikeCluster{}). Complete(r) }
Поскольку мы будем управлять развертыванием кластера серверов (будем использовать StatefulSet, поскольку нам важно сохранить связь экземпляра пода и его местоположения, также нам будет нужно иметь возможность создания и изменения ConfigMap для определения конфигурации сервера по параметрам из манифеста), то нам нужно будет сообщить контроллеру о том, что мы будем наблюдать за ресурсами StatefulSet и Pod и добавить соответствующие правила в комментариях +kubebuilder. Прежде всего добавим импорт структур, описывающих пространство имен apps/v1 и core/v1 (для ConfigMap):
import ( "context" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" aerospikev1alpha1 "github.com/operator-framework/operator-sdk/api/v1alpha1" )
И сообщим менеджеру о том, что мы будем управлять объектами StatefulSet и ConfigMap:
func (r *AerospikeClusterReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&aerospikev1alpha1.AerospikeCluster{}). Owns(&appsv1.StatefulSet{}). Owns(&corev1.ConfigMap{}). Complete(r) }
И добавим метаданные перед методом Reconcile для описания требуемых прав доступа:
//+kubebuilder:rbac:groups=apps/v1,resources=statefulsets,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=apps/v1,resources=pods,verbs=get;list //+kubebuilder:rbac:groups=v1,resources=configmaps,verbs=get;list;create;update;patch;delete
Сама реализация будет использовать публичные методы API Kubernetes, прежде всего для нас интересны методы:
-
r.List(ctx, objectList, opts)
— возвращает список объектов в objectList с отбором по opts (требует разрешения list на этот тип объектов в RBAC-роли), например:
podList := &v1.PodList{} opts := []client.ListOption{ client.InNamespace(request.NamespacedName.Namespace), client.MatchingLabels{"instance": request.NamespacedName.Name}, client.MatchingFields{"status.phase": "Running"}, } err := r.List(ctx, podList, opts...)
Выбирает все запущенные поды с меткой instance и значением, совпадающих с именем кластера (поиск осуществляется в том же пространстве имен, где развернут ресурс AerospikeCluster).
-
r.Get(ctx, namespacedName, object)
— сохраняет объект в object, идентификацию объекта выполняет NamespacedName, включающий пространство имен Namespace и имя внутри него Name (требует разрешение get на этот тип объектов в RBAC-роли);
-
r.Create(ctx, object, opts)
— создает новый объект в кластере (требуется наличие разрешений create на этот тип объекта в RBAC-роли), например:
func (r *AerospikeClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { _ = log.FromContext(ctx) //прочитаем ресурс описания кластера cluster := &aerospikev1alpha1.AerospikeCluster{} err := r.Get(ctx, req.NamespacedName, cluster) if err != nil { return ctrl.Result{}, err } //преобразование числовых полей в строки и в тип int32 для передачи в структуру //описания StatefulSet var deployments int32 deployments = int32(cluster.Spec.Deployments) var replicationFactor string replicationFactor = strconv.Itoa(cluster.Spec.ReplicationFactor) var maxMemory string maxMemory = strconv.Itoa(*cluster.Spec.MaxMemory) var maxCpu string maxCpu = strconv.Itoa(*cluster.Spec.MaxCPU) //описание нового объекта StatefulSet sts := appsv1.StatefulSet{ObjectMeta: ctrl.ObjectMeta{Namespace: req.NamespacedName.Namespace, Name: req.NamespacedName.Name}, Spec: appsv1.StatefulSetSpec{ Replicas: &deployments, Selector: &v1.LabelSelector{ MatchLabels: map[string]string{"label": req.Name}, }, Template: corev1.PodTemplateSpec{ Spec: corev1.PodSpec{ Containers: []corev1.Container{ { Name: "aerospike", Image: "aerospike", Args: nil, EnvFrom: nil, Env: []corev1.EnvVar{ { Name: "REPL_FACTOR", Value replicationFactor, }, { Name: "MEM_GB", Value: maxMemory, }, }, Resources: corev1.ResourceRequirements{ Limits: corev1.ResourceList{ corev1.ResourceCPU: resource.MustParse(maxCpu), }, }, }, }, }, }, UpdateStrategy: appsv1.StatefulSetUpdateStrategy{}, }} r.Create(ctx, &sts) return ctrl.Result{Requeue: true}, nil }
-
r.Update(ctx, object, opts)
— обновление конфигурации объекта (для StatefulSet например можно изменить переменные окружения и количество реплик), требуется разрешение update на этот тип ресурсов в RBAC-роли; -
r.Patch(ctx, object, patch)
— внесение исправлений в структуру, описывающую запущенный объект; -
r.Delete(ctx, objects, opts)
— удаление объекта из кластера (требуется разрешение delete на этот тип ресурсов в RBAC-роли); -
r.Status().Update(ctx, cluster)
— дает доступ к модификации объекта состояния (виден в манифесте и в объекте cluster как .status).
Поведение reconciler в нашем случае может быть описано несколькими возможными сценариями:
-
объект кластера существует, но нет statefulset — создаем новый statefulset и configmap;
-
объект кластера существует и есть statefulset — обновляем параметры конфигурации (лимиты, переменные окружения, количество реплик) и пересоздаем configmap;
-
объект кластера не существует — удаляем statefulset и configmap (игнорируем ошибки).
В целом код можно разделить на несколько частей: генерация и обновление configmap, обработка удаления кластера, обновление конфигурации существующего кластера. Код может выглядеть таким образом. Обновление или создание configmap:
func (r *AerospikeClusterReconciler) generateConfigMap(ctx context.Context, cluster *aerospikev1alpha1.AerospikeCluster) error { //проверим на наличие существующего configmap var cm corev1.ConfigMap err := r.Get(ctx, types.NamespacedName{ Namespace: cluster.Namespace, Name: cluster.Name, }, &cm) if err != nil { if errors.IsNotFound(err) { //создаем новый configmap newConfigMap := corev1.ConfigMap{ ObjectMeta: v1.ObjectMeta{ Namespace: cluster.Namespace, Name: cluster.Name, }, Data: map[string]string{ "aerospike": createConfigMap(cluster), }, } err := r.Create(ctx, &newConfigMap) return err } else { return err } } //если объект уже существует, то изменяем данные cm.Data = map[string]string{ "aerospike": createConfigMap(cluster), } err = r.Update(ctx, &cm) return err } func createConfigMap(cluster *aerospikev1alpha1.AerospikeCluster) string { result := make([]string, 0, 0) if cluster.Spec.Logging != nil { result = append(result, "logging "+*cluster.Spec.Logging) } result = append(result, "strong-consistency "+strconv.FormatBool(cluster.Spec.StrongConsistency)) if cluster.Spec.CompressionLevel != nil { result = append(result, "compression-level "+strconv.Itoa(*cluster.Spec.CompressionLevel)) } result = append(result, "interval "+strconv.Itoa(cluster.Spec.HeartBeat.Interval)) if cluster.Spec.Keepalive != nil { result = append(result, "keepalive-enabled "+strconv.FormatBool(cluster.Spec.Keepalive.Enabled)) result = append(result, "keepalive-intvl "+strconv.Itoa(cluster.Spec.Keepalive.Interval)) } return strings.Join(result, "\n") }
Подготовка объекта StatefulSet по описанию кластера:
func (r *AerospikeClusterReconciler) getConfiguration(cluster *aerospikev1alpha1.AerospikeCluster) *appsv1.StatefulSet { var deployments int32 deployments = int32(cluster.Spec.Deployments) var replicationFactor string replicationFactor = strconv.Itoa(cluster.Spec.ReplicationFactor) var maxMemory string maxMemory = strconv.Itoa(*cluster.Spec.MaxMemory) var maxCpu string maxCpu = strconv.Itoa(*cluster.Spec.MaxCPU) sts := appsv1.StatefulSet{ObjectMeta: ctrl.ObjectMeta{Namespace: cluster.Namespace, Name: cluster.Name}, Spec: appsv1.StatefulSetSpec{ Replicas: &deployments, Selector: &v1.LabelSelector{ MatchLabels: map[string]string{"instance": cluster.Name}, }, Template: corev1.PodTemplateSpec{ ObjectMeta: ctrl.ObjectMeta{ Labels: map[string]string{"instance": cluster.Name}, }, Spec: corev1.PodSpec{ Containers: []corev1.Container{ { Name: "aerospike", Image: "aerospike", Args: nil, EnvFrom: nil, VolumeMounts: []corev1.VolumeMount{ { Name: "config", MountPath: "/etc/aerospike/", }, }, Env: []corev1.EnvVar{ { Name: "REPL_FACTOR", Value: replicationFactor, }, { Name: "MEM_GB", Value: maxMemory, }, }, Resources: corev1.ResourceRequirements{ Limits: corev1.ResourceList{ corev1.ResourceCPU: resource.MustParse(maxCpu), }, }, }, }, Volumes: []corev1.Volume{ { Name: "config", VolumeSource: corev1.VolumeSource{ ConfigMap: &corev1.ConfigMapVolumeSource{ LocalObjectReference: corev1.LocalObjectReference{Name: cluster.Name}, Items: []corev1.KeyToPath{ { Key: "aerospike", Path: "aerospike.conf", }, }, }, }, }, }, }, }, UpdateStrategy: appsv1.StatefulSetUpdateStrategy{}, }} return &sts }
Логика удаления, создания и обновления ресурсов кластера и его состояния:
func (r *AerospikeClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { _ = log.FromContext(ctx) //можно сохранить в logger и использовать для сообщений об ошибках или диагностики cluster := &aerospikev1alpha1.AerospikeCluster{} err := r.Get(ctx, req.NamespacedName, cluster) if err != nil { if errors.IsNotFound(err) { //кластер был удален, удалим развертывание sts := &appsv1.StatefulSet{} err = r.Get(ctx, types.NamespacedName{Name: cluster.Name, Namespace: cluster.Namespace}, sts) if err == nil { err = r.Delete(ctx, sts) if err == nil { cm := &corev1.ConfigMap{} err = r.Get(ctx, types.NamespacedName{Name: cluster.Name, Namespace: cluster.Namespace}, cm) } } } //и завершаем reconfile успешно (или с ошибкой) return ctrl.Result{}, err } // Check if the deployment already exists, if not create a new deployment. found := &appsv1.StatefulSet{} err = r.Get(ctx, types.NamespacedName{Name: cluster.Name, Namespace: cluster.Namespace}, found) if err != nil { if errors.IsNotFound(err) { // существующего развертывания нет - создаем новое err2 := r.generateConfigMap(ctx, cluster) //создаем configmap if err2 != nil { return ctrl.Result{}, err } dep := r.getConfiguration(cluster) if err = r.Create(ctx, dep); err != nil { return ctrl.Result{}, err } cluster.Status.BrokenNodes = 0 cluster.Status.Deployed = true r.Status().Update(ctx, cluster) return ctrl.Result{Requeue: true}, nil } else { return ctrl.Result{}, err } } deployments := int32(cluster.Spec.Deployments) //здесь можно сравнить текущий и предыдущий размер кластера и изменить конфигурацию, если требуется //но у нас могут поменяться обновременно и переменные окружения и configmap found.Spec.Replicas = &deployments var replicationFactor string replicationFactor = strconv.Itoa(cluster.Spec.ReplicationFactor) var maxMemory string maxMemory = strconv.Itoa(*cluster.Spec.MaxMemory) found.Spec.Template.Spec.Containers[0].Env = []corev1.EnvVar{ { Name: "REPL_FACTOR", Value: replicationFactor, }, { Name: "MEM_GB", Value: maxMemory, }, } found.Spec.Template.Spec.Containers[0].Resources.Limits.Cpu().Set(int64(*cluster.Spec.MaxCPU)) err = r.generateConfigMap(ctx, cluster) if err != nil { return ctrl.Result{}, err } // получим активное количество подов и обновим BrokenNodes в состоянии podList := &corev1.PodList{} listOpts := []client.ListOption{ client.InNamespace(cluster.Namespace), client.MatchingLabels{"instance": cluster.Name}, client.MatchingFields{"status.phase": "Running"}, } if err = r.List(ctx, podList, listOpts...); err != nil { return ctrl.Result{}, err } // обновим количество неактивных узлов cluster.Status.BrokenNodes = cluster.Spec.Deployments - len(podList.Items) err2 := r.Status().Update(ctx, cluster) return ctrl.Result{}, err2 }
Для применения созданного оператора можно использовать make deploy
в корневом каталоге проекта или через OLM:
operator-sdk olm install make bundle IMG="dzolotov.tech/aerospike-operator:v0.0.1" make bundle-build bundle-push BUNDLE_IMG="dzolotov.tech/aerospike-operator-bundle:v0.0.1" operator-sdk run bundle dzolotov.tech/aerospike-operator-bundle:v0.0.1
Здесь будет собран образ контейнера (его можно отправить, например, в Docker hub) и запущен оператор на текущем кластере Kubernetes. Для остановки оператора применяются действия make undeploy
или operator-sdk cleanup aerospike-operator
(для OLM).
Проверить работу оператора можно через создание ресурса, который был определен ранее:
cat <<< EOF | kubectl apply -f - apiVersion: dzolotov.tech/v1alpha1 kind: AerospikeCluster metadata: namespace: cluster name: demo spec: deployments: 2 maxMemory: 2 compressionLevel: 4 replicationFactor: 1 strongConsistency: false heartbeat: interval: 10000 EOF
С использованием операторов возможно реализовать сложную логику по управлению распределенными системами или обеспечить возможность конфигурирования разворачиваемых приложений в терминах предметной области. Большое количество операторов можно найти на https://operatorhub.io/, а документацию и статьи по разработке в этой коллекции ссылок.
Также хочу пригласить всех заинтересованных на бесплатный вебинар, в рамках которого мы комплексно рассмотрим основные векторы по обеспечению безопасности kubernetes кластера и подробно остановимся на каждом из них. Затронем тему безопасности docker-образов, безопасность в рантайме, network и application security.
ссылка на оригинал статьи https://habr.com/ru/company/otus/blog/669806/
Добавить комментарий