Синхронизация кеша в распределенных Go (и не только) приложениях

от автора

Заранее оговорюсь, всё что описано в данной статье, касается runtime (децентрализованного) кеша.

Зачем нам такое может понадобиться? По нескольким причинам:

  • У нас высокие требования к скорости работы приложения, дополнительные запросы к централизованному кешу нежелательны, с целью избежать сетевых взаимодействий.

  • Компания не знает как готовить отказоустойчивый кеш (например redis), или просто не хочет/не может затягивать новую технологию, усложнять инфраструктуру.

  • У нас нет четкого представления о том, как инвалидировать кеш на основе TTL (time to live), поскольку бизнес правила не позволяют жить невалидному кешу хоть какое-то время. (будьте осторожны, если у вас высокие требования к синхронизации данных, возникает множество дополнительных проблем в виде проблем с сетью/производительность реплик/etc…)

Представим типичную ситуацию в распределенной среде: имеется N реплик, на каждой из них свой независимый кеш. Например в Repository (я намеренно опустил работу с примитивами синхронизации при обновлении значений кеша, так как цель продемонстрировать суть происходящего):

type db interface {   Find(key string) string   Update(key string, value string) }  type Repository struct {   cache map[string]string   db    db }  func NewRepository(db db) Repository {   return Repository{db: db, cache: make(map[string]string)} }  func (r *Repository) Update(key string, value string) {   r.db.Update(key, value)   r.updateCacheValue(key, value) }  func (r *Repository) updateCacheValue(key string, value string)  {   r.cache[key] = value }  func (r *Repository) UpdateCache(key string) {   r.updateCacheValue(key, r.db.Find(key)) }  func (r *Repository) Find(key string) string {   if val, ok := r.cache[key]; ok {    return val   }   value := r.db.Find(key)   r.updateCacheValue(key, value)   return value }

Это прекрасно работает, но не в распределенном приложении. Мы сталкиваемся с ситуацией, когда запрос пользователя на обновление, попал на одну случайную реплику и нам нужно уведомить всех об изменениях в данных, так как кеш локальный:

Пользователь операции SELECT может попасть на реплику, где устаревшее значение кеша

Пользователь операции SELECT может попасть на реплику, где устаревшее значение кеша

Можем использовать событийно-ориентированный подход и с помощью kafka мы легко решим эту проблему.
Для синхронизации мы можем использовать sync-topic, как это может выглядеть? А примерно так:

Напишем небольшой компонент для работы с топиком:

type KafkaSync struct {   reader *kafka.Reader   writer *kafka.Writer }  func NewKafkaSync(reader *kafka.Reader, writer *kafka.Writer) KafkaSync {   return KafkaSync{reader: reader, writer: writer} }  func (sync KafkaSync) Sync(ctx context.Context, key string) error {   return sync.writer.WriteMessages(ctx, kafka.Message{Value: []byte(key)}) }  func (sync KafkaSync) OnSync(ctx context.Context, cb func(key string)) error {   err := sync.reader.SetOffsetAt(ctx, time.Now())   if err != nil {    return fmt.Errorf("setting offset: %w", err)   }   for ctx.Err() == nil {     message, err := sync.reader.ReadMessage(ctx)     if err != nil {      return fmt.Errorf("read message: %w", err)     }     cb(string(message.value))   }   return nil }

Суть компонента в том, чтобы генерировать события в момент обновления данных — метод Sync, а также уметь их отлавливать — метод OnSync.

Псевдокод, который отразит принцип синхронизации:

kafkaSync := NewKafkaSync() repo := NewRepository() eg, ctx := errgroup.WithContext(context.Background())  eg.Go(func() error {   return kafkaSync.OnSync(ctx, func(key string) error {      // Тут мы отловим новое событие синхронизации на всех репликах      // и можем обновить кеш на каждой из них      repo.UpdateCache(key)   }) })  eg.Go(func() error {   // Представим что тут сработал HTTP Update эндпойнт, на какой то реплике   repo.Update("key1", "val1")   return kafka.sync() })  eg.Wait()

При любом изменении хранилища, мы генерируем событие обновления в sync-topic, которое в свою очередь получат остальные, и обновят локальный кеш.

В топике мы можем пересылать:

  • Ключ записи которую нужно обновить, но необходимо идти в хранилище за новым значением.

  • Пустое сообщение. Иногда это нужно, чтобы просто уведомить о факте обновления хранилища, например если нужно обновить множество ключей.

  • Обновленное значение и ключ. Например, когда мы хотим избежать дополнительных походов в хранилище, чтобы не создавать нагрузку. Мы можем просто переслать это значение в топике.

Нужно учитывать важный момент при работе с kafka

Если sync-topic имеет несколько партиций, то распределение в рамках consumer group может сделать данную схему неработоспособной. Решение проблемы заключается в чтении топика без consumer group, тогда каждая реплика будет уведомлена о любом изменении, в любой партиции.


ссылка на оригинал статьи https://habr.com/ru/articles/908876/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *