Вредные советы Java: просто используй Parallel Stream

от автора

Привет, Хабр! В ежегодных поисках ответа на вопрос «как же удобно и просто распараллелить исполнение задач на Java», я частенько натыкаюсь на вариант основанный на использование Stream API.

Есть много статей о том когда использовать Parallel Stream и где он даст выигрыш в производительности. Хабр и Baeldung плохого не посоветуют. И вроде все хорошо — эвристики в Parallel Stream неплохие. Так и возникает соблазн задешево распараллелить любые задачи:

List<Runnable> tasks = generateExpansiveTasks(); tasks.stream().parallel().forEach(Runnable::run);

Вот и всё! Никаких тебе ExecutorService#invokeAll и join’а к Future или Future#get с его InterruptedException. Декларативно, коротко, читаемо. Казалось бы, хороший план, надежный как швейцарские часы. Но закон Мёрфи никто не отменял. Давайте посмотрим, как всё может пойти не так.

AtomicInteger callsCounter = new AtomicInteger(); List<Runnable> tasks = IntStream.range(0, 100)   .<Runnable>mapToObj(taskNumber -> () -> {     if (callsCounter.incrementAndGet() >= 10) {         throw new RuntimeException();     }     try {         Thread.sleep(1); // simulate intensive work     } catch (InterruptedException e) {         System.err.println("!INTERRUPTED!");         throw new IllegalStateException(e);     } }).toList(); try {     tasks.stream().parallel().forEach(Runnable::run); } catch (Exception ignore) { } System.out.println(callsCounter);

В sequential случае всё предельно просто — callsCounter замрёт на отметке 10. В случае parallel — как повезёт. Может 77, может 42. Но внимательные читатели к этому определенно были готовы — очевидно если больше 10 задач запущено параллельно, то callsCounter будет больше 10. Но одной проверки мало! Давайте посмотрим на callsCounter внимательнее, например 2 раза:

int check1 = 0, check2 = 0; while (check1 == check2) {     AtomicInteger callsCounter = new AtomicInteger();     List<Runnable> tasks = IntStream.range(0, 100)             .<Runnable>mapToObj(taskNumber -> () -> {                 if (callsCounter.incrementAndGet() >= 10) {                     throw new RuntimeException();                 }                 try {                     Thread.sleep(1); // simulate intensive work                 } catch (InterruptedException e) {                     System.err.println("!INTERRUPTED!");                     throw new IllegalStateException(e);                 }             }).toList();     try {         tasks.stream().parallel().forEach(Runnable::run);     } catch (Exception ignore) {     }     check1 = callsCounter.get();     ForkJoinPool.commonPool().awaitQuiescence(1, TimeUnit.SECONDS);     check2 = callsCounter.get();     System.out.println("First: " + check1);     System.out.println("Second: " + check2); }

Здесь пора наконец сказать, что Parallel Stream выполняет задачи на common ForkJoinPool. Если быть точнее, то вызывается ForkJoinTask#fork который выберет pool в зависимости от потока, вызывающего терминальную операцию. В простом случае, при помощи awaitQuiescence мы может дождаться, пока все активные задачи в common ForkJoinPool будут завершены. В какой‑то момент мы получим разные значения для счетчика вызовов. Намного чаще расхождение можно получать, если поток вызывающий терминальную операцию не отправлять в сон.

Скрытый текст
int check1 = 0, check2 = 0; while (check1 == check2) {     AtomicInteger callsCounter = new AtomicInteger();     Thread main = Thread.currentThread();     AtomicBoolean exceptionCaught = new AtomicBoolean();     List<Runnable> tasks = IntStream.range(0, 100)             .<Runnable>mapToObj(taskNumber -> () -> {                 if (exceptionCaught.get()) {                     System.out.println("EXECUTED AFTER EXCEPTION CAUGHT IN CALLER THREAD");                 }                 if (callsCounter.incrementAndGet() >= 10) {                     throw new RuntimeException();                 }                 int pause = Thread.currentThread() == main? 0 : 10;                 try {                     Thread.sleep(pause); // simulate intensive work                 } catch (InterruptedException e) {                     System.err.println("!INTERRUPTED!");                     throw new IllegalStateException(e);                 }             }).toList();     try {         tasks.stream().parallel().forEach(Runnable::run);     } catch (Exception ignore) {         exceptionCaught.set(true);     }     check1 = callsCounter.get();     ForkJoinPool.commonPool().awaitQuiescence(1, TimeUnit.SECONDS);     check2 = callsCounter.get();     System.out.println("First: " + check1);     System.out.println("Second: " + check2); }

В сухом остатке, если одна из операций в цепочке бросила Exception, то после возвращения ошибки вызывающему потоку, исполнение уже запущенных задач может всё еще продолжаться.

Допустим, оставшиеся в исполнении задачи не сделают ничего предосудительного. Наверное можно выдохнуть. Все стало понятно? Не совсем. Угадайте какой вариант быстрее для следующего примера:

public class Main {     public static void main(String[] args) throws Exception {         assert ForkJoinPool.getCommonPoolParallelism() == 8;         System.out.println(ForkJoinPool.commonPool());         List<Integer> sleepMillis = IntStream.range(0, 98).boxed()           .collect(toCollection(ArrayList::new));         sleepMillis.add(1000);         sleepMillis.add(2000);         Instant start = Instant.now();         sleepMillis.stream().parallel().forEach(Main::sleepMillis);         System.out.println("Parallel stream took:" + Duration.between(start, Instant.now()));         ExecutorService es = Executors.newFixedThreadPool(8);         start = Instant.now();         sleepMillis.stream()                 .<Runnable>map(num -> () -> sleepMillis(num))                 .forEach(es::execute);         es.shutdown();         es.awaitTermination(1, TimeUnit.DAYS);         System.out.println("Executor service took:" + Duration.between(start, Instant.now()));     }      private static void sleepMillis(int ms) {         try {             Thread.sleep(ms);         } catch (InterruptedException e) {             throw new IllegalStateException(e);         }     } }

Интуитивно может казаться, что время выполнения должно быть примерно одинаково. Или даже небольшой выигрыш в случае Parallel Stream, ведь там исполнению помогает main поток. Однако на деле может получится наоборот.

Parallel stream took:PT3.4147801S
Executor service took:PT2.5742424S

Дело в том, что Parrallel Stream берет за минимальную единицу работы берет не отдельный вызов consumer#accept, а последовательность вызовов над частью элементов коллекции элементов коллекции. Детальнее можно посмотреть например в классе java.util.stream.ForEachOps Соответственно, в данном случае 2 самые долгие задачи будут выполнены последовательно в одном thread’е. В какой‑то мере этот аспект можно нивелировать за счет использования random shuffle.

Увы, но и это не всё. Долгие блокирующие методы могут заблокировать common ForkJoinPool. Для того чтобы понять к каким последствиям приведет его блокировка, достаточно сказать, что common Pool используется в async методах CompletableFuture, если executor не указан явно. Для примера рассмотрим следующий метод:

private static Duration measureCompletableFutureAsyncDelay() {     CompletableFuture<Long> cf = new CompletableFuture<>();     CompletableFuture<Duration> asyncStage = cf             .thenApplyAsync(start -> Duration.ofNanos(System.nanoTime() - start));     long completionStartTime = System.nanoTime();     cf.complete(completionStartTime);     return asyncStage.join(); }

Если common pool не занят, то начало выполнения thenApplyAsync callback’а будет отложено на период от нескольких микросекунд до миллисекунд. Но если common pool занят, до выполнения *Async callback’ов может пройти достаточно долгое время.

Код для экспериментов
public class Main {   public static void main(String[] args) throws Exception {     int pp = Integer.parseInt(System.getProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "-1"));     int expectedPp = 2;     if (pp != expectedPp) {       System.err.println("Please restart with -Djava.util.concurrent.ForkJoinPool.common.parallelism=" + expectedPp);       throw new IllegalStateException("BAD PARALLELISM");     }     System.out.println("First completable callback future delay: " + measureCompletableFutureAsyncDelay());     System.out.println("Second completable callback future delay: " + measureCompletableFutureAsyncDelay());     CountDownLatch fjpExhausted = new CountDownLatch(pp);     Future<Duration> streamDuration = startLongTasksInParallelStream(fjpExhausted);     fjpExhausted.await();     System.out.println("Third completable callback future delay: " + measureCompletableFutureAsyncDelay());     System.out.println("Stream duration: " + streamDuration.get());   }    private static Duration measureCompletableFutureAsyncDelay() {     CompletableFuture<Long> cf = new CompletableFuture<>();     CompletableFuture<Duration> asyncStage = cf         .thenApplyAsync(start -> Duration.ofNanos(System.nanoTime() - start));     long completionStartTime = System.nanoTime();     cf.complete(completionStartTime);     return asyncStage.join();   }    private static Future<Duration> startLongTasksInParallelStream(CountDownLatch fjpExhausted) {     ThreadFactory tf = Executors.defaultThreadFactory();     ThreadFactory dtf = runnable -> {       Thread thread = tf.newThread(runnable);       thread.setDaemon(true);       return thread;     };     ExecutorService service = Executors.newSingleThreadExecutor(dtf);     Future<Duration> res = service.submit(() -> {       long startTime = System.nanoTime();       IntStream.range(0, 100).parallel().forEach(num -> {         if (Thread.currentThread() instanceof ForkJoinWorkerThread) {           fjpExhausted.countDown();         }         expensiveProcess(num);       });       return Duration.ofNanos(System.nanoTime() - startTime);     });     service.shutdown();     return res;   }    private static void expensiveProcess(Object unusedArg) {     try {       Thread.sleep(1000); //simulate hard IO/blocking work     } catch (Exception e) {       throw new IllegalStateException(e);     }   } }

В этом случае в качестве частичного решения можно подставлять костыли в виде ForkJoinPool#managedBlock для операций в Parallel Stream, однако это уже совсем другая история…

Подводя итог, обязательно используйте Parallel Stream, если:

  • вы ищите серебряную пулю;

  • вы не хотите координировать завершение запущенных задач при ошибках, ведь так интересно посмотреть на проде какие side effect’ы успеет совершить исполняемый код;

  • вам очень хочется заблокировать common ForkJoinPool и посмотреть к чему это приведет.

И конечно:

Народная мудрость. Image by Kandinsky AI 4.1.

Народная мудрость. Image by Kandinsky AI 4.1.


ссылка на оригинал статьи https://habr.com/ru/articles/929120/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *