В CompletableFuture есть два метода, дизайн которых меня удивляет:
- CompletableFuture#allOf
- CompletableFuture#anyOf
В этой статье мы посмотрим, что с ними не так и как их можно сделать более удобными.
CompletableFuture#allOf
Давайте посмотрим на сигнатуру метода:
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) { // ... }
Здесь есть как минимум два спорных момента:
- Метод принимает несколько объектов
CompletableFuture, возвращающих объекты разных типов. - Метод возвращает
CompletableFuture, который возвращаетVoid
Также некоторым может не нравится переменное число параметров, поэтому давайте посмотрим и на эту часть.
часто используется в качестве сигнала завершения операции, однако, внеся небольшое изменение в API, этот метод можно использовать как в качестве сигнального устройства, так и в качестве носителя результатов всех завершенных операций. Давайте попробуем это сделать.CompletableFuture<Void>
Асинхронный CompletableFuture#allOf
Во-первых, давайте придумаем нужную сигнатуру.
Справедливо предположить, что в большинстве случаев потребуется обработка списка однородных CompletableFuture и возврат CompletableFuture, содержащего список результатов:
public static <T> CompletableFuture<List<T>> allOf( Collection<CompletableFuture<T>> futures) { // ... }
Внутренности оригинального метода, скорее всего, более сложные, чем вы ожидаете:
static CompletableFuture<Void> andTree( CompletableFuture<?>[] cfs, int lo, int hi) { CompletableFuture<Void> d = new CompletableFuture<Void>(); if (lo > hi) // empty d.result = NIL; else { CompletableFuture<?> a, b; int mid = (lo + hi) >>> 1; if ((a = (lo == mid ? cfs[lo] : andTree(cfs, lo, mid))) == null || (b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] : andTree(cfs, mid+1, hi))) == null) throw new NullPointerException(); if (!d.biRelay(a, b)) { BiRelay<?,?> c = new BiRelay<>(d, a, b); a.bipush(b, c); c.tryFire(SYNC); } } return d; }
Поэтому, вместо того, чтобы создавать его с нуля, попробуем использовать повторно то, что уже есть в оригинальном методе так, как если бы он был предназначен для использования в качестве сигнализатора завершения… а затем просто поменяем void-результат на список future:
CompletableFuture<List<CompletableFuture<T>>> i = futures.stream() .collect(collectingAndThen( toList(), l -> CompletableFuture.allOf(l.toArray(new CompletableFuture[0])) .thenApply(__ -> l)));
Пока неплохо. Нам удалось получить
вместо CompletableFuture<List<CompletableFuture<T>>>, что уже хорошо. Но нам не нужен список future с результатами, нам нужен список результатов.CompletableFuture<Void>
Теперь мы можем просто обработать список и удалить из него нежелательные future. Совершенно нормально вызвать методы CompletableFuture#join, потому что мы знаем, что они никогда не будут блокироваться (на этот момент все future уже завершены):
CompletableFuture<List<T>> result = intermediate .thenApply(list -> list.stream() .map(CompletableFuture::join) .collect(toList()));
А теперь давайте объединим все это в окончательное решение:
public static <T> CompletableFuture<List<T>> allOf( Collection<CompletableFuture<T>> futures) { return futures.stream() .collect(collectingAndThen( toList(), l -> CompletableFuture.allOf(l.toArray(new CompletableFuture[0])) .thenApply(__ -> l.stream() .map(CompletableFuture::join) .collect(Collectors.toList())))); }
Асинхронный и падающий CompletableFuture#allOf
При наличии исключений оригинальный CompletableFuture#allOf ожидает завершения всех оставшихся операций.
И если мы хотим сообщить о завершении операции при возникновении в ней исключения, то нам придется изменить реализацию.
Для этого нужно создать новый экземпляр CompletableFuture и завершить его вручную, после того как одна из операций вызовет исключение:
CompletableFuture<List<T>> result = new CompletableFuture<>(); futures.forEach(f -> f .handle((__, ex) -> ex == null || result.completeExceptionally(ex)));
… но тогда нам нужно разобраться со сценарием, когда все future завершатся успешно. Это можно легко сделать, используя улучшенный метод allOf(), а затем просто завершить future вручную:
allOf(futures).thenAccept(result::complete);
Теперь мы можем объединить все вместе, чтобы сформировать окончательное решение:
public static <T> CompletableFuture<List<T>> allOfShortcircuiting(Collection<CompletableFuture<T>> futures) { CompletableFuture<List<T>> result = new CompletableFuture<>(); for (CompletableFuture<?> f : futures) { f.handle((__, ex) -> ex == null || result.completeExceptionally(ex)); } allOf(futures).thenAccept(result::complete); return result; }
CompletableFuture#anyOf
Начнем также с сигнатуры метода:
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) { // ... }
Мы сразу можем обнаружить такие же проблемы, как с методом, рассмотренным выше:
- Метод принимает несколько объектов
CompletableFuture, содержащих объекты разных типов. - Метод возвращает
CompletableFuture, содержащий объект типаObject.
Насколько я понимаю, метод CompletableFuture#allOf был спроектирован так, чтобы он использовался в качестве сигнального устройства. Но CompletableFuture#anyOf не соответствует этой философии, возвращая , что еще более запутывает.CompletableFuture<Object>
Посмотрите на следующий пример, где я пытаюсь обработать CompletableFuture, содержащие данные разных типов:
CompletableFuture<Integer> f1 = CompletableFuture.completedFuture(1); CompletableFuture<String> f2 = CompletableFuture.completedFuture("2"); Integer result = CompletableFuture.anyOf(f1, f2) .thenApply(r -> { if (r instanceof Integer) { return (Integer) r; } else if (r instanceof String) { return Integer.valueOf((String) r); } throw new IllegalStateException("unexpected object type!"); }).join();
Довольно неудобно, не так ли?
К счастью, это довольно легко приспособить для более распространенного сценария (ожидание одного из множества future, содержащих значения одного и того же типа), изменив сигнатуру и введя прямое приведение типов.
Таким образом, с нашими улучшениями, мы можем повторно использовать существующие методы и безопасно привести результат:
public static <T> CompletableFuture<T> anyOf(List<CompletableFuture<T>> cfs) { return CompletableFuture.anyOf(cfs.toArray(new CompletableFuture[0])) .thenApply(o -> (T) o); } public static <T> CompletableFuture<T> anyOf(CompletableFuture<T>... cfs) { return CompletableFuture.anyOf(cfs).thenApply(o -> (T) o); }
Исходный код
Исходный код можно найти на Github.
На этом все. До встречи на курсе.
ссылка на оригинал статьи https://habr.com/ru/company/otus/blog/481804/
Добавить комментарий