Structured Concurrency в Java: наконец-то находит опору

от автора

API structured concurrency в Java наконец-то обрёл устойчивость. В новом переводе от команды Spring АйО подробно рассматриваются последние изменения, появившиеся с выходом JEP 505: фабричный метод open(), политики Joiner’ов, улучшенная отмена задач, дедлайны, передача контекста через ScopedValues и строгая защита от ошибок использования. Всё это делает параллельное программирование в Java более безопасным, читаемым и управляемым.


API structured concurrency вновь изменился — после двух инкубационных этапов и четырёх раундов превью.

В идеале такой сценарий кажется неожиданным.

Однако, учитывая статус API как предварительного, подобные изменения возможны — как и произошло в данном случае.

Эти изменения значительно повлияли на зрелость API, и есть надежда, что теперь он стабилизируется и больше не потребует доработок.

Что на самом деле изменилось на этот раз

Когда я впервые начал работать со structured concurrency ещё в инкубационной фазе, меня вдохновила идея более чистого и понятного concurrent кода.

Идея была проста: рассматривать параллельные задачи как структурированный блок, в котором все порождённые (spawned) задачи завершаются до выхода из блока. В теории звучало идеально, но API продолжал меняться, и за этими изменениями было сложно уследить.

Последняя итерация в JEP 505 привнесла важные усовершенствования, которые, на мой взгляд, наконец-то сделали этот функционал устойчивым. Самое заметное изменение — это введение более гибкой обработки задач и лучшая интеграция с виртуальными потоками. В этой статье я расскажу о различиях и поясню, почему они важны.

Основная концепция остаётся неизменной

Прежде чем перейти к изменениям, давайте вспомним, какую проблему пытается решить structured concurrency. В традиционном конкурентном программировании управление тасками часто оказывается разрозненным:

import java.util.Random; import java.util.concurrent.*;  public class TraditionalConcurrencyExample {   private static final Random random = new Random();    private static String fetchUserData(String userId) throws InterruptedException {     Thread.sleep(1000 + random.nextInt(2000)); // 1-3 seconds     if (random.nextBoolean()) {       throw new RuntimeException("User service unavailable");     }     return "UserData[" + userId + "]";   }    private static String fetchUserPreferences(String userId) throws InterruptedException {     Thread.sleep(800 + random.nextInt(1500)); // 0.8-2.3 seconds     if (random.nextBoolean()) {       throw new RuntimeException("Preferences service down");     }     return "Preferences[" + userId + "]";   }    private static String combineUserInfo(String userData, String preferences) {     return userData + " + " + preferences;   }    public static String getUserInfoTraditional(String userId) throws Exception {     try (ExecutorService executor = Executors.newCachedThreadPool()) {       Future<String> future1 = executor.submit(() -> fetchUserData(userId));       Future<String> future2 = executor.submit(() -> fetchUserPreferences(userId));        try {         String userData = future1.get();         String preferences = future2.get();         return combineUserInfo(userData, preferences);       } catch (Exception e) {         // Cleanup is messy - what about the other task?         System.out.println("Error occurred, attempting cleanup...");         future1.cancel(true);         future2.cancel(true);         throw e;       }     }   }    void main() {     for (int i = 0; i < 5; i++) {       try {         System.out.println("Attempt " + (i + 1) + ": " +             getUserInfoTraditional("user123"));       } catch (Exception e) {         System.out.println("Attempt " + (i + 1) + " failed: " +             e.getMessage());       }       System.out.println();     }   } }

Когда вы запускаете этот код, обычно проявляется несколько проблем:

  • Сложная обработка ошибок: если одна из задач завершается с ошибкой, приходится вручную отменять вторую. В противном случае она продолжит выполняться, несмотря на то что больше не нужна, что приводит к утечке ресурсов.

  • Управление жизненным циклом потоков: вы сами несёте ответственность за полный жизненный цикл потоков.

  • Передача исключений: проверяемые (checked) исключения часто оборачиваются не самым правильным образом.

  • Отсутствие гарантии очистки: если основной поток завершается неожиданно, задачи могут продолжить выполняться.

Structured concurrency призвана решить эти проблемы.

Главное изменение: статические фабричные методы

Наиболее заметное новшество в JEP 505 — больше не нужно вызывать new StructuredTaskScope<>(). Вместо этого используется метод open():

try (var scope = StructuredTaskScope.open()) {   // ... }

Метод open() без аргументов возвращает область (scope), которая ожидает успешного завершения всех подзадач либо сбоя хотя бы одной — это политика по умолчанию «все-или-ошибка» (“all-or-fail”). Если требуется более гибкое поведение, используйте перегруженный вариант open(joiner) и передайте пользовательскую политику завершения через Joiner (об этом чуть позже). Почему используется фабрика? Она предоставляет вменяемые значения по умолчанию и, что особенно важно, допускает развитие имплементации без нарушения совместимости с вашим кодом. Я считаю это изменение удачным: использование одного ключевого метода делает код более лаконичным и снижает вероятность ошибок.

Теперь перепишем предыдущий пример с использованием нового API:

public static String getUserInfoTraditional(String userId) throws Exception {   try (var scope = StructuredTaskScope.open()) {     StructuredTaskScope.Subtask<String> task1 = scope.fork(() -> fetchUserData(userId));     StructuredTaskScope.Subtask<String> task2 = scope.fork(() -> fetchUserPreferences(userId));      scope.join();      String userData = task1.get();     String preferences = task2.get();      return combineUserInfo(userData, preferences);   } }

Разница колоссальна. С использованием structured concurrency очистка выполняется автоматически и гарантированно. Если какая-либо из задач завершается с ошибкой, все остальные задачи в области отменяются. Если область завершается (нормально или с исключением), все ресурсы освобождаются. Это сопоставимо с механизмом try-with-resources — но для параллельных задач.

Такой подход даёт несколько очевидных преимуществ, которые я особенно ценю:

  • Гарантированная очистка: задачи не могут пережить свою область.

  • Явное владение: задачи принадлежат конкретной области.

  • Безопасность при исключениях: сбои обрабатываются последовательно.

  • Управление ресурсами: управление пулами потоков не требуется.

  • Композиционность: scope-ы/области можно складывать и комбинировать.

Joiner’ы: задаём политику успеха

Joiner перехватывает события завершения задач и определяет:

  1. следует ли отменять одноуровневые (соседние) задачи,

  2. что должен возвращать метод join().

JDK включает несколько фабричных помощников для таких случаев:

«Побеждает первая» (также известен как гонка реплик).

try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulResultOrThrow())) {     urls.forEach(url -> scope.fork(() -> fetchFrom(url)));     return scope.join();             // returns first successful String }

«Все должны завершиться успешно, и мне нужны их результаты»

try (var scope = StructuredTaskScope.open(Joiner.<Result>allSuccessfulOrThrow())) {     tasks.forEach(scope::fork);     return scope.join()              // Stream<Subtask<Result>>                  .map(Subtask::get)                  .toList(); }

Эти небольшие помощники упрощают распространённые шаблоны — «гонка» (“race”), «сбор» (“gather”), «ожидание всех» (“wait-for-all”).

Создание собственного Joiner’а

Иногда требуется своя политика. Предположим, я хочу собрать все успешные подзадачи, проигнорировав сбои:

import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.StructuredTaskScope; import java.util.stream.Stream;  void main() {    List<String> urls = List.of("https://bazlur.ca", "https://foojay.io", "https://github.com");    try (var scope = StructuredTaskScope.open(new MyCollectingJoiner<String>())) {     urls.forEach(url -> scope.fork(() -> fetchFrom(url)));     List<String> fetchedContent = scope.join().toList();      System.out.println("Total fetched content: " + fetchedContent.size());   } catch (InterruptedException e) {     throw new RuntimeException(e);   }  }  private String fetchFrom(String url) {   return "fetched from " + url + ""; }  class MyCollectingJoiner<T> implements StructuredTaskScope.Joiner<T, Stream<T>> {   private final Queue<T> results = new ConcurrentLinkedQueue<>();    @Override   public boolean onComplete(StructuredTaskScope.Subtask<? extends T> st) {     if (st.state() == StructuredTaskScope.Subtask.State.SUCCESS)       results.add(st.get());     return false;   }    @Override   public Stream<T> result() {     return results.stream();   } }

Интерфейс минимален — onFork, onComplete и result() — но при этом достаточно мощный для большинства пользовательских сценариев. Чтобы запустить этот код, потребуется JDK 25, и его можно выполнить из командной строки с помощью следующей команды:

java --enable-preview CollectingJoiner.java
Комментарий от эксперта команды Spring АйО, Михаила Поливахи

Java 25 GA на момент публикации статьи ещё официально не вышла. Выход 25-ой Java планируется на Сентябрь 2025. Пока есть только early access билды

Улучшенная отмена и дедлайны

Правила отмены по сути не изменились, но API стал строже. Если поток-владелец прерывается до или во время вызова join(), область автоматически отменяет все незавершённые подзадачи. Подзадачи должны корректно обрабатывать InterruptedException; в противном случае close() будет блокироваться в ожидании их завершения. (Если вы используете блокирующий ввод-вывод — всё в порядке; если используете polling — не забудьте проверять Thread.currentThread().isInterrupted().)

Нужен дедлайн? Передайте конфигурационную лямбду:

try (var scope = StructuredTaskScope.open(          Joiner.<String>anySuccessfulResultOrThrow(),          cfg -> cfg.withTimeout(Duration.ofSeconds(2)))) {     // ... }

Если срабатывает таймаут, scope отменяется, а join() выбрасывает TimeoutException. На практике я привязываю таймаут к каждому внешнему вызову, чтобы держать неуправляемые задачи под контролем.

Также можно заменить фабрику виртуальных потоков по умолчанию на ту, которая задаёт имена или устанавливает thread-local переменные:

ThreadFactory tagged = Thread.ofVirtual().name("api-%d").factory();  try (var scope = StructuredTaskScope.open(          Joiner.<Integer>allSuccessfulOrThrow(),          cfg -> cfg.withThreadFactory(tagged))) {     // ... }

Одни только имена потоков делают дампы значительно читаемее.

ScopedValues передаются автоматически

Все подзадачи наследуют привязки ScopedValues, установленные в родительском потоке. Это означает, что можно передавать контекст запроса, учетные данные безопасности или информацию MDC без необходимости вручную упаковывать их в каждую лямбду. Опробовав эту возможность, к ThreadLocal возвращаться уже не хочется.

Комментарий от команды Spring АйО

Про Scoped Values мы как-то уже рассказывали. Вот первая и вторая части.

Защита от неправильного использования

StructuredTaskScope строго соблюдает структуру. Если вызвать fork() из потока, отличного от владельца, будет выброшено исключение StructureViolationException. Забыли использовать try-with-resources и позволили scope выйти за пределы метода? Результат тот же. Подход строгий, но эффективно предотвращает случайное истощение ресурсов (по аналогии с «fork-бомбами»).

Комментарий от Михаила Поливахи

Речь про fork-bomb уязвимость, суть которой заключается в том, что вредоносный процесс fork`ается бесконтрольно, потребляя при этом все ресурсы сервера. https://en.wikipedia.org/wiki/Fork_bomb

Улучшения в Observability

Теперь thread дампы включают дерево скоупов, так что инструменты могут напрямую отображать отношения родитель–потомок. Когда я запускаю jcmd <pid> Thread.dump_to_file -format=json, каждый scope отображается со своими forked потоками, вложенными под владельцем. Найти зависшую задачу, блокирующую виртуальный пул, теперь занимает пару секунд с grep — вместо получасового разбора.

Ещё несколько примеров для практики

Пример 1 — 360°-обзор продукта (сбор–затем–ошибка)

Классический endpoint в e-commerce: один HTTP-запрос должен агрегировать основную информацию о продукте, данные о наличии в реальном времени и персонализированную цену. Каждый подсервис вызывается параллельно внутри StructuredTaskScope, где применяется политика «всё или ничего»: любая ошибка или превышение односекундного дедлайна отменяет всю группу и возвращает ошибку вызывающему.

Таймаут области, пользовательские имена потоков и joiner allSuccessfulOrThrow() позволяют выразить то, что обычно требует сложной конфигурации CompletableFuture, всего в трёх декларативных строках.

import java.time.Duration; import java.util.Random; import java.util.concurrent.StructuredTaskScope; import java.util.concurrent.ThreadFactory;  public class ThreeSixtyProductView {   record Product(long id, String name) {}   record Stock(long productId, int quantity) {}   record Price(long productId, double amount) {}   record ProductPayload(Product core, Stock stock, Price price) {}    private static Product coreApi(long id) throws InterruptedException {     Thread.sleep(100); // simulate latency     return new Product(id, "Gadget‑" + id);   }    private static Stock stockApi(long id) throws InterruptedException {     Thread.sleep(120);     return new Stock(id, new Random().nextInt(100));   }    private static Price priceApi(long id) throws InterruptedException {     Thread.sleep(150);     return new Price(id, 99.99);   }    static ProductPayload fetchProduct(long id) throws Exception {     ThreadFactory named = Thread.ofVirtual().name("prod-%d", 1).factory();      try (var scope = StructuredTaskScope.open(         StructuredTaskScope.Joiner.<Object>allSuccessfulOrThrow(),         cfg -> cfg.withTimeout(Duration.ofSeconds(1))             .withThreadFactory(named))) {        StructuredTaskScope.Subtask<Product> core = scope.fork(() -> coreApi(id));       StructuredTaskScope.Subtask<Stock> stock = scope.fork(() -> stockApi(id));       StructuredTaskScope.Subtask<Price> price = scope.fork(() -> priceApi(id));        scope.join(); // throws on first failure / timeout       return new ProductPayload(core.get(), stock.get(), price.get());     }   }    void main() throws Exception {     ProductPayload productPayload = fetchProduct(1L);     System.out.println(productPayload);   } }

Пример 2 — «Гонка зеркал» для загрузки файлов

Крупные бинарные файлы размещаются на нескольких зеркалах CDN. Поскольку задержки различаются, запросы отправляются на все зеркала одновременно, а Joiner.anySuccessfulResultOrThrow() используется для выбора первого успешно возвращённого InputStream, при этом все остальные задачи отменяются.

Пропускная способность и соединения освобождаются мгновенно, а пользователи получают максимально быструю загрузку — без необходимости вручную реализовывать логику отмены.

import java.io.*; import java.net.URI; import java.nio.file.*; import java.util.List; import java.util.Random; import java.util.concurrent.StructuredTaskScope;  public class MirrorDownloaderDemo {   void main() throws Exception {     List<URI> mirrors = List.of(         URI.create("https://mirror‑a.example.com"),         URI.create("https://mirror‑b.example.com"),         URI.create("https://mirror‑c.example.com"));      Path target = Files.createFile(Path.of("download1.txt"));     download(target, mirrors);     System.out.println("Saved to " + target.toAbsolutePath());   }    static Path download(Path target, List<URI> mirrors) throws Exception {     try (var scope = StructuredTaskScope.open(         StructuredTaskScope.Joiner.<InputStream>anySuccessfulResultOrThrow())) {        mirrors.forEach(uri -> scope.fork(() -> fetchFromMirror(uri)));       try (InputStream in = scope.join()) {         Files.copy(in, target, StandardCopyOption.REPLACE_EXISTING);       }       return target;     }   }    private static InputStream fetchFromMirror(URI uri) throws InterruptedException {     Thread.sleep(50 + new Random().nextInt(300));     String data = "Downloaded from " + uri + "\n";     return new ByteArrayInputStream(data.getBytes());   } }

Пример 3 — Пакетный генератор миниатюр с вложенными областями

Этап медиапайплайна получает каталог изображений. Внешний скоуп перебирает файлы, а внутренний — для каждого изображения — распараллеливает внутри ещё на три задачи ресайза (маленький, средний и большой размеры). Внутренний скоуп работает по принципу «быстрый отказ»: если один из ресайзов завершается с ошибкой, изображение пропускается, но внешний скуоп и его батчёвая обработка продолжается без прерывания.

Вложенные скоупы позволяют разделить консистентность на уровне отдельного элемента и общую производительность партии — при минимуме кода.

import java.io.IOException; import java.nio.file.*; import java.util.concurrent.StructuredTaskScope;  public class ThumbnailBatchDemo {   enum Size {SMALL, MEDIUM, LARGE}    void main() throws Exception {     Path tmpDir = Files.createTempDirectory("images");     for (int i = 0; i < 3; i++) Files.createTempFile(tmpDir, "img" + i, ".jpg");     processBatch(tmpDir);   }    static void processBatch(Path dir) throws IOException, InterruptedException {     try (var batch = StructuredTaskScope.open()) {       try (var files = Files.list(dir)) {         files.filter(Files::isRegularFile)             .forEach(img -> batch.fork(() -> handleOne(img)));       }       batch.join();     }   }    private static void handleOne(Path image) {     try (var scope = StructuredTaskScope.open(         StructuredTaskScope.Joiner.<Void>allSuccessfulOrThrow())) {       scope.fork(() -> resizeAndUpload(image, Size.SMALL));       scope.fork(() -> resizeAndUpload(image, Size.MEDIUM));       scope.fork(() -> resizeAndUpload(image, Size.LARGE));       scope.join();     } catch (Exception ex) {       System.err.println("Skipping " + image.getFileName() + ": " + ex);     }   }    private static Void resizeAndUpload(Path image, Size size) throws InterruptedException {     Thread.sleep(80); // simulate resize     Thread.sleep(40); // simulate upload     System.out.println("Uploaded " + image.getFileName() + " [" + size + "]");     return null;   }

Пример 4 — Служба котировок в реальном времени с резервным механизмом по таймауту

Трейдинговый UI требует котировку не позднее чем через 30 мс. Пользовательский joiner захватывает первую успешную цену из основного рыночного источника, с таймаутом области в 30 мс. Если источник зависает, scope.join() возвращает пустой результат, и сервис мгновенно переключается на вчерашнюю кэшированную цену закрытия.

Вызывающие стороны всегда получают значение вовремя, а вся логика обработки таймаута выражена одной декларативной строкой.

import java.time.Duration; import java.util.*; import java.util.concurrent.StructuredTaskScope; import java.util.concurrent.StructuredTaskScope.Subtask;  public class QuoteServiceDemo {   void main() throws Exception {     double q = quote("ACME");     System.out.printf("Quote for ACME: %.2f%n", q);   }    static double quote(String symbol) throws InterruptedException {     var firstSuccess = new StructuredTaskScope.Joiner<Double, Optional<Double>>() {       private volatile Double value;        public boolean onComplete(Subtask<? extends Double> st) {         if (st.state() == Subtask.State.SUCCESS) value = st.get();         return value != null;           // stop when we have one       }        public Optional<Double> result() {         return Optional.ofNullable(value);       }     };      try (var scope = StructuredTaskScope.open(firstSuccess,         cfg -> cfg.withTimeout(Duration.ofMillis(30)))) {       scope.fork(() -> marketFeed(symbol));       Optional<Double> latest = scope.join();       return latest.orElseGet(() -> cache(symbol));     }   }    private static double marketFeed(String symbol) throws InterruptedException {     long delay = new Random().nextBoolean() ? 20 : 60; // 50 % chance timeout     Thread.sleep(delay);     return 100 + new Random().nextDouble();   }    //for demo purposes only   private static double cache(String symbol) {     return 95.00;   } }

Заключительные мысли

Эти изменения представляют значительный этап в развитии API structured concurrency.

Сегодняшняя версия API structured concurrency значительно лучше той, с которой всё начиналось, и я уверен, что она станет прочной основой для многопоточного программирования на Java в будущем.


Присоединяйтесь к русскоязычному сообществу разработчиков на Spring Boot в телеграм — Spring АйО, чтобы быть в курсе последних новостей из мира разработки на Spring Boot и всего, что с ним связано


ссылка на оригинал статьи https://habr.com/ru/articles/930812/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *