Привет, Хабр!
Сегодня я расскажу о разработке кастомного планировщика pod’ов для HPC‑нагрузок в Kubernetes с учётом NUMA и специфичных требований. Рассмотрим код с примером для магазина корма для собачек и всеми нюансами реализации.
Но для начала… зачем вообще нужен кастомный планировщик для HPC?
Kubernetes в своём стандартном виде отлично справляется с большинством задач. Но когда речь идёт о высокопроизводительных вычислениях (HPC), стандартный scheduler может не выдержать требования:
-
Тщательное распределение ресурсов. HPC‑приложения чувствительны к задержкам, фрагментации памяти и неправильному распределению вычислительных ядер.
-
Учёт NUMA‑топологии. В архитектуре NUMA скорость доступа к памяти зависит от её физического расположения относительно процессора — и тут ошибка может стоить дорого.
-
Специфичные требования к CPU, памяти и даже специализированному оборудованию (GPU, RDMA и т. п.).
Поэтому напишем собственный планировщик, который:
-
Отбирает pod’ы с меткой hpc=true.
-
Анализирует аннотации узлов с описанием NUMA‑топологии.
-
Оценивает узлы по доступным ресурсам и специфическим требованиям pod’ов.
-
Надёжно связывает pod с выбранным узлом через Kubernetes API.
Основные функции нашего кастомного HPC-планировщика
Фильтрация pod’ов по меткам HPC
Не все pod’ы требуют особой обработки. Будем отбирать только те, что действительно настроены для HPC, используя метку hpc=true. Таким образом, стандартный scheduler продолжит работать для остальных задач, а наш планировщик займётся только критичными нагрузками.
Анализ NUMA‑топологии узлов
Каждый узел в кластере снабжён аннотацией, описывающей его NUMA‑топологию, например:
[ {"socketId": 0, "cores": 8, "memory": 32768}, {"socketId": 1, "cores": 8, "memory": 32768} ]
Это позволяет учитывать не только общее количество ядер и памяти, но и распределение этих ресурсов по сокетам, что критично для HPC‑задач.
Оценка узлов
После фильтрации узлов планировщик оценивает их по нескольким параметрам:
-
Доступность требуемых ядер и памяти.
-
Оптимальное распределение по NUMA‑сокетам.
-
Наличие дополнительных ресурсов (например, GPU).
-
Специфичные требования pod’а, передаваемые через аннотации (например, минимальное число ядер для приложения DogFoodShop).
Безопасное связывание pod»ов с узлами
Наконец, когда выбран наилучший узел, планировщик связывает pod с ним с помощью Kubernetes API. Обработка ошибок на каждом этапе гарантирует, что никакие сбои не останутся незамеченными.
Архитектура и реализация
Будем писать на моем любимом Go, ведь это язык, на котором написан сам Kubernetes. Структура проекта довольно проста:
-
Фильтрация pod’ов: отслеживание событий создания pod’ов с помощью информеров.
-
Оценка узлов: считывание аннотаций узлов, разбор NUMA‑топологии и вычисление баллов.
-
Binding: связывание pod’а с выбранным узлом через API Kubernetes.
Объявление пакета и импорт необходимых библиотек
Объявляем пакет main и подключаем все нужные пакеты. Стандартные пакеты (например, context, encoding/json, flag, log, sort и time) понадобятся для работы с контекстами, JSON, аргументами командной строки, логированием, сортировкой и таймерами. Библиотеки из k8s.io — это мост к Kubernetes API, информерам и настройке клиента.
package main import ( "context" "encoding/json" "flag" "fmt" "log" "sort" "time" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/clientcmd" )
Определение структур: NumaInfo и Scheduler
В этой части кода мы объявляем структуру NumaInfo, которая описывает характеристики одного NUMA‑сокета (идентификатор сокета, количество ядер и объём памяти). Также создаём структуру Scheduler — кастомный планировщик, который будет хранить клиента Kubernetes и информер для отслеживания событий pod’ов.
apiVersion: v1 kind: Pod metadata: name: dogfoodshop-hpc labels: app: dogfoodshop hpc: "true" # Обязательно для нашего кастомного планировщика! annotations: dogshop.minCores: "16" # Минимальное требование к количеству ядер spec: containers: - name: dogfoodshop image: dogfoodshop/hpc:latest resources: limits: cpu: "16" memory: "32Gi" requests: cpu: "16" memory: "32Gi"
Конструктор планировщика
Здесь создаём функцию‑конструктор, которая принимает клиент Kubernetes и создаёт фабрику информеров для отслеживания pod’ов. Именно информер будет следить за событиями создания pod’ов и передавать их нашему планировщику.
apiVersion: v1 kind: Node metadata: name: node-1 annotations: hpc.numa/topology: '[{"socketId":0,"cores":8,"memory":32768},{"socketId":1,"cores":8,"memory":32768}]'
Запуск планировщика и обработка событий
Функция Run запускает информер и регистрирует обработчик событий. Обработчик onAddPod вызывается при создании нового pod’а. Здесь фильтруем pod’ы по метке hpc=true, чтобы работать только с нужными HPC‑задачами.
// Run запускает планировщик func (s *Scheduler) Run(stopCh <-chan struct{}) { defer func() { if r := recover(); r != nil { log.Printf("Recovered from panic: %v", r) } }() log.Println("Запускаем кастомный HPC-планировщик...") s.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: s.onAddPod, }) s.informer.Run(stopCh) } // onAddPod вызывается при создании нового pod’а func (s *Scheduler) onAddPod(obj interface{}) { pod, ok := obj.(*v1.Pod) if !ok { return } // Фильтруем pod’ы, которые не предназначены для HPC if val, ok := pod.Labels["hpc"]; !ok || val != "true" { return } log.Printf("Найден HPC-pod: %s/%s", pod.Namespace, pod.Name) // Пробуем назначить pod на подходящий узел err := s.schedulePod(context.Background(), pod) if err != nil { log.Printf("Ошибка при планировании pod’а %s/%s: %v", pod.Namespace, pod.Name, err) } }
Выбор узла для pod’а и связывание
Здесь переходим к «умному» выбору узла: функция schedulePod получает список узлов, фильтрует их по готовности и NUMA‑аннотациям, оценивая их баллами, чтобы выбрать лучший кандидат; при этом filterAndScoreNodes для каждого узла проверяет его готовность, считывает NUMA‑информацию и суммирует количество ядер, учитывая специальные требования (например, для DogFoodShop требуется минимум 16 ядер), а функция isNodeReady просто определяет, находится ли узел в рабочем состоянии; далее getNodeNumaInfo декодирует NUMA‑топологию, сохранённую в аннотациях узла, и, наконец, bindPod создаёт объект binding и связывает pod с выбранным узлом через API.
// schedulePod – логика выбора узла и связывания pod’а с ним func (s *Scheduler) schedulePod(ctx context.Context, pod *v1.Pod) error { // Получаем список всех узлов nodes, err := s.clientset.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) if err != nil { return fmt.Errorf("не удалось получить список узлов: %w", err) } // Фильтруем узлы с учётом NUMA и ресурсных требований candidateNodes := s.filterAndScoreNodes(nodes.Items, pod) if len(candidateNodes) == 0 { return fmt.Errorf("нет подходящих узлов для pod %s/%s", pod.Namespace, pod.Name) } // Выбираем лучший узел (с максимальным баллом) bestNode := candidateNodes[0].nodeName log.Printf("Выбран узел '%s' для pod %s/%s (балл: %d)", bestNode, pod.Namespace, pod.Name, candidateNodes[0].score) // Выполняем binding pod’а к выбранному узлу return s.bindPod(ctx, pod, bestNode) } // nodeScore – структура для хранения оценки узла type nodeScore struct { nodeName string score int } // filterAndScoreNodes фильтрует узлы и присваивает им баллы func (s *Scheduler) filterAndScoreNodes(nodes []v1.Node, pod *v1.Pod) []nodeScore { var scores []nodeScore for _, node := range nodes { // Пропускаем недоступные узлы if !isNodeReady(&node) { continue } // Получаем NUMA-информацию из аннотаций узла numaInfo, err := getNodeNumaInfo(node) if err != nil { log.Printf("Ошибка получения NUMA-информации с узла %s: %v", node.Name, err) continue } // Оцениваем узел: суммируем количество ядер всех сокетов totalCores := 0 for _, socket := range numaInfo { totalCores += socket.Cores } // Здесь можно учитывать дополнительные требования pod’а (например, память, GPU, RDMA) score := totalCores // Пример: если pod предназначен для DogFoodShop и требует минимум 16 ядер, узлы с меньшим количеством получат штраф if val, ok := pod.Annotations["dogshop.minCores"]; ok && val == "16" && totalCores < 16 { score = 0 } if score > 0 { scores = append(scores, nodeScore{ nodeName: node.Name, score: score, }) } } // Сортируем узлы по убыванию баллов sort.Slice(scores, func(i, j int) bool { return scores[i].score > scores[j].score }) return scores } // isNodeReady проверяет, находится ли узел в состоянии Ready func isNodeReady(node *v1.Node) bool { for _, condition := range node.Status.Conditions { if condition.Type == v1.NodeReady && condition.Status == v1.ConditionTrue { return true } } return false } // getNodeNumaInfo декодирует NUMA-информацию из аннотаций узла func getNodeNumaInfo(node v1.Node) ([]NumaInfo, error) { annotation, ok := node.Annotations["hpc.numa/topology"] if !ok { return nil, fmt.Errorf("аннотация hpc.numa/topology не найдена") } var numaInfo []NumaInfo err := json.Unmarshal([]byte(annotation), &numaInfo) if err != nil { return nil, fmt.Errorf("ошибка декодирования NUMA-информации: %w", err) } return numaInfo, nil } // bindPod связывает pod с выбранным узлом func (s *Scheduler) bindPod(ctx context.Context, pod *v1.Pod, nodeName string) error { // Создаем объект Binding binding := &v1.Binding{ ObjectMeta: metav1.ObjectMeta{ Namespace: pod.Namespace, Name: pod.Name, }, Target: v1.ObjectReference{ Kind: "Node", Name: nodeName, }, } // Выполняем вызов API для binding-а err := s.clientset.CoreV1().Pods(pod.Namespace).Bind(ctx, binding, metav1.CreateOptions{}) if err != nil { return fmt.Errorf("не удалось выполнить binding: %w", err) } log.Printf("Pod %s/%s успешно связан с узлом %s", pod.Namespace, pod.Name, nodeName) return nil }
Функция main
В функции main настраиваем подключение к Kubernetes. Если указан kubeconfig, используем его для локальной разработки, иначе — предполагаем, что код работает внутри кластера (in‑cluster config). Затем создаём клиента, инициализируем планировщик и запускаем его в отдельном горутине. В конце блокируем выполнение, чтобы приложение не завершилось.
func main() { // Читаем параметры командной строки kubeconfig := flag.String("kubeconfig", "", "Путь до kubeconfig файла") flag.Parse() var config *rest.Config var err error if *kubeconfig != "" { // Локальная разработка: используем kubeconfig config, err = clientcmd.BuildConfigFromFlags("", *kubeconfig) if err != nil { log.Fatalf("Ошибка создания конфигурации из kubeconfig: %v", err) } } else { // На продакшене: предполагаем работу внутри кластера config, err = rest.InClusterConfig() if err != nil { log.Fatalf("Ошибка создания in-cluster конфигурации: %v", err) } } clientset, err := kubernetes.NewForConfig(config) if err != nil { log.Fatalf("Ошибка создания клиента Kubernetes: %v", err) } scheduler := NewScheduler(clientset) stopCh := make(chan struct{}) go scheduler.Run(stopCh) // Блокируем выполнение, чтобы приложение не завершалось select {} }
Полный код в спойлере
package main import ( "context" "encoding/json" "flag" "fmt" "log" "sort" "time" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/clientcmd" ) // NumaInfo описывает характеристики одного NUMA-сокета type NumaInfo struct { SocketID int `json:"socketId"` Cores int `json:"cores"` Memory int `json:"memory"` // в мегабайтах } // Scheduler – структура нашего кастомного планировщика type Scheduler struct { clientset kubernetes.Interface informer cache.SharedIndexInformer } // NewScheduler создаёт новый экземпляр Scheduler func NewScheduler(clientset kubernetes.Interface) *Scheduler { // Создаём фабрику информеров для отслеживания pod’ов factory := informers.NewSharedInformerFactory(clientset, time.Minute) podInformer := factory.Core().V1().Pods().Informer() return &Scheduler{ clientset: clientset, informer: podInformer, } } // Run запускает планировщик func (s *Scheduler) Run(stopCh <-chan struct{}) { defer func() { if r := recover(); r != nil { log.Printf("Recovered from panic: %v", r) } }() log.Println("Запускаем кастомный HPC-планировщик...") s.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: s.onAddPod, }) s.informer.Run(stopCh) } // onAddPod вызывается при создании нового pod’а func (s *Scheduler) onAddPod(obj interface{}) { pod, ok := obj.(*v1.Pod) if !ok { return } // Фильтруем pod’ы, которые не предназначены для HPC if val, ok := pod.Labels["hpc"]; !ok || val != "true" { return } log.Printf("Найден HPC-pod: %s/%s", pod.Namespace, pod.Name) // Пробуем назначить pod на подходящий узел err := s.schedulePod(context.Background(), pod) if err != nil { log.Printf("Ошибка при планировании pod’а %s/%s: %v", pod.Namespace, pod.Name, err) } } // schedulePod – логика выбора узла и связывания pod’а с ним func (s *Scheduler) schedulePod(ctx context.Context, pod *v1.Pod) error { // Получаем список всех узлов nodes, err := s.clientset.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) if err != nil { return fmt.Errorf("не удалось получить список узлов: %w", err) } // Фильтруем узлы с учётом NUMA и ресурсных требований candidateNodes := s.filterAndScoreNodes(nodes.Items, pod) if len(candidateNodes) == 0 { return fmt.Errorf("нет подходящих узлов для pod %s/%s", pod.Namespace, pod.Name) } // Выбираем лучший узел (с максимальным баллом) bestNode := candidateNodes[0].nodeName log.Printf("Выбран узел '%s' для pod %s/%s (балл: %d)", bestNode, pod.Namespace, pod.Name, candidateNodes[0].score) // Выполняем binding pod’а к выбранному узлу return s.bindPod(ctx, pod, bestNode) } // nodeScore – структура для хранения оценки узла type nodeScore struct { nodeName string score int } // filterAndScoreNodes фильтрует узлы и присваивает им баллы func (s *Scheduler) filterAndScoreNodes(nodes []v1.Node, pod *v1.Pod) []nodeScore { var scores []nodeScore for _, node := range nodes { // Пропускаем недоступные узлы if !isNodeReady(&node) { continue } // Получаем NUMA-информацию из аннотаций узла numaInfo, err := getNodeNumaInfo(node) if err != nil { log.Printf("Ошибка получения NUMA-информации с узла %s: %v", node.Name, err) continue } // Оцениваем узел: суммируем количество ядер всех сокетов totalCores := 0 for _, socket := range numaInfo { totalCores += socket.Cores } // Здесь можно учитывать дополнительные требования pod’а (например, память, GPU, RDMA) score := totalCores // Пример: если pod предназначен для DogFoodShop и требует минимум 16 ядер, узлы с меньшим количеством получат штраф if val, ok := pod.Annotations["dogshop.minCores"]; ok && val == "16" && totalCores < 16 { score = 0 } if score > 0 { scores = append(scores, nodeScore{ nodeName: node.Name, score: score, }) } } // Сортируем узлы по убыванию баллов sort.Slice(scores, func(i, j int) bool { return scores[i].score > scores[j].score }) return scores } // isNodeReady проверяет, находится ли узел в состоянии Ready func isNodeReady(node *v1.Node) bool { for _, condition := range node.Status.Conditions { if condition.Type == v1.NodeReady && condition.Status == v1.ConditionTrue { return true } } return false } // getNodeNumaInfo декодирует NUMA-информацию из аннотаций узла func getNodeNumaInfo(node v1.Node) ([]NumaInfo, error) { annotation, ok := node.Annotations["hpc.numa/topology"] if !ok { return nil, fmt.Errorf("аннотация hpc.numa/topology не найдена") } var numaInfo []NumaInfo err := json.Unmarshal([]byte(annotation), &numaInfo) if err != nil { return nil, fmt.Errorf("ошибка декодирования NUMA-информации: %w", err) } return numaInfo, nil } // bindPod связывает pod с выбранным узлом func (s *Scheduler) bindPod(ctx context.Context, pod *v1.Pod, nodeName string) error { // Создаем объект Binding binding := &v1.Binding{ ObjectMeta: metav1.ObjectMeta{ Namespace: pod.Namespace, Name: pod.Name, }, Target: v1.ObjectReference{ Kind: "Node", Name: nodeName, }, } // Выполняем вызов API для binding-а err := s.clientset.CoreV1().Pods(pod.Namespace).Bind(ctx, binding, metav1.CreateOptions{}) if err != nil { return fmt.Errorf("не удалось выполнить binding: %w", err) } log.Printf("Pod %s/%s успешно связан с узлом %s", pod.Namespace, pod.Name, nodeName) return nil } func main() { // Читаем параметры командной строки kubeconfig := flag.String("kubeconfig", "", "Путь до kubeconfig файла") flag.Parse() var config *rest.Config var err error if *kubeconfig != "" { // Локальная разработка: используем kubeconfig config, err = clientcmd.BuildConfigFromFlags("", *kubeconfig) if err != nil { log.Fatalf("Ошибка создания конфигурации из kubeconfig: %v", err) } } else { // На продакшене: предполагаем работу внутри кластера config, err = rest.InClusterConfig() if err != nil { log.Fatalf("Ошибка создания in-cluster конфигурации: %v", err) } } clientset, err := kubernetes.NewForConfig(config) if err != nil { log.Fatalf("Ошибка создания клиента Kubernetes: %v", err) } scheduler := NewScheduler(clientset) stopCh := make(chan struct{}) go scheduler.Run(stopCh) // Блокируем выполнение, чтобы приложение не завершалось select {} }
Пример: магазин корма для собачек с HPC-нагрузками
Представим, что есть приложение для магазина корма для собачек — DogFoodShop. Манифест для pod’а DogFoodShop:
apiVersion: v1 kind: Pod metadata: name: dogfoodshop-hpc labels: app: dogfoodshop hpc: "true" # Обязательно для нашего кастомного планировщика! annotations: dogshop.minCores: "16" # Минимальное требование к количеству ядер spec: containers: - name: dogfoodshop image: dogfoodshop/hpc:latest resources: limits: cpu: "16" memory: "32Gi" requests: cpu: "16" memory: "32Gi"
Для корректной работы scheduler ожидает, что узлы будут иметь аннотации с NUMA‑топологией, например:
apiVersion: v1 kind: Node metadata: name: node-1 annotations: hpc.numa/topology: '[{"socketId":0,"cores":8,"memory":32768},{"socketId":1,"cores":8,"memory":32768}]'
Работали ли вы с чем‑то подобным? Делитесь в комментариях.
Уже сегодня, 20 февраля в 20:00, пройдёт открытый урок, посвящённый CI/CD. Всего за 100 секунд вы увидите, как можно перейти от пустого проекта к работающей CI/CD-платформе. После этого мы разберём каждый этап: создание пайплайнов, настройку тестирования, автоматический деплой, обработку ошибок и масштабирование.
Записаться можно на странице курса «Инфраструктурная платформа на основе Kubernetes».
ссылка на оригинал статьи https://habr.com/ru/articles/884012/
Добавить комментарий