HPC на коленке: кастомный планировщик pod’ов для Kubernetes с NUMA-awareness

от автора

Привет, Хабр!

Сегодня я расскажу о разработке кастомного планировщика pod’ов для HPC‑нагрузок в Kubernetes с учётом NUMA и специфичных требований. Рассмотрим код с примером для магазина корма для собачек и всеми нюансами реализации.

Но для начала… зачем вообще нужен кастомный планировщик для HPC?

Kubernetes в своём стандартном виде отлично справляется с большинством задач. Но когда речь идёт о высокопроизводительных вычислениях (HPC), стандартный scheduler может не выдержать требования:

  • Тщательное распределение ресурсов. HPC‑приложения чувствительны к задержкам, фрагментации памяти и неправильному распределению вычислительных ядер.

  • Учёт NUMA‑топологии. В архитектуре NUMA скорость доступа к памяти зависит от её физического расположения относительно процессора — и тут ошибка может стоить дорого.

  • Специфичные требования к CPU, памяти и даже специализированному оборудованию (GPU, RDMA и т. п.).

Поэтому напишем собственный планировщик, который:

  • Отбирает pod’ы с меткой hpc=true.

  • Анализирует аннотации узлов с описанием NUMA‑топологии.

  • Оценивает узлы по доступным ресурсам и специфическим требованиям pod’ов.

  • Надёжно связывает pod с выбранным узлом через Kubernetes API.

Основные функции нашего кастомного HPC-планировщика

Фильтрация pod’ов по меткам HPC

Не все pod’ы требуют особой обработки. Будем отбирать только те, что действительно настроены для HPC, используя метку hpc=true. Таким образом, стандартный scheduler продолжит работать для остальных задач, а наш планировщик займётся только критичными нагрузками.

Анализ NUMA‑топологии узлов

Каждый узел в кластере снабжён аннотацией, описывающей его NUMA‑топологию, например:

[   {"socketId": 0, "cores": 8, "memory": 32768},   {"socketId": 1, "cores": 8, "memory": 32768} ]

Это позволяет учитывать не только общее количество ядер и памяти, но и распределение этих ресурсов по сокетам, что критично для HPC‑задач.

Оценка узлов

После фильтрации узлов планировщик оценивает их по нескольким параметрам:

  • Доступность требуемых ядер и памяти.

  • Оптимальное распределение по NUMA‑сокетам.

  • Наличие дополнительных ресурсов (например, GPU).

  • Специфичные требования pod’а, передаваемые через аннотации (например, минимальное число ядер для приложения DogFoodShop).

Безопасное связывание pod»ов с узлами

Наконец, когда выбран наилучший узел, планировщик связывает pod с ним с помощью Kubernetes API. Обработка ошибок на каждом этапе гарантирует, что никакие сбои не останутся незамеченными.

Архитектура и реализация

Будем писать на моем любимом Go, ведь это язык, на котором написан сам Kubernetes. Структура проекта довольно проста:

  • Фильтрация pod’ов: отслеживание событий создания pod’ов с помощью информеров.

  • Оценка узлов: считывание аннотаций узлов, разбор NUMA‑топологии и вычисление баллов.

  • Binding: связывание pod’а с выбранным узлом через API Kubernetes.

Объявление пакета и импорт необходимых библиотек

Объявляем пакет main и подключаем все нужные пакеты. Стандартные пакеты (например, context, encoding/json, flag, log, sort и time) понадобятся для работы с контекстами, JSON, аргументами командной строки, логированием, сортировкой и таймерами. Библиотеки из k8s.io — это мост к Kubernetes API, информерам и настройке клиента.

package main  import ( "context" "encoding/json" "flag" "fmt" "log" "sort" "time"  v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/clientcmd" )

Определение структур: NumaInfo и Scheduler

В этой части кода мы объявляем структуру NumaInfo, которая описывает характеристики одного NUMA‑сокета (идентификатор сокета, количество ядер и объём памяти). Также создаём структуру Scheduler — кастомный планировщик, который будет хранить клиента Kubernetes и информер для отслеживания событий pod’ов.

apiVersion: v1 kind: Pod metadata:   name: dogfoodshop-hpc   labels:     app: dogfoodshop     hpc: "true"  # Обязательно для нашего кастомного планировщика!   annotations:     dogshop.minCores: "16"  # Минимальное требование к количеству ядер spec:   containers:     - name: dogfoodshop       image: dogfoodshop/hpc:latest       resources:         limits:           cpu: "16"           memory: "32Gi"         requests:           cpu: "16"           memory: "32Gi"

Конструктор планировщика

Здесь создаём функцию‑конструктор, которая принимает клиент Kubernetes и создаёт фабрику информеров для отслеживания pod’ов. Именно информер будет следить за событиями создания pod’ов и передавать их нашему планировщику.

apiVersion: v1 kind: Node metadata:   name: node-1   annotations:     hpc.numa/topology: '[{"socketId":0,"cores":8,"memory":32768},{"socketId":1,"cores":8,"memory":32768}]'

Запуск планировщика и обработка событий

Функция Run запускает информер и регистрирует обработчик событий. Обработчик onAddPod вызывается при создании нового pod’а. Здесь фильтруем pod’ы по метке hpc=true, чтобы работать только с нужными HPC‑задачами.

// Run запускает планировщик func (s *Scheduler) Run(stopCh <-chan struct{}) { defer func() { if r := recover(); r != nil { log.Printf("Recovered from panic: %v", r) } }() log.Println("Запускаем кастомный HPC-планировщик...") s.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: s.onAddPod, }) s.informer.Run(stopCh) }  // onAddPod вызывается при создании нового pod’а func (s *Scheduler) onAddPod(obj interface{}) { pod, ok := obj.(*v1.Pod) if !ok { return }  // Фильтруем pod’ы, которые не предназначены для HPC if val, ok := pod.Labels["hpc"]; !ok || val != "true" { return }  log.Printf("Найден HPC-pod: %s/%s", pod.Namespace, pod.Name) // Пробуем назначить pod на подходящий узел err := s.schedulePod(context.Background(), pod) if err != nil { log.Printf("Ошибка при планировании pod’а %s/%s: %v", pod.Namespace, pod.Name, err) } }

Выбор узла для pod’а и связывание

Здесь переходим к «умному» выбору узла: функция schedulePod получает список узлов, фильтрует их по готовности и NUMA‑аннотациям, оценивая их баллами, чтобы выбрать лучший кандидат; при этом filterAndScoreNodes для каждого узла проверяет его готовность, считывает NUMA‑информацию и суммирует количество ядер, учитывая специальные требования (например, для DogFoodShop требуется минимум 16 ядер), а функция isNodeReady просто определяет, находится ли узел в рабочем состоянии; далее getNodeNumaInfo декодирует NUMA‑топологию, сохранённую в аннотациях узла, и, наконец, bindPod создаёт объект binding и связывает pod с выбранным узлом через API.

// schedulePod – логика выбора узла и связывания pod’а с ним func (s *Scheduler) schedulePod(ctx context.Context, pod *v1.Pod) error { // Получаем список всех узлов nodes, err := s.clientset.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) if err != nil { return fmt.Errorf("не удалось получить список узлов: %w", err) }  // Фильтруем узлы с учётом NUMA и ресурсных требований candidateNodes := s.filterAndScoreNodes(nodes.Items, pod) if len(candidateNodes) == 0 { return fmt.Errorf("нет подходящих узлов для pod %s/%s", pod.Namespace, pod.Name) }  // Выбираем лучший узел (с максимальным баллом) bestNode := candidateNodes[0].nodeName log.Printf("Выбран узел '%s' для pod %s/%s (балл: %d)", bestNode, pod.Namespace, pod.Name, candidateNodes[0].score)  // Выполняем binding pod’а к выбранному узлу return s.bindPod(ctx, pod, bestNode) }  // nodeScore – структура для хранения оценки узла type nodeScore struct { nodeName string score    int }  // filterAndScoreNodes фильтрует узлы и присваивает им баллы func (s *Scheduler) filterAndScoreNodes(nodes []v1.Node, pod *v1.Pod) []nodeScore { var scores []nodeScore  for _, node := range nodes { // Пропускаем недоступные узлы if !isNodeReady(&node) { continue }  // Получаем NUMA-информацию из аннотаций узла numaInfo, err := getNodeNumaInfo(node) if err != nil { log.Printf("Ошибка получения NUMA-информации с узла %s: %v", node.Name, err) continue }  // Оцениваем узел: суммируем количество ядер всех сокетов totalCores := 0 for _, socket := range numaInfo { totalCores += socket.Cores }  // Здесь можно учитывать дополнительные требования pod’а (например, память, GPU, RDMA) score := totalCores  // Пример: если pod предназначен для DogFoodShop и требует минимум 16 ядер, узлы с меньшим количеством получат штраф if val, ok := pod.Annotations["dogshop.minCores"]; ok && val == "16" && totalCores < 16 { score = 0 }  if score > 0 { scores = append(scores, nodeScore{ nodeName: node.Name, score:    score, }) } }  // Сортируем узлы по убыванию баллов sort.Slice(scores, func(i, j int) bool { return scores[i].score > scores[j].score })  return scores }  // isNodeReady проверяет, находится ли узел в состоянии Ready func isNodeReady(node *v1.Node) bool { for _, condition := range node.Status.Conditions { if condition.Type == v1.NodeReady && condition.Status == v1.ConditionTrue { return true } } return false }  // getNodeNumaInfo декодирует NUMA-информацию из аннотаций узла func getNodeNumaInfo(node v1.Node) ([]NumaInfo, error) { annotation, ok := node.Annotations["hpc.numa/topology"] if !ok { return nil, fmt.Errorf("аннотация hpc.numa/topology не найдена") }  var numaInfo []NumaInfo err := json.Unmarshal([]byte(annotation), &numaInfo) if err != nil { return nil, fmt.Errorf("ошибка декодирования NUMA-информации: %w", err) } return numaInfo, nil }  // bindPod связывает pod с выбранным узлом func (s *Scheduler) bindPod(ctx context.Context, pod *v1.Pod, nodeName string) error { // Создаем объект Binding binding := &v1.Binding{ ObjectMeta: metav1.ObjectMeta{ Namespace: pod.Namespace, Name:      pod.Name, }, Target: v1.ObjectReference{ Kind: "Node", Name: nodeName, }, }  // Выполняем вызов API для binding-а err := s.clientset.CoreV1().Pods(pod.Namespace).Bind(ctx, binding, metav1.CreateOptions{}) if err != nil { return fmt.Errorf("не удалось выполнить binding: %w", err) } log.Printf("Pod %s/%s успешно связан с узлом %s", pod.Namespace, pod.Name, nodeName) return nil }

Функция main

В функции main настраиваем подключение к Kubernetes. Если указан kubeconfig, используем его для локальной разработки, иначе — предполагаем, что код работает внутри кластера (in‑cluster config). Затем создаём клиента, инициализируем планировщик и запускаем его в отдельном горутине. В конце блокируем выполнение, чтобы приложение не завершилось.

func main() { // Читаем параметры командной строки kubeconfig := flag.String("kubeconfig", "", "Путь до kubeconfig файла") flag.Parse()  var config *rest.Config var err error if *kubeconfig != "" { // Локальная разработка: используем kubeconfig config, err = clientcmd.BuildConfigFromFlags("", *kubeconfig) if err != nil { log.Fatalf("Ошибка создания конфигурации из kubeconfig: %v", err) } } else { // На продакшене: предполагаем работу внутри кластера config, err = rest.InClusterConfig() if err != nil { log.Fatalf("Ошибка создания in-cluster конфигурации: %v", err) } }  clientset, err := kubernetes.NewForConfig(config) if err != nil { log.Fatalf("Ошибка создания клиента Kubernetes: %v", err) }  scheduler := NewScheduler(clientset) stopCh := make(chan struct{}) go scheduler.Run(stopCh)  // Блокируем выполнение, чтобы приложение не завершалось select {} }
Полный код в спойлере
package main  import ( "context" "encoding/json" "flag" "fmt" "log" "sort" "time"  v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/clientcmd" )  // NumaInfo описывает характеристики одного NUMA-сокета type NumaInfo struct { SocketID int `json:"socketId"` Cores    int `json:"cores"` Memory   int `json:"memory"` // в мегабайтах }  // Scheduler – структура нашего кастомного планировщика type Scheduler struct { clientset kubernetes.Interface informer  cache.SharedIndexInformer }  // NewScheduler создаёт новый экземпляр Scheduler func NewScheduler(clientset kubernetes.Interface) *Scheduler { // Создаём фабрику информеров для отслеживания pod’ов factory := informers.NewSharedInformerFactory(clientset, time.Minute) podInformer := factory.Core().V1().Pods().Informer()  return &Scheduler{ clientset: clientset, informer:  podInformer, } }  // Run запускает планировщик func (s *Scheduler) Run(stopCh <-chan struct{}) { defer func() { if r := recover(); r != nil { log.Printf("Recovered from panic: %v", r) } }() log.Println("Запускаем кастомный HPC-планировщик...") s.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: s.onAddPod, }) s.informer.Run(stopCh) }  // onAddPod вызывается при создании нового pod’а func (s *Scheduler) onAddPod(obj interface{}) { pod, ok := obj.(*v1.Pod) if !ok { return }  // Фильтруем pod’ы, которые не предназначены для HPC if val, ok := pod.Labels["hpc"]; !ok || val != "true" { return }  log.Printf("Найден HPC-pod: %s/%s", pod.Namespace, pod.Name) // Пробуем назначить pod на подходящий узел err := s.schedulePod(context.Background(), pod) if err != nil { log.Printf("Ошибка при планировании pod’а %s/%s: %v", pod.Namespace, pod.Name, err) } }  // schedulePod – логика выбора узла и связывания pod’а с ним func (s *Scheduler) schedulePod(ctx context.Context, pod *v1.Pod) error { // Получаем список всех узлов nodes, err := s.clientset.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) if err != nil { return fmt.Errorf("не удалось получить список узлов: %w", err) }  // Фильтруем узлы с учётом NUMA и ресурсных требований candidateNodes := s.filterAndScoreNodes(nodes.Items, pod) if len(candidateNodes) == 0 { return fmt.Errorf("нет подходящих узлов для pod %s/%s", pod.Namespace, pod.Name) }  // Выбираем лучший узел (с максимальным баллом) bestNode := candidateNodes[0].nodeName log.Printf("Выбран узел '%s' для pod %s/%s (балл: %d)", bestNode, pod.Namespace, pod.Name, candidateNodes[0].score)  // Выполняем binding pod’а к выбранному узлу return s.bindPod(ctx, pod, bestNode) }  // nodeScore – структура для хранения оценки узла type nodeScore struct { nodeName string score    int }  // filterAndScoreNodes фильтрует узлы и присваивает им баллы func (s *Scheduler) filterAndScoreNodes(nodes []v1.Node, pod *v1.Pod) []nodeScore { var scores []nodeScore  for _, node := range nodes { // Пропускаем недоступные узлы if !isNodeReady(&node) { continue }  // Получаем NUMA-информацию из аннотаций узла numaInfo, err := getNodeNumaInfo(node) if err != nil { log.Printf("Ошибка получения NUMA-информации с узла %s: %v", node.Name, err) continue }  // Оцениваем узел: суммируем количество ядер всех сокетов totalCores := 0 for _, socket := range numaInfo { totalCores += socket.Cores }  // Здесь можно учитывать дополнительные требования pod’а (например, память, GPU, RDMA) score := totalCores  // Пример: если pod предназначен для DogFoodShop и требует минимум 16 ядер, узлы с меньшим количеством получат штраф if val, ok := pod.Annotations["dogshop.minCores"]; ok && val == "16" && totalCores < 16 { score = 0 }  if score > 0 { scores = append(scores, nodeScore{ nodeName: node.Name, score:    score, }) } }  // Сортируем узлы по убыванию баллов sort.Slice(scores, func(i, j int) bool { return scores[i].score > scores[j].score })  return scores }  // isNodeReady проверяет, находится ли узел в состоянии Ready func isNodeReady(node *v1.Node) bool { for _, condition := range node.Status.Conditions { if condition.Type == v1.NodeReady && condition.Status == v1.ConditionTrue { return true } } return false }  // getNodeNumaInfo декодирует NUMA-информацию из аннотаций узла func getNodeNumaInfo(node v1.Node) ([]NumaInfo, error) { annotation, ok := node.Annotations["hpc.numa/topology"] if !ok { return nil, fmt.Errorf("аннотация hpc.numa/topology не найдена") }  var numaInfo []NumaInfo err := json.Unmarshal([]byte(annotation), &numaInfo) if err != nil { return nil, fmt.Errorf("ошибка декодирования NUMA-информации: %w", err) } return numaInfo, nil }  // bindPod связывает pod с выбранным узлом func (s *Scheduler) bindPod(ctx context.Context, pod *v1.Pod, nodeName string) error { // Создаем объект Binding binding := &v1.Binding{ ObjectMeta: metav1.ObjectMeta{ Namespace: pod.Namespace, Name:      pod.Name, }, Target: v1.ObjectReference{ Kind: "Node", Name: nodeName, }, }  // Выполняем вызов API для binding-а err := s.clientset.CoreV1().Pods(pod.Namespace).Bind(ctx, binding, metav1.CreateOptions{}) if err != nil { return fmt.Errorf("не удалось выполнить binding: %w", err) } log.Printf("Pod %s/%s успешно связан с узлом %s", pod.Namespace, pod.Name, nodeName) return nil }  func main() { // Читаем параметры командной строки kubeconfig := flag.String("kubeconfig", "", "Путь до kubeconfig файла") flag.Parse()  var config *rest.Config var err error if *kubeconfig != "" { // Локальная разработка: используем kubeconfig config, err = clientcmd.BuildConfigFromFlags("", *kubeconfig) if err != nil { log.Fatalf("Ошибка создания конфигурации из kubeconfig: %v", err) } } else { // На продакшене: предполагаем работу внутри кластера config, err = rest.InClusterConfig() if err != nil { log.Fatalf("Ошибка создания in-cluster конфигурации: %v", err) } }  clientset, err := kubernetes.NewForConfig(config) if err != nil { log.Fatalf("Ошибка создания клиента Kubernetes: %v", err) }  scheduler := NewScheduler(clientset) stopCh := make(chan struct{}) go scheduler.Run(stopCh)  // Блокируем выполнение, чтобы приложение не завершалось select {} } 

Пример: магазин корма для собачек с HPC-нагрузками

Представим, что есть приложение для магазина корма для собачек — DogFoodShop. Манифест для pod’а DogFoodShop:

apiVersion: v1 kind: Pod metadata:   name: dogfoodshop-hpc   labels:     app: dogfoodshop     hpc: "true"  # Обязательно для нашего кастомного планировщика!   annotations:     dogshop.minCores: "16"  # Минимальное требование к количеству ядер spec:   containers:     - name: dogfoodshop       image: dogfoodshop/hpc:latest       resources:         limits:           cpu: "16"           memory: "32Gi"         requests:           cpu: "16"           memory: "32Gi"

Для корректной работы scheduler ожидает, что узлы будут иметь аннотации с NUMA‑топологией, например:

apiVersion: v1 kind: Node metadata:   name: node-1   annotations:     hpc.numa/topology: '[{"socketId":0,"cores":8,"memory":32768},{"socketId":1,"cores":8,"memory":32768}]' 

Работали ли вы с чем‑то подобным? Делитесь в комментариях.

Уже сегодня, 20 февраля в 20:00, пройдёт открытый урок, посвящённый CI/CD. Всего за 100 секунд вы увидите, как можно перейти от пустого проекта к работающей CI/CD-платформе. После этого мы разберём каждый этап: создание пайплайнов, настройку тестирования, автоматический деплой, обработку ошибок и масштабирование.

Записаться можно на странице курса «Инфраструктурная платформа на основе Kubernetes».


ссылка на оригинал статьи https://habr.com/ru/articles/884012/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *