
База
Параллельность — выполнение задач в один момент времени на разных логических ядрах.
Конкурентность — выполнение задач последовательно, но со сменой контекста на другую задачу в ожидание завершения иной задачи. У пользователя может возникнуть иллюзия многозадачности даже в однопроцессорной системе, поскольку смена контекста происходит быстро (микросекунды).
Процессы:
-
Раздельная память
-
Раздельные ресурсы
-
Раздельные регистры
Потоки:
-
Общая память
-
Общие ресурсы
-
Раздельные стэк и регистры
Горутины:
-
Общая память
-
Общие ресурсы
-
Общий системный стэк
-
Общие регистры
Go runtime представляет модель P:M:G.
P — представляет логическое ядро процессора.
M — поток ОС по числу процессоров P.
G — структура, которая выполняет переданную функцию, создаётся по необходимости, минимум одна на старте программы (main). Стэк всего 2кб, может расширятся до 1гб для 64x и до 250кб для 32х систем.
Управление горутинами осуществляется планировщиком Go, а не ОС. Планировщик Go работает в пользовательском пространстве. Мы не можем напрямую управлять на каком процессоре будет исполняться горутина, за это отвечает планировщик.
Канал — очередь сообщений, которая умеет работать в многопоточной среде, работает по принципу FIFO.
Есть два типа каналов: буферизованный и небуферизованный.
Первый может хранить несколько сообщений, второй только одно.
Синхронизация
sync.WaitGroup — счётчик, который позволяет подождать завершения горутин.
sync.Mutex — блокирует доступ к ресурсу.
sync.RwMutex — разделяемая блокировка на чтение и запись. Читать могут несколько горутин, но мутировать данные только одна.
sync.Atomic — атомарная операция чтения и записи. Работает только с простыми значениями.
sync.Map — lock-free структура. Работает так же, как и обычная map, но потокобезопасная, можно использовать в многопоточной среде. Хорошо подходит для случаев, где надо много читать и мало писать. Если надо много писать, то лучше использовать обычную map и sync.RwMutex.
Небуферизованный канал
|
Действие |
Открытый канал |
Закрытый канал |
Неинициализированный канал |
|---|---|---|---|
|
Чтение |
Блокировка до прихода писателя |
Zero value |
Блокировка навсегда |
|
Запись |
Блокировка до прихода читателя |
Panic |
Блокировка навсегда |
|
Закрытие |
Канал закроется |
Panic |
Panic |
Буферизованный канал
|
Действие |
Открытый и частично заполненный |
Открытый и полностью заполненный |
Открытый и пустой |
Закрытый и частично заполненный |
|---|---|---|---|---|
|
Чтение |
Прочитаем значение |
Прочитаем значение |
Блокировка до прихода писателя |
Прочитаем значение |
|
Запись |
Запишем значение |
Блокировка до прихода читателя |
Запишем значение |
Panic |
Ограничения канала
|
Действие |
Канал только на чтение |
Канал только на запись |
|---|---|---|
|
Чтение |
|
Ошибка компиляции |
|
Запись |
Ошибка компиляции |
|
|
Закрытие |
Ошибка компиляции |
Важные правила
-
Закрывает канал тот, кто в него пишет.
-
Если пишет несколько продюсеров, то закрывает тот, кто создал продюсеров.
-
Не закрытый канал держит ресурсы. Закрывать надо явно.
Паттерны
Generator — микропаттерн, который наполняет канал. Закрываем канал, чтобы не было проблем.
func generator() <- chan int { ch := make(chant int) go func(){ for i := 0; i <= 12; i++ { ch <- i + 1 } close(ch) }() return ch }
Wrapper — оборачиваем функцию, добавляя функциональность. Если вам что-то говорит слово декоратор, то это тот самый паттерн.
func wrapper(wg *sync.WaitGroup, fn func()) { wg.Add(1) go func() { defer wg.Done() fmt.Println("Work before func") fn() time.Sleep(1 * time.Second) fmt.Println("Work after func") }() } func main() { var wg sync.WaitGroup wrapper(&wg, func() { time.Sleep(1 * time.Second) fmt.Println("heavy work") }) wg.Wait() }
Fan-in — собирает результаты из нескольких каналов в один.
func fanIn(input1, input2 <-chan string) <-chan string { ch := make(chan string) go func(){ for { select { case s := <-input1: ch <- s case s := <-input2: ch <- s } } }() return ch }
Fan-out — одна или несколько горутин пишут в канал, с другой стороны рабочие горутины читают канал, делают работу и умирают.
func worker(ch <-chan int, wg *sync.WaitGroup) { wg.Done() for v := range ch { fmt.Println(v) time.Sleep(1 * time.Second) } } func sender() { ch := make(chan int) var wg sync.WaitGroup for i := 0; i < 2; i++ { wg.Add(1) go worker(ch, &wg) } for i := 0; i < 10; i++ { ch <- i } close(ch) wg.Wait() fmt.Println("done") }
Pipeline — данные обрабатываются цепочкой. Producer -> Producer/Consumer -> Consumer. Стадий обработки может быть сколько угодно.
func producer() <-chan int { c := make(chan int) go func() { for i := 0; i <= 10; i++ { c <- i + 1 } close(c) }() return c } func producerConsumer(c <-chan int) <-chan int { out := make(chan int) go func() { for v := range c { out <- v * 2 } close(out) }() return out } func consumer(ch <-chan int) { for v := range ch { fmt.Println(v) } }
Cancellation — Способ прерывания горутин. Необходим, чтобы избегать висящих горутин, останавливать слишком долгие операции.
// 1. WithCancel func worker(ctx context.Context) { for { select { case <-ctx.Done(): fmt.Println("Done") return default: fmt.Println("Working...") time.Sleep(500 * time.Millisecond) } } } func main() { ctx, cancel := context.WithCancel(context.Background()) go worker(ctx) time.Sleep(1 * time.Second) // работаем cancel() // отменяем time.Sleep(1 * time.Second) // время на завершение } // 2. WithTimeout ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) // спустя 2 секунды воркер перестанет работать // 3. WithDeadline. Можно указать точное время остановки. ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(2*time.Second)) // прибавляем к текущему времени две секунды
Worker pool — каждый воркер берёт задачу, делает работу и отправляет результат в канал, другая горутина, в нашем случае main, читает результат из канала.
func worker(jobs <-chan int, results chan<- int, wg *sync.WaitGroup) { defer wg.Done() for j := range jobs { time.Sleep(1 * time.Second) fmt.Println("job", j) results <- j * j } } func main() { jobs := make(chan int) results := make(chan int) var wg sync.WaitGroup for i := 0; i < 3; i++ { wg.Add(1) go worker(jobs, results, &wg) } go func() { for i := 0; i < 10; i++ { jobs <- i } close(jobs) }() go func() { wg.Wait() close(results) }() for result := range results { fmt.Println(result) } }
ссылка на оригинал статьи https://habr.com/ru/articles/895922/
Добавить комментарий