В предыдущих статьях [ноль, один] мы рассмотрели основные концепции Kafka Streams и сравнили их со стандартными подходами обработки потоковых данных. В этой части мы сосредоточимся на stateless processing (обработке без сохранения состояния) и поймем как применять различные операции Kafka Streams для решения практических задач. Мы создадим приложение для обработки данных медицинской клиники.
Требования к ETL пайплайну:
Приложение должно обрабатывать поток JSON-сообщений из входного топика Kafka (patient-records), содержащих информацию о пациентах медицинской клиники. Цель состоит в том, чтобы применять различные stateless операции для трансформации данных и отправлять результаты в выходной топик (clinic-notifications-topic). Это позволит продемонстрировать, как использовать основные возможности Kafka Streams для обработки данных без сохранения состояния.
Требуемые операции:
-
Фильтрация: исключить записи пациентов младше 18 лет.
-
Изменение ключей: установить patientId в качестве нового ключа.
-
Добавление и удаление полей:
-
Добавить поле nextAppointmentDate, если followUpNeeded равно true.
-
Удалить поле assignedDoctor, если оно null или пустое.
-
-
Ветвление потоков:
-
Поток A: пациенты с установленным диагнозом.
-
Поток B: пациенты без диагноза.
-
-
Преобразование записей:
-
В Потоке A создать уведомления для врачей.
-
В Потоке B создать напоминания для пациентов.
-
-
Поочередное обогащение: добавить информацию об ответственном враче из локального справочника.
-
Слияние потоков: объединить потоки A и B.
-
Вывод данных: отправить обработанные записи в clinic-notifications-topic
Настройка проекта
I. Создание проекта
Создадим новый проект на Java с использованием Gradle и Kotlin DSL (build.gradle.kts
).
Структура проекта:
kafka-streams-stateless/ ├── build.gradle.kts ├── src/ │ ├── main/ │ │ ├── java/ │ │ │ └── com.example.kafka/ │ │ │ ├── StreamsApp.java │ │ │ ├── PatientRecord.java │ │ │ ├── Notification.java │ │ │ ├── Reminder.java │ │ │ └── Doctor.java │ │ └── resources/ │ └── test/ └── settings.gradle.kts
II. Файл build.gradle.kts
plugins { java application } repositories { mavenCentral() } dependencies { implementation("org.apache.kafka:kafka-streams:3.8.0") implementation("com.fasterxml.jackson.core:jackson-databind:2.17.2") implementation("com.fasterxml.jackson.core:jackson-core:2.17.2") implementation("com.fasterxml.jackson.core:jackson-annotations:2.17.2") implementation("com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.17.2") } application { mainClass.set("com.example.kafka.StreamsApp") }
III. Конфигурация Kafka Streams
Создадим класс StreamsApp.java и настроим конфигурацию Kafka Streams.
import java.util.Properties; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsConfig; public class StreamsApp { private static final String APPLICATION_NAME = "stateless-processing-app"; private static final String BOOTSTRAP_SERVERS = "localhost:9092"; public static Properties getStreamsConfig() { var config = new Properties(); config.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_NAME); config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); return config; } }
Описание входных данных
Входные данные представляют собой JSON-сообщения о пациентах медицинской клиники, поступающие из входного топика Kafka (patient-records). Каждое сообщение содержит информацию о пациенте, его записи на прием и медицинском состоянии.
// 1 Взрослый пациент с диагнозом “Грипп” и необходимостью повторного приема. // Поле assignedDoctor пустое. { "recordId": "1", "patientId": "P001", "name": "Иван Иванов", "age": 30, "appointmentDate": "2023-10-15", "diagnosis": "Грипп", "followUpNeeded": true, "assignedDoctor": "" } // 2 Взрослый пациент без диагноза и без необходимости повторного приема. // Поле assignedDoctor равно null. { "recordId": "2", "patientId": "P002", "name": "Анна Петрова", "age": 22, "appointmentDate": "2023-10-16", "diagnosis": "", "followUpNeeded": false, "assignedDoctor": null } // 3 Пациент младше 18 лет. Эта запись должна быть отфильтрована и // не попадет в дальнейшую обработку. { "recordId": "3", "patientId": "P003", "name": "Сергей Сидоров", "age": 16, "appointmentDate": "2023-10-17", "diagnosis": "Ангина", "followUpNeeded": true, "assignedDoctor": "Др. Алексей Смирнов" } // 4 Взрослый пациент с диагнозом “Мигрень”, но без необходимости // повторного приема. Указан ответственный врач. { "recordId": "4", "patientId": "P004", "name": "Мария Кузнецова", "age": 45, "appointmentDate": "2023-10-18", "diagnosis": "Мигрень", "followUpNeeded": false, "assignedDoctor": "Др. Елена Иванова" }
Реализация Stateless Операций
I. Фильтрация записей
Отфильтруем записи пациентов младше 18 лет.
StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> sourceStream = builder.stream("patient-records"); KStream<String, String> filteredStream = sourceStream.filter((key, value) -> { PatientRecord record = PatientRecord.fromJson(value); return record.getAge() >= 18; });
+---------------+ --> +-------------+ --> +----------------+ | Patient Data | | Age < 18 | | Filtered (<18) | +---------------+ +-------------+ +----------------+
II. Изменение ключей записей
Изменим ключ записи на patientId.
KStream<String, String> rekeyedStream = filteredStream.selectKey((key, value) -> { PatientRecord record = PatientRecord.fromJson(value); return record.getPatientId(); });
+---------------+ --> +----------------+ --> +-------------------+ | Record Data | | Change Key to | | Key = patientId | +---------------+ | patientId | +-------------------+
III. Добавление и удаление полей
Добавим поле nextAppointmentDate и удалим поле assignedDoctor, если оно null или пустое.
KStream<String, String> augmentedStream = rekeyedStream.mapValues(value -> { PatientRecord record = PatientRecord.fromJson(value); if (record.isFollowUpNeeded()) { record.setNextAppointmentDate(record.getAppointmentDate().plusDays(30)); } if (record.getAssignedDoctor() == null || record.getAssignedDoctor().isEmpty()) { record.setAssignedDoctor(null); } return record.toJson(); });
+-------------------+ --> +----------------------------+ --> +---------------------+ | Record Data | | Add: nextAppointmentDate | | Field Added/Removed | | | | Remove: assignedDoctor if | | nextAppointmentDate | | | | null or empty | | assignedDoctor (if) | +-------------------+ +----------------------------+ +---------------------+
IV. Ветвление потоков
Разделим поток на два на основе наличия диагноза.
KStream<String, String>[] branchedStreams = augmentedStream.branch( (key, value) -> { PatientRecord record = PatientRecord.fromJson(value); return record.getDiagnosis() != null && !record.getDiagnosis().isEmpty(); }, (key, value) -> { PatientRecord record = PatientRecord.fromJson(value); return record.getDiagnosis() == null || record.getDiagnosis().isEmpty(); } ); KStream<String, String> diagnosedStream = branchedStreams[0]; KStream<String, String> undiagnosedStream = branchedStreams[1];
+---------------+ --> +----------------------+ --> +-----------------+ | Patient Data | | Has Diagnosis? | | With Diagnosis | +---------------+ +----------------------+ +----------------_-+ \\ \\ \\ --> +---------------------+ \\----------------------------> | Without Diagnosis | +--------------------+
V. Преобразование записей в один или несколько выходов
В Потоке A создадим уведомления для врачей.
KStream<String, String> doctorNotifications = diagnosedStream.mapValues(value -> { PatientRecord record = PatientRecord.fromJson(value); Notification notification = createDoctorNotification(record); return notification.toJson(); });
В Потоке B создадим напоминания для пациентов.
KStream<String, String> patientReminders = undiagnosedStream.mapValues(value -> { PatientRecord record = PatientRecord.fromJson(value); Reminder reminder = createPatientReminder(record); return reminder.toJson(); });
+---------------+ --> +-----------+ --> +------------------+ | Patient Data | | Stream A | | Notifications for | +---------------+ | | | Doctors | | | +-------------------+ | | | | +------------------+ | Stream B | ---> | Reminders for | | | | Patients | +-----------+ +------------------+
VI. Поочередное обогащение данных
Добавим информацию об ответственном враче из локального справочника.
KStream<String, String> enrichedDoctorNotifications = doctorNotifications.mapValues(value -> { Notification notification = Notification.fromJson(value); Doctor doctor = getAssignedDoctor(notification.getPatientId()); notification.setAssignedDoctor(doctor); return notification.toJson(); });
VII. Слияние потоков
Объединим оба потока обратно в один.
KStream<String, String> mergedStream = enrichedDoctorNotifications.merge(patientReminders);
+------------------+ +----------+ +------------------+ | Notifications | -----> | Merge | -----> | Combined Output | | for Doctors | | | | | +------------------+ | | +------------------+ | | +------------------+ | | | Reminders for | -----> | | | Patients | +----------+ +------------------+
VIII. Вывод данных
Отправим обработанные записи в выходной топик clinic-notifications-topic.
mergedStream.to("clinic-notifications-topic");
Полный исходный код нашего приложения
-
StreamsApp класс
-
Doctor класс
-
PatientRecord класс
-
Notification класс
-
Reminder класс
Запуск и тестирование приложения
-
Убедитесь, что Kafka запущена локально.
-
Создайте входной и выходной топики.
bin/kafka-topics.sh --create --topic patient-records --bootstrap-server localhost:9092 bin/kafka-topics.sh --create --topic clinic-notifications-topic --bootstrap-server localhost:9092
-
Соберите и запустите приложение
./gradlew run
-
Отправка тестовых сообщений в топик patient-records
-
Просмотрите выходные сообщения в топике clinic-notifications-topic
{ "patientId": "P001", "message": "Проверьте план лечения пациента Иван Иванов.", "assignedDoctor": { "doctorId": "D001", "name": "Др. Сергей Петров" }, "nextAppointmentDate": "2023-11-14" }
Заключение
В этой статье мы подробно рассмотрели, как использовать stateless операции в Kafka Streams для обработки данных медицинской клиники. Мы включили техническое задание, описали структуру входных данных и поэтапно реализовали необходимые операции: фильтрацию, изменение ключей, добавление и удаление полей, ветвление и слияние потоков, преобразование записей и поочередное обогащение данных.
Использование stateless операций позволяет создавать эффективные и масштабируемые приложения для потоковой обработки данных без необходимости управления состоянием. Такие приложения проще в разработке и обслуживании, а также легко масштабируются горизонтально.
ссылка на оригинал статьи https://habr.com/ru/articles/858668/
Добавить комментарий