Kafka Streams ч3: Stateless processing

от автора

В предыдущих статьях [ноль, один] мы рассмотрели основные концепции Kafka Streams и сравнили их со стандартными подходами обработки потоковых данных. В этой части мы сосредоточимся на stateless processing (обработке без сохранения состояния) и поймем как применять различные операции Kafka Streams для решения практических задач. Мы создадим приложение для обработки данных медицинской клиники.

Требования к ETL пайплайну:

Приложение должно обрабатывать поток JSON-сообщений из входного топика Kafka (patient-records), содержащих информацию о пациентах медицинской клиники. Цель состоит в том, чтобы применять различные stateless операции для трансформации данных и отправлять результаты в выходной топик (clinic-notifications-topic). Это позволит продемонстрировать, как использовать основные возможности Kafka Streams для обработки данных без сохранения состояния.

Требуемые операции:

  • Фильтрация: исключить записи пациентов младше 18 лет.

  • Изменение ключей: установить patientId в качестве нового ключа.

  • Добавление и удаление полей:

    • Добавить поле nextAppointmentDate, если followUpNeeded равно true.

    • Удалить поле assignedDoctor, если оно null или пустое.

  • Ветвление потоков:

    • Поток A: пациенты с установленным диагнозом.

    • Поток B: пациенты без диагноза.

  • Преобразование записей:

    • В Потоке A создать уведомления для врачей.

    • В Потоке B создать напоминания для пациентов.

  • Поочередное обогащение: добавить информацию об ответственном враче из локального справочника.

  • Слияние потоков: объединить потоки A и B.

  • Вывод данных: отправить обработанные записи в clinic-notifications-topic

Настройка проекта

I. Создание проекта

Создадим новый проект на Java с использованием Gradle и Kotlin DSL (build.gradle.kts).

Структура проекта:

kafka-streams-stateless/ ├── build.gradle.kts ├── src/ │   ├── main/ │   │   ├── java/ │   │   │   └── com.example.kafka/ │   │   │       ├── StreamsApp.java │   │   │       ├── PatientRecord.java │   │   │       ├── Notification.java │   │   │       ├── Reminder.java │   │   │       └── Doctor.java │   │   └── resources/ │   └── test/ └── settings.gradle.kts 

II. Файл build.gradle.kts

plugins {     java     application }  repositories {     mavenCentral() }  dependencies {     implementation("org.apache.kafka:kafka-streams:3.8.0")     implementation("com.fasterxml.jackson.core:jackson-databind:2.17.2")     implementation("com.fasterxml.jackson.core:jackson-core:2.17.2")     implementation("com.fasterxml.jackson.core:jackson-annotations:2.17.2")     implementation("com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.17.2") }  application {     mainClass.set("com.example.kafka.StreamsApp") }

III. Конфигурация Kafka Streams

Создадим класс StreamsApp.java и настроим конфигурацию Kafka Streams.

import java.util.Properties; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsConfig;  public class StreamsApp {     private static final String APPLICATION_NAME = "stateless-processing-app";     private static final String BOOTSTRAP_SERVERS = "localhost:9092";      public static Properties getStreamsConfig() {         var config = new Properties();                  config.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_NAME);         config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);         config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);         config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);                  return config;     } }

Описание входных данных

Входные данные представляют собой JSON-сообщения о пациентах медицинской клиники, поступающие из входного топика Kafka (patient-records). Каждое сообщение содержит информацию о пациенте, его записи на прием и медицинском состоянии.

// 1 Взрослый пациент с диагнозом “Грипп” и необходимостью повторного приема.  // Поле assignedDoctor пустое. {   "recordId": "1",   "patientId": "P001",   "name": "Иван Иванов",   "age": 30,   "appointmentDate": "2023-10-15",   "diagnosis": "Грипп",   "followUpNeeded": true,   "assignedDoctor": "" } // 2 Взрослый пациент без диагноза и без необходимости повторного приема.  // Поле assignedDoctor равно null. {   "recordId": "2",   "patientId": "P002",   "name": "Анна Петрова",   "age": 22,   "appointmentDate": "2023-10-16",   "diagnosis": "",   "followUpNeeded": false,   "assignedDoctor": null } // 3 Пациент младше 18 лет. Эта запись должна быть отфильтрована и  // не попадет в дальнейшую обработку. {   "recordId": "3",   "patientId": "P003",   "name": "Сергей Сидоров",   "age": 16,   "appointmentDate": "2023-10-17",   "diagnosis": "Ангина",   "followUpNeeded": true,   "assignedDoctor": "Др. Алексей Смирнов" } // 4 Взрослый пациент с диагнозом “Мигрень”, но без необходимости  // повторного приема. Указан ответственный врач. {   "recordId": "4",   "patientId": "P004",   "name": "Мария Кузнецова",   "age": 45,   "appointmentDate": "2023-10-18",   "diagnosis": "Мигрень",   "followUpNeeded": false,   "assignedDoctor": "Др. Елена Иванова" }

Реализация Stateless Операций

I. Фильтрация записей

Отфильтруем записи пациентов младше 18 лет.

StreamsBuilder builder = new StreamsBuilder();  KStream<String, String> sourceStream = builder.stream("patient-records");  KStream<String, String> filteredStream = sourceStream.filter((key, value) -> { PatientRecord record = PatientRecord.fromJson(value);     return record.getAge() >= 18; }); 
+---------------+  -->  +-------------+  -->  +----------------+ | Patient Data  |       | Age < 18    |       | Filtered (<18) | +---------------+       +-------------+       +----------------+

II. Изменение ключей записей

Изменим ключ записи на patientId.

KStream<String, String> rekeyedStream = filteredStream.selectKey((key, value) -> {     PatientRecord record = PatientRecord.fromJson(value);     return record.getPatientId(); }); 
+---------------+  -->  +----------------+  -->  +-------------------+ | Record Data   |       | Change Key to  |       | Key = patientId   | +---------------+       | patientId      |       +-------------------+

III. Добавление и удаление полей

Добавим поле nextAppointmentDate и удалим поле assignedDoctor, если оно null или пустое.

  KStream<String, String> augmentedStream = rekeyedStream.mapValues(value -> {     PatientRecord record = PatientRecord.fromJson(value);      if (record.isFollowUpNeeded()) {         record.setNextAppointmentDate(record.getAppointmentDate().plusDays(30));     }      if (record.getAssignedDoctor() == null || record.getAssignedDoctor().isEmpty()) {         record.setAssignedDoctor(null);     }      return record.toJson(); }); 
+-------------------+  -->  +----------------------------+  -->  +---------------------+ | Record Data       |       | Add: nextAppointmentDate   |       | Field Added/Removed | |                   |       | Remove: assignedDoctor if  |       | nextAppointmentDate | |                   |       | null or empty              |       | assignedDoctor (if) | +-------------------+       +----------------------------+       +---------------------+

IV. Ветвление потоков

Разделим поток на два на основе наличия диагноза.

KStream<String, String>[] branchedStreams = augmentedStream.branch(     (key, value) -> {         PatientRecord record = PatientRecord.fromJson(value);         return record.getDiagnosis() != null && !record.getDiagnosis().isEmpty();     },     (key, value) -> {         PatientRecord record = PatientRecord.fromJson(value);         return record.getDiagnosis() == null || record.getDiagnosis().isEmpty();     } );  KStream<String, String> diagnosedStream = branchedStreams[0]; KStream<String, String> undiagnosedStream = branchedStreams[1]; 
+---------------+  -->  +----------------------+  -->  +-----------------+ | Patient Data  |       | Has Diagnosis?       |      | With Diagnosis   | +---------------+       +----------------------+      +----------------_-+                        \\                         \\                         \\                         -->  +---------------------+                          \\----------------------------> | Without Diagnosis  |                                                         +--------------------+

V. Преобразование записей в один или несколько выходов

В Потоке A создадим уведомления для врачей.

KStream<String, String> doctorNotifications = diagnosedStream.mapValues(value -> {     PatientRecord record = PatientRecord.fromJson(value);     Notification notification = createDoctorNotification(record);     return notification.toJson(); }); 

В Потоке B создадим напоминания для пациентов.

KStream<String, String> patientReminders = undiagnosedStream.mapValues(value -> {     PatientRecord record = PatientRecord.fromJson(value);     Reminder reminder = createPatientReminder(record);     return reminder.toJson(); }); 
+---------------+  -->  +-----------+  -->  +------------------+ | Patient Data  |       | Stream A  |      | Notifications for | +---------------+       |           |      | Doctors           |                         |           |      +-------------------+                         |           |                         |           |      +------------------+                         | Stream B  | ---> | Reminders for    |                         |           |      | Patients         |                         +-----------+      +------------------+

VI. Поочередное обогащение данных

Добавим информацию об ответственном враче из локального справочника.

KStream<String, String> enrichedDoctorNotifications = doctorNotifications.mapValues(value -> {     Notification notification = Notification.fromJson(value);     Doctor doctor = getAssignedDoctor(notification.getPatientId());     notification.setAssignedDoctor(doctor);     return notification.toJson(); }); 

VII. Слияние потоков

Объединим оба потока обратно в один.

KStream<String, String> mergedStream = enrichedDoctorNotifications.merge(patientReminders);
+------------------+        +----------+        +------------------+ | Notifications    | -----> |  Merge   | -----> |  Combined Output  | | for Doctors      |        |          |        |                  | +------------------+        |          |        +------------------+                             |          | +------------------+        |          | | Reminders for    | -----> |          | | Patients         |        +----------+ +------------------+

VIII. Вывод данных

Отправим обработанные записи в выходной топик clinic-notifications-topic.

mergedStream.to("clinic-notifications-topic"); 

Полный исходный код нашего приложения

Запуск и тестирование приложения

  • Убедитесь, что Kafka запущена локально.

  • Создайте входной и выходной топики.

    bin/kafka-topics.sh --create --topic patient-records --bootstrap-server localhost:9092 bin/kafka-topics.sh --create --topic clinic-notifications-topic --bootstrap-server localhost:9092
  • Соберите и запустите приложение ./gradlew run

  • Отправка тестовых сообщений в топик patient-records

  • Просмотрите выходные сообщения в топике clinic-notifications-topic

    { "patientId": "P001", "message": "Проверьте план лечения пациента Иван Иванов.", "assignedDoctor": { "doctorId": "D001", "name": "Др. Сергей Петров" }, "nextAppointmentDate": "2023-11-14" }

Заключение

В этой статье мы подробно рассмотрели, как использовать stateless операции в Kafka Streams для обработки данных медицинской клиники. Мы включили техническое задание, описали структуру входных данных и поэтапно реализовали необходимые операции: фильтрацию, изменение ключей, добавление и удаление полей, ветвление и слияние потоков, преобразование записей и поочередное обогащение данных.

Использование stateless операций позволяет создавать эффективные и масштабируемые приложения для потоковой обработки данных без необходимости управления состоянием. Такие приложения проще в разработке и обслуживании, а также легко масштабируются горизонтально.


ссылка на оригинал статьи https://habr.com/ru/articles/858668/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *