Интеграция с Apache Cassandra. Создаем микросервис с Cassandra и Kafka

от автора

Автор статьи: Рустем Галиев

IBM Senior DevOps Engineer & Integration Architect

Привет Хабр!

Сегодня мы узнаем, как написать микросервис, который сохраняет данные в Apache Cassandra с помощью Java-драйвера DataStax и генерирует события для изменений данных в Apache Kafka.

Этот пост основан на дизайне простого микросервиса для управления данными бронирования отелей, который называется Reservation Service. Вы можете выполнить серию упражнений по записи и чтению данных в Cassandra с помощью службы резервирования в наборе «Cassandra: разработка приложений с помощью Java-драйвера DataStax». Исходный код службы бронирования доступен на GitHub.

Служба бронирования использует модель данных бронирования отелей, которая включает приведенные ниже таблицы.

Cassandra и Kafka часто используются вместе в микросервисных архитектурах, как показано ниже. В этом дизайне служба бронирования получает запрос API для выполнения некоторых действий, таких как создание, обновление или удаление бронирования. После сохранения изменения в Cassandra служба резервирования отправляет сообщение в топик reservation в кластере Kafka.

Другие службы используют топик reservation и выполняют различные действия в ответ на каждое сообщение: например, служба инвентаризации обновляет таблицы инвентаризации, чтобы отметить даты как зарезервированные, или, возможно, служба электронной почты отправляет электронное письмо с благодарностью гостю за бронирование. В этом стиле взаимодействия Кассандра и Кафка не связаны напрямую, а используются взаимодополняющим образом.

Настройка службы бронирования


Начнем с клонирования исходного кода Reservation Service с GitHub и проверки ветки, которая станет отправной точкой:

git clone -b kafka https://github.com/jeffreyscarpenter/reservation-service

После того, как это будет завершено, мы должны увидеть директорию reservation_service.

Давайте рассмотрим часть содержимого в коде.

Во-первых, мы рассмотрим схему, которую будем использовать для таблиц резервирования по пути reservation-service/src/main/resources/reservation.cql:

/*  * Copyright (C) 2016-2020 Jeff Carpenter  */  /* This file contains a slightly modified version of the "reservation" keyspace and table definitions  * for the example defined in Chapter 5 of Cassandra: The Definitive Guide, 2nd and 3nd Editions.  * The changes are to facilitate development exercises.  */  CREATE KEYSPACE reservation     WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 1};  CREATE TABLE reservation.reservations_by_hotel_date (     hotel_id text,     start_date date,     end_date date,     room_number smallint,     confirm_number text,     guest_id uuid,     PRIMARY KEY ((hotel_id, start_date), room_number) );  CREATE TABLE reservation.reservations_by_confirmation (     confirm_number text PRIMARY KEY,     hotel_id text,     start_date date,     end_date date,     room_number smallint,     guest_id uuid );  /*  The following tables are provided for completeness with the book text, but they are not used in the current  implementation of the Reservation Service  */  CREATE TABLE reservation.reservations_by_guest (     guest_last_name text,     hotel_id text,     start_date date,     end_date date,     room_number smallint,     confirm_number text,     guest_id uuid,     PRIMARY KEY ((guest_last_name), hotel_id) );  CREATE TYPE reservation.address (     street text,     city text,     state_or_province text,     postal_code text,     country text );  CREATE TABLE reservation.guests (     guest_id uuid PRIMARY KEY,     first_name text,     last_name text,     title text,     emails set<text>,     phone_numbers list<text>,     addresses map<text, frozen<address>> );

Мы также хотим отметить зависимости для драйвера DataStax Java и клиентских библиотек Kafka по пути reservation-service/pom.xml:

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion>  <groupId>com.cassandraguide</groupId> <artifactId>reservation-service</artifactId> <version>1.0.0</version> <packaging>jar</packaging>  <name>reservation-service</name> <description>Demo service using Cassandra driver and Spring Boot</description>  <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.6.RELEASE</version> <relativePath /> </parent>  <properties> <java.version>11</java.version> <swagger.version>2.9.2</swagger.version> <junit-jupiter.version>5.4.2</junit-jupiter.version> <junit-platform.version>1.7.0-M1</junit-platform.version> <oss-java-driver.version>4.5.1</oss-java-driver.version> <kafka-client.version>2.5.0</kafka-client.version> <testcontainers.version>1.14.1</testcontainers.version>  <version.maven.plugin.compiler>3.8.0</version.maven.plugin.compiler> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <start-class>dev.cassandraguide.ReservationServiceApp</start-class> <dockerfile-maven-version>1.4.13</dockerfile-maven-version>  </properties>  <dependencies>  <!-- Create a RESTFul Service --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency>  <!-- Document for REST Service --> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger2</artifactId> <version>${swagger.version}</version> </dependency> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger-ui</artifactId> <version>${swagger.version}</version> </dependency>  <!-- Cassandra Driver --> <dependency> <groupId>com.datastax.oss</groupId> <artifactId>java-driver-core</artifactId> <version>${oss-java-driver.version}</version> </dependency> <dependency> <groupId>com.datastax.oss</groupId> <artifactId>java-driver-query-builder</artifactId> <version>${oss-java-driver.version}</version> </dependency>  <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>${kafka-client.version}</version> </dependency>  <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> </dependency> <!-- Provides JSON serialization/deserialization for date/time types --> <dependency> <groupId>com.fasterxml.jackson.datatype</groupId> <artifactId>jackson-datatype-jsr310</artifactId> </dependency>  <!-- Tests --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>junit</groupId> <artifactId>junit</artifactId> </exclusion> </exclusions> </dependency> <!-- Junit 5 --> <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter-api</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter-engine</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter-params</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.junit.platform</groupId> <artifactId>junit-platform-engine</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.junit.platform</groupId> <artifactId>junit-platform-launcher</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.junit.platform</groupId> <artifactId>junit-platform-runner</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.junit.platform</groupId> <artifactId>junit-platform-console-standalone</artifactId> <version>${junit-platform.version}</version> <scope>test</scope> </dependency> <!-- Test against Docker Containers --> <dependency> <groupId>org.testcontainers</groupId> <artifactId>cassandra</artifactId> <scope>test</scope> <version>${testcontainers.version}</version> </dependency> <dependency> <groupId>org.testcontainers</groupId> <artifactId>testcontainers</artifactId> <scope>test</scope> <version>${testcontainers.version}</version> </dependency> <dependency> <groupId>org.testcontainers</groupId> <artifactId>kafka</artifactId> <scope>test</scope> <version>${testcontainers.version}</version> </dependency> <!-- Add driver keys to spring-boot config file --> <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-configuration-processor</artifactId>            <optional>true</optional>         </dependency>          </dependencies>  <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <configuration> <fork>true</fork> <mainClass>${start-class}</mainClass> </configuration> <executions> <execution> <goals> <goal>repackage</goal> </goals> </execution> </executions> </plugin>             <plugin>                 <groupId>org.apache.maven.plugins</groupId>                 <artifactId>maven-compiler-plugin</artifactId>                 <configuration>                     <source>${java.version}</source>                     <target>${java.version}</target> </configuration>             </plugin>  <plugin> <groupId>com.spotify</groupId> <artifactId>dockerfile-maven-plugin</artifactId> <version>${dockerfile-maven-version}</version> <configuration> <repository>reservation-service</repository> <tag>${project.version}</tag> <buildArgs> <JAR_FILE>${project.build.finalName}.jar</JAR_FILE> </buildArgs> </configuration> </plugin>         </plugins> </build>  </project> 

В исходном коде есть несколько ключевых классов Java, которые мы могли бы также изучить:

Определение класса сущностей, используемое в HTTP API, которое представляет тип данных, которые мы сохраняем в Cassandra и публикуем в теме Kafka reservation-service/src/main/java/dev/cassandraguide/model/Reservation.java:

/*  * Copyright (C) 2017-2020 Jeff Carpenter  *  * Licensed under the Apache License, Version 2.0 (the "License");  * you may not use this file except in compliance with the License.  * You may obtain a copy of the License at  *  * http://www.apache.org/licenses/LICENSE-2.0  *  * Unless required by applicable law or agreed to in writing, software  * distributed under the License is distributed on an "AS IS" BASIS,  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  * See the License for the specific language governing permissions and  * limitations under the License.  */ package dev.cassandraguide.model;  import java.io.Serializable; import java.time.LocalDate; import java.util.UUID;  /**  * Entity working with Reservation on Cassandra.  *  * @author Jeff Carpenter  */ public class Reservation implements Serializable {      /** Serial. */     private static final long serialVersionUID = -3392237616280919281L;          /** Hotel identifier, as Text not UUID (for simplicity). */     private String hotelId;          /** Formated as YYYY-MM-DD in interfaces. */     private LocalDate startDate;          /** Formated as YYYY-MM-DD in interfaces. */     private LocalDate endDate;          /** Room number. */     private short roomNumber;          /** UUID. */     private UUID guestId;          /** Confirmation for this Reservation. */     private String confirmationNumber;          /**      * Default constructor      */     public Reservation() {     }          /**      * Default constructor      */     public Reservation(ReservationRequest form) {         setStartDate(form.getStartDate());         setEndDate(form.getEndDate());         setHotelId(form.getHotelId());         setGuestId(form.getGuestId());         setRoomNumber(form.getRoomNumber());     }          /**      * Default constructor      */     public Reservation(ReservationRequest form, String confirmationNumber) {         this(form);         this.confirmationNumber = confirmationNumber;     }      /**      * Getter accessor for attribute 'hotelId'.      *      * @return      *       current value of 'hotelId'      */     public String getHotelId() {         return hotelId;     }      /**      * Setter accessor for attribute 'hotelId'.      * @param hotelId      *      new value for 'hotelId '      */     public void setHotelId(String hotelId) {         this.hotelId = hotelId;     }      /**      * Getter accessor for attribute 'startDate'.      *      * @return      *       current value of 'startDate'      */     public LocalDate getStartDate() {         return startDate;     }      /**      * Setter accessor for attribute 'startDate'.      * @param startDate      *      new value for 'startDate '      */     public void setStartDate(LocalDate startDate) {         this.startDate = startDate;     }      /**      * Getter accessor for attribute 'endDate'.      *      * @return      *       current value of 'endDate'      */     public LocalDate getEndDate() {         return endDate;     }      /**      * Setter accessor for attribute 'endDate'.      * @param endDate      *      new value for 'endDate '      */     public void setEndDate(LocalDate endDate) {         this.endDate = endDate;     }      /**      * Getter accessor for attribute 'roomNumber'.      *      * @return      *       current value of 'roomNumber'      */     public short getRoomNumber() {         return roomNumber;     }      /**      * Setter accessor for attribute 'roomNumber'.      * @param roomNumber      *      new value for 'roomNumber '      */     public void setRoomNumber(short roomNumber) {         this.roomNumber = roomNumber;     }      /**      * Getter accessor for attribute 'guestId'.      *      * @return      *       current value of 'guestId'      */     public UUID getGuestId() {         return guestId;     }      /**      * Setter accessor for attribute 'guestId'.      * @param guestId      *      new value for 'guestId '      */     public void setGuestId(UUID guestId) {         this.guestId = guestId;     }      /**      * Getter accessor for attribute 'confirmationNumber'.      *      * @return      *       current value of 'confirmationNumber'      */     public String getConfirmationNumber() {         return confirmationNumber;     }      /**      * Setter accessor for attribute 'confirmationNumber'.      * @param confirmationNumber      * new value for 'confirmationNumber '      */     public void setConfirmationNumber(String confirmationNumber) {         this.confirmationNumber = confirmationNumber;     }          /** {@inheritDoc} */     @Override     public String toString() {         return "Confirmation Number = " + confirmationNumber +                 ", Hotel ID: " + getHotelId() +                 ", Start Date = " + getStartDate() +                 ", End Date = " + getEndDate() +                 ", Room Number = " + getRoomNumber() +                 ", Guest ID = " + getGuestId();     } }

Логика хранения данных сервиса, где мы будем выполнять большую часть нашей работы reservation-service/src/main/java/dev/cassandraguide/repository/ReservationRepository.java:

ReservationRepository.java
/*  * Copyright (C) 2017-2020 Jeff Carpenter  *  * Licensed under the Apache License, Version 2.0 (the "License");  * you may not use this file except in compliance with the License.  * You may obtain a copy of the License at  *  * http://www.apache.org/licenses/LICENSE-2.0  *  * Unless required by applicable law or agreed to in writing, software  * distributed under the License is distributed on an "AS IS" BASIS,  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  * See the License for the specific language governing permissions and  * limitations under the License.  */ package dev.cassandraguide.repository;  import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.bindMarker; import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.deleteFrom; import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.selectFrom; import static com.datastax.oss.driver.api.querybuilder.SchemaBuilder.createTable; import static com.datastax.oss.driver.api.querybuilder.SchemaBuilder.createType; import static com.datastax.oss.driver.api.querybuilder.relation.Relation.column;  import java.time.LocalDate; import java.util.List; import java.util.Objects; import java.util.Optional; import java.util.UUID; import java.util.stream.Collectors;  import javax.annotation.PreDestroy;  import com.fasterxml.jackson.core.JsonProcessingException; import dev.cassandraguide.model.Reservation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Profile; import org.springframework.lang.NonNull; import org.springframework.stereotype.Repository;  import com.datastax.oss.driver.api.core.CqlIdentifier; import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.cql.BatchStatement; import com.datastax.oss.driver.api.core.cql.BoundStatement; import com.datastax.oss.driver.api.core.cql.DefaultBatchType; import com.datastax.oss.driver.api.core.cql.PreparedStatement; import com.datastax.oss.driver.api.core.cql.ResultSet; import com.datastax.oss.driver.api.core.cql.Row; import com.datastax.oss.driver.api.core.metadata.schema.ClusteringOrder; import com.datastax.oss.driver.api.core.type.DataTypes; import com.datastax.oss.driver.api.core.type.UserDefinedType; import com.datastax.oss.driver.api.querybuilder.QueryBuilder;  // TODO: Review imports for publishing to Kafka import org.apache.kafka.clients.producer.*; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;  /**  * The goal of this project is to provide a minimally functional implementation of a microservice   * that uses Apache Cassandra for its data storage. The reservation service is implemented as a   * RESTful service using Spring Boot.  *  * @author Jeff Carpenter, Cedrick Lunven  */ @Repository @Profile("!unit-test") // When I do some 'unit-test' no connectivity to DB public class ReservationRepository {      /** Logger for the class. */     private static final Logger logger = LoggerFactory.getLogger(ReservationRepository.class);          // Reservation Schema Constants     public static final CqlIdentifier TYPE_ADDRESS               = CqlIdentifier.fromCql("address");     public static final CqlIdentifier TABLE_RESERVATION_BY_HOTEL_DATE =             CqlIdentifier.fromCql("reservations_by_hotel_date");     public static final CqlIdentifier TABLE_RESERVATION_BY_CONFI = CqlIdentifier.fromCql("reservations_by_confirmation");     public static final CqlIdentifier TABLE_RESERVATION_BY_GUEST = CqlIdentifier.fromCql("reservations_by_guest");     public static final CqlIdentifier TABLE_GUESTS               = CqlIdentifier.fromCql("guests");     public static final CqlIdentifier STREET                     = CqlIdentifier.fromCql("street");     public static final CqlIdentifier CITY                       = CqlIdentifier.fromCql("city");     public static final CqlIdentifier STATE_PROVINCE             = CqlIdentifier.fromCql("state_or_province");     public static final CqlIdentifier POSTAL_CODE                = CqlIdentifier.fromCql("postal_code");     public static final CqlIdentifier COUNTRY                    = CqlIdentifier.fromCql("country");     public static final CqlIdentifier HOTEL_ID                   = CqlIdentifier.fromCql("hotel_id");     public static final CqlIdentifier START_DATE                 = CqlIdentifier.fromCql("start_date");     public static final CqlIdentifier END_DATE                   = CqlIdentifier.fromCql("end_date");     public static final CqlIdentifier ROOM_NUMBER                = CqlIdentifier.fromCql("room_number");     public static final CqlIdentifier CONFIRM_NUMBER             = CqlIdentifier.fromCql("confirm_number");     public static final CqlIdentifier GUEST_ID                   = CqlIdentifier.fromCql("guest_id");     public static final CqlIdentifier GUEST_LAST_NAME            = CqlIdentifier.fromCql("guest_last_name");     public static final CqlIdentifier FIRSTNAME                  = CqlIdentifier.fromCql("first_name");     public static final CqlIdentifier LASTNAME                   = CqlIdentifier.fromCql("last_name");     public static final CqlIdentifier TITLE                      = CqlIdentifier.fromCql("title");     public static final CqlIdentifier EMAILS                     = CqlIdentifier.fromCql("emails");     public static final CqlIdentifier PHONE_NUMBERS              = CqlIdentifier.fromCql("phone_numbers");     public static final CqlIdentifier ADDRESSES                  = CqlIdentifier.fromCql("addresses");          private PreparedStatement psExistReservation;     private PreparedStatement psFindReservation;     private PreparedStatement psInsertReservationByHotelDate;     private PreparedStatement psInsertReservationByConfirmation;     private PreparedStatement psDeleteReservationByHotelDate;     private PreparedStatement psDeleteReservationByConfirmation;     private PreparedStatement psSearchReservation;          /** CqlSession holding metadata to interact with Cassandra. */     private CqlSession     cqlSession;     private CqlIdentifier  keyspaceName;      // TODO: Review variables used for publishing to Kafka     /** KafkaProducer for publishing messages to Kafka. */     private KafkaProducer<String, String> kafkaProducer;     private String kafkaTopicName;     private ObjectMapper objectMapper;          /** External Initialization. */     public ReservationRepository(             @NonNull CqlSession cqlSession,              @Qualifier("keyspace") @NonNull CqlIdentifier keyspaceName,             @NonNull KafkaProducer<String, String> kafkaProducer,             @NonNull String kafkaTopicName) {         this.cqlSession   = cqlSession;         this.keyspaceName = keyspaceName;          // TODO: Review initialization of objects needed for publishing to Kafka         this.kafkaProducer = kafkaProducer;         this.kafkaTopicName = kafkaTopicName;          objectMapper = new ObjectMapper();         objectMapper.registerModule(new JavaTimeModule());         objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);         objectMapper.enable(SerializationFeature.INDENT_OUTPUT);          // Will create tables (if they do not exist)         createReservationTables();                  // Prepare Statements of reservation         prepareStatements();         logger.info("Application initialized.");     }          /**      * CqlSession is a stateful object handling TCP connections to nodes in the cluster.      * This operation properly closes sockets when the application is stopped      */     @PreDestroy     public void cleanup() {         if (null != cqlSession) {             cqlSession.close();             logger.info("+ CqlSession has been successfully closed");         }     }          /**      * Testing existence is useful for building correct semantics in the RESTful API. To evaluate existence find the      * table where confirmation number is partition key which is reservations_by_confirmation      *       * @param confirmationNumber      *      unique identifier for confirmation      * @return      *      true if the reservation exists, false if it does not      */     public boolean exists(String confirmationNumber) {         return cqlSession.execute(psExistReservation.bind(confirmationNumber))                          .getAvailableWithoutFetching() > 0;     }          /**      * Similar to exists() but maps and parses results.      *       * @param confirmationNumber      *      unique identifier for confirmation      * @return      *      reservation if present or empty      */     @NonNull     public Optional<Reservation> findByConfirmationNumber(@NonNull String confirmationNumber) {                  ResultSet resultSet = cqlSession.execute(psFindReservation.bind(confirmationNumber));                  // Hint: an empty result might not be an error as this method is sometimes used to check whether a         // reservation with this confirmation number exists         Row row = resultSet.one();         if (row == null) {             logger.debug("Unable to load reservation with confirmation number: " + confirmationNumber);             return Optional.empty();         }                  // Hint: If there is a result, create a new reservation object and set the values         // Bonus: factor the logic to extract a reservation from a row into a separate method         // (you will reuse it again later in getAllReservations())         return Optional.of(mapRowToReservation(row));     }          /**      * Create new entry in multiple tables for this reservation.      *      * @param reservation      *      current reservation object      * @return      *      confirmation number for the reservation      *            */      public String upsert(Reservation reservation) {         Objects.requireNonNull(reservation);         if (null == reservation.getConfirmationNumber()) {             // Generating a new reservation number if none has been provided             reservation.setConfirmationNumber(UUID.randomUUID().toString());         }         // Insert into 'reservations_by_hotel_date'         BoundStatement bsInsertReservationByHotel =                  psInsertReservationByHotelDate.bind(reservation.getHotelId(), reservation.getStartDate(),                         reservation.getEndDate(), reservation.getRoomNumber(), reservation.getConfirmationNumber(),                         reservation.getGuestId());         // Insert into 'reservations_by_confirmation'         BoundStatement bsInsertReservationByConfirmation =                  psInsertReservationByConfirmation.bind(reservation.getConfirmationNumber(), reservation.getHotelId(),                         reservation.getStartDate(), reservation.getEndDate(), reservation.getRoomNumber(),                         reservation.getGuestId());         BatchStatement batchInsertReservation = BatchStatement                     .builder(DefaultBatchType.LOGGED)                     .addStatement(bsInsertReservationByHotel)                     .addStatement(bsInsertReservationByConfirmation)                     .build();         cqlSession.execute(batchInsertReservation);          // TODO: Publish message to Kafka containing reservation          try {              String reservationJson = objectMapper.writeValueAsString(reservation);              // HINT: use the constructor ProducerRecord(String topic, K key, V value)              // with the reservation confirmation number as the key, and the JSON string as the value              ProducerRecord<String, String> record = null;              kafkaProducer.send(record);          } catch (Exception e) {              logger.warn("Error publishing reservation message to Kafka: {}", e);          }          return reservation.getConfirmationNumber();     }      /**      * We pick 'reservations_by_confirmation' table to list reservations      * BUT we could have used 'reservations_by_hotel_date' (as no key provided in request)      *        * @return      *      list containing all reservations      */     public List<Reservation> findAll() {         return cqlSession.execute(selectFrom(keyspaceName, TABLE_RESERVATION_BY_CONFI).all().build())                   .all()                          // no paging we retrieve all objects                   .stream()                       // because we are good people                   .map(this::mapRowToReservation) // Mapping row as Reservation                   .collect(Collectors.toList());  // Back to list objects     }            /**      * Deleting a reservation.      *      * @param confirmationNumber      *      unique identifier for confirmation.      */     public boolean delete(String confirmationNumber) {          // Retrieving entire reservation in order to obtain the attributes we will need to delete from         // reservations_by_hotel_date table         Optional<Reservation> reservationToDelete = this.findByConfirmationNumber(confirmationNumber);          if (reservationToDelete.isPresent()) {              // Delete from 'reservations_by_hotel_date'             Reservation reservation = reservationToDelete.get();             BoundStatement bsDeleteReservationByHotelDate =                     psDeleteReservationByHotelDate.bind(reservation.getHotelId(),                             reservation.getStartDate(), reservation.getRoomNumber());              // Delete from 'reservations_by_confirmation'             BoundStatement bsDeleteReservationByConfirmation =                     psDeleteReservationByConfirmation.bind(confirmationNumber);              BatchStatement batchDeleteReservation = BatchStatement                     .builder(DefaultBatchType.LOGGED)                     .addStatement(bsDeleteReservationByHotelDate)                     .addStatement(bsDeleteReservationByConfirmation)                     .build();             cqlSession.execute(batchDeleteReservation);              // TODO: Publish message to Kafka with empty payload to indicate deletion             // HINT: use the constructor ProducerRecord(String topic, K key, V value)             // with the reservation confirmation number as the key, and an empty string as the value             ProducerRecord<String, String> record = null;             kafkaProducer.send(record);              return true;         }         return false;     }          /**      * Search all reservation for an hotel id and LocalDate.      *      * @param hotelId      *      hotel identifier      * @param date      *      searched Date      * @return      *      list of reservations matching the search criteria      */     public List<Reservation> findByHotelAndDate(String hotelId, LocalDate date) {         Objects.requireNonNull(hotelId);         Objects.requireNonNull(date);         return cqlSession.execute(psSearchReservation.bind(hotelId, date))                          .all()                          // no paging we retrieve all objects                          .stream()                       // because we are good people                          .map(this::mapRowToReservation) // Mapping row as Reservation                          .collect(Collectors.toList());  // Back to list objects     }      /**      * Utility method to marshal a row as expected Reservation Bean.      *      * @param row      *      current row from ResultSet      * @return      *      object      */     private Reservation mapRowToReservation(Row row) {         Reservation reservation = new Reservation();         reservation.setHotelId(row.getString(HOTEL_ID));         reservation.setConfirmationNumber(row.getString(CONFIRM_NUMBER));         reservation.setGuestId(row.getUuid(GUEST_ID));         reservation.setRoomNumber(row.getShort(ROOM_NUMBER));         reservation.setStartDate(row.getLocalDate(START_DATE));         reservation.setEndDate(row.getLocalDate(END_DATE));         return reservation;     }          /**      * Create Keyspace and relevant tables as per defined in 'reservation.cql'      */     public void createReservationTables() {                  /**          * Create TYPE 'Address' if not exists          *           * CREATE TYPE reservation.address (          *   street text,          *   city text,          *   state_or_province text,          *   postal_code text,          *   country text          * );          */         cqlSession.execute(                 createType(keyspaceName, TYPE_ADDRESS)                 .ifNotExists()                 .withField(STREET, DataTypes.TEXT)                 .withField(CITY, DataTypes.TEXT)                 .withField(STATE_PROVINCE, DataTypes.TEXT)                 .withField(POSTAL_CODE, DataTypes.TEXT)                 .withField(COUNTRY, DataTypes.TEXT)                 .build());         logger.debug("+ Type '{}' has been created (if needed)", TYPE_ADDRESS.asInternal());                  /**           * CREATE TABLE reservation.reservations_by_hotel_date (          *  hotel_id text,          *  start_date date,          *  end_date date,          *  room_number smallint,          *  confirm_number text,          *  guest_id uuid,          *  PRIMARY KEY ((hotel_id, start_date), room_number)          * );          */         cqlSession.execute(createTable(keyspaceName, TABLE_RESERVATION_BY_HOTEL_DATE)                         .ifNotExists()                         .withPartitionKey(HOTEL_ID, DataTypes.TEXT)                         .withPartitionKey(START_DATE, DataTypes.DATE)                         .withClusteringColumn(ROOM_NUMBER, DataTypes.SMALLINT)                         .withColumn(END_DATE, DataTypes.DATE)                         .withColumn(CONFIRM_NUMBER, DataTypes.TEXT)                         .withColumn(GUEST_ID, DataTypes.UUID)                         .withClusteringOrder(ROOM_NUMBER, ClusteringOrder.ASC)                         .withComment("Q7. Find reservations by hotel and date")                         .build());         logger.debug("+ Table '{}' has been created (if needed)", TABLE_RESERVATION_BY_HOTEL_DATE.asInternal());                  /**          * CREATE TABLE reservation.reservations_by_confirmation (          *   confirm_number text PRIMARY KEY,          *   hotel_id text,          *   start_date date,          *   end_date date,          *   room_number smallint,          *   guest_id uuid          * );          */         cqlSession.execute(createTable(keyspaceName, TABLE_RESERVATION_BY_CONFI)                 .ifNotExists()                 .withPartitionKey(CONFIRM_NUMBER, DataTypes.TEXT)                 .withColumn(HOTEL_ID, DataTypes.TEXT)                 .withColumn(START_DATE, DataTypes.DATE)                 .withColumn(END_DATE, DataTypes.DATE)                 .withColumn(ROOM_NUMBER, DataTypes.SMALLINT)                 .withColumn(GUEST_ID, DataTypes.UUID)                 .build());          logger.debug("+ Table '{}' has been created (if needed)", TABLE_RESERVATION_BY_CONFI.asInternal());                    /**           * CREATE TABLE reservation.reservations_by_guest (           *  guest_last_name text,           *  hotel_id text,           *  start_date date,           *  end_date date,           *  room_number smallint,           *  confirm_number text,           *  guest_id uuid,           *  PRIMARY KEY ((guest_last_name), hotel_id)           * );           */          cqlSession.execute(createTable(keyspaceName, TABLE_RESERVATION_BY_GUEST)                  .ifNotExists()                  .withPartitionKey(GUEST_LAST_NAME, DataTypes.TEXT)                  .withClusteringColumn(HOTEL_ID, DataTypes.TEXT)                  .withColumn(START_DATE, DataTypes.DATE)                  .withColumn(END_DATE, DataTypes.DATE)                  .withColumn(ROOM_NUMBER, DataTypes.SMALLINT)                  .withColumn(CONFIRM_NUMBER, DataTypes.TEXT)                  .withColumn(GUEST_ID, DataTypes.UUID)                  .withComment("Q8. Find reservations by guest name")                  .build());           logger.debug("+ Table '{}' has been created (if needed)", TABLE_RESERVATION_BY_GUEST.asInternal());                      /**            * CREATE TABLE reservation.guests (            *   guest_id uuid PRIMARY KEY,            *   first_name text,            *   last_name text,            *   title text,            *   emails set<text>,            *   phone_numbers list<text>,            *   addresses map<text, frozen<address>>,            *   confirm_number text            * );            */           UserDefinedType  udtAddressType =                    cqlSession.getMetadata().getKeyspace(keyspaceName).get() // Retrieving KeySpaceMetadata                             .getUserDefinedType(TYPE_ADDRESS).get();        // Looking for UDT (extending DataType)           cqlSession.execute(createTable(keyspaceName, TABLE_GUESTS)                   .ifNotExists()                   .withPartitionKey(GUEST_ID, DataTypes.UUID)                   .withColumn(FIRSTNAME, DataTypes.TEXT)                   .withColumn(LASTNAME, DataTypes.TEXT)                   .withColumn(TITLE, DataTypes.TEXT)                   .withColumn(EMAILS, DataTypes.setOf(DataTypes.TEXT))                   .withColumn(PHONE_NUMBERS, DataTypes.listOf(DataTypes.TEXT))                   .withColumn(ADDRESSES, DataTypes.mapOf(DataTypes.TEXT, udtAddressType, true))                   .withColumn(CONFIRM_NUMBER, DataTypes.TEXT)                   .withComment("Q9. Find guest by ID")                   .build());            logger.debug("+ Table '{}' has been created (if needed)", TABLE_GUESTS.asInternal());            logger.info("Schema has been successfully initialized.");     }      private void prepareStatements() {         if (psExistReservation == null) {             psExistReservation = cqlSession.prepare(selectFrom(keyspaceName, TABLE_RESERVATION_BY_CONFI).column(CONFIRM_NUMBER)                                 .where(column(CONFIRM_NUMBER).isEqualTo(bindMarker(CONFIRM_NUMBER)))                                 .build());             psFindReservation = cqlSession.prepare(                                 selectFrom(keyspaceName, TABLE_RESERVATION_BY_CONFI).all()                                 .where(column(CONFIRM_NUMBER).isEqualTo(bindMarker(CONFIRM_NUMBER)))                                 .build());             psSearchReservation = cqlSession.prepare(                                 selectFrom(keyspaceName, TABLE_RESERVATION_BY_HOTEL_DATE).all()                                 .where(column(HOTEL_ID).isEqualTo(bindMarker(HOTEL_ID)))                                 .where(column(START_DATE).isEqualTo(bindMarker(START_DATE)))                                 .build());             psDeleteReservationByConfirmation = cqlSession.prepare(                                 deleteFrom(keyspaceName, TABLE_RESERVATION_BY_CONFI)                                 .where(column(CONFIRM_NUMBER).isEqualTo(bindMarker(CONFIRM_NUMBER)))                                 .build());             psDeleteReservationByHotelDate = cqlSession.prepare(                     deleteFrom(keyspaceName, TABLE_RESERVATION_BY_HOTEL_DATE)                     .where(column(HOTEL_ID).isEqualTo(bindMarker(HOTEL_ID)))                     .where(column(START_DATE).isEqualTo(bindMarker(START_DATE)))                     .where(column(ROOM_NUMBER).isEqualTo(bindMarker(ROOM_NUMBER)))                     .build());             psInsertReservationByHotelDate = cqlSession.prepare(QueryBuilder.insertInto(keyspaceName, TABLE_RESERVATION_BY_HOTEL_DATE)                     .value(HOTEL_ID, bindMarker(HOTEL_ID))                     .value(START_DATE, bindMarker(START_DATE))                     .value(END_DATE, bindMarker(END_DATE))                     .value(ROOM_NUMBER, bindMarker(ROOM_NUMBER))                     .value(CONFIRM_NUMBER, bindMarker(CONFIRM_NUMBER))                     .value(GUEST_ID, bindMarker(GUEST_ID))                     .build());             psInsertReservationByConfirmation = cqlSession.prepare(QueryBuilder.insertInto(keyspaceName, TABLE_RESERVATION_BY_CONFI)                     .value(CONFIRM_NUMBER, bindMarker(CONFIRM_NUMBER))                     .value(HOTEL_ID, bindMarker(HOTEL_ID))                     .value(START_DATE, bindMarker(START_DATE))                     .value(END_DATE, bindMarker(END_DATE))                     .value(ROOM_NUMBER, bindMarker(ROOM_NUMBER))                     .value(GUEST_ID, bindMarker(GUEST_ID))                     .build());             logger.info("Statements have been successfully prepared.");         }     } }

Код, который создает подключение к Kafka reservation-service/src/main/java/dev/cassandraguide/conf/KafkaConfiguration.java:

package dev.cassandraguide.conf;  import dev.cassandraguide.repository.ReservationRepository; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;  import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties;  /**  * Import Configuration from Configuration File  *  * @author Jeff Carpenter  */ @Configuration public class KafkaConfiguration {      // Logger     private static final Logger logger = LoggerFactory.getLogger(ReservationRepository.class);      // Bootstrap servers     @Value("${kafka.bootstrap-servers:localhost:9092}")     protected String bootstrapServers;      // Kafka Client ID     @Value("${kafka.client-id:ReservationService}")     protected String clientId = "ReservationService";      // Topic Name     @Value("${kafka.topicName:reservation}")     public String topicName = "reservation";      /**      * Default configuration.      */     public KafkaConfiguration() {}      /**      * Initialization of Configuration.      *      * @param bootstrapServers      * @param clientId      * @param topicName      */     public KafkaConfiguration(             String bootstrapServers, String clientId, String topicName) {         super();         this.bootstrapServers = bootstrapServers;         this.clientId = clientId;         this.topicName = topicName;     }      @Bean     public KafkaProducer<String, String> kafkaProducer() {         logger.info("Creating Kafka Producer.");          Properties props = new Properties();         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);         props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());         return new KafkaProducer<>(props);     }      /**      * Getter accessor for attribute 'bootstrapServers'.      *      * @return      *       current value of 'bootstrapServers'      */     public String getBootstrapServers() {         return bootstrapServers;     }      /**      * Setter accessor for attribute 'bootstrapServers'.      * @param bootstrapServers      * new value for 'bootstrapServers'      */     public void setBootstrapServers(String bootstrapServers) {         this.bootstrapServers = bootstrapServers;     }      /**      * Getter accessor for attribute 'clientId'.      *      * @return      *       current value of 'clientId'      */     public String getClientId() {         return clientId;     }      /**      * Setter accessor for attribute 'clientId'.      * @param clientId      * new value for 'clientId '      */     public void setClientId(String clientId) {         this.clientId = clientId;     }      /**      * Getter accessor for attribute 'topicName'.      *      * @return      *       current value of 'topicName'      */     public String getTopicName() {         return topicName;     }      /**      * Setter accessor for attribute 'topicName'.      * @param topicName      * new value for 'topicName '      */     public void setTopicName(String topicName) {         this.topicName = topicName;     }  }

Обратите внимание, что служба резервирования использует файл конфигурации для хранения информации о конфигурации Cassandra и Kafka, которую мы можем просмотреть здесь reservation-service/src/main/resources/application.yml:

# ---------------------------------------------------------- # Spring Boot Config # ---------------------------------------------------------- spring:   application:     name: Reservation Reservicess   jackson:     serialization:       WRITE_DATES_AS_TIMESTAMPS: false server:   port: 8080  # ---------------------------------------------------------- # DataStax Enterprise Java Driver Config # ---------------------------------------------------------- cassandra:   contactPoint: 127.0.0.1   port: 9042   keyspaceName: reservation   localDataCenterName: datacenter1   dropSchema: false  # ---------------------------------------------------------- # Kafka Producer Config # ---------------------------------------------------------- kafka:   bootstrap-servers: localhost:9092   client-id: ReservationService   topicName: reservation 

Таблицы Cassandra для хранения бронирований

В дистрибутив Cassandra входит cqlsh, оболочка для выдачи команд на CQL. Запустим оболочку:

apache-cassandra-4.0-alpha4/bin/cqlsh

Мы должны увидеть приглашение cqlsh> в терминале.

Как только подсказка будет доступна, мы можем загрузить файл схемы, который мы просмотрели ранее, чтобы создать пространство ключей, содержащее таблицы для резервирования:

SOURCE 'reservation-service/src/main/resources/reservation.cql';

Теперь мы можем просмотреть только что созданную схему с помощью команды DESCRIBE:

DESCRIBE KEYSPACE reservation;

Давайте установим это как кейспейс для будущих команд:

USE reservation;

Обратите внимание, что существуют три разные таблицы для хранения данных резервирования с похожими столбцами, но с разными первичными ключами.

Наша естественная тенденция как специалистов по моделированию данных заключалась бы в том, чтобы сначала сосредоточиться на разработке таблиц для хранения бронирований и других записей, таких как профили гостей, и только потом начинать думать о запросах, которые будут получать к ним доступ. Но в моделировании данных Cassandra мы начинаем с наших запросов. Эти таблицы предназначены для поддержки запросов, которые определяют, как ваши пользователи будут получать доступ к бронированиям, что приводит к денормализованному дизайну.

Во-первых, таблица reservations_by_confirmation поддерживает поиск бронирований по уникальному номеру подтверждения, предоставленному клиенту во время бронирования:

CREATE TABLE reservations_by_confirmation  (  confirm_number text,  hotel_id text,  start_date date,  end_date date,  room_number smallint,  guest_id uuid,  PRIMARY KEY (confirm_number) )

Вот пример загрузки строки данных в эту таблицу:

INSERT INTO reservations_by_confirmation (confirm_number, hotel_id, start_date, end_date, room_number, guest_id) VALUES ('RS2G0Z', 'NY456', '2020-06-08', '2020-06-10', 111, 1b4d86f4-ccff-4256-a63d-45c905df2677);

Чтобы получить эту строку позже, мы использовали бы такой запрос:

SELECT * FROM reservations_by_confirmation WHERE confirm_number = 'RS2G0Z';

Во-вторых, таблица reservations_by_hotel_date позволяет персоналу отеля просматривать записи предстоящих бронирований по датам, чтобы получить представление о том, как работает отель, например, когда отель был заполнен или не заполнен:

CREATE TABLE reservations_by_hotel_date ( hotel_id text, start_date date, end_date date, room_number smallint, confirm_number text, guest_id uuid, PRIMARY KEY ((hotel_id, start_date), room_number) );

Вот пример загрузки той же брони, которую мы только что загрузили в reservations_by_confirmation, в эту дополнительную таблицу:

INSERT INTO reservations_by_hotel_date (confirm_number, hotel_id, start_date, end_date, room_number, guest_id) VALUES ('RS2G0Z', 'NY456', '2020-06-08', '2020-06-10', 111, 1b4d86f4-ccff-4256-a63d-45c905df2677);

Эта таблица может поддерживать два запроса. Во-первых, поскольку ключ раздела содержит hotel_id и start_date, мы можем получить все бронирования для определенного отеля и даты:

SELECT * FROM reservations_by_hotel_date WHERE hotel_id = 'NY456' AND start_date = '2020-06-08';

Также мы можем найти бронь на конкретный номер по дате:

SELECT * FROM reservations_by_hotel_date WHERE hotel_id = 'NY456' AND start_date = '2020-06-08' AND room_number = 111;

Точно так же таблицу reservations_by_guest можно использовать для поиска бронирования по имени гостя для гостя, который забыл свой номер подтверждения. Для целей мы сосредоточимся на таблицах reservations_by_confirmation и reservations_by_hotel_date.

Мы заметим, что класс ReservationRepository использует инструкции Cassandra BATCH для группировки изменений в таблицах reservations_by_confirmation и reservations_by_hotel_date, чтобы они выполнялись как одна операция CQL reservation-service/src/main/java/dev/cassandraguide/repository/ReservationRepository.java:

ReservationRepository.java
/*  * Copyright (C) 2017-2020 Jeff Carpenter  *  * Licensed under the Apache License, Version 2.0 (the "License");  * you may not use this file except in compliance with the License.  * You may obtain a copy of the License at  *  * http://www.apache.org/licenses/LICENSE-2.0  *  * Unless required by applicable law or agreed to in writing, software  * distributed under the License is distributed on an "AS IS" BASIS,  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  * See the License for the specific language governing permissions and  * limitations under the License.  */ package dev.cassandraguide.repository;  import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.bindMarker; import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.deleteFrom; import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.selectFrom; import static com.datastax.oss.driver.api.querybuilder.SchemaBuilder.createTable; import static com.datastax.oss.driver.api.querybuilder.SchemaBuilder.createType; import static com.datastax.oss.driver.api.querybuilder.relation.Relation.column;  import java.time.LocalDate; import java.util.List; import java.util.Objects; import java.util.Optional; import java.util.UUID; import java.util.stream.Collectors;  import javax.annotation.PreDestroy;  import com.fasterxml.jackson.core.JsonProcessingException; import dev.cassandraguide.model.Reservation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Profile; import org.springframework.lang.NonNull; import org.springframework.stereotype.Repository;  import com.datastax.oss.driver.api.core.CqlIdentifier; import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.cql.BatchStatement; import com.datastax.oss.driver.api.core.cql.BoundStatement; import com.datastax.oss.driver.api.core.cql.DefaultBatchType; import com.datastax.oss.driver.api.core.cql.PreparedStatement; import com.datastax.oss.driver.api.core.cql.ResultSet; import com.datastax.oss.driver.api.core.cql.Row; import com.datastax.oss.driver.api.core.metadata.schema.ClusteringOrder; import com.datastax.oss.driver.api.core.type.DataTypes; import com.datastax.oss.driver.api.core.type.UserDefinedType; import com.datastax.oss.driver.api.querybuilder.QueryBuilder;  // TODO: Review imports for publishing to Kafka import org.apache.kafka.clients.producer.*; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;  /**  * The goal of this project is to provide a minimally functional implementation of a microservice   * that uses Apache Cassandra for its data storage. The reservation service is implemented as a   * RESTful service using Spring Boot.  *  * @author Jeff Carpenter, Cedrick Lunven  */ @Repository @Profile("!unit-test") // When I do some 'unit-test' no connectivity to DB public class ReservationRepository {      /** Logger for the class. */     private static final Logger logger = LoggerFactory.getLogger(ReservationRepository.class);          // Reservation Schema Constants     public static final CqlIdentifier TYPE_ADDRESS               = CqlIdentifier.fromCql("address");     public static final CqlIdentifier TABLE_RESERVATION_BY_HOTEL_DATE =             CqlIdentifier.fromCql("reservations_by_hotel_date");     public static final CqlIdentifier TABLE_RESERVATION_BY_CONFI = CqlIdentifier.fromCql("reservations_by_confirmation");     public static final CqlIdentifier TABLE_RESERVATION_BY_GUEST = CqlIdentifier.fromCql("reservations_by_guest");     public static final CqlIdentifier TABLE_GUESTS               = CqlIdentifier.fromCql("guests");     public static final CqlIdentifier STREET                     = CqlIdentifier.fromCql("street");     public static final CqlIdentifier CITY                       = CqlIdentifier.fromCql("city");     public static final CqlIdentifier STATE_PROVINCE             = CqlIdentifier.fromCql("state_or_province");     public static final CqlIdentifier POSTAL_CODE                = CqlIdentifier.fromCql("postal_code");     public static final CqlIdentifier COUNTRY                    = CqlIdentifier.fromCql("country");     public static final CqlIdentifier HOTEL_ID                   = CqlIdentifier.fromCql("hotel_id");     public static final CqlIdentifier START_DATE                 = CqlIdentifier.fromCql("start_date");     public static final CqlIdentifier END_DATE                   = CqlIdentifier.fromCql("end_date");     public static final CqlIdentifier ROOM_NUMBER                = CqlIdentifier.fromCql("room_number");     public static final CqlIdentifier CONFIRM_NUMBER             = CqlIdentifier.fromCql("confirm_number");     public static final CqlIdentifier GUEST_ID                   = CqlIdentifier.fromCql("guest_id");     public static final CqlIdentifier GUEST_LAST_NAME            = CqlIdentifier.fromCql("guest_last_name");     public static final CqlIdentifier FIRSTNAME                  = CqlIdentifier.fromCql("first_name");     public static final CqlIdentifier LASTNAME                   = CqlIdentifier.fromCql("last_name");     public static final CqlIdentifier TITLE                      = CqlIdentifier.fromCql("title");     public static final CqlIdentifier EMAILS                     = CqlIdentifier.fromCql("emails");     public static final CqlIdentifier PHONE_NUMBERS              = CqlIdentifier.fromCql("phone_numbers");     public static final CqlIdentifier ADDRESSES                  = CqlIdentifier.fromCql("addresses");          private PreparedStatement psExistReservation;     private PreparedStatement psFindReservation;     private PreparedStatement psInsertReservationByHotelDate;     private PreparedStatement psInsertReservationByConfirmation;     private PreparedStatement psDeleteReservationByHotelDate;     private PreparedStatement psDeleteReservationByConfirmation;     private PreparedStatement psSearchReservation;          /** CqlSession holding metadata to interact with Cassandra. */     private CqlSession     cqlSession;     private CqlIdentifier  keyspaceName;      // TODO: Review variables used for publishing to Kafka     /** KafkaProducer for publishing messages to Kafka. */     private KafkaProducer<String, String> kafkaProducer;     private String kafkaTopicName;     private ObjectMapper objectMapper;          /** External Initialization. */     public ReservationRepository(             @NonNull CqlSession cqlSession,              @Qualifier("keyspace") @NonNull CqlIdentifier keyspaceName,             @NonNull KafkaProducer<String, String> kafkaProducer,             @NonNull String kafkaTopicName) {         this.cqlSession   = cqlSession;         this.keyspaceName = keyspaceName;          // TODO: Review initialization of objects needed for publishing to Kafka         this.kafkaProducer = kafkaProducer;         this.kafkaTopicName = kafkaTopicName;          objectMapper = new ObjectMapper();         objectMapper.registerModule(new JavaTimeModule());         objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);         objectMapper.enable(SerializationFeature.INDENT_OUTPUT);          // Will create tables (if they do not exist)         createReservationTables();                  // Prepare Statements of reservation         prepareStatements();         logger.info("Application initialized.");     }          /**      * CqlSession is a stateful object handling TCP connections to nodes in the cluster.      * This operation properly closes sockets when the application is stopped      */     @PreDestroy     public void cleanup() {         if (null != cqlSession) {             cqlSession.close();             logger.info("+ CqlSession has been successfully closed");         }     }          /**      * Testing existence is useful for building correct semantics in the RESTful API. To evaluate existence find the      * table where confirmation number is partition key which is reservations_by_confirmation      *       * @param confirmationNumber      *      unique identifier for confirmation      * @return      *      true if the reservation exists, false if it does not      */     public boolean exists(String confirmationNumber) {         return cqlSession.execute(psExistReservation.bind(confirmationNumber))                          .getAvailableWithoutFetching() > 0;     }          /**      * Similar to exists() but maps and parses results.      *       * @param confirmationNumber      *      unique identifier for confirmation      * @return      *      reservation if present or empty      */     @NonNull     public Optional<Reservation> findByConfirmationNumber(@NonNull String confirmationNumber) {                  ResultSet resultSet = cqlSession.execute(psFindReservation.bind(confirmationNumber));                  // Hint: an empty result might not be an error as this method is sometimes used to check whether a         // reservation with this confirmation number exists         Row row = resultSet.one();         if (row == null) {             logger.debug("Unable to load reservation with confirmation number: " + confirmationNumber);             return Optional.empty();         }                  // Hint: If there is a result, create a new reservation object and set the values         // Bonus: factor the logic to extract a reservation from a row into a separate method         // (you will reuse it again later in getAllReservations())         return Optional.of(mapRowToReservation(row));     }          /**      * Create new entry in multiple tables for this reservation.      *      * @param reservation      *      current reservation object      * @return      *      confirmation number for the reservation      *            */      public String upsert(Reservation reservation) {         Objects.requireNonNull(reservation);         if (null == reservation.getConfirmationNumber()) {             // Generating a new reservation number if none has been provided             reservation.setConfirmationNumber(UUID.randomUUID().toString());         }         // Insert into 'reservations_by_hotel_date'         BoundStatement bsInsertReservationByHotel =                  psInsertReservationByHotelDate.bind(reservation.getHotelId(), reservation.getStartDate(),                         reservation.getEndDate(), reservation.getRoomNumber(), reservation.getConfirmationNumber(),                         reservation.getGuestId());         // Insert into 'reservations_by_confirmation'         BoundStatement bsInsertReservationByConfirmation =                  psInsertReservationByConfirmation.bind(reservation.getConfirmationNumber(), reservation.getHotelId(),                         reservation.getStartDate(), reservation.getEndDate(), reservation.getRoomNumber(),                         reservation.getGuestId());         BatchStatement batchInsertReservation = BatchStatement                     .builder(DefaultBatchType.LOGGED)                     .addStatement(bsInsertReservationByHotel)                     .addStatement(bsInsertReservationByConfirmation)                     .build();         cqlSession.execute(batchInsertReservation);          // TODO: Publish message to Kafka containing reservation          try {              String reservationJson = objectMapper.writeValueAsString(reservation);              // HINT: use the constructor ProducerRecord(String topic, K key, V value)              // with the reservation confirmation number as the key, and the JSON string as the value              ProducerRecord<String, String> record = null;              kafkaProducer.send(record);          } catch (Exception e) {              logger.warn("Error publishing reservation message to Kafka: {}", e);          }          return reservation.getConfirmationNumber();     }      /**      * We pick 'reservations_by_confirmation' table to list reservations      * BUT we could have used 'reservations_by_hotel_date' (as no key provided in request)      *        * @return      *      list containing all reservations      */     public List<Reservation> findAll() {         return cqlSession.execute(selectFrom(keyspaceName, TABLE_RESERVATION_BY_CONFI).all().build())                   .all()                          // no paging we retrieve all objects                   .stream()                       // because we are good people                   .map(this::mapRowToReservation) // Mapping row as Reservation                   .collect(Collectors.toList());  // Back to list objects     }            /**      * Deleting a reservation.      *      * @param confirmationNumber      *      unique identifier for confirmation.      */     public boolean delete(String confirmationNumber) {          // Retrieving entire reservation in order to obtain the attributes we will need to delete from         // reservations_by_hotel_date table         Optional<Reservation> reservationToDelete = this.findByConfirmationNumber(confirmationNumber);          if (reservationToDelete.isPresent()) {              // Delete from 'reservations_by_hotel_date'             Reservation reservation = reservationToDelete.get();             BoundStatement bsDeleteReservationByHotelDate =                     psDeleteReservationByHotelDate.bind(reservation.getHotelId(),                             reservation.getStartDate(), reservation.getRoomNumber());              // Delete from 'reservations_by_confirmation'             BoundStatement bsDeleteReservationByConfirmation =                     psDeleteReservationByConfirmation.bind(confirmationNumber);              BatchStatement batchDeleteReservation = BatchStatement                     .builder(DefaultBatchType.LOGGED)                     .addStatement(bsDeleteReservationByHotelDate)                     .addStatement(bsDeleteReservationByConfirmation)                     .build();             cqlSession.execute(batchDeleteReservation);              // TODO: Publish message to Kafka with empty payload to indicate deletion             // HINT: use the constructor ProducerRecord(String topic, K key, V value)             // with the reservation confirmation number as the key, and an empty string as the value             ProducerRecord<String, String> record = null;             kafkaProducer.send(record);              return true;         }         return false;     }          /**      * Search all reservation for an hotel id and LocalDate.      *      * @param hotelId      *      hotel identifier      * @param date      *      searched Date      * @return      *      list of reservations matching the search criteria      */     public List<Reservation> findByHotelAndDate(String hotelId, LocalDate date) {         Objects.requireNonNull(hotelId);         Objects.requireNonNull(date);         return cqlSession.execute(psSearchReservation.bind(hotelId, date))                          .all()                          // no paging we retrieve all objects                          .stream()                       // because we are good people                          .map(this::mapRowToReservation) // Mapping row as Reservation                          .collect(Collectors.toList());  // Back to list objects     }      /**      * Utility method to marshal a row as expected Reservation Bean.      *      * @param row      *      current row from ResultSet      * @return      *      object      */     private Reservation mapRowToReservation(Row row) {         Reservation reservation = new Reservation();         reservation.setHotelId(row.getString(HOTEL_ID));         reservation.setConfirmationNumber(row.getString(CONFIRM_NUMBER));         reservation.setGuestId(row.getUuid(GUEST_ID));         reservation.setRoomNumber(row.getShort(ROOM_NUMBER));         reservation.setStartDate(row.getLocalDate(START_DATE));         reservation.setEndDate(row.getLocalDate(END_DATE));         return reservation;     }          /**      * Create Keyspace and relevant tables as per defined in 'reservation.cql'      */     public void createReservationTables() {                  /**          * Create TYPE 'Address' if not exists          *           * CREATE TYPE reservation.address (          *   street text,          *   city text,          *   state_or_province text,          *   postal_code text,          *   country text          * );          */         cqlSession.execute(                 createType(keyspaceName, TYPE_ADDRESS)                 .ifNotExists()                 .withField(STREET, DataTypes.TEXT)                 .withField(CITY, DataTypes.TEXT)                 .withField(STATE_PROVINCE, DataTypes.TEXT)                 .withField(POSTAL_CODE, DataTypes.TEXT)                 .withField(COUNTRY, DataTypes.TEXT)                 .build());         logger.debug("+ Type '{}' has been created (if needed)", TYPE_ADDRESS.asInternal());                  /**           * CREATE TABLE reservation.reservations_by_hotel_date (          *  hotel_id text,          *  start_date date,          *  end_date date,          *  room_number smallint,          *  confirm_number text,          *  guest_id uuid,          *  PRIMARY KEY ((hotel_id, start_date), room_number)          * );          */         cqlSession.execute(createTable(keyspaceName, TABLE_RESERVATION_BY_HOTEL_DATE)                         .ifNotExists()                         .withPartitionKey(HOTEL_ID, DataTypes.TEXT)                         .withPartitionKey(START_DATE, DataTypes.DATE)                         .withClusteringColumn(ROOM_NUMBER, DataTypes.SMALLINT)                         .withColumn(END_DATE, DataTypes.DATE)                         .withColumn(CONFIRM_NUMBER, DataTypes.TEXT)                         .withColumn(GUEST_ID, DataTypes.UUID)                         .withClusteringOrder(ROOM_NUMBER, ClusteringOrder.ASC)                         .withComment("Q7. Find reservations by hotel and date")                         .build());         logger.debug("+ Table '{}' has been created (if needed)", TABLE_RESERVATION_BY_HOTEL_DATE.asInternal());                  /**          * CREATE TABLE reservation.reservations_by_confirmation (          *   confirm_number text PRIMARY KEY,          *   hotel_id text,          *   start_date date,          *   end_date date,          *   room_number smallint,          *   guest_id uuid          * );          */         cqlSession.execute(createTable(keyspaceName, TABLE_RESERVATION_BY_CONFI)                 .ifNotExists()                 .withPartitionKey(CONFIRM_NUMBER, DataTypes.TEXT)                 .withColumn(HOTEL_ID, DataTypes.TEXT)                 .withColumn(START_DATE, DataTypes.DATE)                 .withColumn(END_DATE, DataTypes.DATE)                 .withColumn(ROOM_NUMBER, DataTypes.SMALLINT)                 .withColumn(GUEST_ID, DataTypes.UUID)                 .build());          logger.debug("+ Table '{}' has been created (if needed)", TABLE_RESERVATION_BY_CONFI.asInternal());                    /**           * CREATE TABLE reservation.reservations_by_guest (           *  guest_last_name text,           *  hotel_id text,           *  start_date date,           *  end_date date,           *  room_number smallint,           *  confirm_number text,           *  guest_id uuid,           *  PRIMARY KEY ((guest_last_name), hotel_id)           * );           */          cqlSession.execute(createTable(keyspaceName, TABLE_RESERVATION_BY_GUEST)                  .ifNotExists()                  .withPartitionKey(GUEST_LAST_NAME, DataTypes.TEXT)                  .withClusteringColumn(HOTEL_ID, DataTypes.TEXT)                  .withColumn(START_DATE, DataTypes.DATE)                  .withColumn(END_DATE, DataTypes.DATE)                  .withColumn(ROOM_NUMBER, DataTypes.SMALLINT)                  .withColumn(CONFIRM_NUMBER, DataTypes.TEXT)                  .withColumn(GUEST_ID, DataTypes.UUID)                  .withComment("Q8. Find reservations by guest name")                  .build());           logger.debug("+ Table '{}' has been created (if needed)", TABLE_RESERVATION_BY_GUEST.asInternal());                      /**            * CREATE TABLE reservation.guests (            *   guest_id uuid PRIMARY KEY,            *   first_name text,            *   last_name text,            *   title text,            *   emails set<text>,            *   phone_numbers list<text>,            *   addresses map<text, frozen<address>>,            *   confirm_number text            * );            */           UserDefinedType  udtAddressType =                    cqlSession.getMetadata().getKeyspace(keyspaceName).get() // Retrieving KeySpaceMetadata                             .getUserDefinedType(TYPE_ADDRESS).get();        // Looking for UDT (extending DataType)           cqlSession.execute(createTable(keyspaceName, TABLE_GUESTS)                   .ifNotExists()                   .withPartitionKey(GUEST_ID, DataTypes.UUID)                   .withColumn(FIRSTNAME, DataTypes.TEXT)                   .withColumn(LASTNAME, DataTypes.TEXT)                   .withColumn(TITLE, DataTypes.TEXT)                   .withColumn(EMAILS, DataTypes.setOf(DataTypes.TEXT))                   .withColumn(PHONE_NUMBERS, DataTypes.listOf(DataTypes.TEXT))                   .withColumn(ADDRESSES, DataTypes.mapOf(DataTypes.TEXT, udtAddressType, true))                   .withColumn(CONFIRM_NUMBER, DataTypes.TEXT)                   .withComment("Q9. Find guest by ID")                   .build());            logger.debug("+ Table '{}' has been created (if needed)", TABLE_GUESTS.asInternal());            logger.info("Schema has been successfully initialized.");     }      private void prepareStatements() {         if (psExistReservation == null) {             psExistReservation = cqlSession.prepare(selectFrom(keyspaceName, TABLE_RESERVATION_BY_CONFI).column(CONFIRM_NUMBER)                                 .where(column(CONFIRM_NUMBER).isEqualTo(bindMarker(CONFIRM_NUMBER)))                                 .build());             psFindReservation = cqlSession.prepare(                                 selectFrom(keyspaceName, TABLE_RESERVATION_BY_CONFI).all()                                 .where(column(CONFIRM_NUMBER).isEqualTo(bindMarker(CONFIRM_NUMBER)))                                 .build());             psSearchReservation = cqlSession.prepare(                                 selectFrom(keyspaceName, TABLE_RESERVATION_BY_HOTEL_DATE).all()                                 .where(column(HOTEL_ID).isEqualTo(bindMarker(HOTEL_ID)))                                 .where(column(START_DATE).isEqualTo(bindMarker(START_DATE)))                                 .build());             psDeleteReservationByConfirmation = cqlSession.prepare(                                 deleteFrom(keyspaceName, TABLE_RESERVATION_BY_CONFI)                                 .where(column(CONFIRM_NUMBER).isEqualTo(bindMarker(CONFIRM_NUMBER)))                                 .build());             psDeleteReservationByHotelDate = cqlSession.prepare(                     deleteFrom(keyspaceName, TABLE_RESERVATION_BY_HOTEL_DATE)                     .where(column(HOTEL_ID).isEqualTo(bindMarker(HOTEL_ID)))                     .where(column(START_DATE).isEqualTo(bindMarker(START_DATE)))                     .where(column(ROOM_NUMBER).isEqualTo(bindMarker(ROOM_NUMBER)))                     .build());             psInsertReservationByHotelDate = cqlSession.prepare(QueryBuilder.insertInto(keyspaceName, TABLE_RESERVATION_BY_HOTEL_DATE)                     .value(HOTEL_ID, bindMarker(HOTEL_ID))                     .value(START_DATE, bindMarker(START_DATE))                     .value(END_DATE, bindMarker(END_DATE))                     .value(ROOM_NUMBER, bindMarker(ROOM_NUMBER))                     .value(CONFIRM_NUMBER, bindMarker(CONFIRM_NUMBER))                     .value(GUEST_ID, bindMarker(GUEST_ID))                     .build());             psInsertReservationByConfirmation = cqlSession.prepare(QueryBuilder.insertInto(keyspaceName, TABLE_RESERVATION_BY_CONFI)                     .value(CONFIRM_NUMBER, bindMarker(CONFIRM_NUMBER))                     .value(HOTEL_ID, bindMarker(HOTEL_ID))                     .value(START_DATE, bindMarker(START_DATE))                     .value(END_DATE, bindMarker(END_DATE))                     .value(ROOM_NUMBER, bindMarker(ROOM_NUMBER))                     .value(GUEST_ID, bindMarker(GUEST_ID))                     .build());             logger.info("Statements have been successfully prepared.");         }     } }

Теперь мы закончили использовать cqlsh, давайте выйдем:

Exit

Устанавливаем и запускаем Apache Kafka

Теперь давайте сосредоточимся на расширении службы резервирования, чтобы она публиковала события в Apache Kafka.

Сначала скачиваем и устанавливаем Kafka:

wget http://archive.apache.org/dist/kafka/2.5.0/kafka_2.12-2.5.0.tgz

tar xzf kafka_2.12-2.5.0.tgz

Теперь у вас есть папка в вашей файловой системе с именем _kafka_x.x-x.x.x_ (на основе версий Scala и Kafka соответственно).

Чтобы запустить кластер Kafka с одним узлом, вам нужно запустить два процесса: Zookeeper и брокер Kafka. Kafka использует Zookeeper для управления кластером. Он работает вместе с каждым брокером и гарантирует, что брокер может взаимодействовать с кластером. Zookeeper должен быть запущен, чтобы брокер мог получать или передавать сообщения. Запускаем Zookeeper командой:

kafka_2.12-2.5.0/bin/zookeeper-server-start.sh kafka_2.12-2.5.0/config/zookeeper.properties &> zookeeper_start.log &

Далее запускаем брокера Kafka:

kafka_2.12-2.5.0/bin/kafka-server-start.sh kafka_2.12-2.5.0/config/server.properties &> kafka_start.log &

Создаем Kafka Topic и публикуем сообщения

Kafka поддерживает стиль обмена сообщениями публикации и подписки, при котором сообщения публикуются в темах в формате ключ-значение. Подобно Cassandra, Kafka разделяет данные с помощью ключа и реплицирует данные между несколькими узлами, известными как брокеры в Kafka.

Теперь давайте создадим топик Kafka для данных бронирования:

kafka_2.12-2.5.0/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic reservation --config retention.ms=-1

Далее мы обновим службу резервирования, чтобы публиковать ее в разделе Kafka о бронировании каждый раз, когда резервирование изменяется или удаляется. Мы будем вводить элементы API понемногу, и сначала мы получим опыт работы с ними в jshell, Java REPL, прежде чем обновлять фактический код в службе бронирования:

cd reservation-service mvn -q compile com.github.johnpoth:jshell-maven-plugin:1.3:run

Давайте создадим KafkaProducer, который мы можем использовать для публикации сообщений в топике Kafka:

import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.CLIENT_ID_CONFIG, "ReservationService"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); KafkaProducer<String, String> producer = new KafkaProducer<>(props);

Давайте создадим объект Reservation, используя класс, который служба Reservation использует для представления обрабатываемого нами бронирования:

import java.time.LocalDate; import java.util.UUID; import dev.cassandraguide.model.Reservation; String confirmationNumber = "RS2G0Z" Reservation reservation = new Reservation(); reservation.setConfirmationNumber(confirmationNumber); reservation.setStartDate(LocalDate.now()); reservation.setEndDate(LocalDate.now().plusDays(2)); reservation.setHotelId("NY456"); reservation.setGuestId(UUID.fromString("1b4d86f4-ccff-4256-a63d-45c905df2677")); reservation.setRoomNumber((short)111);

Теперь сериализуйте этот класс в строку JSON, используя Jackson ObjectMapper:

Параметры, выбранные в objectMapper, предназначены для упрощения форматирования даты и красивой печати JSON. Чтобы увидеть результирующую строку, мы выполняем:

System.out.println(reservationJson);

Теперь мы можем опубликовать сообщение в топике бронирования Kafka, используя confirmNumber в качестве ключа и reservationJson в качестве полезной нагрузки:

ProducerRecord<String, String> record = new ProducerRecord<>("reservation", confirmationNumber, reservationJson); producer.send(record);

Выйдем из jshell: exit

Давайте воспользуемся простым консьюмером командной строки, чтобы прочитать сообщение, которое мы только что опубликовали:

cd kafka_2.12-2.5.0/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic reservation --max-messages 10

Выйти из консьюмера можно с помощью Ctrl+C.

Обновление службы бронирования для публикации в Kafka

Теперь, когда у нас есть некоторый опыт публикации в Kafka, давайте обновим службу резервирования, чтобы публиковать события в Kafka при изменении или удалении резервирования. Взглянем на класс ReservationRepository:

ReservationRepository.java
/*  * Copyright (C) 2017-2020 Jeff Carpenter  *  * Licensed under the Apache License, Version 2.0 (the "License");  * you may not use this file except in compliance with the License.  * You may obtain a copy of the License at  *  * http://www.apache.org/licenses/LICENSE-2.0  *  * Unless required by applicable law or agreed to in writing, software  * distributed under the License is distributed on an "AS IS" BASIS,  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  * See the License for the specific language governing permissions and  * limitations under the License.  */ package dev.cassandraguide.repository;  import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.bindMarker; import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.deleteFrom; import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.selectFrom; import static com.datastax.oss.driver.api.querybuilder.SchemaBuilder.createTable; import static com.datastax.oss.driver.api.querybuilder.SchemaBuilder.createType; import static com.datastax.oss.driver.api.querybuilder.relation.Relation.column;  import java.time.LocalDate; import java.util.List; import java.util.Objects; import java.util.Optional; import java.util.UUID; import java.util.stream.Collectors;  import javax.annotation.PreDestroy;  import com.fasterxml.jackson.core.JsonProcessingException; import dev.cassandraguide.model.Reservation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Profile; import org.springframework.lang.NonNull; import org.springframework.stereotype.Repository;  import com.datastax.oss.driver.api.core.CqlIdentifier; import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.cql.BatchStatement; import com.datastax.oss.driver.api.core.cql.BoundStatement; import com.datastax.oss.driver.api.core.cql.DefaultBatchType; import com.datastax.oss.driver.api.core.cql.PreparedStatement; import com.datastax.oss.driver.api.core.cql.ResultSet; import com.datastax.oss.driver.api.core.cql.Row; import com.datastax.oss.driver.api.core.metadata.schema.ClusteringOrder; import com.datastax.oss.driver.api.core.type.DataTypes; import com.datastax.oss.driver.api.core.type.UserDefinedType; import com.datastax.oss.driver.api.querybuilder.QueryBuilder;  // TODO: Review imports for publishing to Kafka import org.apache.kafka.clients.producer.*; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;  /**  * The goal of this project is to provide a minimally functional implementation of a microservice   * that uses Apache Cassandra for its data storage. The reservation service is implemented as a   * RESTful service using Spring Boot.  *  * @author Jeff Carpenter, Cedrick Lunven  */ @Repository @Profile("!unit-test") // When I do some 'unit-test' no connectivity to DB public class ReservationRepository {      /** Logger for the class. */     private static final Logger logger = LoggerFactory.getLogger(ReservationRepository.class);          // Reservation Schema Constants     public static final CqlIdentifier TYPE_ADDRESS               = CqlIdentifier.fromCql("address");     public static final CqlIdentifier TABLE_RESERVATION_BY_HOTEL_DATE =             CqlIdentifier.fromCql("reservations_by_hotel_date");     public static final CqlIdentifier TABLE_RESERVATION_BY_CONFI = CqlIdentifier.fromCql("reservations_by_confirmation");     public static final CqlIdentifier TABLE_RESERVATION_BY_GUEST = CqlIdentifier.fromCql("reservations_by_guest");     public static final CqlIdentifier TABLE_GUESTS               = CqlIdentifier.fromCql("guests");     public static final CqlIdentifier STREET                     = CqlIdentifier.fromCql("street");     public static final CqlIdentifier CITY                       = CqlIdentifier.fromCql("city");     public static final CqlIdentifier STATE_PROVINCE             = CqlIdentifier.fromCql("state_or_province");     public static final CqlIdentifier POSTAL_CODE                = CqlIdentifier.fromCql("postal_code");     public static final CqlIdentifier COUNTRY                    = CqlIdentifier.fromCql("country");     public static final CqlIdentifier HOTEL_ID                   = CqlIdentifier.fromCql("hotel_id");     public static final CqlIdentifier START_DATE                 = CqlIdentifier.fromCql("start_date");     public static final CqlIdentifier END_DATE                   = CqlIdentifier.fromCql("end_date");     public static final CqlIdentifier ROOM_NUMBER                = CqlIdentifier.fromCql("room_number");     public static final CqlIdentifier CONFIRM_NUMBER             = CqlIdentifier.fromCql("confirm_number");     public static final CqlIdentifier GUEST_ID                   = CqlIdentifier.fromCql("guest_id");     public static final CqlIdentifier GUEST_LAST_NAME            = CqlIdentifier.fromCql("guest_last_name");     public static final CqlIdentifier FIRSTNAME                  = CqlIdentifier.fromCql("first_name");     public static final CqlIdentifier LASTNAME                   = CqlIdentifier.fromCql("last_name");     public static final CqlIdentifier TITLE                      = CqlIdentifier.fromCql("title");     public static final CqlIdentifier EMAILS                     = CqlIdentifier.fromCql("emails");     public static final CqlIdentifier PHONE_NUMBERS              = CqlIdentifier.fromCql("phone_numbers");     public static final CqlIdentifier ADDRESSES                  = CqlIdentifier.fromCql("addresses");          private PreparedStatement psExistReservation;     private PreparedStatement psFindReservation;     private PreparedStatement psInsertReservationByHotelDate;     private PreparedStatement psInsertReservationByConfirmation;     private PreparedStatement psDeleteReservationByHotelDate;     private PreparedStatement psDeleteReservationByConfirmation;     private PreparedStatement psSearchReservation;          /** CqlSession holding metadata to interact with Cassandra. */     private CqlSession     cqlSession;     private CqlIdentifier  keyspaceName;      // TODO: Review variables used for publishing to Kafka     /** KafkaProducer for publishing messages to Kafka. */     private KafkaProducer<String, String> kafkaProducer;     private String kafkaTopicName;     private ObjectMapper objectMapper;          /** External Initialization. */     public ReservationRepository(             @NonNull CqlSession cqlSession,              @Qualifier("keyspace") @NonNull CqlIdentifier keyspaceName,             @NonNull KafkaProducer<String, String> kafkaProducer,             @NonNull String kafkaTopicName) {         this.cqlSession   = cqlSession;         this.keyspaceName = keyspaceName;          // TODO: Review initialization of objects needed for publishing to Kafka         this.kafkaProducer = kafkaProducer;         this.kafkaTopicName = kafkaTopicName;          objectMapper = new ObjectMapper();         objectMapper.registerModule(new JavaTimeModule());         objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);         objectMapper.enable(SerializationFeature.INDENT_OUTPUT);          // Will create tables (if they do not exist)         createReservationTables();                  // Prepare Statements of reservation         prepareStatements();         logger.info("Application initialized.");     }          /**      * CqlSession is a stateful object handling TCP connections to nodes in the cluster.      * This operation properly closes sockets when the application is stopped      */     @PreDestroy     public void cleanup() {         if (null != cqlSession) {             cqlSession.close();             logger.info("+ CqlSession has been successfully closed");         }     }          /**      * Testing existence is useful for building correct semantics in the RESTful API. To evaluate existence find the      * table where confirmation number is partition key which is reservations_by_confirmation      *       * @param confirmationNumber      *      unique identifier for confirmation      * @return      *      true if the reservation exists, false if it does not      */     public boolean exists(String confirmationNumber) {         return cqlSession.execute(psExistReservation.bind(confirmationNumber))                          .getAvailableWithoutFetching() > 0;     }          /**      * Similar to exists() but maps and parses results.      *       * @param confirmationNumber      *      unique identifier for confirmation      * @return      *      reservation if present or empty      */     @NonNull     public Optional<Reservation> findByConfirmationNumber(@NonNull String confirmationNumber) {                  ResultSet resultSet = cqlSession.execute(psFindReservation.bind(confirmationNumber));                  // Hint: an empty result might not be an error as this method is sometimes used to check whether a         // reservation with this confirmation number exists         Row row = resultSet.one();         if (row == null) {             logger.debug("Unable to load reservation with confirmation number: " + confirmationNumber);             return Optional.empty();         }                  // Hint: If there is a result, create a new reservation object and set the values         // Bonus: factor the logic to extract a reservation from a row into a separate method         // (you will reuse it again later in getAllReservations())         return Optional.of(mapRowToReservation(row));     }          /**      * Create new entry in multiple tables for this reservation.      *      * @param reservation      *      current reservation object      * @return      *      confirmation number for the reservation      *            */      public String upsert(Reservation reservation) {         Objects.requireNonNull(reservation);         if (null == reservation.getConfirmationNumber()) {             // Generating a new reservation number if none has been provided             reservation.setConfirmationNumber(UUID.randomUUID().toString());         }         // Insert into 'reservations_by_hotel_date'         BoundStatement bsInsertReservationByHotel =                  psInsertReservationByHotelDate.bind(reservation.getHotelId(), reservation.getStartDate(),                         reservation.getEndDate(), reservation.getRoomNumber(), reservation.getConfirmationNumber(),                         reservation.getGuestId());         // Insert into 'reservations_by_confirmation'         BoundStatement bsInsertReservationByConfirmation =                  psInsertReservationByConfirmation.bind(reservation.getConfirmationNumber(), reservation.getHotelId(),                         reservation.getStartDate(), reservation.getEndDate(), reservation.getRoomNumber(),                         reservation.getGuestId());         BatchStatement batchInsertReservation = BatchStatement                     .builder(DefaultBatchType.LOGGED)                     .addStatement(bsInsertReservationByHotel)                     .addStatement(bsInsertReservationByConfirmation)                     .build();         cqlSession.execute(batchInsertReservation);          // TODO: Publish message to Kafka containing reservation          try {              String reservationJson = objectMapper.writeValueAsString(reservation);              // HINT: use the constructor ProducerRecord(String topic, K key, V value)              // with the reservation confirmation number as the key, and the JSON string as the value              ProducerRecord<String, String> record = null;              kafkaProducer.send(record);          } catch (Exception e) {              logger.warn("Error publishing reservation message to Kafka: {}", e);          }          return reservation.getConfirmationNumber();     }      /**      * We pick 'reservations_by_confirmation' table to list reservations      * BUT we could have used 'reservations_by_hotel_date' (as no key provided in request)      *        * @return      *      list containing all reservations      */     public List<Reservation> findAll() {         return cqlSession.execute(selectFrom(keyspaceName, TABLE_RESERVATION_BY_CONFI).all().build())                   .all()                          // no paging we retrieve all objects                   .stream()                       // because we are good people                   .map(this::mapRowToReservation) // Mapping row as Reservation                   .collect(Collectors.toList());  // Back to list objects     }            /**      * Deleting a reservation.      *      * @param confirmationNumber      *      unique identifier for confirmation.      */     public boolean delete(String confirmationNumber) {          // Retrieving entire reservation in order to obtain the attributes we will need to delete from         // reservations_by_hotel_date table         Optional<Reservation> reservationToDelete = this.findByConfirmationNumber(confirmationNumber);          if (reservationToDelete.isPresent()) {              // Delete from 'reservations_by_hotel_date'             Reservation reservation = reservationToDelete.get();             BoundStatement bsDeleteReservationByHotelDate =                     psDeleteReservationByHotelDate.bind(reservation.getHotelId(),                             reservation.getStartDate(), reservation.getRoomNumber());              // Delete from 'reservations_by_confirmation'             BoundStatement bsDeleteReservationByConfirmation =                     psDeleteReservationByConfirmation.bind(confirmationNumber);              BatchStatement batchDeleteReservation = BatchStatement                     .builder(DefaultBatchType.LOGGED)                     .addStatement(bsDeleteReservationByHotelDate)                     .addStatement(bsDeleteReservationByConfirmation)                     .build();             cqlSession.execute(batchDeleteReservation);              // TODO: Publish message to Kafka with empty payload to indicate deletion             // HINT: use the constructor ProducerRecord(String topic, K key, V value)             // with the reservation confirmation number as the key, and an empty string as the value             ProducerRecord<String, String> record = null;             kafkaProducer.send(record);              return true;         }         return false;     }          /**      * Search all reservation for an hotel id and LocalDate.      *      * @param hotelId      *      hotel identifier      * @param date      *      searched Date      * @return      *      list of reservations matching the search criteria      */     public List<Reservation> findByHotelAndDate(String hotelId, LocalDate date) {         Objects.requireNonNull(hotelId);         Objects.requireNonNull(date);         return cqlSession.execute(psSearchReservation.bind(hotelId, date))                          .all()                          // no paging we retrieve all objects                          .stream()                       // because we are good people                          .map(this::mapRowToReservation) // Mapping row as Reservation                          .collect(Collectors.toList());  // Back to list objects     }      /**      * Utility method to marshal a row as expected Reservation Bean.      *      * @param row      *      current row from ResultSet      * @return      *      object      */     private Reservation mapRowToReservation(Row row) {         Reservation reservation = new Reservation();         reservation.setHotelId(row.getString(HOTEL_ID));         reservation.setConfirmationNumber(row.getString(CONFIRM_NUMBER));         reservation.setGuestId(row.getUuid(GUEST_ID));         reservation.setRoomNumber(row.getShort(ROOM_NUMBER));         reservation.setStartDate(row.getLocalDate(START_DATE));         reservation.setEndDate(row.getLocalDate(END_DATE));         return reservation;     }          /**      * Create Keyspace and relevant tables as per defined in 'reservation.cql'      */     public void createReservationTables() {                  /**          * Create TYPE 'Address' if not exists          *           * CREATE TYPE reservation.address (          *   street text,          *   city text,          *   state_or_province text,          *   postal_code text,          *   country text          * );          */         cqlSession.execute(                 createType(keyspaceName, TYPE_ADDRESS)                 .ifNotExists()                 .withField(STREET, DataTypes.TEXT)                 .withField(CITY, DataTypes.TEXT)                 .withField(STATE_PROVINCE, DataTypes.TEXT)                 .withField(POSTAL_CODE, DataTypes.TEXT)                 .withField(COUNTRY, DataTypes.TEXT)                 .build());         logger.debug("+ Type '{}' has been created (if needed)", TYPE_ADDRESS.asInternal());                  /**           * CREATE TABLE reservation.reservations_by_hotel_date (          *  hotel_id text,          *  start_date date,          *  end_date date,          *  room_number smallint,          *  confirm_number text,          *  guest_id uuid,          *  PRIMARY KEY ((hotel_id, start_date), room_number)          * );          */         cqlSession.execute(createTable(keyspaceName, TABLE_RESERVATION_BY_HOTEL_DATE)                         .ifNotExists()                         .withPartitionKey(HOTEL_ID, DataTypes.TEXT)                         .withPartitionKey(START_DATE, DataTypes.DATE)                         .withClusteringColumn(ROOM_NUMBER, DataTypes.SMALLINT)                         .withColumn(END_DATE, DataTypes.DATE)                         .withColumn(CONFIRM_NUMBER, DataTypes.TEXT)                         .withColumn(GUEST_ID, DataTypes.UUID)                         .withClusteringOrder(ROOM_NUMBER, ClusteringOrder.ASC)                         .withComment("Q7. Find reservations by hotel and date")                         .build());         logger.debug("+ Table '{}' has been created (if needed)", TABLE_RESERVATION_BY_HOTEL_DATE.asInternal());                  /**          * CREATE TABLE reservation.reservations_by_confirmation (          *   confirm_number text PRIMARY KEY,          *   hotel_id text,          *   start_date date,          *   end_date date,          *   room_number smallint,          *   guest_id uuid          * );          */         cqlSession.execute(createTable(keyspaceName, TABLE_RESERVATION_BY_CONFI)                 .ifNotExists()                 .withPartitionKey(CONFIRM_NUMBER, DataTypes.TEXT)                 .withColumn(HOTEL_ID, DataTypes.TEXT)                 .withColumn(START_DATE, DataTypes.DATE)                 .withColumn(END_DATE, DataTypes.DATE)                 .withColumn(ROOM_NUMBER, DataTypes.SMALLINT)                 .withColumn(GUEST_ID, DataTypes.UUID)                 .build());          logger.debug("+ Table '{}' has been created (if needed)", TABLE_RESERVATION_BY_CONFI.asInternal());                    /**           * CREATE TABLE reservation.reservations_by_guest (           *  guest_last_name text,           *  hotel_id text,           *  start_date date,           *  end_date date,           *  room_number smallint,           *  confirm_number text,           *  guest_id uuid,           *  PRIMARY KEY ((guest_last_name), hotel_id)           * );           */          cqlSession.execute(createTable(keyspaceName, TABLE_RESERVATION_BY_GUEST)                  .ifNotExists()                  .withPartitionKey(GUEST_LAST_NAME, DataTypes.TEXT)                  .withClusteringColumn(HOTEL_ID, DataTypes.TEXT)                  .withColumn(START_DATE, DataTypes.DATE)                  .withColumn(END_DATE, DataTypes.DATE)                  .withColumn(ROOM_NUMBER, DataTypes.SMALLINT)                  .withColumn(CONFIRM_NUMBER, DataTypes.TEXT)                  .withColumn(GUEST_ID, DataTypes.UUID)                  .withComment("Q8. Find reservations by guest name")                  .build());           logger.debug("+ Table '{}' has been created (if needed)", TABLE_RESERVATION_BY_GUEST.asInternal());                      /**            * CREATE TABLE reservation.guests (            *   guest_id uuid PRIMARY KEY,            *   first_name text,            *   last_name text,            *   title text,            *   emails set<text>,            *   phone_numbers list<text>,            *   addresses map<text, frozen<address>>,            *   confirm_number text            * );            */           UserDefinedType  udtAddressType =                    cqlSession.getMetadata().getKeyspace(keyspaceName).get() // Retrieving KeySpaceMetadata                             .getUserDefinedType(TYPE_ADDRESS).get();        // Looking for UDT (extending DataType)           cqlSession.execute(createTable(keyspaceName, TABLE_GUESTS)                   .ifNotExists()                   .withPartitionKey(GUEST_ID, DataTypes.UUID)                   .withColumn(FIRSTNAME, DataTypes.TEXT)                   .withColumn(LASTNAME, DataTypes.TEXT)                   .withColumn(TITLE, DataTypes.TEXT)                   .withColumn(EMAILS, DataTypes.setOf(DataTypes.TEXT))                   .withColumn(PHONE_NUMBERS, DataTypes.listOf(DataTypes.TEXT))                   .withColumn(ADDRESSES, DataTypes.mapOf(DataTypes.TEXT, udtAddressType, true))                   .withColumn(CONFIRM_NUMBER, DataTypes.TEXT)                   .withComment("Q9. Find guest by ID")                   .build());            logger.debug("+ Table '{}' has been created (if needed)", TABLE_GUESTS.asInternal());            logger.info("Schema has been successfully initialized.");     }      private void prepareStatements() {         if (psExistReservation == null) {             psExistReservation = cqlSession.prepare(selectFrom(keyspaceName, TABLE_RESERVATION_BY_CONFI).column(CONFIRM_NUMBER)                                 .where(column(CONFIRM_NUMBER).isEqualTo(bindMarker(CONFIRM_NUMBER)))                                 .build());             psFindReservation = cqlSession.prepare(                                 selectFrom(keyspaceName, TABLE_RESERVATION_BY_CONFI).all()                                 .where(column(CONFIRM_NUMBER).isEqualTo(bindMarker(CONFIRM_NUMBER)))                                 .build());             psSearchReservation = cqlSession.prepare(                                 selectFrom(keyspaceName, TABLE_RESERVATION_BY_HOTEL_DATE).all()                                 .where(column(HOTEL_ID).isEqualTo(bindMarker(HOTEL_ID)))                                 .where(column(START_DATE).isEqualTo(bindMarker(START_DATE)))                                 .build());             psDeleteReservationByConfirmation = cqlSession.prepare(                                 deleteFrom(keyspaceName, TABLE_RESERVATION_BY_CONFI)                                 .where(column(CONFIRM_NUMBER).isEqualTo(bindMarker(CONFIRM_NUMBER)))                                 .build());             psDeleteReservationByHotelDate = cqlSession.prepare(                     deleteFrom(keyspaceName, TABLE_RESERVATION_BY_HOTEL_DATE)                     .where(column(HOTEL_ID).isEqualTo(bindMarker(HOTEL_ID)))                     .where(column(START_DATE).isEqualTo(bindMarker(START_DATE)))                     .where(column(ROOM_NUMBER).isEqualTo(bindMarker(ROOM_NUMBER)))                     .build());             psInsertReservationByHotelDate = cqlSession.prepare(QueryBuilder.insertInto(keyspaceName, TABLE_RESERVATION_BY_HOTEL_DATE)                     .value(HOTEL_ID, bindMarker(HOTEL_ID))                     .value(START_DATE, bindMarker(START_DATE))                     .value(END_DATE, bindMarker(END_DATE))                     .value(ROOM_NUMBER, bindMarker(ROOM_NUMBER))                     .value(CONFIRM_NUMBER, bindMarker(CONFIRM_NUMBER))                     .value(GUEST_ID, bindMarker(GUEST_ID))                     .build());             psInsertReservationByConfirmation = cqlSession.prepare(QueryBuilder.insertInto(keyspaceName, TABLE_RESERVATION_BY_CONFI)                     .value(CONFIRM_NUMBER, bindMarker(CONFIRM_NUMBER))                     .value(HOTEL_ID, bindMarker(HOTEL_ID))                     .value(START_DATE, bindMarker(START_DATE))                     .value(END_DATE, bindMarker(END_DATE))                     .value(ROOM_NUMBER, bindMarker(ROOM_NUMBER))                     .value(GUEST_ID, bindMarker(GUEST_ID))                     .build());             logger.info("Statements have been successfully prepared.");         }     } }

TODO в этом классе включают несколько пунктов, которые мы должны рассмотреть:

  • TODO: проверяет импорт для публикации в Kafka. Обратите внимание, что классы, которые вы использовали ранее, импортированы.

  • TODO: Это обзор переменных, используемых для публикации в Kafka. В репозитории хранятся ObjectMapper и KafkaProducer.

  • TODO: Проверка инициализации объектов, необходимых для публикации в Kafka, инициализация переменных.

Теперь самое интересное — реализация кода для публикации сообщений:

  • TODO: Это публикует сообщение Кафке, содержащее reservation. В методе upsert() опубликуйте сообщение, содержащее резервирование, в виде строки JSON.

  • TODO: публикует сообщение для Kafka с пустой полезной нагрузкой, указывающее на удаление. В методе delete() публикует сообщение об удалении резервирования.

Подправим наш код:

ReservationRepositorySolution.java
/*  * Copyright (C) 2017-2020 Jeff Carpenter  *  * Licensed under the Apache License, Version 2.0 (the "License");  * you may not use this file except in compliance with the License.  * You may obtain a copy of the License at  *  * http://www.apache.org/licenses/LICENSE-2.0  *  * Unless required by applicable law or agreed to in writing, software  * distributed under the License is distributed on an "AS IS" BASIS,  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  * See the License for the specific language governing permissions and  * limitations under the License.  */ package dev.cassandraguide.repository;  import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.bindMarker; import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.deleteFrom; import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.selectFrom; import static com.datastax.oss.driver.api.querybuilder.SchemaBuilder.createTable; import static com.datastax.oss.driver.api.querybuilder.SchemaBuilder.createType; import static com.datastax.oss.driver.api.querybuilder.relation.Relation.column;  import java.time.LocalDate; import java.util.List; import java.util.Objects; import java.util.Optional; import java.util.UUID; import java.util.stream.Collectors;  import javax.annotation.PreDestroy;  import com.fasterxml.jackson.core.JsonProcessingException; import dev.cassandraguide.model.Reservation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Profile; import org.springframework.lang.NonNull; import org.springframework.stereotype.Repository;  import com.datastax.oss.driver.api.core.CqlIdentifier; import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.cql.BatchStatement; import com.datastax.oss.driver.api.core.cql.BoundStatement; import com.datastax.oss.driver.api.core.cql.DefaultBatchType; import com.datastax.oss.driver.api.core.cql.PreparedStatement; import com.datastax.oss.driver.api.core.cql.ResultSet; import com.datastax.oss.driver.api.core.cql.Row; import com.datastax.oss.driver.api.core.metadata.schema.ClusteringOrder; import com.datastax.oss.driver.api.core.type.DataTypes; import com.datastax.oss.driver.api.core.type.UserDefinedType; import com.datastax.oss.driver.api.querybuilder.QueryBuilder;  // TODO: Review imports for publishing to Kafka import org.apache.kafka.clients.producer.*; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;  /**  * The goal of this project is to provide a minimally functional implementation of a microservice   * that uses Apache Cassandra for its data storage. The reservation service is implemented as a   * RESTful service using Spring Boot.  *  * @author Jeff Carpenter, Cedrick Lunven  */ @Repository @Profile("!unit-test") // When I do some 'unit-test' no connectivity to DB public class ReservationRepository {      /** Logger for the class. */     private static final Logger logger = LoggerFactory.getLogger(ReservationRepository.class);          // Reservation Schema Constants     public static final CqlIdentifier TYPE_ADDRESS               = CqlIdentifier.fromCql("address");     public static final CqlIdentifier TABLE_RESERVATION_BY_HOTEL_DATE =             CqlIdentifier.fromCql("reservations_by_hotel_date");     public static final CqlIdentifier TABLE_RESERVATION_BY_CONFI = CqlIdentifier.fromCql("reservations_by_confirmation");     public static final CqlIdentifier TABLE_RESERVATION_BY_GUEST = CqlIdentifier.fromCql("reservations_by_guest");     public static final CqlIdentifier TABLE_GUESTS               = CqlIdentifier.fromCql("guests");     public static final CqlIdentifier STREET                     = CqlIdentifier.fromCql("street");     public static final CqlIdentifier CITY                       = CqlIdentifier.fromCql("city");     public static final CqlIdentifier STATE_PROVINCE             = CqlIdentifier.fromCql("state_or_province");     public static final CqlIdentifier POSTAL_CODE                = CqlIdentifier.fromCql("postal_code");     public static final CqlIdentifier COUNTRY                    = CqlIdentifier.fromCql("country");     public static final CqlIdentifier HOTEL_ID                   = CqlIdentifier.fromCql("hotel_id");     public static final CqlIdentifier START_DATE                 = CqlIdentifier.fromCql("start_date");     public static final CqlIdentifier END_DATE                   = CqlIdentifier.fromCql("end_date");     public static final CqlIdentifier ROOM_NUMBER                = CqlIdentifier.fromCql("room_number");     public static final CqlIdentifier CONFIRMATION_NUMBER        = CqlIdentifier.fromCql("confirmation_number");     public static final CqlIdentifier GUEST_ID                   = CqlIdentifier.fromCql("guest_id");     public static final CqlIdentifier GUEST_LAST_NAME            = CqlIdentifier.fromCql("guest_last_name");     public static final CqlIdentifier FIRSTNAME                  = CqlIdentifier.fromCql("first_name");     public static final CqlIdentifier LASTNAME                   = CqlIdentifier.fromCql("last_name");     public static final CqlIdentifier TITLE                      = CqlIdentifier.fromCql("title");     public static final CqlIdentifier EMAILS                     = CqlIdentifier.fromCql("emails");     public static final CqlIdentifier PHONE_NUMBERS              = CqlIdentifier.fromCql("phone_numbers");     public static final CqlIdentifier ADDRESSES                  = CqlIdentifier.fromCql("addresses");          private PreparedStatement psExistReservation;     private PreparedStatement psFindReservation;     private PreparedStatement psInsertReservationByHotelDate;     private PreparedStatement psInsertReservationByConfirmation;     private PreparedStatement psDeleteReservationByHotelDate;     private PreparedStatement psDeleteReservationByConfirmation;     private PreparedStatement psSearchReservation;          /** CqlSession holding metadata to interact with Cassandra. */     private CqlSession     cqlSession;     private CqlIdentifier  keyspaceName;      // TODO: Review variables used for publishing to Kafka     /** KafkaProducer for publishing messages to Kafka. */     private KafkaProducer<String, String> kafkaProducer;     private String kafkaTopicName;     private ObjectMapper objectMapper;          /** External Initialization. */     public ReservationRepository(             @NonNull CqlSession cqlSession,              @Qualifier("keyspace") @NonNull CqlIdentifier keyspaceName,             @NonNull KafkaProducer<String, String> kafkaProducer,             @NonNull String kafkaTopicName) {         this.cqlSession   = cqlSession;         this.keyspaceName = keyspaceName;          // TODO: Review initialization of objects needed for publishing to Kafka         this.kafkaProducer = kafkaProducer;         this.kafkaTopicName = kafkaTopicName;          objectMapper = new ObjectMapper();         objectMapper.registerModule(new JavaTimeModule());         objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)         objectMapper.enable(SerializationFeature.INDENT_OUTPUT)          // Will create tables (if they do not exist)         createReservationTables();                  // Prepare Statements of reservation         prepareStatements();         logger.info("Application initialized.");     }          /**      * CqlSession is a stateful object handling TCP connections to nodes in the cluster.      * This operation properly closes sockets when the application is stopped      */     @PreDestroy     public void cleanup() {         if (null != cqlSession) {             cqlSession.close();             logger.info("+ CqlSession has been successfully closed");         }     }          /**      * Testing existence is useful for building correct semantics in the RESTful API. To evaluate existence find the      * table where confirmation number is partition key which is reservations_by_confirmation      *       * @param confirmationNumber      *      unique identifier for confirmation      * @return      *      true if the reservation exists, false if it does not      */     public boolean exists(String confirmationNumber) {         return cqlSession.execute(psExistReservation.bind(confirmationNumber))                          .getAvailableWithoutFetching() > 0;     }          /**      * Similar to exists() but maps and parses results.      *       * @param confirmationNumber      *      unique identifier for confirmation      * @return      *      reservation if present or empty      */     @NonNull     public Optional<Reservation> findByConfirmationNumber(@NonNull String confirmationNumber) {                  ResultSet resultSet = cqlSession.execute(psFindReservation.bind(confirmationNumber));                  // Hint: an empty result might not be an error as this method is sometimes used to check whether a         // reservation with this confirmation number exists         Row row = resultSet.one();         if (row == null) {             logger.debug("Unable to load reservation with confirmation number: " + confirmationNumber);             return Optional.empty();         }                  // Hint: If there is a result, create a new reservation object and set the values         // Bonus: factor the logic to extract a reservation from a row into a separate method         // (you will reuse it again later in getAllReservations())         return Optional.of(mapRowToReservation(row));     }          /**      * Create new entry in multiple tables for this reservation.      *      * @param reservation      *      current reservation object      * @return      *      confirmation number for the reservation      *            */      public String upsert(Reservation reservation) {         Objects.requireNonNull(reservation);         if (null == reservation.getConfirmationNumber()) {             // Generating a new reservation number if none has been provided             reservation.setConfirmationNumber(UUID.randomUUID().toString());         }         // Insert into 'reservations_by_hotel_date'         BoundStatement bsInsertReservationByHotel =                  psInsertReservationByHotelDate.bind(reservation.getHotelId(), reservation.getStartDate(),                         reservation.getEndDate(), reservation.getRoomNumber(), reservation.getConfirmationNumber(),                         reservation.getGuestId());         // Insert into 'reservations_by_confirmation'         BoundStatement bsInsertReservationByConfirmation =                  psInsertReservationByConfirmation.bind(reservation.getConfirmationNumber(), reservation.getHotelId(),                         reservation.getStartDate(), reservation.getEndDate(), reservation.getRoomNumber(),                         reservation.getGuestId());         BatchStatement batchInsertReservation = BatchStatement                     .builder(DefaultBatchType.LOGGED)                     .addStatement(bsInsertReservationByHotel)                     .addStatement(bsInsertReservationByConfirmation)                     .build();         cqlSession.execute(batchInsertReservation);          // TODO: Publish message to Kafka containing reservation          try {              String reservationJson = objectMapper.writeValueAsString(reservation);              ProducerRecord<String, String> record = new ProducerRecord<>(kafkaTopicName, reservation.getConfirmationNumber(), reservationJson);              kafkaProducer.send(record);          } catch (Exception e) {              logger.warn("Error publishing reservation message to Kafka: {}", e);          }          return reservation.getConfirmationNumber();     }      /**      * We pick 'reservations_by_confirmation' table to list reservations      * BUT we could have used 'reservations_by_hotel_date' (as no key provided in request)      *        * @return      *      list containing all reservations      */     public List<Reservation> findAll() {         return cqlSession.execute(selectFrom(keyspaceName, TABLE_RESERVATION_BY_CONFI).all().build())                   .all()                          // no paging we retrieve all objects                   .stream()                       // because we are good people                   .map(this::mapRowToReservation) // Mapping row as Reservation                   .collect(Collectors.toList());  // Back to list objects     }            /**      * Deleting a reservation.      *      * @param confirmationNumber      *      unique identifier for confirmation.      */     public boolean delete(String confirmationNumber) {          // Retrieving entire reservation in order to obtain the attributes we will need to delete from         // reservations_by_hotel_date table         Optional<Reservation> reservationToDelete = this.findByConfirmationNumber(confirmationNumber);          if (reservationToDelete.isPresent()) {              // Delete from 'reservations_by_hotel_date'             Reservation reservation = reservationToDelete.get();             BoundStatement bsDeleteReservationByHotelDate =                     psDeleteReservationByHotelDate.bind(reservation.getHotelId(),                             reservation.getStartDate(), reservation.getRoomNumber());              // Delete from 'reservations_by_confirmation'             BoundStatement bsDeleteReservationByConfirmation =                     psDeleteReservationByConfirmation.bind(confirmationNumber);              BatchStatement batchDeleteReservation = BatchStatement                     .builder(DefaultBatchType.LOGGED)                     .addStatement(bsDeleteReservationByHotelDate)                     .addStatement(bsDeleteReservationByConfirmation)                     .build();             cqlSession.execute(batchDeleteReservation);              // TODO: Publish message to Kafka with empty payload to indicate deletion             ProducerRecord<String, String> record = new ProducerRecord<>(kafkaTopicName, reservation.getConfirmationNumber(), "");             kafkaProducer.send(record);              return true;         }         return false;     }          /**      * Search all reservation for an hotel id and LocalDate.      *      * @param hotelId      *      hotel identifier      * @param date      *      searched Date      * @return      *      list of reservations matching the search criteria      */     public List<Reservation> findByHotelAndDate(String hotelId, LocalDate date) {         Objects.requireNonNull(hotelId);         Objects.requireNonNull(date);         return cqlSession.execute(psSearchReservation.bind(hotelId, date))                          .all()                          // no paging we retrieve all objects                          .stream()                       // because we are good people                          .map(this::mapRowToReservation) // Mapping row as Reservation                          .collect(Collectors.toList());  // Back to list objects     }      /**      * Utility method to marshal a row as expected Reservation Bean.      *      * @param row      *      current row from ResultSet      * @return      *      object      */     private Reservation mapRowToReservation(Row row) {         Reservation reservation = new Reservation();         reservation.setHotelId(row.getString(HOTEL_ID));         reservation.setConfirmationNumber(row.getString(CONFIRMATION_NUMBER));         reservation.setGuestId(row.getUuid(GUEST_ID));         reservation.setRoomNumber(row.getShort(ROOM_NUMBER));         reservation.setStartDate(row.getLocalDate(START_DATE));         reservation.setEndDate(row.getLocalDate(END_DATE));         return reservation;     }          /**      * Create Keyspace and relevant tables as per defined in 'reservation.cql'      */     public void createReservationTables() {                  /**          * Create TYPE 'Address' if not exists          *           * CREATE TYPE reservation.address (          *   street text,          *   city text,          *   state_or_province text,          *   postal_code text,          *   country text          * );          */         cqlSession.execute(                 createType(keyspaceName, TYPE_ADDRESS)                 .ifNotExists()                 .withField(STREET, DataTypes.TEXT)                 .withField(CITY, DataTypes.TEXT)                 .withField(STATE_PROVINCE, DataTypes.TEXT)                 .withField(POSTAL_CODE, DataTypes.TEXT)                 .withField(COUNTRY, DataTypes.TEXT)                 .build());         logger.debug("+ Type '{}' has been created (if needed)", TYPE_ADDRESS.asInternal());                  /**           * CREATE TABLE reservation.reservations_by_hotel_date (          *  hotel_id text,          *  start_date date,          *  end_date date,          *  room_number smallint,          *  confirmation_number text,          *  guest_id uuid,          *  PRIMARY KEY ((hotel_id, start_date), room_number)          * ) WITH comment = 'Q7. Find reservations by hotel and date';          */         cqlSession.execute(createTable(keyspaceName, TABLE_RESERVATION_BY_HOTEL_DATE)                         .ifNotExists()                         .withPartitionKey(HOTEL_ID, DataTypes.TEXT)                         .withPartitionKey(START_DATE, DataTypes.DATE)                         .withClusteringColumn(ROOM_NUMBER, DataTypes.SMALLINT)                         .withColumn(END_DATE, DataTypes.DATE)                         .withColumn(CONFIRMATION_NUMBER, DataTypes.TEXT)                         .withColumn(GUEST_ID, DataTypes.UUID)                         .withClusteringOrder(ROOM_NUMBER, ClusteringOrder.ASC)                         .withComment("Q7. Find reservations by hotel and date")                         .build());         logger.debug("+ Table '{}' has been created (if needed)", TABLE_RESERVATION_BY_HOTEL_DATE.asInternal());                  /**          * CREATE TABLE reservation.reservations_by_confirmation (          *   confirmation_number text PRIMARY KEY,          *   hotel_id text,          *   start_date date,          *   end_date date,          *   room_number smallint,          *   guest_id uuid          * );          */         cqlSession.execute(createTable(keyspaceName, TABLE_RESERVATION_BY_CONFI)                 .ifNotExists()                 .withPartitionKey(CONFIRMATION_NUMBER, DataTypes.TEXT)                 .withColumn(HOTEL_ID, DataTypes.TEXT)                 .withColumn(START_DATE, DataTypes.DATE)                 .withColumn(END_DATE, DataTypes.DATE)                 .withColumn(ROOM_NUMBER, DataTypes.SMALLINT)                 .withColumn(GUEST_ID, DataTypes.UUID)                 .build());          logger.debug("+ Table '{}' has been created (if needed)", TABLE_RESERVATION_BY_CONFI.asInternal());                    /**           * CREATE TABLE reservation.reservations_by_guest (           *  guest_last_name text,           *  hotel_id text,           *  start_date date,           *  end_date date,           *  room_number smallint,           *  confirmation_number text,           *  guest_id uuid,           *  PRIMARY KEY ((guest_last_name), hotel_id)           * ) WITH comment = 'Q8. Find reservations by guest name';           */          cqlSession.execute(createTable(keyspaceName, TABLE_RESERVATION_BY_GUEST)                  .ifNotExists()                  .withPartitionKey(GUEST_LAST_NAME, DataTypes.TEXT)                  .withClusteringColumn(HOTEL_ID, DataTypes.TEXT)                  .withColumn(START_DATE, DataTypes.DATE)                  .withColumn(END_DATE, DataTypes.DATE)                  .withColumn(ROOM_NUMBER, DataTypes.SMALLINT)                  .withColumn(CONFIRMATION_NUMBER, DataTypes.TEXT)                  .withColumn(GUEST_ID, DataTypes.UUID)                  .withComment("Q8. Find reservations by guest name")                  .build());           logger.debug("+ Table '{}' has been created (if needed)", TABLE_RESERVATION_BY_GUEST.asInternal());                      /**            * CREATE TABLE reservation.guests (            *   guest_id uuid PRIMARY KEY,            *   first_name text,            *   last_name text,            *   title text,            *   emails set<text>,            *   phone_numbers list<text>,            *   addresses map<text, frozen<address>>,            *   confirmation_number text            * ) WITH comment = 'Q9. Find guest by ID';            */           UserDefinedType  udtAddressType =                    cqlSession.getMetadata().getKeyspace(keyspaceName).get() // Retrieving KeySpaceMetadata                             .getUserDefinedType(TYPE_ADDRESS).get();        // Looking for UDT (extending DataType)           cqlSession.execute(createTable(keyspaceName, TABLE_GUESTS)                   .ifNotExists()                   .withPartitionKey(GUEST_ID, DataTypes.UUID)                   .withColumn(FIRSTNAME, DataTypes.TEXT)                   .withColumn(LASTNAME, DataTypes.TEXT)                   .withColumn(TITLE, DataTypes.TEXT)                   .withColumn(EMAILS, DataTypes.setOf(DataTypes.TEXT))                   .withColumn(PHONE_NUMBERS, DataTypes.listOf(DataTypes.TEXT))                   .withColumn(ADDRESSES, DataTypes.mapOf(DataTypes.TEXT, udtAddressType, true))                   .withColumn(CONFIRMATION_NUMBER, DataTypes.TEXT)                   .withComment("Q9. Find guest by ID")                   .build());            logger.debug("+ Table '{}' has been created (if needed)", TABLE_GUESTS.asInternal());            logger.info("Schema has been successfully initialized.");     }      private void prepareStatements() {         if (psExistReservation == null) {             psExistReservation = cqlSession.prepare(selectFrom(keyspaceName, TABLE_RESERVATION_BY_CONFI).column(CONFIRMATION_NUMBER)                                 .where(column(CONFIRMATION_NUMBER).isEqualTo(bindMarker(CONFIRMATION_NUMBER)))                                 .build());             psFindReservation = cqlSession.prepare(                                 selectFrom(keyspaceName, TABLE_RESERVATION_BY_CONFI).all()                                 .where(column(CONFIRMATION_NUMBER).isEqualTo(bindMarker(CONFIRMATION_NUMBER)))                                 .build());             psSearchReservation = cqlSession.prepare(                                 selectFrom(keyspaceName, TABLE_RESERVATION_BY_HOTEL_DATE).all()                                 .where(column(HOTEL_ID).isEqualTo(bindMarker(HOTEL_ID)))                                 .where(column(START_DATE).isEqualTo(bindMarker(START_DATE)))                                 .build());             psDeleteReservationByConfirmation = cqlSession.prepare(                                 deleteFrom(keyspaceName, TABLE_RESERVATION_BY_CONFI)                                 .where(column(CONFIRMATION_NUMBER).isEqualTo(bindMarker(CONFIRMATION_NUMBER)))                                 .build());             psDeleteReservationByHotelDate = cqlSession.prepare(                     deleteFrom(keyspaceName, TABLE_RESERVATION_BY_HOTEL_DATE)                     .where(column(HOTEL_ID).isEqualTo(bindMarker(HOTEL_ID)))                     .where(column(START_DATE).isEqualTo(bindMarker(START_DATE)))                     .where(column(ROOM_NUMBER).isEqualTo(bindMarker(ROOM_NUMBER)))                     .build());             psInsertReservationByHotelDate = cqlSession.prepare(QueryBuilder.insertInto(keyspaceName, TABLE_RESERVATION_BY_HOTEL_DATE)                     .value(HOTEL_ID, bindMarker(HOTEL_ID))                     .value(START_DATE, bindMarker(START_DATE))                     .value(END_DATE, bindMarker(END_DATE))                     .value(ROOM_NUMBER, bindMarker(ROOM_NUMBER))                     .value(CONFIRMATION_NUMBER, bindMarker(CONFIRMATION_NUMBER))                     .value(GUEST_ID, bindMarker(GUEST_ID))                     .build());             psInsertReservationByConfirmation = cqlSession.prepare(QueryBuilder.insertInto(keyspaceName, TABLE_RESERVATION_BY_CONFI)                     .value(CONFIRMATION_NUMBER, bindMarker(CONFIRMATION_NUMBER))                     .value(HOTEL_ID, bindMarker(HOTEL_ID))                     .value(START_DATE, bindMarker(START_DATE))                     .value(END_DATE, bindMarker(END_DATE))                     .value(ROOM_NUMBER, bindMarker(ROOM_NUMBER))                     .value(GUEST_ID, bindMarker(GUEST_ID))                     .build());             logger.info("Statements have been successfully prepared.");         }     } }

И переходим к тестам.

Служба резервирования также включает тесты, которые выполняются для контейнера Cassandra, запущенного в Docker, который запускается для целей теста, а затем уничтожается.

Перед запуском этого теста мы хотим отключить узел Cassandra:

kill `cat /tmp/cassandra-pid`

Теперь мы можем запустить тесты с помощью Maven:

mvn test

Мы узнали, как написать микросервис, который сохраняет данные в Apache Cassandra с помощью Java-драйвера DataStax и генерирует события для изменения данных в Apache Kafka.

Статья подготовлена в преддверии старта курса Software Architect. Также хочу поделиться с вами записью бесплатного урока по теме «Архитектурное свойство «Сопровождаемость» на примере сервисов k8s».


ссылка на оригинал статьи https://habr.com/ru/articles/722278/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *