Кастомный процессор для OpenTelemetry Collector. А почему бы и нет?

от автора

Привет! Я Артём, скромный платформенный инженер: код пишу, метрики смотрю и иногда даже понимаю, что происходит. В работе мне часто приходится сталкиваться с Observability‑инструментами. Одним из таких инструментов, о котором я хотел бы рассказать — OpenTelemetry Collector. Это мощный инструмент, который позволяет работать с различной телеметрией и строить гибкие пайплайны для метрик, логов и трейсов.

Но иногда возможностей стандартного набора компонентов не хватает, чтобы справиться с поставленными задачами без использования костылей и изоленты. Тогда на сцену выходят кастомные компоненты для Otel-Collector.

Содержание

Введение

В стандартном наборе opentelemetry-collector-contrib много готовых процессоров: фильтрация, нормализация, лимит памяти и так далее. Но у нас возникла задача: обогащать спаны данными из внешнего справочника (в нашем случае выбор пал на CSV‑файл).

Например, у нас есть спан с атрибутом SolObjectID.
А в CSV лежит таблица, где этому SolObjectID соответствует поле trace_code, очень нужное нам, по каким-то причинам. Хочется, чтобы в трейсе появлялся этот trace_code без дополнительного изменения кода в сервисах. К сожалению, готовых решений в contrib версии OpenTelemetry Collector не предусмотрено, однако, такой функционал очень даже хочется иметь. Взять и написать свой процессор — кажется хорошей идеей.

В этой статье я расскажу про свой опыт написания кастомного процессора для otel-collector. Он обогащает спаны данными из CSV‑файла: находит совпадение по атрибуту и добавляет дополнительные поля прямо внутрь трейсов. Мы разберём архитектуру процессора, посмотрим код и конфигурацию, а в конце покажу, как собрать и запустить Collector с этим расширением.

Архитектура кастомного процессора

Любой компонент в Collector (receiver, processor, exporter, connector) устроен по одному принципу: у него есть Config, Factory и сама логика (в нашем случае Processor).

1. Config (config.go)

Config описывает, какие параметры мы можем задать в YAML‑конфигурации Collector.
В OpenTelemetry Collector каждый компонент имеет свой конфиг. Collector ожидает, что этот конфиг будет реализовывать интерфейс component.Config.

Collector не знает заранее, какие именно поля есть у нашего конфига. Но он знает, что любой конфиг — это объект, который реализует интерфейс component.Config. Это позволяет Collector работать с любым компонентом одинаково.

Фрагмент Config кода:

type Config struct {CSVPath        string        `mapstructure:"csv_path"`MatchField     string        `mapstructure:"match_field"`EnrichColumns  []string      `mapstructure:"enrich_columns"`ReloadInterval time.Duration `mapstructure:"reload_interval"`}var _ component.Config = (*Config)(nil)

Что здесь важно:

  • CSVPath — путь к CSV‑файлу.

  • MatchField — атрибут спана, по которому ищем совпадения.

  • EnrichColumns — список колонок, которые нужно добавить в спан.

  • ReloadInterval — как часто обновлять данные из CSV без перезапуска Collector.

Пример использования в collector-config.yaml:

processors:  csvenricherprocessor:    csv_path: "/etc/mapping.csv"    match_field: "SolObjectID"    enrich_columns: [ "trace_code", "product_id" ]    reload_interval: "5m"

2. Factory (factory.go)

Factory — это «фабрика» для создания процессора. Collector работает по контракту: каждый компонент обязан предоставить фабрику, чтобы Collector мог его корректно зарегистрировать и собрать в пайплайн.

Фабрика отвечает за три ключевых момента:

  1. Тип компонента — строковый идентификатор, по которому Collector узнаёт наш процессор.

var (      strType = component.MustNewType("csvenricherprocessor")  )
  1. Конфигурация по умолчанию — значения, которые будут использоваться, если параметры не заданы в YAML.

func createDefaultConfig() component.Config {return &Config{MatchField:    "SolObjectID", // дефолтное поле для поискаEnrichColumns: []string{"trace_code", "another_code"}, // дефолтные колонки для обогащения    }}
  1. Создание самого процессора — функция, которая принимает контекст, конфигурацию и nextConsumer (следующий в цепочке компонент), а возвращает рабочий экземпляр процессора.

   func createTracesProcessor(   ctx context.Context,   set processor.Settings,   cfg component.Config,   nextConsumer consumer.Traces,) (processor.Traces, error) {processorCfg, ok := cfg.(*Config)if !ok {        return nil, fmt.Errorf("configuration parsing error")    }    // создаём наш процессор    proc, err := newProcessor(processorCfg, set.Logger)    if err != nil {        return nil, fmt.Errorf("cannot create csvenricher processor: %w", err)    }    // оборачиваем в хелпер    return processorhelper.NewTraces(        ctx,        set,        cfg,        nextConsumer,        proc.processTraces,        processorhelper.WithCapabilities(consumer.Capabilities{MutatesData: true}),        processorhelper.WithStart(proc.start),        processorhelper.WithShutdown(proc.shutdown),    )}
  1. Всё это объединяется в фабрике:

   func NewFactory() processor.Factory {   return processor.NewFactory(   strType,   createDefaultConfig,   processor.WithTraces(createTracesProcessor, stability),)   }

Таким образом, Factory:

  • регистрирует наш процессор в Collector (под уникальным именем csvenricherprocessor);

  • гарантирует, что у процессора всегда есть валидная конфигурация (даже если YAML пустой);

  • описывает, как создать рабочий экземпляр для обработки трейсов;

  • подключает lifecycle-хуки: start, shutdown, processTraces.

Без фабрики Collector просто не сможет «узнать» о существовании процессора и включить его в пайплайн.

3. Processor (processor.go)

Processor — это место, где реализуется бизнес-логика обработки данных.
В отличие от Factory (которая только регистрирует и создаёт компонент), Processor отвечает за всё: загрузку данных, обработку входящих спанов и graceful shutdown.

Разберём ключевые части.

Структура процессора

type csvEnricherProcessor struct {      logger     *zap.Logger      config     *Config      csvData    []map[string]string  // мапа с данными из таблицы    matchIndex map[string]int // индекс для поиска по значению `MatchField`     mu         sync.RWMutex        ticker   *time.Ticker      stopChan chan struct{}  }

Что тут есть:

  • logger — для логирования (стандартная практика в Collector).

  • config — ссылка на Config, чтобы знать, где искать CSV, какое поле использовать и т.д.

  • csvData и matchIndex — подготовленные данные из CSV для быстрого поиска.

  • musync.RWMutex для потокобезопасного доступа к данным (Collector обрабатывает данные конкурентно).

  • ticker и stopChan — управление циклом периодической перезагрузки CSV.

Жизненный цикл

Processor должен уметь запускаться и корректно завершаться:

start() — поднимает бэкграунд-процесс для регулярной перезагрузки CSV.

func (p *csvEnricherProcessor) start(ctx context.Context, host component.Host) error {p.logger.Info("Starting CSV Enricher Processor")// Если интервал <= 0, просто загружаем один раз и выходимif p.config.ReloadInterval <= 0 {...return nil}p.logger.Info("Starting background CSV reload loop",zap.Duration("interval", p.config.ReloadInterval))go func() {ticker := time.NewTicker(p.config.ReloadInterval)defer ticker.Stop()for {select {case <-ticker.C:p.logger.Info("Reloading CSV data")if err := p.loadCSVData(); err != nil {p.logger.Warn("Failed to reload CSV data", zap.Error(err))}case <-ctx.Done():p.logger.Info("CSV reload context done, stopping reload loop")return}}}()return nil}

shutdown() — останавливает тикеры и чистит ресурсы.

func (p *csvEnricherProcessor) shutdown(ctx context.Context) error {      p.logger.Info("Shutting Down CSV Enricher Processor")        if p.ticker != nil {         p.ticker.Stop()      }      close(p.stopChan)        return nil  }

loadCSVData() — загрузка данных из csv в мапу

func (p *csvEnricherProcessor) loadCSVData() error {file, err := os.Open(p.config.CSVPath)...records, err := reader.ReadAll()...// headers + построение индекса по MatchField }

Тут алгоритм такой:

  • Открываем файл.

  • Читаем все строки.

  • Берём заголовок (header).

  • Строим map[matchValue] → индекс строки для быстрого поиска.

Обработка трейсов

Collector передаёт процессору батч трейсов через функцию processTraces.

func (p *csvEnricherProcessor) processTraces(ctx context.Context, td ptrace.Traces) (ptrace.Traces, error) {      p.mu.RLock()      defer p.mu.RUnlock()        resourceSpans := td.ResourceSpans()      for i := 0; i < resourceSpans.Len(); i++ {         resourceSpan := resourceSpans.At(i)           // enrichment на уровне resource         resourceAttrs := resourceSpan.Resource().Attributes()         p.enrichResource(resourceAttrs)      }      return td, nil  }

В нашем случае мы решили обогащать атрибуты ресурса (Resource.Attributes), а не отдельные спаны.

Почему так:

  • ресурс привязан к сервису/экземпляру, и логично обогащать его.

  • меньше дублирования — обогащение на уровне Resource применяется ко всем спанам внутри.

Логика enrichment

func (p *csvEnricherProcessor) enrichResource(resourceAttrs pcommon.Map) {matchValue, exists := resourceAttrs.Get(p.config.MatchField)if !exists || matchValue.Type() != pcommon.ValueTypeStr {      return  }...recordIdx, found := p.matchIndex[matchValue.Str()]...for _, column := range p.config.EnrichColumns {if value, ok := record[column]; ok {resourceAttrs.PutStr(column, value)}}}

Если в ресурсных атрибутах есть поле MatchField, мы ищем его в CSV и добавляем все указанные колонки как новые атрибуты.

Итого, Processor:

  • управляет данными (CSV → память → быстрый поиск);

  • добавляет атрибуты к ресурсам;

  • живёт по жизненному циклу (start/shutdown);

  • потокобезопасен (sync.RWMutex);

  • умеет обновлять данные «на лету» без рестарта Collector.

Как встроить процессор в Collector

Чтобы кастомный процессор оказался в финальном бинарнике Collector, нужно пересобрать его с помощью otelcol‑builder. Подробно про сборку бинарника можно прочитать в официальной документации

Мы же готовим Docker Image, поэтому опишу, как собрать кастомный образ.

Шаг 1. Настройка builder-config.yaml

В файле builder-config.yaml в конец блока processors добавить ссылку на репозиторий с кодом кастомного процессора, версия релиза обязательна.

processors:  - gomod: go.opentelemetry.io/collector/processor/batchprocessor v0.128.0  - gomod: go.opentelemetry.io/collector/processor/memorylimiterprocessor v0.128.0  ...  # наш процессор  - gomod: github.com/hiphopzeliboba/csvenricherprocessorr v0.3.0

При разработке процессора или его тестировании, можно использовать replaces и подключить локальный пакет:

   replaces:     - github.com/hiphopzeliboba/csvenricherprocessor => /path/to/local/csvenricherprocessor

В файле builder-config.yaml указать корректный name и output_path.
output_path должен соответствовать пути ENTRYPOINT ["/otelcol-contrib"] в Dockerfile

   dist:     module: github.com/open-telemetry/opentelemetry-collector-contrib/cmd/otelcontribcol     name: otelcol-contrib     decription: Local OpenTelemetry Collector Contrib binary, testing only.     version: 0.128.0-dev     output_path: ./otelcol-contrib

Шаг 2. Сборка Docker Image

Пример готового Doсkerfile можно найти тут

    docker build -t opentelemetry-collector-contrib-custom:1.128.4 .

Шаг 3. Запускаем Collector (локально, для тестов)

Тут мы пробрасываем необходимый collector-config.yaml и монтируем enrich.csv внутрь контейнера

  docker run -it --rm \  -v $(pwd)/enrich.csv:/data/enrich.csv \  -v $(pwd)/collector-config.yaml:/otelcol/collector-config.yaml \  -p 4317:4317 -p 4318:4318 -p 8888:8888 \  --name otelcol-custom opentelemetry-collector-contrib-custom:0.128.4

На этом этапе всё должно заработать 🙂

Тестирование

Чтобы проверить работу процессора, я написал маленькую утилиту test_tracer на Go. Она отправляет тестовые спаны с нужными атрибутами, чтобы убедиться, что обогащение работает. (Работает на стандартных портах Collector)

cd test_tracer/go mod tidygo run tracer.go

Итог

Мы реализовали кастомный процессор для OpenTelemetry Collector, который:

  • читает данные из CSV;

  • сопоставляет их по ключу с атрибутами спана;

  • добавляет новые поля в трассировки;

  • обновляет справочник без перезапуска Collector.

Благодаря модульной архитектуре Collector, сделать такой процессор оказалось не так сложно — главное понять три кита: Config, Factory и Processor.

Кастомные компоненты — это удобный способ расширить возможности OpenTelemetry Collector под свои бизнес‑нужды.
Если стандартных компонентов не хватает, можно писать свои: достаточно один раз разобраться в структуре, и дальше получится быстро собирать собственные «кирпичики» для Observability.

Надеюсь, эта статья будет полезна и поможет вам сэкономить кучу времени, экспериментов и поисков по коду contrib‑репозитория :-).

Репозиторий c проектом

Полный код проекта доступен на GitHub:
hiphopzeliboba/otel-collector-contrib-custom-processor

ссылка на оригинал статьи https://habr.com/ru/articles/947724/