Почему стек?
Потому что это одна из простейших в реализации структур данных, так что это не будет «затенять» многопоточную логику. Также из трех основных операций (push, pop, peek) — есть как операции исключительно чтения мутирующей совместно используемой памяти, так и операции записи.
Целью статьи не было проведение сравнительного анализа различных подходов. Задача статьи — показать разнообразие возможностей. Однако в целом стоит отметить, что основная проблема демонстрируемых реализаций стека — наличие одной «горячей точки».
Существуют реализации, которые ослабляют семантику FIFO (или, в других терминах, являются нелинериализуемыми) и «расщепляют» эту точку в «пятно», что улучшает показатели при высококонкурентном доступе. Возможно, это тема для еще одной статьи «Еще 10 способов …».
Это не просто статья, это — материал к весеннему вебинару «Multicore programming in Java». Видео к занятию #13 я выкладываю в свободный доступ для сообщества.
1. Не синхронизироваться, использовать чужой happens-before
2. На основе synchronized
3. На основе synchronized + идиома Private Monitor
4. На основе ReentrantLock
5. На основе Semaphore
6. На основе ReentrantReadWriteLock
7. На основе Spin Lock (неблокирующий)
8. Treiber stack (неблокирующий)
9. Используем идиому Copy-on-write
10. В функциональном стиле: Persistent stack
Вот видео вебинара (Лекция #13), где мы разбираем данные 10 способов
Везде ниже используется один и тот же вариант стека на односвязном списке. Методы pop() и peek() на пустом стеке приводят к NullPointerException.
public class Stack<T> { private Node<T> top = null; public void push(T newElem) { this.top = new Node<>(newElem, this.top); } public T peek() { return top.value; } public T pop() { Node<T> oldTop = this.top; this.top = this.top.next; return oldTop.value; } private static class Node<E> { private final E value; private final Node<E> next; private Node(E value, Node<E> next) { this.value = value; this.next = next; } } }
1. Не синхронизироваться, использовать чужой happens-before
Если вы полностью контролируете использование вашего стека, то может оказаться, что
1) его передача между потоками всегда сопряжена с happens-before ребром
2) логика приложения такова, что потоки используют (читают/пишут) стек «по очереди»
Собственно берем наш, ничем не защищенный стек
public class Stack<T> { private Node<T> top = null; public void push(T newElem) { this.top = new Node<>(newElem, this.top); } public T peek() { return top.value; } public T pop() { Node<T> oldTop = this.top; this.top = this.top.next; return oldTop.value; } private static class Node<E> { private final E value; private final Node<E> next; private Node(E value, Node<E> next) { this.value = value; this.next = next; } } }
И передаем через happens-before ребро образованное вызовом метода start() и первой инструкцией метода run() (как говорит Святая Книга: «A call to start() on a thread happens-before any actions in the started thread.»)
public class Demo { public static void main(String[] args) { final Stack<Integer> stack = new Stack<>(); // меняем стек в потоке main stack.push(1); stack.push(2); stack.push(3); stack.push(4); stack.push(5); new Thread(new Runnable() { public void run() { // меняем стек в другом потоке stack.pop(); // и читаем, это все не проблема System.out.println(stack.pop()); } }).start(); } }
Стоит ли напоминать что за такое
public class Demo { public static void main(String[] args) { final Stack<Integer> stack = new Stack<>(); stack.push(1); stack.push(2); stack.push(3); stack.push(4); stack.push(5); new Thread(new Runnable() { public void run() { stack.push(100); } }).start(); new Thread(new Runnable() { public void run() { System.out.println(stack.peek()); } }).start(); } }
Вы будете вечно гореть в Java-Аду (за data-racefull программу)!
Но практически все способы передачи через потокозащищенные коллекции создают happens-before ребро
import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; public class Demo { public static void main(String[] args) { final Stack<Integer> stack = new Stack<>(); final BlockingQueue<Stack<Integer>> interThreadQueue = new LinkedBlockingQueue<>(); stack.push(1); stack.push(2); stack.push(3); stack.push(4); stack.push(5); new Thread(new Runnable() { public void run() { stack.push(100); try { interThreadQueue.put(stack); } catch (InterruptedException ignore) {/*NOP*/} } }).start(); new Thread(new Runnable() { public void run() { try { Stack<Integer> myStack = interThreadQueue.take(); System.out.println(myStack.pop()); } catch (InterruptedException ignore) {/*NOP*/} } }).start(); } }
Мораль: создавайте специальные очереди сообщений, для обмена непотокозащищенными данными между потоками.
2. На основе synchronized
Что может быть проще чем
public class Stack<T> { private Node<T> top = null; public synchronized void push(T newElem) { this.top = new Node<>(newElem, this.top); } public synchronized T peek() { return top.value; } public synchronized T pop() { Node<T> oldTop = this.top; this.top = this.top.next; return oldTop.value; } private static class Node<E> { private final E value; private final Node<E> next; private Node(E value, Node<E> next) { this.value = value; this.next = next; } } }
Из забавного — мы теперь сами стали той самой потокозащищенной коллекцией, передача данных через которую создает happens-before ребро!
3. На основе synchronized + идиома Private Monitor
В предыдущем примере все хорошо, кроме того, что мы синхронизируемся по встроенному монитору стека, который «виден всем», кто видит стек. Таким образом, мы открываем наружу детали реализации синхронизации (нарушение инкапсуляции) и предыдущий пример можно атаковать вот так
public class Demo { public static void main(String[] args) { final Stack<String> stack = new Stack<>(); // Ну ооочень полезный поток, делает push()/pop() new Thread(new Runnable() { public void run() { while (true) { stack.push("A"); stack.pop(); System.out.println("push()/pop()"); } } }).start(); // поток-паразит new Thread(new Runnable() { public void run() { synchronized (stack) { while (true) ; } } }).start(); } } >> push()/pop() >> push()/pop() >> push()/pop() >> push()/pop() >> ... висим
Поток-паразит использовал встроенный стек и «повис», повесив операции со стеком. Вряд ли он это сделал со злости, скорее ошибка программиста.
Просто используйте идиому Private Mutex
public class Stack<T> { // Private Mutex! private final Object lock = new Object(); private Node<T> top = null; public void push(T newElem) { synchronized (lock) { this.top = new Node<>(newElem, this.top); } } public T peek() { synchronized (lock) { return this.top.value; } } public T pop() { synchronized (lock) { Node<T> oldTop = this.top; this.top = this.top.next; return oldTop.value; } } private static class Node<E> { private final E value; private final Node<E> next; private Node(E value, Node<E> next) { this.value = value; this.next = next; } } }
4. На основе ReentrantLock
Все как в предыдущем примере, но с ReentrantLock
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class Stack<T> { private final Lock lock = new ReentrantLock(); private Node<T> top = null; public void push(T newElem) { lock.lock(); try { this.top = new Node<>(newElem, this.top); } finally {lock.unlock();} } public T peek() { lock.lock(); try { return top.value; } finally {lock.unlock();} } public T pop() { lock.lock(); try { Node<T> oldTop = this.top; this.top = this.top.next; return oldTop.value; } finally {lock.unlock();} } private static class Node<E> { private final E value; private final Node<E> next; private Node(E value, Node<E> next) { this.value = value; this.next = next; } } }
Вопрос, а зачем же использовать ReentrantLock, а не встроенный монитор/synchronized?
Ну, во-первых, обратите свой взор на его богатое API (честность/fairness, lock, lockInterruptibly, tryLock, …), хотя в данном случае, вряд ли какой-то другой поток надолго «зависнет» в методах стека
import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class Stack<T> { private final Lock lock; private Node<T> top = null; public Stack(boolean fair) { this.lock = new ReentrantLock(fair); } // просто-push public void push(T newElem) { lock.lock(); try { //Thread.stop() this.top = new Node<>(newElem, this.top); } finally {lock.unlock();} } // push, с возможностью прервать ожидание захвата блокировки // посредством Thread.interrupt() -> InterruptedException public void pushInterruptibly(T newElem) throws InterruptedException { lock.lockInterruptibly(); try { this.top = new Node<>(newElem, this.top); } finally {lock.unlock();} } // push, с возможностью захвата блокировки только // в том случае, если она свободна public boolean tryPush(T newElem) { if (lock.tryLock()) { try { this.top = new Node<>(newElem, this.top); return true; } finally {lock.unlock();} } else { return false; } } // push, с возможностью захвата блокировки // с ограниченным временем ожидания public boolean tryPush(T newElem, long time, TimeUnit unit) throws InterruptedException { if (lock.tryLock(time, unit)) { try { this.top = new Node<>(newElem, this.top); return true; } finally {lock.unlock();} } else { return false; } } /*В этом примере расписан только метод push(). peek() и pop() пропустили*/ private static class Node<E> { private final E value; private final Node<E> next; private Node(E value, Node<E> next) { this.value = value; this.next = next; } } }
А, во-вторых, обратите свой взор на главы «13.1. Lock and ReentrantLock», «13.2. Performance Considerations» и «13.4. Choosing Between Synchronized and ReentrantLock» книги Brian Goetz и других «Java Concurrency in Practice», где проводится сравнение synchronized и ReentrantLock.
5. На основе Semaphore
Аналогично предыдущему примеру (на ReentrantLock) можно сделать на семафоре
import java.util.concurrent.Semaphore; public class Stack<T> { // binary semaphore private final Semaphore sem = new Semaphore(1); private Node<T> top = null; public void push(T newElem) { sem.acquireUninterruptibly(); try { this.top = new Node<>(newElem, this.top); } finally {sem.release();} } public T peek() { sem.acquireUninterruptibly(); try { return top.value; } finally {sem.release();} } public T pop() { sem.acquireUninterruptibly(); try { Node<T> oldTop = this.top; this.top = this.top.next; return oldTop.value; } finally {sem.release();} } private static class Node<E> { private final E value; private final Node<E> next; private Node(E value, Node<E> next) { this.value = value; this.next = next; } } }
Семафор, «заряженный единицей», работает как обычная блокировка и называется Binary Semaphore.
Однако, как и другие жители java.util.concurrent и наследники Великого и Могучего AbstractQueuedSynchronizer обладает таким же богатым API
import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; public class Stack<T> { // binary semaphore private final Semaphore sem; private Node<T> top = null; public Stack(boolean fair) { this.sem = new Semaphore(1, fair); } public void push(T newElem) { sem.acquireUninterruptibly(); try { this.top = new Node<>(newElem, this.top); } finally {sem.release();} } public void pushInterruptibly(T newElem) throws InterruptedException { sem.acquire(); try { this.top = new Node<>(newElem, this.top); } finally {sem.release();} } public boolean tryPush(T newElem) { if (sem.tryAcquire()) { try { this.top = new Node<>(newElem, this.top); return true; } finally {sem.release();} } else { return false; } } public boolean tryPush(T newElem, long time, TimeUnit unit) throws InterruptedException { if (sem.tryAcquire(time, unit)) { try { this.top = new Node<>(newElem, this.top); return true; } finally {sem.release();} } else { return false; } } /*В этом примере расписан только метод push(). peek() и pop() пропустили*/ private static class Node<E> { private final E value; private final Node<E> next; private Node(E value, Node<E> next) { this.value = value; this.next = next; } } }
6. На основе ReentrantReadWriteLock
Давайте, наконец-то, обратим внимание на то, что у нас два рода операций — мутаторы (push, pop) и читатель (peek) и используем отдельные режимы блокировки — exclusive и shared
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; public class Stack<T> { private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); private final Lock rLock = rwLock.readLock(); private final Lock wLock = rwLock.writeLock(); private Node<T> top = null; public void push(T newElem) { // wLock - EXCLUSIVE mode! wLock.lock(); try { this.top = new Node<>(newElem, this.top); } finally {wLock.unlock();} } public T peek() { // rLock - SHARED mode! rLock.lock(); try { return this.top.value; } finally {rLock.unlock();} } public T pop() { // wLock - EXCLUSIVE mode! wLock.lock(); try { Node<T> oldTop = this.top; this.top = this.top.next; return oldTop.value; } finally {wLock.unlock();} } private static class Node<E> { private final E value; private final Node<E> next; private Node(E value, Node<E> next) { this.value = value; this.next = next; } } }
Да, ReentrantReadWriteLock тоже произошел от AbstractQueuedSynchronizer и тоже обладает всеми этими fairness, lock, lockInterruptible, tryLock,…
7. На основе Spin Lock (неблокирующий)
Мы можем сделать свой Spin Lock на основе java.util.concurrent.atomic
Тут мы, в случае занятого стека, не передаем управление JVM/OS (в отличии от senchronized, ReentrantLock, Semaphore, ReadWriteReentrantLock, …)
import java.util.concurrent.atomic.AtomicBoolean; public class Stack<T> { private final AtomicBoolean locked = new AtomicBoolean(false); private Node<T> top = null; public void push(T newElem) { // false->true while (!locked.compareAndSet(false, true)) {/*NOP*/} try { this.top = new Node<>(newElem, this.top); } finally {locked.set(false);} } public T peek() { while (!locked.compareAndSet(false, true)) {/*NOP*/} try { return top.value; } finally {locked.set(false);} } public T pop() { while (!locked.compareAndSet(false, true)) {/*NOP*/} try { Node<T> oldTop = this.top; this.top = this.top.next; return oldTop.value; } finally { locked.set(false); } } private static class Node<E> { private final E value; private final Node<E> next; private Node(E value, Node<E> next) { this.value = value; this.next = next; } } }
Протокол захвата — перевод AtomicBoolean: false -> true (при конкуренции со стороны других потоков).
Протокол освобождения — перевод AtomicBoolean: true -> false (без конкуренции со стороны других потоков).
8. Treiber stack (неблокирующий)
Ну раз уже взялись за на основе java.util.concurrent.atomic, то надо делать неблокирующий стек Трейбера
import java.util.concurrent.atomic.AtomicReference; public class Stack<T> { private final AtomicReference<Node<T>> top = new AtomicReference<>(null); public void push(T newElem) { Node<T> newTop = new Node<>(newElem, null); while (true) { Node<T> oldTop = top.get(); newTop.next = oldTop; if (top.compareAndSet(oldTop, newTop)) { break; } } } public T peek() { return top.get().value; } public T pop() { while (true) { Node<T> oldTop = this.top.get(); Node<T> newTop = oldTop.next; if (top.compareAndSet(oldTop, newTop)) { return oldTop.value; } } } private static class Node<E> { private final E value; private Node<E> next; private Node(E value, Node<E> next) { this.value = value; this.next = next; } } }
9. Используем идиому Copy-on-write
Это не совсем copy-on-write, мы не создаем полноценную копию, но это дань уважения старому доброму я взял эту идею у старого доброго CopyOnWriteArrayList — мутации с захватом монопольной блокировки, чтение через volatile
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class Stack<T> { private final Lock lock = new ReentrantLock(); private volatile Node<T> top = null; public void push(T newElem) { lock.lock(); try { this.top = new Node<>(newElem, this.top); } finally {lock.unlock();} } public T peek() { return top.value; } public T pop() { lock.lock(); try { Node<T> oldTop = this.top; this.top = this.top.next; return oldTop.value; } finally {lock.unlock();} } private static class Node<E> { private final E value; private final Node<E> next; private Node(E value, Node<E> next) { this.value = value; this.next = next; } } }
10. В функциональном стиле: Persistent stack
Функциональные языки от императивных отличает многое. Один из аспектов — нет изменяемых переменных. Точнее нет переменных вообще, есть значения. Вопрос — а что же они делают с коллекциями? Как известно в отсутствии коллекций жизнь зародиться не может, а коллекции, кажется, по определению изменчивые сущности.
Хитрые функциональщики на каждое действие-мутацию (add, remove, …) порождают новую коллекцию. А для уменьшения потребления памяти и процессора на такие действия — стараются, что бы новая версия использовала как можно больше «материала» от предыдущей.
public class Stack<T> { private final Node<T> top; private Stack(Node<T> top) { this.top = top; } public Stack() { this.top = null; } public Stack<T> push(T newElem) { return new Stack<>(new Node<>(newElem, this.top)); } public T peek() { return this.top.value; } public Stack<T> pop() { return new Stack<>(top.next); } private static class Node<E> { private final E value; private final Node<E> next; private Node(E value, Node<E> next) { this.value = value; this.next = next; } } }
У такого варианта немного необычное API на первый взгляд. Его надо привыкнуть использовать определенным образом
public class Demo { public static void main(String[] args) { Stack<String> stack = new Stack<>(); stack = stack.push("A"); stack = stack.push("B").push("C"); System.out.println("stack.peek(): " + stack.peek()); } } >> stack.peek(): C
Так как каждая мутация порождает новую версию
public class Demo { public static void main(String[] args) { Stack<String> stack = new Stack<>(); Stack<String> stackA = stack.push("A"); Stack<String> stackAB = stackA.push("B"); Stack<String> stackABC = stackAB.push("C"); System.out.println("stackA.peek(): " + stackA.peek()); System.out.println("stackAB.peek(): " + stackAB.peek()); System.out.println("stackABC.peek(): " + stackABC.peek()); } } >> stackA.peek(): A >> stackAB.peek(): B >> stackABC.peek(): C
Здесь я непрерывно передаю «мутирующие» стеки от одного потока к другому. Конечно, надо делать safe publication (я делаю через volatile-read/volatile-write), но после чтения можно свободно «менять» свою версию.
public class Demo { static volatile Stack<String> stack = new Stack<>(); static { stack.push("#"); stack.push("#"); } public static void main(String[] args) { new Thread(new Runnable() { public void run() { for (int k = 0; ; k++) { // добавляем в стек элемент stack = stack.push("" + k); } } }).start(); new Thread(new Runnable() { public void run() { while (true) { // удаляем из стека элемент Stack<String> newStack = stack.pop(); System.out.println(newStack.peek()); } } }).start(); } }
Данный подход (метод-мутатор возвращает новую версию и не меняет предыдущую) достаточно хорошо представлен в JDK
public class Demo { public static void main(String[] args) { String origin = "Hello!"; String mutated = origin.toUpperCase(); System.out.println("origin: " + origin); System.out.println("mutated: " + mutated); } } >> origin: Hello! >> mutated: HELLO!
import java.math.BigInteger; public class Demo { public static void main(String[] args) { BigInteger origin = new BigInteger("40"); BigInteger mutated = origin.add(new BigInteger("2")); System.out.println("origin: " + origin); System.out.println("mutated: " + mutated); } } >> origin: 40 >> mutated: 42
Контакты
Кратко о курсе «Multicore programming in Java»: стартует 1 сентября, ведется в режиме вебинаров дважды в неделю (понедельник + четверг) в 19.00-22.00 (по московскому времени), состоит из 16 лекций по 2.5 часа (=40 лекционных часов), рассчитан на Java Middle.
Стоимость курса
— при оплате до 9 августа — 375$
— при оплате до 16 августа — 400$
— при оплате до 23 августа — 425$
— при оплате до 30 августа — 450$
Я занимаюсь онлайн обучением Java (вот курсы программирования) и публикую часть учебных материалов в рамках переработки курса Java Core. Видеозаписи лекций в аудитории Вы можете увидеть на youtube-канале, возможно, видео канала лучше систематизировано в этой статье.
На все вопросы с удовольствием отвечу по следующим контактам (или в комментариях)
skype: GolovachCourses
email: GolovachCourses@gmail.com
ссылка на оригинал статьи http://habrahabr.ru/post/231953/
Добавить комментарий