Заранее оговорюсь, всё что описано в данной статье, касается runtime (децентрализованного) кеша.
Зачем нам такое может понадобиться? По нескольким причинам:
-
У нас высокие требования к скорости работы приложения, дополнительные запросы к централизованному кешу нежелательны, с целью избежать сетевых взаимодействий.
-
Компания не знает как готовить отказоустойчивый кеш (например redis), или просто не хочет/не может затягивать новую технологию, усложнять инфраструктуру.
-
У нас нет четкого представления о том, как инвалидировать кеш на основе TTL (time to live), поскольку бизнес правила не позволяют жить невалидному кешу хоть какое-то время. (будьте осторожны, если у вас высокие требования к синхронизации данных, возникает множество дополнительных проблем в виде проблем с сетью/производительность реплик/etc…)
Представим типичную ситуацию в распределенной среде: имеется N реплик, на каждой из них свой независимый кеш. Например в Repository (я намеренно опустил работу с примитивами синхронизации при обновлении значений кеша, так как цель продемонстрировать суть происходящего):
type db interface { Find(key string) string Update(key string, value string) } type Repository struct { cache map[string]string db db } func NewRepository(db db) Repository { return Repository{db: db, cache: make(map[string]string)} } func (r *Repository) Update(key string, value string) { r.db.Update(key, value) r.updateCacheValue(key, value) } func (r *Repository) updateCacheValue(key string, value string) { r.cache[key] = value } func (r *Repository) UpdateCache(key string) { r.updateCacheValue(key, r.db.Find(key)) } func (r *Repository) Find(key string) string { if val, ok := r.cache[key]; ok { return val } value := r.db.Find(key) r.updateCacheValue(key, value) return value }
Это прекрасно работает, но не в распределенном приложении. Мы сталкиваемся с ситуацией, когда запрос пользователя на обновление, попал на одну случайную реплику и нам нужно уведомить всех об изменениях в данных, так как кеш локальный:
Можем использовать событийно-ориентированный подход и с помощью kafka мы легко решим эту проблему.
Для синхронизации мы можем использовать sync-topic, как это может выглядеть? А примерно так:

Напишем небольшой компонент для работы с топиком:
type KafkaSync struct { reader *kafka.Reader writer *kafka.Writer } func NewKafkaSync(reader *kafka.Reader, writer *kafka.Writer) KafkaSync { return KafkaSync{reader: reader, writer: writer} } func (sync KafkaSync) Sync(ctx context.Context, key string) error { return sync.writer.WriteMessages(ctx, kafka.Message{Value: []byte(key)}) } func (sync KafkaSync) OnSync(ctx context.Context, cb func(key string)) error { err := sync.reader.SetOffsetAt(ctx, time.Now()) if err != nil { return fmt.Errorf("setting offset: %w", err) } for ctx.Err() == nil { message, err := sync.reader.ReadMessage(ctx) if err != nil { return fmt.Errorf("read message: %w", err) } cb(string(message.value)) } return nil }
Суть компонента в том, чтобы генерировать события в момент обновления данных — метод Sync, а также уметь их отлавливать — метод OnSync.
Псевдокод, который отразит принцип синхронизации:
kafkaSync := NewKafkaSync() repo := NewRepository() eg, ctx := errgroup.WithContext(context.Background()) eg.Go(func() error { return kafkaSync.OnSync(ctx, func(key string) error { // Тут мы отловим новое событие синхронизации на всех репликах // и можем обновить кеш на каждой из них repo.UpdateCache(key) }) }) eg.Go(func() error { // Представим что тут сработал HTTP Update эндпойнт, на какой то реплике repo.Update("key1", "val1") return kafka.sync() }) eg.Wait()
При любом изменении хранилища, мы генерируем событие обновления в sync-topic, которое в свою очередь получат остальные, и обновят локальный кеш.
В топике мы можем пересылать:
-
Ключ записи которую нужно обновить, но необходимо идти в хранилище за новым значением.
-
Пустое сообщение. Иногда это нужно, чтобы просто уведомить о факте обновления хранилища, например если нужно обновить множество ключей.
-
Обновленное значение и ключ. Например, когда мы хотим избежать дополнительных походов в хранилище, чтобы не создавать нагрузку. Мы можем просто переслать это значение в топике.
Нужно учитывать важный момент при работе с kafka
Если sync-topic имеет несколько партиций, то распределение в рамках consumer group может сделать данную схему неработоспособной. Решение проблемы заключается в чтении топика без consumer group, тогда каждая реплика будет уведомлена о любом изменении, в любой партиции.
ссылка на оригинал статьи https://habr.com/ru/articles/908876/
Добавить комментарий