Практический CQRS и Event Sourcing на Go

от автора

Проблема CRUD-подхода

// Проблемы: // 1. История изменений теряется // 2. Конфликты при конкурентных обновлениях (или использование блокировок) func UpdateOrderStatus(orderID string, status Status) error {     return db.Exec("UPDATE orders SET status=? WHERE id=?", status, orderID) }

Решение: CQRS и Event Sourcing

Архитектурное ядро

Архитектура

Архитектура

Ключевые компоненты архитектуры

1. Команда (Command)

Запрос на выполнение действия («Завершить заказ», «Списать средства»)

Компонент на диаграмме

Компонент на диаграмме

Характеристики:

  • Может быть отклонена бизнес-правилами

  • Не возвращает данные (только статус выполнения)

Пример:

type CompleteOrderCommand struct {     OrderID string     UserID  string } 

2. Агрегат (Aggregate)

Компонент на диаграмме

Компонент на диаграмме

Хранитель бизнес-правил, который:

  • Восстанавливает текущее состояние из истории событий

  • Проверяет возможность выполнения команды

  • Генерирует новые события при успешной проверке

Важно: Агрегат не сохраняет состояние, только содержит логику.

Пример:

// LoadFromHistory восстанавливает состояние (оптимизировано с учетом снапшотов) func LoadFromHistory(events []Event, snapshot *Snapshot) *OrderAggregate {     agg := &OrderAggregate{         id: events[0].AggregateID(),     }          if snapshot != nil {         agg.applySnapshot(snapshot)     }          for _, event := range events {         if event.Version() > agg.version {             agg.applyEvent(event)         }     }          return agg }  // Complete обрабатывает команду func (a *OrderAggregate) Complete(cmd CompleteOrderCommand) ([]Event, error) {     if a.status != Paid {         return nil, fmt.Errorf("order %s must be paid first", a.id)     }      return []Event{         NewOrderCompletedEvent(a.id, cmd.UserID, a.version+1),     }, nil }  // applyEvent применяет событие к состоянию func (a *OrderAggregate) applyEvent(event Event) {     switch e := event.(type) {     case OrderCompletedEvent:         a.status = Completed         a.version = e.Version()     // обработка других типов событий     } }  // applySnapshot применяет снапшот func (a *OrderAggregate) applySnapshot(s Snapshot) {     a.version = s.Version     a.status = s.Status     // другие поля }  // TakeSnapshot создает снапшот func (a *OrderAggregate) TakeSnapshot() Snapshot {     return Snapshot{         AggregateID: a.id,         Version:    a.version,         Status:     a.status,         // другие поля     } }

3. Событие (Event)

Неизменяемая запись о произошедшем изменении.

Компонент на диаграмме

Компонент на диаграмме

Свойства:

  • Содержит все релевантные данные

  • Сериализуемо и сохраняемо

Пример:

 // Факт произошедшего изменения  type OrderCompletedEvent struct {    OrderID      string    UserID       string    CreatedAt    time.Time  }

4. Хранилище событий (Event Store)

Компонент на диаграмме

Компонент на диаграмме

Append-only журнал, который:

  • Гарантирует сохранение событий

  • Позволяет воспроизвести историю для любого агрегата

  • Реализует оптимистичные блокировки через версии

Пример:

func (es *EventStore) Append(aggregateID string, events []Event, expectedVersion int) error {     currentVersion := es.GetVersion(aggregateID)     if expectedVersion != currentVersion {         return ErrConcurrentModification     }          // Append-only запись     for _, event := range events {         record := EventRecord{             ID:         uuid.New(),             AggregateID: aggregateID,             Version:    currentVersion+1,             Type:       event.Type(),             Data:       event.Data(),             Timestamp:  time.Now(),         }         es.db.Create(&record)         currentVersion++     }          return nil }

5. Шина событий (Event Bus)

Компонент на диаграмме

Компонент на диаграмме

Асинхронная доставка событий подписчикам через:

  • Внутрипроцессные каналы

  • Внешние брокеры

6. Проектор (Projector)

Трансформирует события в оптимизированные модели чтения.

Компонент на диаграмме

Компонент на диаграмме

Особенности:

  • Создает несколько различных представлений

  • Работает асинхронно и независимо

  • Допускает eventual consistency

Пример:

func (p *OrderProjector) HandleEvent(event Event) {         switch e := event.(type) {         case OrderCompletedEvent:             return p.updateOrderStatus(e.OrderID, "completed")         // ... другие типы событий         }         return nil }

7. Обработчик запросов (Query Handler)

Поставщик данных для чтения, работающий с проекциями.

Компонент на диаграмме

Компонент на диаграмме

Пример:

func GetOrderSummary(orderID string) (*OrderSummary, error) {     var summary OrderSummary     err := db.Where("id = ?", orderID).First(&summary).Error     return &summary, err }

Минимальный цикл

// 1. Получение команды cmd := CompleteOrderCommand{OrderID: "123", UserID: "u456"}  // 2. Загрузка событий events := eventStore.Load("123")  // 3. Восстановление агрегата order := LoadOrder(events)  // 4. Обработка команды newEvents, err := order.Complete(cmd)  // 5. Сохранение событий eventStore.Save("123", newEvents, order.Version)  // 6. Публикация событий for _, event := range newEvents {     eventBus.Publish(event) }  // 7. Запрос данных (где-то в другом месте) summary := GetOrderSummary("123") fmt.Println(summary.Status) // "completed" 

Ключевые потоки данных

Командный поток

Клиент → CommandHandler → EventStore → Aggregate → Сохранение → EventBus

Поток запросов

EventBus → Projector → ReadDB ← QueryHandler ← Клиент

Сравнение с традиционным подходом

Традиционный CRUD

CQRS/ES

UPDATE orders SET status=?

Append OrderCompleted event

Текущее состояние в БД

Состояние = Σ всех событий

Нет истории изменений

Полный аудит автоматически

Блокировки для консистентности

Оптимистичные блокировки через версии

Одна модель для чтения/записи

Раздельные оптимизированные модели

Ключевые выводы

  1. Фундаментальный сдвиг парадигмы
    Состояние системы = история всех событий, а не последний snapshot данных.
    Преимущество: Полный аудит и возможность «переиграть» историю.

  2. Жесткое разделение ответственности

    Команды (Write): «Сделай что-то» → Генерируют события

    Запросы (Read): «Покажи данные» → Читают оптимизированные проекции Результат: Независимое масштабирование операций записи и чтения.

  3. Агрегаты — хранители бизнес-логики
    Не хранят состояние постоянно, а воссоздают его из событий и принимают решения.

  4. События как источник истины

    Неизменяемые факты

    Содержат всю информацию об изменении

    Append-only хранилище = гарантия сохранности истории

  5. Проекции — гибкие представления
    Можно создавать несколько специализированных моделей для разных задач.


ссылка на оригинал статьи https://habr.com/ru/articles/932510/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *