Привет, Хабр! В ежегодных поисках ответа на вопрос «как же удобно и просто распараллелить исполнение задач на Java», я частенько натыкаюсь на вариант основанный на использование Stream API.
Есть много статей о том когда использовать Parallel Stream и где он даст выигрыш в производительности. Хабр и Baeldung плохого не посоветуют. И вроде все хорошо — эвристики в Parallel Stream неплохие. Так и возникает соблазн задешево распараллелить любые задачи:
List<Runnable> tasks = generateExpansiveTasks(); tasks.stream().parallel().forEach(Runnable::run);
Вот и всё! Никаких тебе ExecutorService#invokeAll и join’а к Future или Future#get с его InterruptedException. Декларативно, коротко, читаемо. Казалось бы, хороший план, надежный как швейцарские часы. Но закон Мёрфи никто не отменял. Давайте посмотрим, как всё может пойти не так.
AtomicInteger callsCounter = new AtomicInteger(); List<Runnable> tasks = IntStream.range(0, 100) .<Runnable>mapToObj(taskNumber -> () -> { if (callsCounter.incrementAndGet() >= 10) { throw new RuntimeException(); } try { Thread.sleep(1); // simulate intensive work } catch (InterruptedException e) { System.err.println("!INTERRUPTED!"); throw new IllegalStateException(e); } }).toList(); try { tasks.stream().parallel().forEach(Runnable::run); } catch (Exception ignore) { } System.out.println(callsCounter);
В sequential случае всё предельно просто — callsCounter замрёт на отметке 10. В случае parallel — как повезёт. Может 77, может 42. Но внимательные читатели к этому определенно были готовы — очевидно если больше 10 задач запущено параллельно, то callsCounter будет больше 10. Но одной проверки мало! Давайте посмотрим на callsCounter внимательнее, например 2 раза:
int check1 = 0, check2 = 0; while (check1 == check2) { AtomicInteger callsCounter = new AtomicInteger(); List<Runnable> tasks = IntStream.range(0, 100) .<Runnable>mapToObj(taskNumber -> () -> { if (callsCounter.incrementAndGet() >= 10) { throw new RuntimeException(); } try { Thread.sleep(1); // simulate intensive work } catch (InterruptedException e) { System.err.println("!INTERRUPTED!"); throw new IllegalStateException(e); } }).toList(); try { tasks.stream().parallel().forEach(Runnable::run); } catch (Exception ignore) { } check1 = callsCounter.get(); ForkJoinPool.commonPool().awaitQuiescence(1, TimeUnit.SECONDS); check2 = callsCounter.get(); System.out.println("First: " + check1); System.out.println("Second: " + check2); }
Здесь пора наконец сказать, что Parallel Stream выполняет задачи на common ForkJoinPool. Если быть точнее, то вызывается ForkJoinTask#fork который выберет pool в зависимости от потока, вызывающего терминальную операцию. В простом случае, при помощи awaitQuiescence мы может дождаться, пока все активные задачи в common ForkJoinPool будут завершены. В какой‑то момент мы получим разные значения для счетчика вызовов. Намного чаще расхождение можно получать, если поток вызывающий терминальную операцию не отправлять в сон.
Скрытый текст
int check1 = 0, check2 = 0; while (check1 == check2) { AtomicInteger callsCounter = new AtomicInteger(); Thread main = Thread.currentThread(); AtomicBoolean exceptionCaught = new AtomicBoolean(); List<Runnable> tasks = IntStream.range(0, 100) .<Runnable>mapToObj(taskNumber -> () -> { if (exceptionCaught.get()) { System.out.println("EXECUTED AFTER EXCEPTION CAUGHT IN CALLER THREAD"); } if (callsCounter.incrementAndGet() >= 10) { throw new RuntimeException(); } int pause = Thread.currentThread() == main? 0 : 10; try { Thread.sleep(pause); // simulate intensive work } catch (InterruptedException e) { System.err.println("!INTERRUPTED!"); throw new IllegalStateException(e); } }).toList(); try { tasks.stream().parallel().forEach(Runnable::run); } catch (Exception ignore) { exceptionCaught.set(true); } check1 = callsCounter.get(); ForkJoinPool.commonPool().awaitQuiescence(1, TimeUnit.SECONDS); check2 = callsCounter.get(); System.out.println("First: " + check1); System.out.println("Second: " + check2); }
В сухом остатке, если одна из операций в цепочке бросила Exception, то после возвращения ошибки вызывающему потоку, исполнение уже запущенных задач может всё еще продолжаться.
Допустим, оставшиеся в исполнении задачи не сделают ничего предосудительного. Наверное можно выдохнуть. Все стало понятно? Не совсем. Угадайте какой вариант быстрее для следующего примера:
public class Main { public static void main(String[] args) throws Exception { assert ForkJoinPool.getCommonPoolParallelism() == 8; System.out.println(ForkJoinPool.commonPool()); List<Integer> sleepMillis = IntStream.range(0, 98).boxed() .collect(toCollection(ArrayList::new)); sleepMillis.add(1000); sleepMillis.add(2000); Instant start = Instant.now(); sleepMillis.stream().parallel().forEach(Main::sleepMillis); System.out.println("Parallel stream took:" + Duration.between(start, Instant.now())); ExecutorService es = Executors.newFixedThreadPool(8); start = Instant.now(); sleepMillis.stream() .<Runnable>map(num -> () -> sleepMillis(num)) .forEach(es::execute); es.shutdown(); es.awaitTermination(1, TimeUnit.DAYS); System.out.println("Executor service took:" + Duration.between(start, Instant.now())); } private static void sleepMillis(int ms) { try { Thread.sleep(ms); } catch (InterruptedException e) { throw new IllegalStateException(e); } } }
Интуитивно может казаться, что время выполнения должно быть примерно одинаково. Или даже небольшой выигрыш в случае Parallel Stream, ведь там исполнению помогает main поток. Однако на деле может получится наоборот.
Parallel stream took:PT3.4147801S
Executor service took:PT2.5742424S
Дело в том, что Parrallel Stream берет за минимальную единицу работы берет не отдельный вызов consumer#accept, а последовательность вызовов над частью элементов коллекции элементов коллекции. Детальнее можно посмотреть например в классе java.util.stream.ForEachOps Соответственно, в данном случае 2 самые долгие задачи будут выполнены последовательно в одном thread’е. В какой‑то мере этот аспект можно нивелировать за счет использования random shuffle.
Увы, но и это не всё. Долгие блокирующие методы могут заблокировать common ForkJoinPool. Для того чтобы понять к каким последствиям приведет его блокировка, достаточно сказать, что common Pool используется в async методах CompletableFuture, если executor не указан явно. Для примера рассмотрим следующий метод:
private static Duration measureCompletableFutureAsyncDelay() { CompletableFuture<Long> cf = new CompletableFuture<>(); CompletableFuture<Duration> asyncStage = cf .thenApplyAsync(start -> Duration.ofNanos(System.nanoTime() - start)); long completionStartTime = System.nanoTime(); cf.complete(completionStartTime); return asyncStage.join(); }
Если common pool не занят, то начало выполнения thenApplyAsync callback’а будет отложено на период от нескольких микросекунд до миллисекунд. Но если common pool занят, до выполнения *Async callback’ов может пройти достаточно долгое время.
Код для экспериментов
public class Main { public static void main(String[] args) throws Exception { int pp = Integer.parseInt(System.getProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "-1")); int expectedPp = 2; if (pp != expectedPp) { System.err.println("Please restart with -Djava.util.concurrent.ForkJoinPool.common.parallelism=" + expectedPp); throw new IllegalStateException("BAD PARALLELISM"); } System.out.println("First completable callback future delay: " + measureCompletableFutureAsyncDelay()); System.out.println("Second completable callback future delay: " + measureCompletableFutureAsyncDelay()); CountDownLatch fjpExhausted = new CountDownLatch(pp); Future<Duration> streamDuration = startLongTasksInParallelStream(fjpExhausted); fjpExhausted.await(); System.out.println("Third completable callback future delay: " + measureCompletableFutureAsyncDelay()); System.out.println("Stream duration: " + streamDuration.get()); } private static Duration measureCompletableFutureAsyncDelay() { CompletableFuture<Long> cf = new CompletableFuture<>(); CompletableFuture<Duration> asyncStage = cf .thenApplyAsync(start -> Duration.ofNanos(System.nanoTime() - start)); long completionStartTime = System.nanoTime(); cf.complete(completionStartTime); return asyncStage.join(); } private static Future<Duration> startLongTasksInParallelStream(CountDownLatch fjpExhausted) { ThreadFactory tf = Executors.defaultThreadFactory(); ThreadFactory dtf = runnable -> { Thread thread = tf.newThread(runnable); thread.setDaemon(true); return thread; }; ExecutorService service = Executors.newSingleThreadExecutor(dtf); Future<Duration> res = service.submit(() -> { long startTime = System.nanoTime(); IntStream.range(0, 100).parallel().forEach(num -> { if (Thread.currentThread() instanceof ForkJoinWorkerThread) { fjpExhausted.countDown(); } expensiveProcess(num); }); return Duration.ofNanos(System.nanoTime() - startTime); }); service.shutdown(); return res; } private static void expensiveProcess(Object unusedArg) { try { Thread.sleep(1000); //simulate hard IO/blocking work } catch (Exception e) { throw new IllegalStateException(e); } } }
В этом случае в качестве частичного решения можно подставлять костыли в виде ForkJoinPool#managedBlock для операций в Parallel Stream, однако это уже совсем другая история…
Подводя итог, обязательно используйте Parallel Stream, если:
-
вы ищите серебряную пулю;
-
вы не хотите координировать завершение запущенных задач при ошибках, ведь так интересно посмотреть на проде какие side effect’ы успеет совершить исполняемый код;
-
вам очень хочется заблокировать common ForkJoinPool и посмотреть к чему это приведет.
И конечно:
ссылка на оригинал статьи https://habr.com/ru/articles/929120/
Добавить комментарий