Многопоточная среда в разработке JAVA

от автора

Если ты разработчик, то тебя это коснется. Спустя какое то время, кто-то раньше, кто-то позже, но каждый приходит к тому, что пора реально взять и разобраться в многопоточке. Я как то писал статью для себя, когда готовился к собесам и понял, что мне самому очень вкатывает такой формат обучения, когда ты пишешь статью на тему, которую исследуешь изучаешь. Так информация осваивается в разы лучше. Поэтому я собираюсь пропустить через себя огромный объем информации по многопоточности. Буду работать как супер компьютер обрабатывать сразу все возможные ветки в направлении освоения многопоточности. 

Дороги, которые ведут тебя к истине

Дороги, которые ведут тебя к истине

Дорогу осилит идущий

Готовы погрузится настолько глубоко, насколько это возможно в рамках этой статьи? Делаем. Изучая тему многопоточки, мне стало интересно, что физически происходит в процессоре. Мы написали код на JAVA компилятор, скомпилировал наш код и передал его на уровень ниже, теперь JVM превращает скомпилированный код в набор инструкций понятных процессору. Сам процессор в свою очередь состоит из ядер, каждое ядро может выполнять единовременно только одну инструкцию (в нашем понимании это один поток), это связанно с тем, что для одного ядра выделяется один управляющий, который переключает датчики в соответсвии с инструкциями. Ядра могут работать одновременно каждый со своим набором инструкций. 

Внутри ядра есть регистры, ранее я называл их датчиками. Правильно будет все таки понимать их как регистры, супер быстрые ячейки, которые хранят (числа, адреса, данные) без задержки в один такт

Такт — это минимальная единица времени, за которую процессор способен реализовать самую простую операцию например сдвинуть бит в регистре. Один набор инструкций на ядре может состоять из десятков тысяч тактов.

Бит — это буквально два возможных состояния в транзисторе. Сдвиг бита, это переключение из двух возможных состояний, высокое напряжение или низкое, мы можем это понимать как 0 и 1.

Еще один очень интересный момент. Одно ядро и правда может выполнять один поток, однако сейчас очень мощные процессоры и ядра, которые исполняют именно аппаратный поток. Благодаря суперсклярной архитектуре один аппаратный поток может за один такт выполнить 2-6 инструкций того же потока. Называется это Hyper-Threading, он же создаёт иллюзию двух потоков на одном ядре через быстрое переключение, но в каждый момент времени инструкции исполняются только одного из них.

Мой товарищ Паша Сорокин дал четкое руководство по которому мы и двинемся в изучении многопоточки, шаг первый понять как потоки устроены внутри.

Начнем с базы, мы знаем что когда запускается Java приложения то под каждый поток выделяется свой СТЕК памяти эта область памяти так и называется STACK и там хранятся все временные переменные. Однако напомню что все остальные объекты находятся в HEAP в стеках вообще нет объектов там есть только ссылки на объекты.

Получается мы имеем объекты в HEAP и в разных стеках под разные потоки данных могут находится ссылки на один и тот же объект. Вот так мы и получаем пространство вариантов. Где каждый поток по своему хочет и будет влиять на объект. Возникающие коллизии на основе такой структуры называют Race Condition в переводе — гонка потоков. Где потоки гонятся за объектами и пытаются на них воздействовать изменять по принципу кто успел тот и съел. Результат зависит от того, какой поток успел прочитать, изменить и записать значение — это и есть Race Condition. Как раз на этом фоне и возникают ошибки, которые иногда воспроизводятся, а иногда нет. Локально зачастую причем все будет работать корректно, потому что вы запустили Сервис и тестируете один метод. Этот метод отрабатывает и ни с кем не соревнуется. А на проде возможно к тем же самым объектам в этот момент будут пытаться обратиться другие потоки и у вас с некоторой периодичностью будут возникать коллизии и вы не будете знать от чего они, потому что просто не понимаете как устроены потоки внутри.

Давайте разберем, что происходит на каждом уровне. Первый уровень -> запуск программы, буквально файл который запускает приложение. Второй уровень -> это запуск процесса в операционной системе и внутри процесса третий уровень -> где потоки делят между собой одну общую память.

Получается, что процесс — это запущенный экземпляр программы, то есть Операционная Система ОС выделяет для этого процесса определенную область памяти, выделяет ресурсы для данного приложения. А поток это легковесная единица выполнения внутри процесса, таких потоков в процессе может быть множество. Физически один реальный поток занимает одно ядро процессора, значит количество реально одновременно работающих потоков ограниченно ядрами вашего процессора. Мы помним, что потоки делят между собой память и для каждого потока свой Стек памяти. 

Поймай дзен

Поймай дзен

Поговорим про методы синхронизации работы потоков между собой. КОгда мы работаем с потоками, иногда нам нужно чтобы они работали параллельно, иногда последовательно. Порой, нужно, чтобы потом проделал работу и ждал, когда он снова понадобиться. Для этого есть инструменты, давайте познакомимся с ними:

wait/notify, notifyAll — это основные методы, которые позволят управлять потоками, из названий понятно, что wait() — заставляет поток ждать, то есть уснуть, пока какое-то условие не заставит его проснуться и дальше выполнять свою работу. А вот как раз методы notify() и notifyAll() — уведомляют либо один поток, либо сразу все, о том, что пора проснуться и продолжить свою работу.

Познакомимся с понятием монитор, комната ожидания и захват монитора synchronized

Как это работает, представим, что у каждого объекта есть монитор, его можно понимать как охранник, распределитель. И есть две важные зоны:

  1. Entry Set — очередь потоков, которые хотят захватить synchronized блок.

  2. Wait Set — комната ожидания для потоков, которые ожидают, пока не произойдет событие и тогда они активизируются.

ВАЖНОЕ ПРАВИЛО! Команды wait(), notify(), notifyAll() могут быть вызванны только в блоке synchronized (или метода) того объекта, чей монитор используется. Иначе выбросится IllegalMonitorStateException

На заметку! notify() — пробуждает любой поток из очереди потоков Wait Set, notifyAll() — будит все потоки, тако вариант лучше использовать чтобы не терять уснувшие потоки. Если будто один случайный поток, то если мы потеряем этот поток, то оставшиеся потоки будут спать вечно — так называется потерянное пробуждение (lost wakeup)

За счет синхронизации захватывается монитор объекта, у каждого объекта, который наследуется от Object, а в джава это буквально каждый объект есть встроенная блокировка, которая и называется монитором. Блок синхронизации захватывает монитор у объекта и следующие потоки отправляются в Wait Set в комнату ожиданий. За счет synchronized достигаются два главных свойства многопоточности:

Взаимное исключение (Mutual Exclusion) — гарантирует, что только один поток захвативший монитор будет производить над ним изменения в один момент времени. Достигается видимость между всеми потоками с помощью переменных volatile

Видимость (Visibility) — гарантия того, что все другие объекты увидят изменения после выхода потока из блока синхронизации.

Важно отметить, что мы говорили про блокировку у объектов, однако у класса тоже можно захватить монитор, с помощью статического метода. Помним, что статические методы и переменные относятся к классу, а не статические к объекту. Нужно так же понимать, что эти блокировки не конфликтуют друг с другом, так как одна блокирует монитор объекта this, а другая Utils.class.

Когда я первый раз разбирал задачку по многопоточке, там в начале был создан новый объект и был назван как lock, я не понимал зачем был создан объект и почему назван как блокировка. Дальше этот объект выступал монитором, который блокировал остальные потоки. И понимание этой логики к вам придет, только после того, как вы поймете, что каждый объект имеет свою одну блокировку, которую можно использовать если не хотите блокировать тот объект в котором производите логику.

Бывает, так что вы не хотите использовать монитор класса this (тот класс в котором вы находитесь), а хотите параллельно реализовывать логику в одном объекте в разных методах. Тогда вы можете передать в блок синхронизации любой другой объект, монитор которого, будет использоваться. Для этого просто создаете объект с помощью new Object(); и передаете его в блок синхронизации и теперь synchronized понимает монитор какого объекта он блокирует. 

Вот тут еще хочется поговорить про концепцию happens-before, по факту это гарантия, на то, что события произошедшие до, будут считанны и известны другим потокам.

Если вы хотите управлять потоками более гибким способом, то вы можете передать не просто объект для захвата монитора, а объект new ReentrantLock(), тип данных Lock. Тогда у такого объекта вы сможете захватывать монитор и иметь дополнительные методы управления, такие как: 

  • tryLock() — попытаться захватить блокировку без ожидания (вернуть false, если занята).

  • tryLock(timeout, unit) — ждать не бесконечно.

  • lockInterruptibly() — позволить прервать ожидание блокировки.

  • Справедливость (fairness) — можно создать new ReentrantLock(true), тогда блокировка будет отдаваться самому долго ждущему потоку (но медленнее).

Объект Semaphore sem = new Semaphore(3); позволит управлять количество потоков.

Deadlock (взаимная блокировка): Поток А захватил монитор одного объекта и ждет освобождения монитора у другого объекта, а тот объект находится в захвате у другого блока синхронизации и ждет освобождения первого. В итоге оба стоят вечно — программа зависла, потоки не выполняются и не реагируют.

Livelock (живая блокировка): Потоки не заблокированы, но постоянно уступают друг другу — например, видят, что ресурс занят, отпускают свой и пробуют заново в том же порядке. Они активно крутятся, загружая процессор, но прогресса ноль.

Давайте притормозим тут, нам важно качественно осознать информацию выше, поэтому пушим в мастер.

git commit -m "Базовые знания по многопоточности: потоки, процессор, синхронизация, deadlock, livelock"

Поговорим про синхронные и асинхронные методы, это не обязательно многопоточка, однако для понимания работы потоков нам необходимо понимание синхронности.

Синхронный метод — это обычный метод, который работает по принципу, поток зашел в метод, начал выполнять свою работу и например обратился в базу данных с запросом, запрос обрабатывается на стороне БД, поэтому поток приложения в это время ничем не занят. И он ждет ответа. В этот момент поток не выполняет никакой работы — простаивает. Сравнить можно с покупками на рынке, вы подходите в мясной отдел и запрашиваете курицу 2 килограмма, вам говорят сейчас упакуем, вы поток, ничего полезного в этот момент не делаете, просто ждете, когда вам отдадут курицу, вы пойдете дальше выполнять свою большую задачу по закупке.

Асинхронный метод — это эффективно работающий метод с потоком, поток внутри метода встретившись в обращением к внешней системе не будет ждать выполнения, он тут же заключает контракт на обещание, что результаты будут позже. И поток идет дальше в комнату ожидания, если это многопоточная среда или выполнять другие асинхронные методы

CompletableFuture — это контейнер для результата, которого ещё нет. Он может быть в трёх состояниях: ещё не завершён, завершён с результатом, завершён с ошибкой. Ты можешь навесить на него колбэки: что сделать, когда результат появится. Главная фишка — методы thenApply, thenCompose, thenAccept, которые позволяют строить цепочки асинхронных операций. 

Пример: CompletableFuture.supplyAsync(() -> userService.getUser(123L)) .thenCompose(user -> orderService.getOrdersAsync(user)) .thenAccept(orders -> System.out.println(orders));

Такой метод, будет выполнять все запросы асинхронно, то есть он пойдет в юзер сервис, дойдет до момента, когда обратиться в бд, получит обещание, что выполнится например через 1 секунду,  CompletableFuture — это значит, что выполнятся все метода последовательно. Это значит пока первый блок асинхронных методов не будет выполнен, поток не будет выполнять следующий шаг. Однако простаивать тоже не будет, он уйдет в пул потоков и будет там ждать новую работу.

Самое интересное, что начать исполнять CompletableFuture может один поток, а закончить другой, так как потоки не привязаны к методам, они как унифицированные работники, каждый может заменить другого. Они просто эффективно распределяют свое время.

Ассинхронность это мощный инструмент, эффективного распределения ресурсов. Однако есть цена, которую придется заплатить, писать асинхронные методы довольно сложная работа, нетривиальная. Вы никогда не знаете какой поток, пойдет исполнять какой метод, ошибки будут неочевидными, поставив брейкпоинт в сервисе с синхронными методами вы будете читать код сверху вниз и ожил=дать остановку в вашем месте. Однако с асинхронными методами, вы не можете быть уверенны, что потоки пойдут исполнять именно этот метод. Вам нужно уметь прогнозировать предсказывать поведение асинхронных методов, связывать их корректно, чтобы не допускать не консистентности данных.

Вот вам и многомировая интерпретация — потоки выбирают свою вселенную, вопрос имеют ли потоки сознание и свободу воли, чтобы выбирать какую работу они пойдут выполнять? 

Определиолся?

Определиолся?

Какой главный риск, когда мы используем асинхронные методы Через CompletableFuture в многопоточке?

Мы знаем, что количество возможных потоков в пуле потоков ForkJoinPool.commonPool(). Равно количество ядер минус один. Итого на 8 ядерном устройстве будет доступно в пуле потоков 7 потоков. А CompletableFuture буквально берет потоки из пула потоков. 

И все бы хорошо, поток получает обещание от запроса к бд внутри метода и уходит обратно в пул. Но бывает такое, что внутренние методы в рамках CompletableFuture на этапе обращения к бд встречаются с внутренними методами, которые не являются асинхронными и там поток засыпает, не переходя дальше, тогда и возникает риск захвата всех потоков, если все потоки от 7 одновременных запросов кнут на пути к бд на синхронном методе. Восьмой запрос просто не получит свободного потока и будет ждать. Девятый и десятый тоже. Очередь будет расти, приложение перестанет отвечать, а ты будешь гадать, почему всё работало на тесте, а под нагрузкой упало. Диагностировать такую проблему сложно, потому что внешне код выглядит как асинхронный, а внутри спрятана синхронная бомба замедленного действия.

Внутренние методы: supplyAsync — запускает асинхронную операцию, которая возвращает результатthenApply — преобразует результат (синхронно)thenCompose — преобразует результат в новый CompletableFuture (асинхронно)thenAccept — потребляет результат, ничего не возвращаяexceptionally — обрабатывает ошибку

Итог: проверь себя

Если ты честно осилил статью, пропустил через свой мыслительный аппарат, почувствовал сложность в осмыслении, если вопросы которые у тебя появлялись ты гуглил, чтобы закрыть пробелы, то я официально считаю тебя красавчиком. Респект тебе! Можешь закрепить материал и ответить на эти вопросы:

  1. Почему потенциально может возникать гонка потоков Race Condition при создании потоков? Подсказка: STACK

  2. За счет чего достигается создание 20, 30 и более потоков на 8 ядерном процессоре? Ведь мы знаем, что количество потоков в пуле ограниченно количеством ядер минус один.Подсказка: Суперсклярность ядер процессора

  3. Что делает блок synchronized ? Чей монитор захватывается по умолчанию?В чем особенности с блоком synchronized в рамках статического метода?Подсказка: Только ли у объекта можно захватить монитор?

  4. Почему есть риск положить приложение из за нехватки потоков при использовании Асинхронных методов посредством CompletableFuture?Подсказка: Всегда ли гарантия Ассинхронности на метод распространяется на внутренние методы?


Ответил на все? Значит, тему ты честно преодолел большой шаг в понимании. На этом многопоточка только начинается, дальше предлагаю тебе порешать задачки, например задача "PING-PONG" Если на какие-то вопросы ответить не можешь, пиши мне, разберем вместе. Если в статье увидел не точности или противоречия, тоже пиши мне -разберемся почистим.

Мой тг -@karim_product на связи родной/родная 😉

ссылка на оригинал статьи https://habr.com/ru/articles/1022934/