10 способов реализовать потокозащищенный Stack на Java

от автора

В данной статье я предлагаю рассмотреть 10 способов реализовать потокозащищенный стек на Java.

Почему стек?
Потому что это одна из простейших в реализации структур данных, так что это не будет «затенять» многопоточную логику. Также из трех основных операций (push, pop, peek) — есть как операции исключительно чтения мутирующей совместно используемой памяти, так и операции записи.

Целью статьи не было проведение сравнительного анализа различных подходов. Задача статьи — показать разнообразие возможностей. Однако в целом стоит отметить, что основная проблема демонстрируемых реализаций стека — наличие одной «горячей точки».

Существуют реализации, которые ослабляют семантику FIFO (или, в других терминах, являются нелинериализуемыми) и «расщепляют» эту точку в «пятно», что улучшает показатели при высококонкурентном доступе. Возможно, это тема для еще одной статьи «Еще 10 способов …».

Это не просто статья, это — материал к весеннему вебинару «Multicore programming in Java». Видео к занятию #13 я выкладываю в свободный доступ для сообщества.

1. Не синхронизироваться, использовать чужой happens-before
2. На основе synchronized
3. На основе synchronized + идиома Private Monitor
4. На основе ReentrantLock
5. На основе Semaphore
6. На основе ReentrantReadWriteLock
7. На основе Spin Lock (неблокирующий)
8. Treiber stack (неблокирующий)
9. Используем идиому Copy-on-write
10. В функциональном стиле: Persistent stack

Вот видео вебинара (Лекция #13), где мы разбираем данные 10 способов


Везде ниже используется один и тот же вариант стека на односвязном списке. Методы pop() и peek() на пустом стеке приводят к NullPointerException.

public class Stack<T> {     private Node<T> top = null;     public void push(T newElem) {         this.top = new Node<>(newElem, this.top);     }     public T peek() {         return top.value;     }         public T pop() {         Node<T> oldTop = this.top;         this.top = this.top.next;         return oldTop.value;     }              private static class Node<E> {         private final E value;         private final Node<E> next;         private Node(E value, Node<E> next) {             this.value = value;             this.next = next;         }     }     } 

1. Не синхронизироваться, использовать чужой happens-before

Если вы полностью контролируете использование вашего стека, то может оказаться, что
1) его передача между потоками всегда сопряжена с happens-before ребром
2) логика приложения такова, что потоки используют (читают/пишут) стек «по очереди»

Собственно берем наш, ничем не защищенный стек

public class Stack<T> {     private Node<T> top = null;     public void push(T newElem) {         this.top = new Node<>(newElem, this.top);     }     public T peek() {         return top.value;     }         public T pop() {         Node<T> oldTop = this.top;         this.top = this.top.next;         return oldTop.value;     }              private static class Node<E> {         private final E value;         private final Node<E> next;         private Node(E value, Node<E> next) {             this.value = value;             this.next = next;         }     }     } 

И передаем через happens-before ребро образованное вызовом метода start() и первой инструкцией метода run() (как говорит Святая Книга: «A call to start() on a thread happens-before any actions in the started thread.»)

public class Demo {     public static void main(String[] args) {         final Stack<Integer> stack = new Stack<>();         // меняем стек в потоке main         stack.push(1);         stack.push(2);         stack.push(3);         stack.push(4);         stack.push(5);          new Thread(new Runnable() {             public void run() {                 // меняем стек в другом потоке                 stack.pop();                 // и читаем, это все не проблема                 System.out.println(stack.pop());             }         }).start();     } } 

Стоит ли напоминать что за такое

public class Demo {     public static void main(String[] args) {         final Stack<Integer> stack = new Stack<>();         stack.push(1);         stack.push(2);         stack.push(3);         stack.push(4);         stack.push(5);          new Thread(new Runnable() {             public void run() {                 stack.push(100);             }         }).start();                  new Thread(new Runnable() {                         public void run() {                 System.out.println(stack.peek());             }         }).start();     } } 

Вы будете вечно гореть в Java-Аду (за data-racefull программу)!

Но практически все способы передачи через потокозащищенные коллекции создают happens-before ребро

import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue;  public class Demo {     public static void main(String[] args) {         final Stack<Integer> stack = new Stack<>();         final BlockingQueue<Stack<Integer>> interThreadQueue                                                                                    = new LinkedBlockingQueue<>();         stack.push(1);         stack.push(2);         stack.push(3);         stack.push(4);         stack.push(5);          new Thread(new Runnable() {             public void run() {                 stack.push(100);                 try {                     interThreadQueue.put(stack);                 } catch (InterruptedException ignore) {/*NOP*/}             }         }).start();                  new Thread(new Runnable() {             public void run() {                 try {                     Stack<Integer> myStack = interThreadQueue.take();                     System.out.println(myStack.pop());                 } catch (InterruptedException ignore) {/*NOP*/}             }         }).start();     } } 

Мораль: создавайте специальные очереди сообщений, для обмена непотокозащищенными данными между потоками.

2. На основе synchronized

Что может быть проще чем

public class Stack<T> {     private Node<T> top = null;     public synchronized void push(T newElem) {         this.top = new Node<>(newElem, this.top);     }     public synchronized T peek() {         return top.value;     }             public synchronized T pop() {         Node<T> oldTop = this.top;         this.top = this.top.next;         return oldTop.value;     }              private static class Node<E> {         private final E value;         private final Node<E> next;         private Node(E value, Node<E> next) {             this.value = value;             this.next = next;         }     }     } 

Из забавного — мы теперь сами стали той самой потокозащищенной коллекцией, передача данных через которую создает happens-before ребро!

3. На основе synchronized + идиома Private Monitor

В предыдущем примере все хорошо, кроме того, что мы синхронизируемся по встроенному монитору стека, который «виден всем», кто видит стек. Таким образом, мы открываем наружу детали реализации синхронизации (нарушение инкапсуляции) и предыдущий пример можно атаковать вот так

public class Demo {     public static void main(String[] args) {         final Stack<String> stack = new Stack<>();          // Ну ооочень полезный поток, делает push()/pop()         new Thread(new Runnable() {             public void run() {                 while (true) {                     stack.push("A");                     stack.pop();                     System.out.println("push()/pop()");                 }             }         }).start();          // поток-паразит         new Thread(new Runnable() {             public void run() {                 synchronized (stack) {                     while (true) ;                 }             }         }).start();     } }  >> push()/pop() >> push()/pop() >> push()/pop() >> push()/pop() >> ... висим 

Поток-паразит использовал встроенный стек и «повис», повесив операции со стеком. Вряд ли он это сделал со злости, скорее ошибка программиста.

Просто используйте идиому Private Mutex

public class Stack<T> {     // Private Mutex!     private final Object lock = new Object();     private Node<T> top = null;     public void push(T newElem) {         synchronized (lock) {             this.top = new Node<>(newElem, this.top);         }     }     public T peek() {         synchronized (lock) {             return this.top.value;         }     }     public T pop() {         synchronized (lock) {             Node<T> oldTop = this.top;             this.top = this.top.next;             return oldTop.value;         }     }      private static class Node<E> {         private final E value;         private final Node<E> next;         private Node(E value, Node<E> next) {             this.value = value;             this.next = next;         }     } } 

4. На основе ReentrantLock

Все как в предыдущем примере, но с ReentrantLock

import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock;  public class Stack<T> {     private final Lock lock = new ReentrantLock();     private Node<T> top = null;     public void push(T newElem) {         lock.lock();         try {             this.top = new Node<>(newElem, this.top);         } finally {lock.unlock();}     }     public T peek() {         lock.lock();         try {             return top.value;         } finally {lock.unlock();}     }     public T pop() {         lock.lock();         try {             Node<T> oldTop = this.top;             this.top = this.top.next;             return oldTop.value;         } finally {lock.unlock();}     }      private static class Node<E> {         private final E value;         private final Node<E> next;         private Node(E value, Node<E> next) {             this.value = value;             this.next = next;         }     } } 

Вопрос, а зачем же использовать ReentrantLock, а не встроенный монитор/synchronized?

Ну, во-первых, обратите свой взор на его богатое API (честность/fairness, lock, lockInterruptibly, tryLock, …), хотя в данном случае, вряд ли какой-то другой поток надолго «зависнет» в методах стека

import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock;  public class Stack<T> {     private final Lock lock;     private Node<T> top = null;     public Stack(boolean fair) {         this.lock = new ReentrantLock(fair);     }     // просто-push     public void push(T newElem) {         lock.lock();         try { //Thread.stop()             this.top = new Node<>(newElem, this.top);         } finally {lock.unlock();}     }     // push, с возможностью прервать ожидание захвата блокировки     // посредством Thread.interrupt() -> InterruptedException      public void pushInterruptibly(T newElem) throws InterruptedException {         lock.lockInterruptibly();         try {             this.top = new Node<>(newElem, this.top);         } finally {lock.unlock();}     }     // push, с возможностью захвата блокировки только      // в том случае, если она свободна      public boolean tryPush(T newElem) {         if (lock.tryLock()) {             try {                 this.top = new Node<>(newElem, this.top);                 return true;             } finally {lock.unlock();}         } else {             return false;         }     }     // push, с возможностью захвата блокировки      // с ограниченным временем ожидания     public boolean tryPush(T newElem, long time, TimeUnit unit) throws InterruptedException {         if (lock.tryLock(time, unit)) {             try {                 this.top = new Node<>(newElem, this.top);                 return true;             } finally {lock.unlock();}         } else {             return false;         }     }      /*В этом примере расписан только метод push().         peek() и pop() пропустили*/      private static class Node<E> {         private final E value;         private final Node<E> next;         private Node(E value, Node<E> next) {             this.value = value;             this.next = next;         }     } } 

А, во-вторых, обратите свой взор на главы «13.1. Lock and ReentrantLock», «13.2. Performance Considerations» и «13.4. Choosing Between Synchronized and ReentrantLock» книги Brian Goetz и других «Java Concurrency in Practice», где проводится сравнение synchronized и ReentrantLock.

5. На основе Semaphore

Аналогично предыдущему примеру (на ReentrantLock) можно сделать на семафоре

import java.util.concurrent.Semaphore;  public class Stack<T> {     // binary semaphore     private final Semaphore sem = new Semaphore(1);     private Node<T> top = null;     public void push(T newElem) {         sem.acquireUninterruptibly();         try {             this.top = new Node<>(newElem, this.top);         } finally {sem.release();}     }     public T peek() {         sem.acquireUninterruptibly();         try {             return top.value;         } finally {sem.release();}     }     public T pop() {         sem.acquireUninterruptibly();         try {             Node<T> oldTop = this.top;             this.top = this.top.next;             return oldTop.value;         } finally {sem.release();}     }      private static class Node<E> {         private final E value;         private final Node<E> next;         private Node(E value, Node<E> next) {             this.value = value;             this.next = next;         }     } } 

Семафор, «заряженный единицей», работает как обычная блокировка и называется Binary Semaphore.
Однако, как и другие жители java.util.concurrent и наследники Великого и Могучего AbstractQueuedSynchronizer обладает таким же богатым API

import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit;  public class Stack<T> {     // binary semaphore     private final Semaphore sem;     private Node<T> top = null;     public Stack(boolean fair) {         this.sem = new Semaphore(1, fair);     }     public void push(T newElem) {         sem.acquireUninterruptibly();         try {             this.top = new Node<>(newElem, this.top);         } finally {sem.release();}     }     public void pushInterruptibly(T newElem) throws InterruptedException {         sem.acquire();         try {             this.top = new Node<>(newElem, this.top);         } finally {sem.release();}     }     public boolean tryPush(T newElem) {         if (sem.tryAcquire()) {             try {                 this.top = new Node<>(newElem, this.top);                 return true;             } finally {sem.release();}         } else {             return false;         }     }     public boolean tryPush(T newElem, long time, TimeUnit unit) throws InterruptedException {         if (sem.tryAcquire(time, unit)) {             try {                 this.top = new Node<>(newElem, this.top);                 return true;             } finally {sem.release();}         } else {             return false;         }     }      /*В этом примере расписан только метод push().         peek() и pop() пропустили*/      private static class Node<E> {         private final E value;         private final Node<E> next;         private Node(E value, Node<E> next) {             this.value = value;             this.next = next;         }     } } 

6. На основе ReentrantReadWriteLock

Давайте, наконец-то, обратим внимание на то, что у нас два рода операций — мутаторы (push, pop) и читатель (peek) и используем отдельные режимы блокировки — exclusive и shared

import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;  public class Stack<T> {     private final ReadWriteLock rwLock = new ReentrantReadWriteLock();     private final Lock rLock = rwLock.readLock();     private final Lock wLock = rwLock.writeLock();     private Node<T> top = null;      public void push(T newElem) {         // wLock - EXCLUSIVE mode!         wLock.lock();         try {             this.top = new Node<>(newElem, this.top);         } finally {wLock.unlock();}     }     public T peek() {         // rLock - SHARED mode!         rLock.lock();         try {             return this.top.value;         } finally {rLock.unlock();}     }             public T pop() {         // wLock - EXCLUSIVE mode!         wLock.lock();         try {             Node<T> oldTop = this.top;             this.top = this.top.next;             return oldTop.value;         } finally {wLock.unlock();}     }      private static class Node<E> {         private final E value;         private final Node<E> next;         private Node(E value, Node<E> next) {             this.value = value;             this.next = next;         }     } } 

Да, ReentrantReadWriteLock тоже произошел от AbstractQueuedSynchronizer и тоже обладает всеми этими fairness, lock, lockInterruptible, tryLock,…

7. На основе Spin Lock (неблокирующий)

Мы можем сделать свой Spin Lock на основе java.util.concurrent.atomic

Тут мы, в случае занятого стека, не передаем управление JVM/OS (в отличии от senchronized, ReentrantLock, Semaphore, ReadWriteReentrantLock, …)

import java.util.concurrent.atomic.AtomicBoolean;  public class Stack<T> {     private final AtomicBoolean locked = new AtomicBoolean(false);     private Node<T> top = null;     public void push(T newElem) {         // false->true         while (!locked.compareAndSet(false, true)) {/*NOP*/}         try {             this.top = new Node<>(newElem, this.top);         } finally {locked.set(false);}     }     public T peek() {         while (!locked.compareAndSet(false, true)) {/*NOP*/}         try {             return top.value;         } finally {locked.set(false);}     }            public T pop() {         while (!locked.compareAndSet(false, true)) {/*NOP*/}         try {             Node<T> oldTop = this.top;             this.top = this.top.next;             return oldTop.value;         } finally {             locked.set(false);         }     }          private static class Node<E> {         private final E value;         private final Node<E> next;         private Node(E value, Node<E> next) {             this.value = value;             this.next = next;         }     } } 

Протокол захвата — перевод AtomicBoolean: false -> true (при конкуренции со стороны других потоков).
Протокол освобождения — перевод AtomicBoolean: true -> false (без конкуренции со стороны других потоков).

8. Treiber stack (неблокирующий)

Ну раз уже взялись за на основе java.util.concurrent.atomic, то надо делать неблокирующий стек Трейбера

import java.util.concurrent.atomic.AtomicReference;  public class Stack<T> {     private final AtomicReference<Node<T>> top = new AtomicReference<>(null);      public void push(T newElem) {         Node<T> newTop = new Node<>(newElem, null);         while (true) {             Node<T> oldTop = top.get();             newTop.next = oldTop;             if (top.compareAndSet(oldTop, newTop)) {                 break;             }         }     }     public T peek() {         return top.get().value;     }          public T pop() {         while (true) {             Node<T> oldTop = this.top.get();             Node<T> newTop = oldTop.next;             if (top.compareAndSet(oldTop, newTop)) {                 return oldTop.value;             }         }     }      private static class Node<E> {         private final E value;         private Node<E> next;         private Node(E value, Node<E> next) {             this.value = value;             this.next = next;         }     } } 

9. Используем идиому Copy-on-write

Это не совсем copy-on-write, мы не создаем полноценную копию, но это дань уважения старому доброму я взял эту идею у старого доброго CopyOnWriteArrayList — мутации с захватом монопольной блокировки, чтение через volatile

import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock;  public class Stack<T> {     private final Lock lock = new ReentrantLock();     private volatile Node<T> top = null;     public void push(T newElem) {         lock.lock();         try {             this.top = new Node<>(newElem, this.top);         } finally {lock.unlock();}     }     public T peek() {         return top.value;     }     public T pop() {         lock.lock();         try {             Node<T> oldTop = this.top;             this.top = this.top.next;             return oldTop.value;         } finally {lock.unlock();}     }      private static class Node<E> {         private final E value;         private final Node<E> next;         private Node(E value, Node<E> next) {             this.value = value;             this.next = next;         }     } } 

10. В функциональном стиле: Persistent stack

Функциональные языки от императивных отличает многое. Один из аспектов — нет изменяемых переменных. Точнее нет переменных вообще, есть значения. Вопрос — а что же они делают с коллекциями? Как известно в отсутствии коллекций жизнь зародиться не может, а коллекции, кажется, по определению изменчивые сущности.
Хитрые функциональщики на каждое действие-мутацию (add, remove, …) порождают новую коллекцию. А для уменьшения потребления памяти и процессора на такие действия — стараются, что бы новая версия использовала как можно больше «материала» от предыдущей.

public class Stack<T> {     private final Node<T> top;     private Stack(Node<T> top) {         this.top = top;     }     public Stack() {         this.top = null;     }     public Stack<T> push(T newElem) {         return new Stack<>(new Node<>(newElem, this.top));     }     public T peek() {         return this.top.value;     }     public Stack<T> pop() {         return new Stack<>(top.next);     }          private static class Node<E> {         private final E value;         private final Node<E> next;         private Node(E value, Node<E> next) {             this.value = value;             this.next = next;         }     } } 

У такого варианта немного необычное API на первый взгляд. Его надо привыкнуть использовать определенным образом

public class Demo {     public static void main(String[] args) {         Stack<String> stack = new Stack<>();          stack = stack.push("A");         stack = stack.push("B").push("C");          System.out.println("stack.peek(): " + stack.peek());     } }  >> stack.peek(): C 

Так как каждая мутация порождает новую версию

public class Demo {     public static void main(String[] args) {         Stack<String> stack = new Stack<>();         Stack<String> stackA = stack.push("A");         Stack<String> stackAB = stackA.push("B");         Stack<String> stackABC = stackAB.push("C");          System.out.println("stackA.peek(): " + stackA.peek());         System.out.println("stackAB.peek(): " + stackAB.peek());         System.out.println("stackABC.peek(): " + stackABC.peek());     } }  >> stackA.peek(): A >> stackAB.peek(): B >> stackABC.peek(): C 

Здесь я непрерывно передаю «мутирующие» стеки от одного потока к другому. Конечно, надо делать safe publication (я делаю через volatile-read/volatile-write), но после чтения можно свободно «менять» свою версию.

public class Demo {     static volatile Stack<String> stack = new Stack<>();     static {         stack.push("#");         stack.push("#");     }      public static void main(String[] args) {         new Thread(new Runnable() {             public void run() {                 for (int k = 0; ; k++) {                     // добавляем в стек элемент                     stack = stack.push("" + k);                  }             }         }).start();          new Thread(new Runnable() {             public void run() {                 while (true) {                     // удаляем из стека элемент                     Stack<String> newStack = stack.pop();                      System.out.println(newStack.peek());                 }             }         }).start();     } } 

Данный подход (метод-мутатор возвращает новую версию и не меняет предыдущую) достаточно хорошо представлен в JDK

public class Demo {     public static void main(String[] args) {         String origin = "Hello!";         String mutated = origin.toUpperCase();          System.out.println("origin: " + origin);         System.out.println("mutated: " + mutated);     } }  >> origin: Hello! >> mutated: HELLO! 
import java.math.BigInteger;  public class Demo {     public static void main(String[] args) {         BigInteger origin = new BigInteger("40");         BigInteger mutated = origin.add(new BigInteger("2"));          System.out.println("origin: " + origin);         System.out.println("mutated: " + mutated);     } }  >> origin: 40 >> mutated: 42 

Контакты

Кратко о курсе «Multicore programming in Java»: стартует 1 сентября, ведется в режиме вебинаров дважды в неделю (понедельник + четверг) в 19.00-22.00 (по московскому времени), состоит из 16 лекций по 2.5 часа (=40 лекционных часов), рассчитан на Java Middle.

Стоимость курса
— при оплате до 9 августа — 375$
— при оплате до 16 августа — 400$
— при оплате до 23 августа — 425$
— при оплате до 30 августа — 450$

Я занимаюсь онлайн обучением Java (вот курсы программирования) и публикую часть учебных материалов в рамках переработки курса Java Core. Видеозаписи лекций в аудитории Вы можете увидеть на youtube-канале, возможно, видео канала лучше систематизировано в этой статье.

На все вопросы с удовольствием отвечу по следующим контактам (или в комментариях)
skype: GolovachCourses
email: GolovachCourses@gmail.com

ссылка на оригинал статьи http://habrahabr.ru/post/231953/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *