А давайте в Go сделаем TryLock(context.Context)

от автора

Привет!

Вэтой статье я хотел бы расcказать, как можно было бы сделать свой RWMutex, но с возможностью по таймауту или по срабатыванию контекста пропустить блокировку. То есть реализовать TryLock(context.Context) и RTryLock(context.Context), но уже для своего Mutex.

image

На картинке изображено, как нужно наливать воду в очень узкое горлышко.

Для начала следует уточнить, что для 99% задач такие методы вообще не нужны. Нужны они становятся тогда, когда блокируемый ресурс может очень долго не отпускаться. Хочу отметить, что если блокируемый ресурс долгое время остаётся занятым, стоит в начале попробовать оптимизировать логику таким образом, что бы минимизировать время блокировки.

Подробнее об этом написано в посте Танцы с мьютексами в Go в примере 2.

Но если всё же нам приходится иметь долгое удержание одним потоком ресурсов, то как мне кажется без TryLock будет сложно обойтись.

Когда мы работаем так, что у нас нет параллельных потоков, мы будем просто менять переменную методами из пакета atomic, записывая факт блокировки или разблокировки мьютекса. Но вот если наш запрос пришел в заблокированнй мьютекс, то мы пойдём по длинному сценарию. В этом случае нам нужно будет разблокировать ожидающие потоки по завершении блокировки, и для этого, можно воспользоваться каналом. А точнее свойством канала, что если мы его закрываем, то все читающие из него потоки разблокируются.

Создадим Mutex:

// RWTMutex - Read Write and Try Mutex type RWTMutex struct {     state int32     mx    sync.Mutex     ch    chan struct{} }

state — состояние mutex, мы будем работать с ним через atomic.AddInt32, atomic.LoadInt32 и atomic.CompareAndSwapInt32

ch — канал, который будет разблокировать потоки.

mx — мьютекс, который используется для того, что бы разблокировка потоков и создания каналов не происходили параллельно.

А теперь можно перейти к реализации:

// TryLock - try locks mutex with context func (m *RWTMutex) TryLock(ctx context.Context) bool {     if atomic.CompareAndSwapInt32(&m.state, 0, -1) {         return true     }      // Slow way     return m.lockST(ctx) } // RTryLock - try read locks mutex with context func (m *RWTMutex) RTryLock(ctx context.Context) bool {     k := atomic.LoadInt32(&m.state)     if k >= 0 && atomic.CompareAndSwapInt32(&m.state, k, k+1) {         return true     }      // Slow way     return m.rlockST(ctx) }

Как можно увидеть, если Мьютекс не залочен, то его можно просто заблокировать, но вот если нет, то мы перейдём к более сложной схеме.

В начале получим канал, и переходим в бесконечный цикл, если получилось залочить, выходим с успехом, а если нет, то мы начинаем ждать одного из 2х событий, или что канал разблокируется, или что разблокирует поток ctx.Done():

func (m *RWTMutex) chGet() chan struct{} {     m.mx.Lock()     if m.ch == nil {         m.ch = make(chan struct{}, 1)     }     r := m.ch     m.mx.Unlock()     return r }  func (m *RWTMutex) lockST(ctx context.Context) bool {     ch := m.chGet()     for {         if atomic.CompareAndSwapInt32(&m.state, 0, -1) {             return true         }         if ctx == nil {             return false         }         select {         case <-ch:             ch = m.chGet()         case <-ctx.Done():             return false         }     } }  func (m *RWTMutex) rlockST(ctx context.Context) bool {     ch := m.chGet()     var k int32     for {         k = atomic.LoadInt32(&m.state)         if k >= 0 && atomic.CompareAndSwapInt32(&m.state, k, k+1) {             return true         }         if ctx == nil {             return false         }         select {         case <-ch:             ch = m.chGet()         case <-ctx.Done():             return false         }     } }

Давайте раблокируем мьютекс.

Нам потребуется поменять состояние и при необходимости разблокировать канал.

Как я уже писал выше, если канал закрыть, то case <-ch пропустит поток выполнения дальше.

func (m *RWTMutex) chClose() {     if m.ch == nil {         return     }      var o chan struct{}     m.mx.Lock()     if m.ch != nil {         o = m.ch         m.ch = nil     }     m.mx.Unlock()     if o != nil {         close(o)     } } // Unlock - unlocks mutex func (m *RWTMutex) Unlock() {     if atomic.CompareAndSwapInt32(&m.state, -1, 0) {         m.chClose()         return     }      panic("RWTMutex: Unlock fail") } // RUnlock - unlocks mutex func (m *RWTMutex) RUnlock() {     i := atomic.AddInt32(&m.state, -1)     if i > 0 {         return     } else if i == 0 {         m.chClose()         return     }      panic("RWTMutex: RUnlock fail") }

Собственно мьютекс готов, нужно к нему написать пару тестов и стандартных методов типа Lock() и RLock()

Бенчмарки на моей машине показали вот такие скорости

Описанные выше методы BenchmarkRWTMutexTryLockUnlock-8        92154297                12.8 ns/op             0 B/op          0 allocs/op BenchmarkRWTMutexTryRLockRUnlock-8      64337136                18.4 ns/op             0 B/op          0 allocs/op  Стандартный RWMutex BenchmarkRWMutexLockUnlock-8            44187962                25.8 ns/op             0 B/op          0 allocs/op BenchmarkRWMutexRLockRUnlock-8          94655520                12.6 ns/op             0 B/op          0 allocs/op  Стандартный Mutex BenchmarkMutexLockUnlock-8              94345815                12.7 ns/op             0 B/op          0 allocs/op

То есть скорость работы сопоставима с обычным RWMutex и Mutex.

Код на github

ссылка на оригинал статьи https://habr.com/ru/post/527412/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *