Улучшаем allOf и anyOf в CompletableFuture

от автора

И снова здравствуйте. В преддверии старта курса «Разработчик Java» подготовили для вас перевод полезного материала.


В CompletableFuture есть два метода, дизайн которых меня удивляет:

  • CompletableFuture#allOf
  • CompletableFuture#anyOf

В этой статье мы посмотрим, что с ними не так и как их можно сделать более удобными.

CompletableFuture#allOf

Давайте посмотрим на сигнатуру метода:

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {     // ... }

Здесь есть как минимум два спорных момента:

  1. Метод принимает несколько объектов CompletableFuture, возвращающих объекты разных типов.
  2. Метод возвращает CompletableFuture, который возвращает Void

Также некоторым может не нравится переменное число параметров, поэтому давайте посмотрим и на эту часть.

CompletableFuture<Void> часто используется в качестве сигнала завершения операции, однако, внеся небольшое изменение в API, этот метод можно использовать как в качестве сигнального устройства, так и в качестве носителя результатов всех завершенных операций. Давайте попробуем это сделать.

Асинхронный CompletableFuture#allOf

Во-первых, давайте придумаем нужную сигнатуру.

Справедливо предположить, что в большинстве случаев потребуется обработка списка однородных CompletableFuture и возврат CompletableFuture, содержащего список результатов:

public static <T> CompletableFuture<List<T>> allOf(   Collection<CompletableFuture<T>> futures) {     // ... }

Внутренности оригинального метода, скорее всего, более сложные, чем вы ожидаете:

static CompletableFuture<Void> andTree(   CompletableFuture<?>[] cfs, int lo, int hi) {     CompletableFuture<Void> d = new CompletableFuture<Void>();     if (lo > hi) // empty         d.result = NIL;     else {         CompletableFuture<?> a, b;         int mid = (lo + hi) >>> 1;         if ((a = (lo == mid ? cfs[lo] :                   andTree(cfs, lo, mid))) == null ||             (b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] :                   andTree(cfs, mid+1, hi)))  == null)             throw new NullPointerException();         if (!d.biRelay(a, b)) {             BiRelay<?,?> c = new BiRelay<>(d, a, b);             a.bipush(b, c);             c.tryFire(SYNC);         }     }     return d; }

Поэтому, вместо того, чтобы создавать его с нуля, попробуем использовать повторно то, что уже есть в оригинальном методе так, как если бы он был предназначен для использования в качестве сигнализатора завершения… а затем просто поменяем void-результат на список future:

CompletableFuture<List<CompletableFuture<T>>> i = futures.stream()     .collect(collectingAndThen(       toList(),        l -> CompletableFuture.allOf(l.toArray(new CompletableFuture[0]))         .thenApply(__ -> l)));

Пока неплохо. Нам удалось получить
CompletableFuture<List<CompletableFuture<T>>> вместо CompletableFuture<Void>, что уже хорошо. Но нам не нужен список future с результатами, нам нужен список результатов.

Теперь мы можем просто обработать список и удалить из него нежелательные future. Совершенно нормально вызвать методы CompletableFuture#join, потому что мы знаем, что они никогда не будут блокироваться (на этот момент все future уже завершены):

CompletableFuture<List<T>> result = intermediate     .thenApply(list -> list.stream()         .map(CompletableFuture::join)         .collect(toList()));

А теперь давайте объединим все это в окончательное решение:

public static <T> CompletableFuture<List<T>> allOf(   Collection<CompletableFuture<T>> futures) {     return futures.stream()         .collect(collectingAndThen(           toList(),            l -> CompletableFuture.allOf(l.toArray(new CompletableFuture[0]))         .thenApply(__ -> l.stream()            .map(CompletableFuture::join)            .collect(Collectors.toList())))); }

Асинхронный и падающий CompletableFuture#allOf

При наличии исключений оригинальный CompletableFuture#allOf ожидает завершения всех оставшихся операций.

И если мы хотим сообщить о завершении операции при возникновении в ней исключения, то нам придется изменить реализацию.

Для этого нужно создать новый экземпляр CompletableFuture и завершить его вручную, после того как одна из операций вызовет исключение:

CompletableFuture<List<T>> result = new CompletableFuture<>(); futures.forEach(f -> f   .handle((__, ex) -> ex == null || result.completeExceptionally(ex)));

… но тогда нам нужно разобраться со сценарием, когда все future завершатся успешно. Это можно легко сделать, используя улучшенный метод allOf(), а затем просто завершить future вручную:

allOf(futures).thenAccept(result::complete);

Теперь мы можем объединить все вместе, чтобы сформировать окончательное решение:

public static <T> CompletableFuture<List<T>>    allOfShortcircuiting(Collection<CompletableFuture<T>> futures) {     CompletableFuture<List<T>> result = new CompletableFuture<>();      for (CompletableFuture<?> f : futures) {         f.handle((__, ex) -> ex == null || result.completeExceptionally(ex));     }      allOf(futures).thenAccept(result::complete);      return result; }

CompletableFuture#anyOf

Начнем также с сигнатуры метода:

public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {     // ... }

Мы сразу можем обнаружить такие же проблемы, как с методом, рассмотренным выше:

  1. Метод принимает несколько объектов CompletableFuture, содержащих объекты разных типов.
  2. Метод возвращает CompletableFuture, содержащий объект типа Object.

Насколько я понимаю, метод CompletableFuture#allOf был спроектирован так, чтобы он использовался в качестве сигнального устройства. Но CompletableFuture#anyOf не соответствует этой философии, возвращая CompletableFuture<Object>, что еще более запутывает.

Посмотрите на следующий пример, где я пытаюсь обработать CompletableFuture, содержащие данные разных типов:

CompletableFuture<Integer> f1 = CompletableFuture.completedFuture(1); CompletableFuture<String> f2 = CompletableFuture.completedFuture("2");  Integer result = CompletableFuture.anyOf(f1, f2)   .thenApply(r -> {       if (r instanceof Integer) {           return (Integer) r;       } else if (r instanceof String) {           return Integer.valueOf((String) r);       }       throw new IllegalStateException("unexpected object type!");   }).join();

Довольно неудобно, не так ли?

К счастью, это довольно легко приспособить для более распространенного сценария (ожидание одного из множества future, содержащих значения одного и того же типа), изменив сигнатуру и введя прямое приведение типов.

Таким образом, с нашими улучшениями, мы можем повторно использовать существующие методы и безопасно привести результат:

public static <T> CompletableFuture<T> anyOf(List<CompletableFuture<T>> cfs) {     return CompletableFuture.anyOf(cfs.toArray(new CompletableFuture[0]))       .thenApply(o -> (T) o); }   public static <T> CompletableFuture<T> anyOf(CompletableFuture<T>... cfs) {     return CompletableFuture.anyOf(cfs).thenApply(o -> (T) o); }

Исходный код

Исходный код можно найти на Github.

На этом все. До встречи на курсе.


ссылка на оригинал статьи https://habr.com/ru/company/otus/blog/481804/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *