Проблема CRUD-подхода
// Проблемы: // 1. История изменений теряется // 2. Конфликты при конкурентных обновлениях (или использование блокировок) func UpdateOrderStatus(orderID string, status Status) error { return db.Exec("UPDATE orders SET status=? WHERE id=?", status, orderID) }
Решение: CQRS и Event Sourcing
Архитектурное ядро
Ключевые компоненты архитектуры
1. Команда (Command)
Запрос на выполнение действия («Завершить заказ», «Списать средства»)
Характеристики:
-
Может быть отклонена бизнес-правилами
-
Не возвращает данные (только статус выполнения)
Пример:
type CompleteOrderCommand struct { OrderID string UserID string }
2. Агрегат (Aggregate)
Хранитель бизнес-правил, который:
-
Восстанавливает текущее состояние из истории событий
-
Проверяет возможность выполнения команды
-
Генерирует новые события при успешной проверке
Важно: Агрегат не сохраняет состояние, только содержит логику.
Пример:
// LoadFromHistory восстанавливает состояние (оптимизировано с учетом снапшотов) func LoadFromHistory(events []Event, snapshot *Snapshot) *OrderAggregate { agg := &OrderAggregate{ id: events[0].AggregateID(), } if snapshot != nil { agg.applySnapshot(snapshot) } for _, event := range events { if event.Version() > agg.version { agg.applyEvent(event) } } return agg } // Complete обрабатывает команду func (a *OrderAggregate) Complete(cmd CompleteOrderCommand) ([]Event, error) { if a.status != Paid { return nil, fmt.Errorf("order %s must be paid first", a.id) } return []Event{ NewOrderCompletedEvent(a.id, cmd.UserID, a.version+1), }, nil } // applyEvent применяет событие к состоянию func (a *OrderAggregate) applyEvent(event Event) { switch e := event.(type) { case OrderCompletedEvent: a.status = Completed a.version = e.Version() // обработка других типов событий } } // applySnapshot применяет снапшот func (a *OrderAggregate) applySnapshot(s Snapshot) { a.version = s.Version a.status = s.Status // другие поля } // TakeSnapshot создает снапшот func (a *OrderAggregate) TakeSnapshot() Snapshot { return Snapshot{ AggregateID: a.id, Version: a.version, Status: a.status, // другие поля } }
3. Событие (Event)
Неизменяемая запись о произошедшем изменении.
Свойства:
-
Содержит все релевантные данные
-
Сериализуемо и сохраняемо
Пример:
// Факт произошедшего изменения type OrderCompletedEvent struct { OrderID string UserID string CreatedAt time.Time }
4. Хранилище событий (Event Store)
Append-only журнал, который:
-
Гарантирует сохранение событий
-
Позволяет воспроизвести историю для любого агрегата
-
Реализует оптимистичные блокировки через версии
Пример:
func (es *EventStore) Append(aggregateID string, events []Event, expectedVersion int) error { currentVersion := es.GetVersion(aggregateID) if expectedVersion != currentVersion { return ErrConcurrentModification } // Append-only запись for _, event := range events { record := EventRecord{ ID: uuid.New(), AggregateID: aggregateID, Version: currentVersion+1, Type: event.Type(), Data: event.Data(), Timestamp: time.Now(), } es.db.Create(&record) currentVersion++ } return nil }
5. Шина событий (Event Bus)
Асинхронная доставка событий подписчикам через:
-
Внутрипроцессные каналы
-
Внешние брокеры
6. Проектор (Projector)
Трансформирует события в оптимизированные модели чтения.
Особенности:
-
Создает несколько различных представлений
-
Работает асинхронно и независимо
-
Допускает eventual consistency
Пример:
func (p *OrderProjector) HandleEvent(event Event) { switch e := event.(type) { case OrderCompletedEvent: return p.updateOrderStatus(e.OrderID, "completed") // ... другие типы событий } return nil }
7. Обработчик запросов (Query Handler)
Поставщик данных для чтения, работающий с проекциями.
Пример:
func GetOrderSummary(orderID string) (*OrderSummary, error) { var summary OrderSummary err := db.Where("id = ?", orderID).First(&summary).Error return &summary, err }
Минимальный цикл
// 1. Получение команды cmd := CompleteOrderCommand{OrderID: "123", UserID: "u456"} // 2. Загрузка событий events := eventStore.Load("123") // 3. Восстановление агрегата order := LoadOrder(events) // 4. Обработка команды newEvents, err := order.Complete(cmd) // 5. Сохранение событий eventStore.Save("123", newEvents, order.Version) // 6. Публикация событий for _, event := range newEvents { eventBus.Publish(event) } // 7. Запрос данных (где-то в другом месте) summary := GetOrderSummary("123") fmt.Println(summary.Status) // "completed"
Ключевые потоки данных
Командный поток
Клиент → CommandHandler → EventStore → Aggregate → Сохранение → EventBus
Поток запросов
EventBus → Projector → ReadDB ← QueryHandler ← Клиент
Сравнение с традиционным подходом
|
Традиционный CRUD |
CQRS/ES |
|---|---|
|
|
|
|
Текущее состояние в БД |
Состояние = Σ всех событий |
|
Нет истории изменений |
Полный аудит автоматически |
|
Блокировки для консистентности |
Оптимистичные блокировки через версии |
|
Одна модель для чтения/записи |
Раздельные оптимизированные модели |
Ключевые выводы
-
Фундаментальный сдвиг парадигмы
Состояние системы = история всех событий, а не последний snapshot данных.
Преимущество: Полный аудит и возможность «переиграть» историю. -
Жесткое разделение ответственности
Команды (Write): «Сделай что-то» → Генерируют события
Запросы (Read): «Покажи данные» → Читают оптимизированные проекции Результат: Независимое масштабирование операций записи и чтения.
-
Агрегаты — хранители бизнес-логики
Не хранят состояние постоянно, а воссоздают его из событий и принимают решения. -
События как источник истины
Неизменяемые факты
Содержат всю информацию об изменении
Append-only хранилище = гарантия сохранности истории
-
Проекции — гибкие представления
Можно создавать несколько специализированных моделей для разных задач.
ссылка на оригинал статьи https://habr.com/ru/articles/932510/
Добавить комментарий