Как правило, в Go для безопасного доступа к общим данным используются мьютексы. Да, каналы тоже можно приспособить для изменения общих данных, так как они потокобезопасны, но это усложняет и замедляет логику.
Но в этой статье мы поговорим о другом. Современные процессоры имеют поддержку атомарных операций, что позволяет на основе них организовывать работу с общими данными до нескольких раз быстрее, чем с помощью общепринятых вариантов. Так как мьютексы реализованы на основе ОС, каналы сделаны на основе внутреннего кода Go с использованием тех же мьютексов из ОС под капотом, а атомарные операции делает сам процессор аппаратно за существенно меньшее количество тактов.
Пример кода, который при многопоточности неожиданно начинает глючить
Начнём с основ. Вначале будет код, который параллельно к переменной прибавляет и убавляет по миллиону единиц. В типе A лежит счётчик, тип B добавляет 1 (метод B.II), тип С убавляет 1 (метод C.DD). И всё это параллельно. Казалось бы, тут не нужны никакие примитивы синхронизации, так как общее число прибавлений и убавлений равно миллиону, а поэтому результат будет равен нулю.
package main import ( "fmt" "sync" ) type A struct { i int } func (a *A) Inc(delta int) { a.i += delta } func (a *A) Get() int { return a.i } type B struct { } func (b *B) II(c int) { for i := 0; i < c; i++ { a.Inc(1) } wg.Done() } type C struct { } func (c *C) DD(count int) { for i := 0; i < count; i++ { a.Inc(-1) } wg.Done() } var a A var b B var c C var wg sync.WaitGroup func main() { var nCount = 1000000 a.i = 0 wg.Add(1) go b.II(nCount) wg.Add(1) go c.DD(nCount) wg.Wait() fmt.Println("Sum", a.Get()) }
Однако если мы запустим этот код, то получим что угодно, но только не ноль. Пробуем!
Объяснить это можно только тем, что процессы параллельно лезут к одним и тем же ячейкам памяти, их действия накладываются друг на друга (хотя чему тут накладываться? инкремент/декремент — одна из самых примитивных операций), но компилятор не догадывается их использовать, а использует несколько регистров, вступают в действие кеши L1, L2 и L3, и на выходе вместо нуля белиберда.
Классика: мьютексы решают вопрос
Хорошо, теперь перепишем этот код классически, на базе мьютексов:
package main import ( "fmt" "sync" ) type A struct { i int m sync.Mutex } func (a *A) Inc(delta int) { a.m.Lock() defer a.m.Unlock() a.i += delta } func (a *A) Get() int { a.m.Lock() defer a.m.Unlock() return a.i } type B struct { } func (b *B) II(c int) { for i := 0; i < c; i++ { a.Inc(1) } wg.Done() } type C struct { } func (c *C) DD(count int) { for i := 0; i < count; i++ { a.Inc(-1) } wg.Done() } var a A var b B var c C var wg sync.WaitGroup func main() { var nCount = 1000000 a.i = 0 wg.Add(1) go b.II(nCount) wg.Add(1) go c.DD(nCount) wg.Wait() fmt.Println("Sum", a.Get()) }
Мьютекс ожидаемо делает своё дело, и на выходе мы получаем ноль. Проверить самому. Кстати, мьютекс можно разместить в любом объекте, не только в защищаемом, и он сделает своё благородное дело. Только из соображений понятности и поддержки кода мьютексы обычно размещают в защищаемых объектах.
На сцену выходят примы: атомарные операции
Теперь же решим эту задачу неблокирующим, быстрым и красивым способом. Атомарным.
package main import ( "fmt" "sync" "sync/atomic" ) type A struct { i int64 } func (a *A) Inc(delta int64) { atomic.AddInt64(&a.i, delta) } func (a *A) Get() int64 { return atomic.LoadInt64(&a.i) } type B struct { } func (b *B) II(c int) { for i := 0; i < c; i++ { a.Inc(1) } wg.Done() } type C struct { } func (c *C) DD(count int) { for i := 0; i < count; i++ { a.Inc(-1) } wg.Done() } var a A var b B var c C var wg sync.WaitGroup func main() { var nCount = 1000000 a.i = 0 wg.Add(1) go b.II(nCount) wg.Add(1) go c.DD(nCount) wg.Wait() fmt.Println("Sum", a.Get()) }
Во дела! Работает! Мы тоже получили на выходе ноль после параллельной работы увеличивающей и уменьшающей процедуры.
Достаточно было только использовать специфическую атомарную функцию atomic.AddInt64. Атомарное сложение — очень полезная функция, так как неблокирующим способом мы можем корректно складывать некие результаты из разных потоков практически без затрат времени на синхронизацию. Также я использовал atomic.LoadInt64, но в данном примере она необязательна.
Специфика атомарных операций в том, что они работают только с узким набором данных — это 32-х, 64-х битные данные и указатели.
▍ Сравнение скорости мьютексов и атомарных операций
Для того чтобы сравнить скорость действия атомарных операций и мьютексов я увеличил число операций до 100 миллионов.
Итак, атомарные операции: 2.12 секунды, мьютексы 5.85 секунды.
Как и следовало ожидать, атомарные операции существенно быстрее, примерно в 3 раза.
Частый случай использования атомарных операций, когда несколько горутин что-то делают с общим аккумулятором. Например, суммы убытков и прибылей из разных источников складываются и отображаются онлайн. А вот суммирование массива несколькими горутинами через атомарное сложение не ускоришь. Так как оно существенно дороже по процессорному времени, чем операция обычного сложения из-за сбросов кешей процессора.
С точки зрения процессора (и всех его многочисленных ядер) есть мир до атомарной операции и после. На основе этого принципа из атомарных операций можно делать самопальные конструкции для синхронизации многопоточной работы. И довольно быстрые.
Самодельный мьютекс на основе CompareAndSwap
Для того, чтобы создавать самодельные мьютексы, можно использовать специальную атомарную фунцию
CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool)
С ней всё ещё проще. Сделаем самодельный мьютекс.
type MyMutex struct { i int64 } func (m *MyMutex) get() int64 { return atomic.LoadInt64(&m.i) } func (m *MyMutex) set(val int64) { atomic.StoreInt64(&m.i, val) } func (m *MyMutex) Lock() { for atomic.CompareAndSwapInt64(&m.i, 0, 1) != true { } return } func (m *MyMutex) Unlock() { if m.get() == 0 { panic(fmt.Errorf("Double unlocking")) } m.set(0) }
Этот подход известен в системном программировании как Compare and swap. Также для этой цели можно использовать подход Test and set, но пакет sync/atomic не предоставляет функций для этого подхода. «Compare and swap» в википедии считают более хорошим подходом. Дополнительная информация. При тестировании этот самодельный мьютекс показывает на 40% худшие результаты, чем оригинальный. Из-за нескольких причин:
- Есть цикл (ниже), на который тратится много процессорного времени.
for atomic.CompareAndSwapInt64(&m.i, 0, 1) != true { }
Если бы могли в него вставить, что-то типа ассемблерных инструкций NO_OPERATION или какой-то микрозадержки, то стал бы лучше работать. Стандартная функция time.Sleep() плохо работает для этой цели. При выставлении 1 наносекунды там может быть в тысячи раз большая реальная задержка.
- Планировщик ОС, при использовании системного мьютекса, выделяет поменьше квантов времени тому потоку, который просто ждёт захвата мьютекса.
- Библиотечный мьютекс, наверняка, написан на более низком уровне.
- Обвязка для самодельного мьютекса, скорее всего, менее эффективно компилируется в ассемблерные инструкции.
Безопасная многопоточная работа с общими данными без каналов и мьютексов
Хорошо, а если нам нужны какие-то более сложные операции и структуры данных? Можно ли тут получить пользу от атомарных операций?
Да, можно! Перейдём к самому интересному. Создадим на основе атомарных операций самодельную потокобезопасную конструкцию, которая будет контролировать состояние конечного автомата. И сделаем это по другому, чем ранее в статье.
▍ Конечный автомат, обслуживающий несколько потоков на атомарных операциях
Некий автомат имеет 3 состояния: открыт, занят уменьшением, занят увеличением. Это условно. Состояний может быть больше. Они могут быть называться как угодно. Суть в том, что есть общие данные любого вида и мы организуем к ним бесконфликтный доступ разных методов в зависимости от состояния объекта. Без использования мьютексов и каналов.
Например, потому что мы знаем, что атомарные операции могут быть сильно быстрее, или просто любим ненормальное программирование.
Итак, представляю код. В нём (для демонстрации потенциальных вариантов реализации) я обошёлся без функции CompareAndSwap. Хотя с ней было бы чуть проще.
package main import ( "fmt" "sync" "sync/atomic" ) const ( Open int64 = iota Increasing Decreasing ) type A struct { state int64 i int64 } func (a *A) SetState(state int64) { atomic.StoreInt64(&a.state, state) } func (a *A) GetState() int64 { return atomic.LoadInt64(&a.state) } func (a *A) AddState(delta int64) int64 { return atomic.AddInt64(&a.state, delta) } // По хорошему тут должно быть 2 процедуры: для состояния Increasing и для Decreasing, // но не хотелось плодить почти одинаковый код func (a *A) Inc(delta int64) { if delta > 0 { a.SetState(Increasing) } else { a.SetState(Decreasing) } a.i += delta a.SetState(Open) } func (a *A) Init() { a.SetState(Open) } func (a *A) Get() int64 { return a.i } type B struct { } func (b *B) II(c int) { for i := 0; i < c; i++ { i_begin: for a.GetState() != Open { } if a.AddState(Increasing) == Increasing { a.Inc(1) } else { goto i_begin } } wg.Done() } type C struct { } func (c *C) DD(count int) { for i := 0; i < count; i++ { d_begin: for a.GetState() != Open { } if a.AddState(Decreasing) == Decreasing { a.Inc(-1) } else { goto d_begin } } wg.Done() } var a A var b B var c C var wg sync.WaitGroup func main() { a.Init() var nCount = 1000000 wg.Add(1) go b.II(nCount) wg.Add(1) go c.DD(nCount) wg.Wait() fmt.Println("Sum", a.Get()) }
Запустить в Go Playground. Если проблем с многопоточностью нет, то код должен вывести ноль как результат. И выводит.
Обратите внимание, что в этом коде операция инкремента теперь обычная, а не атомарная, но тем не менее всё корректно считается.
Логика: когда объект A объявляет о том, что он открыт, то объекты B и C пытаются переключить его состояние в Increasing или Decreasing с помощью добавления к нулю (это состояние Open) значений Increasing или Decreasing. Тот, кто первый добавил, получает, например, Increasing. Второй уже получит значение Increasing + Decreasing, которое не даст ему начать делать свою работу и переведёт в состояние ожидания, пока состояние не станет Open. Мы можем использовать сколько угодно состояний, главное, чтобы сумма номеров любых остальных не равнялась никакому номеру другого состояния.
Вот кусочек с главным хаком, который делает нашу работу потокобезопасной:
for a.GetState() != Open { } if a.AddState(Decreasing) == Decreasing { a.Inc(-1) }
Например, можно использовать степени двойки: 1, 2, 4, 8, 16, 32. Сумма любого количества состояний не будет равна никакому другому номеру состояния. Каждая степень двойки представлена бинарно отдельным битом, который не пересекается с другими битами. Каждое состояние будет представлено машинным словом со всего одной битовой единицей, только в разном месте. И поэтому при сложении состояний мы будем получать результат с несколькими единицами.
Наш переключатель реагирует только на чистые состояния, в которых только по одному включённому биту. И поэтому каждая горутина легко отличает ситуации, когда ей удалось занять очередь на исполнение, и начинает исполнять свою работу.
Для 100 миллионов циклов этот код выполнялся 14 секунд, аналогичный код с тремя состояними на мьютексах выполнялся 25.3 секунды. Код на мьютексах немного сложнее, чем мог бы быть, но только для того, чтобы имитировать объект с 3 состояниями и с минимально блокирующим доступом к нему. Наверное, более правильно его переписать на каналах, но тогда он будет выполняться ещё дольше, так как под капотом каналов всё те же мьютексы.
Сразу скажу, что в Go Playground не получится потестить мои скрипты со 100М выполнений, и функция времени там работает некорректно. Для того, чтобы воспроизвести мои эксперименты вам придётся компилировать самому.
Я не призываю переписывать все мьютексы на атомарных операциях, то что я рассказал, это скорее proof-of-concept. Но при этом нужно знать об атомарных операциях и почаще их использовать — инструмент великолепный и быстрый!
Вывод
Атомарные функции в Go дают возможность организовать как блокирующие, так и в особенности неблокирующие алгоритмы с механизмами синхронизации, которые иногда работают в 2-3 раза быстрее, чем мьютексы и каналы. В некоторых ситуациях низкоуровневого программирования это может дать существенную пользу, хотя и требует экстремальной внимательности во время разработки.
Во всех остальных ситуациях разработчики Go рекомендуют использовать каналы и мьютексы, как более понятные и наглядные в применении.
Нативные мьютексы от ОС имеют более тесную связь с планировшиком задач, поэтому нет смысла переписывать именно мьютексы на атомарных операциях, но в других задачах, в особенности в неблокирующих алгоритмах, стоит рассмотреть атомарные операции для ускорения алгоритмов.
Более детально посмотреть на атомарные функции вы можете в документации Go.
Ранее на Хабре на эту тему: Атомарные и неатомарные операции, Atomic operation.
Telegram-канал со скидками, розыгрышами призов и новостями IT ?
ссылка на оригинал статьи https://habr.com/ru/articles/833264/
Добавить комментарий