Как это работает в мире java. Пул потоков

от автора

Основной принцип программирования гласит: не изобретать велосипед. Но иногда, чтобы понять, что происходит и как использовать инструмент неправильно, нам нужно это сделать. Сегодня изобретаем паттерн многопоточного выполнения задач.

Представим, что у вас которая вызывает большую загрузку процессора:

public class Counter {      public Double count(double a) {         for (int i = 0; i < 1000000; i++) {             a = a + Math.tan(a);         }          return a;     } }

Мы хотим как можно быстрее обработать ряд таких задач, попробуем*:

public class SingleThreadClient {      public static void main(String[] args) {         Counter counter = new Counter();          long start = System.nanoTime();          double value = 0;         for (int i = 0; i < 400; i++) {             value += counter.count(i);         }          System.out.println(format("Executed by %d s, value : %f",                 (System.nanoTime() - start) / (1000_000_000),                 value));     } }

На моей тачке с 4 физическими ядрами использование ресурсов процессора
top -pid {pid}:

image

Время выполнения 104 сек.

Как вы заметили, загрузка одного процессора на один java-процесс с одним выполняемым потоком составляет 100%, но общая загрузка процессора в пользовательском пространстве составляет всего 2,5%, и у нас есть много неиспользуемых системных ресурсов.
Давайте попробуем использовать больше, добавив больше рабочих потоков:

public class MultithreadClient {      public static void main(String[] args) throws ExecutionException, InterruptedException {         ExecutorService threadPool = Executors.newFixedThreadPool(8);         Counter counter = new Counter();          long start = System.nanoTime();          List<Future<Double>> futures = new ArrayList<>();         for (int i = 0; i < 400; i++) {             final int j = i;             futures.add(                     CompletableFuture.supplyAsync(                             () -> counter.count(j),                             threadPool                     ));         }          double value = 0;         for (Future<Double> future : futures) {             value += future.get();         }          System.out.println(format("Executed by %d s, value : %f",                 (System.nanoTime() - start) / (1000_000_000),                 value));          threadPool.shutdown();     } }

Занятые ресурсы:

image

ThreadPoolExecutor

Для ускорения мы использовали ThreadPool — в java его роль играет ThreadPoolExecutor, который может быть реализован непосредственно или из одного из методов в классе Utilities. Если мы заглянем внутрь ThreadPoolExecutor, мы можем найти очередь:

private final BlockingQueue<Runnable> workQueue;

в которой задачи собираются, если запущено больше потоков чем размер начального пула. Если запущено меньше потоков начального размера пула, пул попробует стартовать новый поток:

public void execute(Runnable command) {    ...     if (workerCountOf(c) < corePoolSize) {         if (addWorker(command, true))             return;    ...    if (isRunning(c) && workQueue.offer(command)) {    ...        addWorker(null, false);    ...     } }

Каждый addWorker запускает новый поток с задачей Runnable, которая опрашивает workQueue на наличие новых задач и выполняет их.

final void runWorker(Worker w) {    ...     try {         while (task != null || (task = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)) != null) {            ...                  task.run();    ... }

ThreadPoolExecutor имеет очень понятный javadoc, поэтому нет смысла его перефразировать. Вместо этого, давайте попробуем сделать наш собственный:

public class ThreadPool implements Executor {     private final static Queue<Runnable> workQueue = new ConcurrentLinkedQueue<>();     private static volatile boolean isRunning = true;      public ThreadPool(int nThreads) {         for (int i = 0; i < nThreads; i++) {             new Thread(new TaskWorker()).start();         }     }      @Override     public void execute(Runnable command) {         if (isRunning) {             workQueue.offer(command);         }     }      public void shutdown() {         isRunning = false;     }      private final class TaskWorker implements Runnable {          @Override         public void run() {             while (isRunning) {                 Runnable nextTask = workQueue.poll();                 if (nextTask != null) {                     nextTask.run();                 }             }         }     } }

Теперь давайте выполним ту же задачу, что и выше, с нашим пулом.
Меняем строку в MultithreadClient:

// ExecutorService threadPool = Executors.newFixedThreadPool (8); ThreadPool threadPool = новый ThreadPool (8);

Время выполнения практически одинаковое — 15 секунд.

Размер пула потоков

Попробуем еще больше увеличить количество запущенных потоков в пуле — до 100.

ThreadPool threadPool = new ThreadPool(100);

Мы можем видеть, что время выполнения уменьшилось до 28 секунд — почему это произошло?

Существует несколько независимых причин, по которым производительность могла упасть, например, из-за постоянных переключений контекста процессора, когда он приостанавливает работу над одной задачей и должен переключаться на другую, переключение включает сохранение состояния и восстановление состояния. Пока процессор ​​занято переключением состояний, оно не делает никакой полезной работы над какой-либо задачей.

Количество переключений контекста процесса можно увидеть, посмотрев на csw параметр при выводе команды top.

На 8 потоках:
image

На 100 потоках:
image

Как выбрать размер пула?

Размер зависит от типа выполняемых задач. Разумеется, размер пула потоков редко должен быть захардокожен, скорее он должен быть настраиваемый а оптимальный размер выводится из мониторинга пропускной способности исполняемых задач.

Предполагая, что потоки не блокируют друг друга, нет циклов ожидания I/O, и время обработки задач одинаково, оптимальный
пул потоков = Runtime.getRuntime().availProcessors () + 1.

Если потоки в основном ожидают I/O, то оптимальный размер пула должен быть увеличен на отношение между временем ожидания процесса и временем вычисления. Например. У нас есть процесс, который тратит 50% времени в iowait, тогда размер пула может быть
2 * Runtime.getRuntime().availableProcessors () + 1.

Другие виды пулов

  1. Пул потоков с ограничением по памяти, который блокирует отправку задачи, когда в очереди слишком много задач
    MemoryAwareThreadPoolExecutor

  2. Пул потоков, который регистрирует JMX-компонент для контроля и настройки размера пула в runtime.
    JMXEnabledThreadPoolExecutor

Исходный код можно найти здесь.

[*] — тест не является точным, для более точных тестов используйте: http://openjdk.java.net/projects/code-tools/jmh/

ссылка на оригинал статьи https://habrahabr.ru/post/326146/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *