Spring Batch научился работать с MongoDB

от автора

Spring Batch появился за много лет до появления MongoDB, и его архитектура изначально предполагала наличие SQL-базы данных для хранения состояния заданий Spring Batch.

Но это было десятилетия назад, а один из самых частых вопросов у тех, кто впервые сталкивался со Spring Batch, звучал так: «Почему этой штуке нужно обращаться к SQL-базе данных?» Ответ, конечно, заключался в том, что Spring Batch ведет подробнейший учет каждого задания, шага и выполнения в JobRepository, и многие годы этот репозиторий говорил на одном диалекте — SQL. Если вы счастливо жили в мире MongoDB, вам все равно приходилось тащить за собой экземпляр Postgres или MySQL лишь для того, чтобы Batch мог записать, что он сделал в прошлый вторник.

В последних версиях Spring Batch JobRepository был отвязан от JDBC, а Spring Boot 4.1 наконец доводит этот опыт до полноценного состояния с помощью корректной автоконфигурации spring-boot-starter-batch-data-mongodb. Вы получаете такой же zero-config-опыт Boot для метаданных batch-задач, каким пользователи JDBC наслаждались с самого начала.

Занятный факт: доктор Дэйв Сайер, сооснователь Spring Boot, был основателем и многолетним руководителем Spring Batch. Естественно, первой автоконфигурацией, которую он написал для Spring Boot, была автоконфигурация для Spring Batch! Так что, когда я говорю, что пользователи Spring Boot пользовались поддержкой на базе JDBC в Spring Boot с самого начала, я именно это имею в виду 🙂

В этом материале мы разберем небольшой, но полноценный пример. Он:

  • Хранит Spring Batch JobRepository в MongoDB через новый стартер 4.1.

  • Читает customers.csv из classpath.

  • Записывает строки в таблицу PostgreSQL customers.

  • Запускает все это на сервисах, поднятых через compose.yaml в корне проекта.

Поднимаем инфраструктуру

Прежде чем работать с одной конкретной строкой в Java, запустим два вспомогательных сервиса:

docker compose up

compose.yaml поднимает экземпляр MongoDB, настроенный как одноузловой replica set — поддержке MongoDB в Batch нужны транзакции, а для транзакций нужен replica set, — экземпляр PostgreSQL для целевой таблицы и контейнер Grafana LGTM для наблюдаемости, если он понадобится позже.

Задание, которое мы определим, — это простой ETL-процесс: extraction, transformation, load, то есть извлечение, преобразование и загрузка. Он читает данные из файла customers.csv и записывает их в таблицу customers в базе данных PostgreSQL.

Нам нужно инициализировать таблицу Postgres customers; это делает src/main/resources/schema.sql:

create table if not exists customers (    id    serial primary key,    name  varchar(255),    email varchar(255));

SQL-инициализация Spring Boot (spring.sql.init.mode=always) выполняет этот скрипт при запуске. Сторона MongoDB устроена так же автономно — spring.batch.data.mongodb.schema.initialize=true говорит новому стартеру создать коллекции, необходимые JobRepository.

Ключевые фрагменты application.properties:

spring.mongodb.host=localhostspring.mongodb.port=27017spring.mongodb.database=mydatabasespring.batch.data.mongodb.schema.initialize=truespring.datasource.url=jdbc:postgresql://localhost/mydatabasespring.datasource.username=myuserspring.datasource.password=secret
Комментарий от Михаила Поливаха

Тут можно использовать Spring Boot Docker Compose starter, который избавит почти от всей ручной настройки какой-либо конфигурации (опять же, если мы говорим про простой «demo» кейс).

Мы также о нем писали:

https://habr.com/ru/companies/spring_aio/articles/1031216/

Задание

Наш batch job называется etl и выполняет два шага последовательно:

@BeanJob job(@Qualifier(STEP_RESET) Step stepReset,        @Qualifier(STEP_FILES_TO_DB) Step stepFilesToDb) {    return new JobBuilder("etl", this.repository)            .start(stepReset)            .next(stepFilesToDb)            .incrementer(new RunIdIncrementer())            .build();}

Сначала reset — tasklet, который очищает целевую таблицу, затем files-to-db — reader → processor → writer, который фактически переносит данные. RunIdIncrementer — небольшая, но важная деталь: он увеличивает параметр run.id при каждом запуске, чтобы Spring Batch рассматривал каждый вызов как новый экземпляр задания, а не отказывался повторно запускать уже завершенное задание.

Шаг первый: Reset

Самый простой тип шага в Spring Batch — это Tasklet: единый фрагмент работы без понятия чтения или записи элементов. Это подходящий инструмент, когда между шагами нужно просто что-то выполнить. Здесь — начать с чистого листа:

@Bean(STEP_RESET)Step cleanTableStep(JdbcClient db, JobRepository repository) {    return new StepBuilder("reset", repository)            .tasklet((contribution, chunkContext) -> {                db.sql("delete from customers").update();                return RepeatStatus.FINISHED;            })            .build();}

Tasklet выполняется один раз, возвращает RepeatStatus.FINISHED, и мы движемся дальше.

Шаг второй: reader, processor, writer

Самый интересный шаг — chunked-шаг. Базовый паттерн Spring Batch — прочитать элемент, обработать его, накопить chunk, записать chunk. Reader извлекает строки из customers.csv:

@BeanFlatFileItemReader<Customer> customerFlatFileItemReader(        @Value("classpath:/customers.csv") Resource csv) {    return new FlatFileItemReaderBuilder<Customer>()            .name("customer-reader")            .resource(csv)            .delimited(c -> c.delimiter(",").names("id", "name", "email"))            .fieldSetMapper(fs -> new Customer(                    fs.readInt("id"),                    fs.readString("name"),                    fs.readString("email")))            .build();}

Сам CSV:

id,name,email1,josh,josh@joshlong.com2,dashaun,dashaun@dashaun.com3,james,james@jamesward.dev

Идея понятна.

Writer отправляет каждый chunk в Postgres, используя ON CONFLICT DO NOTHING, чтобы повторные запуски не падали из-за конфликтов по первичному ключу:

    @Bean    FlatFileItemReader<Customer> customerFlatFileItemReader(@Value("classpath:/customers.csv") Resource csv) {        return new FlatFileItemReaderBuilder<Customer>() //                .name("customer-reader")                .resource(csv)                .delimited(c -> c.delimiter(",").names("id", "name", "email"))                .targetType(Customer.class)                .build();    }

А сам шаг связывает все вместе с небольшим сквозным processor — отличным местом, куда позже можно добавить трансформацию, обогащение или фильтрацию, — и размером chunk, равным 10:

    @Bean    JdbcBatchItemWriter<Customer> customerJdbcBatchItemWriter(DataSource dataSource) {        return new JdbcBatchItemWriterBuilder<Customer>()//                .assertUpdates(true)//                .dataSource(dataSource)//                .sql("INSERT INTO customers(id, name, email) VALUES (:id, :name, :email) on conflict do nothing")//                .beanMapped()//                .itemPreparedStatementSetter((item, ps) -> {                    ps.setInt(1, item.id());                    ps.setString(2, item.name());                    ps.setString(3, item.email());                })//                .build();    }

Обратите внимание на переключатель faultTolerant() и политику retry — Spring Batch будет незаметно повторять обработку элементов, выбрасывающих IllegalArgumentException, до десяти раз, прежде чем пометит chunk как завершившийся с ошибкой. Это одна строка, потому что всю “бухгалтерию” за вас ведет фреймворк, и именно эта бухгалтерия теперь живет в MongoDB.

Запуск

При запущенном Docker Compose:

./mvnw spring-boot:run

Задание стартует, reset очищает Postgres, files-to-db прогоняет CSV через chunk-пайплайн в Postgres, а каждый переход между шагами, число элементов, статус завершения и временная метка выполнения записываются в MongoDB. Откройте mongosh — и увидите знакомые Batch-коллекции: BATCH_JOB_INSTANCE, BATCH_JOB_EXECUTION, BATCH_STEP_EXECUTION. Только теперь это документы, а не таблицы.

Наблюдаемость

Spring Batch-задания публикуют массу интересных Spring ApplicationEvents! Я слушаю одно из них — JobExecutionEvent, которое публикуется всякий раз, когда задание завершилось, успешно или нет.

    @EventListener    void after(JobExecutionEvent event) {        IO.println("Job execution #" + event.getJobExecution() + " finished");    }

Когда я создавал программу через Spring Initializr, я не забыл добавить и OpenTelemetry Spring Boot starter. Spring Boot и Micrometer давно поддерживают OpenTelemetry, но раньше всегда требовалась некоторая возня. Теперь, если в classpath есть OpenTelemetry starter, он будет публиковать метрики в любой OpenTelemetry endpoint — по умолчанию предполагается порт 3000. Если выбрать поддержку Docker Compose в Spring Initializr, он также выдаст конфигурацию Grafana, которая будет слушать OpenTelemetry-информацию на порту 3000.

Итак, запустите приложение, затем откройте localhost:3000, нажмите Drilldown, затем Metrics и найдите spring_batch в поле поиска.

Либо можно открыть http://localhost:8080/actuator/metrics и увидеть те же метрики. Но мне нравятся яркие и красочные графики, так что страница Grafana меня весьма радует.

Бонус: Native Images с GraalVM

Технология native image в GraalVM потенциально позволяет снизить общее потребление памяти. Spring Batch уже в основном работает с GraalVM native images, но появились новые классы, которые мне нужно было учесть. И несколько новых schema-файлов.

    static class Hints implements RuntimeHintsRegistrar {        @Override        public void registerHints(@NonNull RuntimeHints hints, @Nullable ClassLoader classLoader) {            for (var c : new Class[]{                    org.springframework.batch.core.repository.persistence.JobInstance.class,                    org.springframework.batch.core.repository.persistence.ExecutionContext.class,                    org.springframework.batch.core.repository.persistence.ExitStatus.class,                    org.springframework.batch.core.repository.persistence.StepExecution.class,                    org.springframework.batch.core.repository.persistence.JobExecution.class,                    org.springframework.batch.core.repository.persistence.JobParameter.class,            }) {                hints.reflection().registerType(c, MemberCategory.values());            }            var prefix = "org/springframework/batch/core/";            for (var r : new String[]{                    "schema-mongodb", //                    "schema-drop-mongodb"}) {                for (var suffix : "jsonl,js".split(",")) {                    var path = prefix + r + "." + suffix;                    var resource = new ClassPathResource(path);                    if (resource.exists()) {                        hints.resources().registerResource(resource);                    }                }            }        }    }

И нам также нужно сообщить GraalVM о customers.csv.

    static class ResourceHints implements RuntimeHintsRegistrar {        @Override        public void registerHints(RuntimeHints hints, @Nullable ClassLoader classLoader) {            hints.reflection().registerType(Customer.class, MemberCategory.values());            hints.resources().registerResource(new ClassPathResource("/customers.csv"));        }    }

Зарегистрируйте оба в обычном порядке, добавив это в класс BatchConfiguration или в любой класс с @Configuration:

@ImportRuntimeHints({BatchConfiguration.ResourceHints.class, BatchConfiguration.Hints.class})

После этого можно собрать GraalVM native image обычным способом. Я записал шаги в скрипте native.sh в корне репозитория:

#!/usr/bin/env bashls -la target && rm -rf target./mvnw -DskipTests -Pnative native:compile./target/batch

Запустите приложение: ./target/batch — и убедитесь, что оно стартует практически мгновенно и потребляет заметно меньше RAM, чем при запуске на JVM. На моей машине — Apple M5 с macOS — оно стартует примерно за одну десятую секунды и использует около 150 МБ RAM. Долгоживущие batch-задания обычно не относятся к тем вещам, которым критически важен быстрый старт, но экономия RAM приятна, а быстрое время запуска точно не мешает.

Ленивые подключения DataSource

Еще одна оптимизация, которая в общем масштабе не слишком меняет именно эту нагрузку, но сама по себе весьма приятна: в Spring Boot 4.1 теперь поддерживается ленивое получение соединения. Помните: по умолчанию Spring Boot инициализирует DataSource и создает соединение всякий раз, когда начинается транзакция, даже если нет гарантии, что вы это соединение вообще используете. Этой платы можно избежать с помощью нового конфигурационного свойства Spring Boot spring.datasource.connection-fetch=lazy.

Получить исходники

Как обычно, полный код этого примера доступен здесь.

Почему это важно

Историческая связка Spring Batch с реляционной базой данных всегда была прагматичным компромиссом, а не архитектурным идеалом. Фреймворку нужно надежное место, где можно помнить, что он сделал, и SQL был путем наименьшего сопротивления. Теперь, когда абстракция JobRepository по-настоящему отвязана, а Spring Boot 4.1 поставляет полноценную первоклассную автоконфигурацию для MongoDB, командам, работающим на document stores, больше не нужно держать JDBC-базу только ради того, чтобы угодить batch-слою.

Выбирайте базу данных, которая подходит вашим данным. Spring Batch подстроится под ваш выбор.

Присоединяйтесь к русскоязычному сообществу разработчиков на Spring Boot в телеграм — Spring АйО, чтобы быть в курсе последних новостей из мира разработки на Spring Boot и всего, что с ним связано.

ссылка на оригинал статьи https://habr.com/ru/articles/1052358/