Go: нужно ли закрывать канал?

от автора

Введение

При знакомстве с Go, наверное, первое, что узнаешь, так это о возможностях параллелизма. Насколько просто писать многопоточный код на этой технологии. В обеспечении этой простоты не последнюю роль играет наличие такого типа данных, как каналы, которые обеспечивают взаимодействие горутин — легковесных потоков Go. Чтение и запись в этот тип данных приводит к блокировкам, благодаря которым удаётся управлять многопоточностью. Но помимо очевидных операций над каналами их ещё можно закрывать. Для этой цели в Go есть встроенная функция close. В литературе встречал, что незакрытый канал может привести к утечке ресурсов. Но не припомню, чтобы уточнялось о какой именно утечке идёт речь. Поэтому с необходимостью закрытия каналов может возникнуть путаница, жертвой которой я однажды стал.

Забавный случай

На момент написания статьи Go являлась достаточно популярной технологией. Трудно было найти коллектив, в котором не было бы новоиспечённых go‑разработчиков или кто‑то не планировал сделать такой переход. Я тоже в своё время перешёл на Go с PHP. Это было не самое простое время — необходимо было получить определённый объём знаний и опыта в кратчайшие сроки. Сложно переоценить помощь, которую мне «оказали» тогда книги. Всё‑таки книга — лучший друг любого разработчика. Именно из литературы я узнал, что незакрытый канал может привести к утечке ресурсов. Но то ли не было приведено примеров, то ли нужно было параллельно разбираться со сборщиком мусора (GC), почему‑то у меня сложилось мнение, что незакрытый канал GC не сможет удалить из памяти. Копать глубже тогда времени не было, но рекомендацию о необходимости закрытия каналов я запомнил. Немного позднее на одном из проектов я решал такие вопросы, как проведение собеседований, внутренних аттестаций и написание курса по Go для сотрудников, которые желают перейти на него с другой технологии. При проведении собеседований всегда писал в системе для наема сотрудников подробный отчёт: на какие вопросы кандидат не ответил, и что я ожидал услышать. Но проведя n‑ое количество собеседований, я осознал, что мой опросник далёк от идеала — сложно по нему оценить реальные знания кандидатов. А тут ещё произошла не самая приятная ситуация — PHP‑разработчик был не согласен с грейдом, который ему присвоил архитектор. Чтобы не раздувать скандал, меня попросили провести аттестацию повторно. В общем, я улучшил опросник — что‑то убрал, что‑то добавил, но вопрос о необходимости закрывать канал я всё же оставил. И вот…повторная аттестация… Коллега отвечал довольно слабо. Но меня сильно удивил один момент: он ошибался в элементарных новых вопросах, но правильно ответил про необходимость закрытия канала: GC не удалит из памяти канал пока его не закроют, даже если на него никто больше не ссылается. Я был просто потрясён — никто правильно не отвечал на этот вопрос. После аттестации мне нужно было подумать и вынести вердикт, а тем временем я приступил к написанию лекции про каналы. Решил подтвердить своё убеждение в книге Алана Донована «Язык программирования Go», но был несколько взволнован, когда нашёл утверждение, что функция close всего лишь изменяет значение некоторого флага канала, означающего, что в него никто больше ничего писать не будет…Что поделать? — все ошибаются. Тем не менее, та уникальная чушь, которую я нёс, когда объяснял причину необходимости закрывать канал, позволила выявить аферу разработчика, зазубрившего ожидаемые ответы из системы наема сотрудников.

Где живёт канал?

Прежде, чем анализировать удаление каналов из памяти, необходимо понять, где они создаются — в куче или стеке? GC работает с кучей, поэтому требуется создавать каналы именно в куче, причём в таком количестве, чтобы результат был хорошо виден в трассировке. Как известно, в Go при создании переменной в теле функции:

package main  func main() { var a int64 = 10  print(a) }

Переменная обычно попадает в стек. Можно провести escape-анализ:

go build -gcflags=-m

После выполнения команды на моей локальной машине получился следующий результат:

# app ./main.go:3:6: can inline main

Сообщения «escapes to heap» нет, что говорит об отсутствии перемещения переменной из стека в кучу. Но такое поведение наблюдается не всегда — переменная может попасть в кучу из‑за объёма занимаемой памяти. Следующий код демонстрирует такое поведение:

package main  func main() { s1 := make([]int, 10) s2 := make([]int, 10000)  print(s1[0]) print(s2[0]) }

В этом случае escape-анализ вывел следующий результат:

# app ./main.go:3:6: can inline main ./main.go:4:12: make([]int, 10) does not escape ./main.go:5:12: make([]int, 10000) escapes to heap

Видно, что срез в 10 элементов типа int создаётся в стеке, а в 10 000 элементов — в куче. То есть срез до определённой длины будет создаваться в стеке, после чего будет наблюдаться перемещение переменной в кучу. Когда именно произойдёт этот переход, можно более подробно ознакомиться здесь. В случае канала:

package main  func main() { ch := make(chan int)  print(ch) }

Escape-анализ не сообщит о перемещении в кучу:

# app ./main.go:3:6: can inline main

Но это не означает, что канал создался в стеке, т.к. он по умолчанию может создаваться в куче. В Go стек выделяется для горутины. Причём стек одной горутины недоступен для другой, а вот куча у них общая. В то же время каналы обеспечивают взаимодействие горутин, поэтому можно предположить, что они всегда создаются в куче. Для подтверждения этой мысли сначала нужно понять, какой объём памяти требует канал:

package main  import ( "fmt" "unsafe" )  func main() { ch := make(chan int)  fmt.Println("channel size:", unsafe.Sizeof(ch)) }

Запустив этот код:

go run main.go 

Видно, что канал занимает 8 байт:

channel size: 8

Это очень мало, чтобы увидеть изменения объёма кучи при трассировке. Нужно занять больший объём памяти. Но как? — или увеличить занимаемую память за счёт буфера канала, или создать большое количество каналов. Проанализируем, влияет ли буфер на занимаемый объём памяти каналом:

package main  import ( "fmt" "unsafe" )  func main() { ch := make(chan int, 1000) for i := 0; i < 999; i++ { ch <- 1 }  fmt.Println("channel size:", unsafe.Sizeof(ch)) }

Выполнение программы выведет в stdout те же 8 байт:

channel size: 8

Т. е. буффер никак не влияет на размер памяти, которую занимает канал. Поэтому остаётся лишь создать n-ое количество каналов:

package main  import ( "os" "runtime/debug" "runtime/trace" )  func main() { debug.SetGCPercent(-1)  f, _ := os.Create("trace.out") trace.Start(f) defer trace.Stop()  for i := 0; i < 10000; i++ { ch := make(chan struct{}) close(ch) } }

Для большей наглядности в коде отключен запуск GC:

debug.SetGCPercent(-1)

Трассировка записывается в файл trace.out:

f, _ := os.Create(«trace.out»)
trace.Start(f)
defer trace.Stop()

Программа создаёт 10 000 небуферизованных каналов и закрывает их (уж, будем хорошими мальчиками и девочками!). Для получения трассировки необходимо запустить программу:

go run main.go 

Для просмотра результата требуется выполнить:

go tool trace trace.out

После чего, у меня в терминале появился результат:

2024/11/18 12:08:43 Preparing trace for viewer... 2024/11/18 12:08:43 Splitting trace for viewer... 2024/11/18 12:08:43 Opening browser. Trace viewer is listening on http://127.0.0.1:54503

И автоматически открылась страница http://127.0.0.1:54503 в браузере. Для просмотра трассировки нужно открыть http://127.0.0.1:54503/trace:

Рис. 1. Создание в куче каналов при выключенном GC

Рис. 1. Создание в куче каналов при выключенном GC

По рис. 1 видно, что куча растёт. При этом в программе просто создаются каналы, значит Go всегда создаёт каналы в куче… можете даже у Алисы спросить 🙂

Каналы и GC

После выяснения, где создаётся канал, ясно, как обеспечить их создание в куче. Обладая такой информацией, можно включить GC и посмотреть, как он чистит память от закрытых каналов:

package main  import ( "os" "runtime" "runtime/trace" "time" )  func main() { f, _ := os.Create("trace.out") trace.Start(f) defer trace.Stop()  for j := 0; j < 10; j++ { for i := 0; i < 10000; i++ { ch := make(chan struct{}) close(ch) } time.Sleep(time.Second) runtime.GC() time.Sleep(time.Second) } } 

Для лучшей визуализации изменения кучи используются 2 цикла — 10 раз создаются 10 000 небуферизованных каналов. После каждой итерации происходит задержка:

time.Sleep(time.Second)

И принудительный запуск GC:

runtime.GC()

После запуска программы и просмотра трассировки:

Рис. 2. Чистка GC закрытых каналов

Рис. 2. Чистка GC закрытых каналов

Чётко видны ступени, т. е. GC периодически запускается и чистит кучу от закрытых каналов. Причём аналогичное поведение будет и для буферизованных каналов с непустым буфером:

package main  import ( "os" "runtime" "runtime/trace" "time" )  func main() { f, _ := os.Create("trace.out") trace.Start(f) defer trace.Stop()  for j := 0; j < 10; j++ { for i := 0; i < 10000; i++ { ch := make(chan struct{}, 1) ch <- struct{}{} close(ch) } time.Sleep(time.Second) runtime.GC() time.Sleep(time.Second) } } 
Рис. 3. Чистка GC закрытых каналов с непустым буфером

Рис. 3. Чистка GC закрытых каналов с непустым буфером

Похоже, самое время опровергнуть моё заблуждение и подтвердить мысль из литературы, что функция close всего лишь изменяет значения флага, который показывает готовность канала принимать новые значения:

package main  import ( "os" "runtime" "runtime/trace" "time" )  func main() { f, _ := os.Create("trace.out") trace.Start(f) defer trace.Stop()  for j := 0; j < 10; j++ { for i := 0; i < 10000; i++ { _ = make(chan struct{}) } time.Sleep(time.Second) runtime.GC() time.Sleep(time.Second) } } 
Рис. 4. Чистка GC незакрытых каналов

Рис. 4. Чистка GC незакрытых каналов

Всё те же ступени, а значит GC чистит память от незакрытых каналов. Аналогичное поведение можно наблюдать и для буферизованных каналов с заполненным буфером:

package main  import ( "os" "runtime" "runtime/trace" "time" )  func main() { f, _ := os.Create("trace.out") trace.Start(f) defer trace.Stop()  for j := 0; j < 10; j++ { for i := 0; i < 10000; i++ { ch := make(chan struct{}, 1) ch <- struct{}{} } time.Sleep(time.Second) runtime.GC() time.Sleep(time.Second) } } 
Рис. 5. Чистка GC незакрытых каналов с непустым буфером

Рис. 5. Чистка GC незакрытых каналов с непустым буфером

Из трассировки видно, что GC одинаково успешно очищает память от закрытых и незакрытых каналов. Причём заполненность буфера канала никак не влияет на работу сборщика мусора. Для успешного освобождения памяти необходимо лишь, чтобы рассматриваемая переменная в коде более не использовалась.

Каналы и горутины

После проведённого анализа возникает вполне естественный вопрос: тогда что понималось под утечкой ресурсов при незакрытых каналах? Для ответа, достаточно вспомнить, зачем нужны каналы — для взаимодействия горутин. Рассмотрим код программы, которая в течение 1-й миллисекунды собирает лог из разных источников и записывает его в результирующий канал:

package main  import ( "fmt" "os" "runtime/trace" "sync" "time" )  func main() { f, _ := os.Create("trace.out") trace.Start(f) defer trace.Stop()  timer := time.After(time.Millisecond) OUT: for { select { case <-timer: break OUT default: run() } }  time.Sleep(15 * time.Millisecond) }  func run() { ch1 := make(chan string) ch2 := make(chan string) ch3 := make(chan string)  go func() { for _, msg := range []string{"create user", "update user", "select user"} { ch1 <- msg } close(ch1) }()  go func() { for _, msg := range []string{"create order", "update order", "select order"} { ch2 <- msg } close(ch2) }()  go func() { for _, msg := range []string{"create task", "update task", "run task"} { ch3 <- msg } close(ch3) }()  output := merge(ch1, ch2, ch3) go func() { for { select { case v, ok := <-output: if !ok { return } fmt.Println(v) default: fmt.Println("waiting...") } } }() }  func merge(sources ...<-chan string) <-chan string { var wg sync.WaitGroup result := make(chan string)  output := func(ch <-chan string) { for v := range ch { result <- v } wg.Done() }  for _, source := range sources { wg.Add(1) go output(source) }  go func() { wg.Wait() close(result) }()  return result }

В коде присутствуют 3 функции: main, run и merge. merge представляет собой реализацию fan‑in — значения из нескольких каналов помещаются в один, который возвращается в качестве результата. run имитирует множество источников данных: ch1, ch2, ch3, для передачи в merge и получения агрегирующего эти данные канала. Далее эта функция, в отдельной горутине, передаёт значения из полученного канала в stdout. В main функция run запускается на протяжении 1-й миллисекунды, после чего программа приостанавливает своё выполнение на 15 миллисекунд. Многократный запуск функции run обеспечивает создание большого количества горутин, что облегчает визуальное восприятие при исследовании трассировки, а временный останов программы позволяет узнать дальнейшее развитие событий, т.к. функции run и merge не содержат логику, гарантирующую ожидание выполнения дочерних горутин. После запуска программы в терминале можно увидеть что‑то подобное:

select user select order .... waiting... ...

Сократил вывод, т. к. лога довольно много. При открытии трассировки у меня получился следующий результат:

Трассировка горутин при закрытых каналах

Трассировка горутин при закрытых каналах

Видно, что в конечном итоге все горутины завершают своё выполнение. А что произойдёт, если не закрывать результирующий канал? Т.е. функцию merge переписать следующим образом:

func merge(sources ...<-chan string) <-chan string { result := make(chan string) output := func(ch <-chan string) { for v := range ch { result <- v } }  for _, source := range sources { go output(source) }  return result }

Заодно и WaitGroup убрали — упростили код. Запуск программы с изменённым кодом функции merge по-прежнему выведет лог в stdout:

... select user waiting... ...

Его по-прежнему будет много. Т. е. работа программы никак не изменилась, но если посмотреть на новую трассировку:

Видно, что часть горутин продолжает работать. Но это и ожидаемо, т.к. в функции run чтение результирующего канала происходит в отдельной горутине:

go func() { for { select { case v, ok := <-output: if !ok { return } fmt.Println(v) default: fmt.Println("waiting...") } } }()

Канал output никогда не закрывается, а значит бесконечный цикл никогда не прервётся. Таким образом, если бы программа продолжала вызывать функцию run, то количество незавершённых горутин постоянно увеличивалось бы. Такое поведение называется утечкой горутин. Как известно, для горутины выделяется стек, который стартует с 2 Кбайт и, на 64-битной архитектуре, может достигнуть 1 Гбайта. Соответственно утечка горутин приводит, как минимум, к утечке по памяти. Да и вообще, работающая горутина занимает процессорное время, что оказывает влияние на скорость выполнения других горутин.

Где закрывать канал?

Думаю, после приведённых примеров и трассировки, понятно, что закрытие канала не стремление «быть хорошим», а стремление избежать утечки ресурсов. Но вот только где нужно закрывать канал? В предложенном ранее примере создавались каналы в функции run и там же закрывались:

ch1 := make(chan string) ch2 := make(chan string) ch3 := make(chan string)  go func() { for _, msg := range []string{"create user", "update user", "select user"} { ch1 &lt;- msg } close(ch1) }()  go func() { for _, msg := range []string{"create order", "update order", "select order"} { ch2 &lt;- msg } close(ch2) }()  go func() { for _, msg := range []string{"create task", "update task", "run task"} { ch3 &lt;- msg } close(ch3) }() 

А что, если немного изменить код? Например, функция merge при обнаружении, что один из переданных каналов закрыт, сама будет закрывать оставшиеся каналы:

package main  import ( "fmt" "sync" "time" )  func main() { run() time.Sleep(15 * time.Millisecond) }  func run() { ch1 := make(chan string) ch2 := make(chan string) ch3 := make(chan string)  go func() { for _, msg := range []string{"create user", "update user", "select user"} { time.Sleep(10 * time.Microsecond) ch1 <- msg } }()  go func() { for _, msg := range []string{"create order", "update order", "select order"} { ch2 <- msg } }()  go func() { for _, msg := range []string{"create task", "update task", "run task"} { ch3 <- msg } close(ch3) }()  output := merge(ch1, ch2, ch3) go func() { for { select { case v, ok := <-output: if !ok { return } fmt.Println(v) default: fmt.Println("waiting...") } } }() }  func merge(sources ...chan string) <-chan string { var wg sync.WaitGroup result := make(chan string) stop := make(chan struct{}) output := func(ch <-chan string) { defer wg.Done() for { v, ok := <-ch if !ok { stop <- struct{}{} return } result <- v } }  for _, source := range sources { wg.Add(1) go output(source) }  go func() { wg.Wait() close(result) }()  go func() { <-stop for _, ch := range sources { select { case _, ok := <-ch: if !ok { continue } close(ch) default: close(ch) } } }()  return result }

В предложенном коде сильно упростилась функция main — достаточно однократного запуска run с временной задержкой (не стал усложнять код — суть не в этом). В функции run закрывается только ch3, а при записи в ch1 добавлена временная задержка:

time.Sleep(10 * time.Microsecond)

Больше всего правок в merge. Теперь эта функция самостоятельно отслеживает закрытые каналы из числа переданных. И, если какой‑то канал по каким‑либо причинам был закрыт, merge самостоятельно закрывает оставшиеся каналы. Для этих целей добавлен сигнальный канал:

stop := make(chan struct{})

Также изменена логика записи в результирующий канал:

output := func(ch <-chan string) {
defer wg.Done()
for {
v, ok := <-ch
if !ok {
stop <- struct{}{}
return
}
result <- v
}
}

Если канал закрыт (ok равно false), то записывается пустая структура в сигнальный, и прерывается бесконечный цикл. В противном случае, будет запись значения в результирующий канал. Также запущена отдельная горутина, которая, при получении значения из сигнального канала, закрывает все переданные каналы:

go func() {
<-stop
for _, ch := range sources {
select {
case _, ok := <-ch:
if !ok {
continue
}
close(ch)
default:
close(ch)
}
}
}()

Причём для закрытия каналов используется select, т.к возможны 3 ситуации: канал имеет значение, канал закрыт, канал пуст. Первые две ситуации обыгрывает ветка:

case _, ok := <-ch:

Т.к. закрытый канал возвращает значение по умолчанию. Именно поэтому проверяется открытость канала:

if !ok {
continue
}

т.к. повторное закрытие создаст аварию:

panic: close of closed channel

Далее, если канал открыт, и из него просто было вычитано значение, то он закрывается. Ветка default будет отрабатывать на пустых каналах. Также пришлось изменить сигнатуру функции merge:

func merge(sources …chan string) <-chan string {

Отказаться от однонаправленности канала. В противном случае возникала ошибка компиляции:

# command-line-arguments ./main.go:89:11: invalid operation: cannot close receive-only channel ch (variable of type <-chan string) ./main.go:91:11: invalid operation: cannot close receive-only channel ch (variable of type <-chan string)

Но, внеся все корректировки, удалось успешно скомпилировать программу:

go build main.go

Перед запуском бинарного файла main, хочется ещё раз посмотреть на код и отметить: теперь в run не нужно закрывать каждый канал. Закрытие каналов происходит в merge, в цикле. Т.е. при большом количестве каналов достаточно закрыть всего один, а остальные закроютсяавтоматически. Кажется, внесены существенные улучшения. Запустим «бинарь»:

./main

И увидим результат:

... waiting... waiting... waiting... panic: send on closed channel  goroutine 34 [running]: main.run.func1() /usr/local/go/src/app/main.go:22 +0xac created by main.run in goroutine 1 /usr/local/go/src/app/main.go:19 +0x94

Произошла авария! — попытка записи в закрытый канал. Коварство этой ситуации заключается в том, что её успешно пропускает компилятор. А значит она может возникнуть в любое время, например в 03:00 ночи (привет дежурным!). Почему возникла авария? Дело в том, что функци merge только читает и закрывает каналы. Пишет в эти каналы другая логика, которая не знает, что один из каналов уже закрыли. Да она и не должна знать, т.к. любая функция или метод существуют для того, что инкапсулировать определённую логику. Поэтому в Go существует один подход при работе с каналами — кто в канал пишет, тот и закрывает. Справедливости ради, добавим временную задержку при записи в ch1 в ранее предложенное решение:

package main  import ( "fmt" "sync" "time" )  func main() { run() time.Sleep(15 * time.Millisecond) }  func run() { ch1 := make(chan string) ch2 := make(chan string) ch3 := make(chan string)  go func() { for _, msg := range []string{"create user", "update user", "select user"} { time.Sleep(10 * time.Microsecond) ch1 <- msg } close(ch1) }()  go func() { for _, msg := range []string{"create order", "update order", "select order"} { ch2 <- msg } close(ch2) }()  go func() { for _, msg := range []string{"create task", "update task", "run task"} { ch3 <- msg } close(ch3) }()  output := merge(ch1, ch2, ch3) go func() { for { select { case v, ok := <-output: if !ok { return } fmt.Println(v) default: fmt.Println("waiting...") } } }() }  func merge(sources ...<-chan string) <-chan string { var wg sync.WaitGroup result := make(chan string)  output := func(ch <-chan string) { for v := range ch { result <- v } wg.Done() }  for _, source := range sources { wg.Add(1) go output(source) }  go func() { wg.Wait() close(result) }()  return result }

После запуска программы можно убедиться, что она отработала в штатном режиме:

... waiting... select user waiting... ...

Итого

  • Каналы всегда создаются в куче

  • GC удаляет как закрытые, так и незакрытые каналы. Наличие и заполненность буфера у канала тоже никакого влияния не оказывает на работу GC

  • Незакрытые каналы могут привести к утечке горутин

  • Кто пишет в канал, тот его и закрывает

Из проведённого анализа видно, что строгой рекомендации закрывать канал нет. Если есть уверенность, что не произойдёт утечки горутин, то оставленный открытый канал никак не повлияет на работу приложения. Тем не менее, ненужные каналы лучше закрывать. Это вопрос дисциплины — лучше приучать себя писать аккуратный код.

P.S.

Также хочется отметить, что указание направленности канала, не просто облегчает понимание кода, как пишут в некоторой литературе, но в некоторых случаях может уберечь от ошибочной оптимизации.


ссылка на оригинал статьи https://habr.com/ru/articles/861846/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *