Автор статьи: Рустем Галиев
IBM Senior DevOps Engineer & Integration Architect
Привет Хабр!
Сегодня мы узнаем, как написать микросервис, который сохраняет данные в Apache Cassandra с помощью Java-драйвера DataStax и генерирует события для изменений данных в Apache Kafka.
Этот пост основан на дизайне простого микросервиса для управления данными бронирования отелей, который называется Reservation Service. Вы можете выполнить серию упражнений по записи и чтению данных в Cassandra с помощью службы резервирования в наборе «Cassandra: разработка приложений с помощью Java-драйвера DataStax». Исходный код службы бронирования доступен на GitHub.
Служба бронирования использует модель данных бронирования отелей, которая включает приведенные ниже таблицы.
Cassandra и Kafka часто используются вместе в микросервисных архитектурах, как показано ниже. В этом дизайне служба бронирования получает запрос API для выполнения некоторых действий, таких как создание, обновление или удаление бронирования. После сохранения изменения в Cassandra служба резервирования отправляет сообщение в топик reservation в кластере Kafka.
Другие службы используют топик reservation и выполняют различные действия в ответ на каждое сообщение: например, служба инвентаризации обновляет таблицы инвентаризации, чтобы отметить даты как зарезервированные, или, возможно, служба электронной почты отправляет электронное письмо с благодарностью гостю за бронирование. В этом стиле взаимодействия Кассандра и Кафка не связаны напрямую, а используются взаимодополняющим образом.
Настройка службы бронирования
Начнем с клонирования исходного кода Reservation Service с GitHub и проверки ветки, которая станет отправной точкой:
git clone -b kafka
https://github.com/jeffreyscarpenter/reservation-service
После того, как это будет завершено, мы должны увидеть директорию reservation_service
.
Давайте рассмотрим часть содержимого в коде.
Во-первых, мы рассмотрим схему, которую будем использовать для таблиц резервирования по пути reservation-service/src/main/resources/reservation.cql
:
/* * Copyright (C) 2016-2020 Jeff Carpenter */ /* This file contains a slightly modified version of the "reservation" keyspace and table definitions * for the example defined in Chapter 5 of Cassandra: The Definitive Guide, 2nd and 3nd Editions. * The changes are to facilitate development exercises. */ CREATE KEYSPACE reservation WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 1}; CREATE TABLE reservation.reservations_by_hotel_date ( hotel_id text, start_date date, end_date date, room_number smallint, confirm_number text, guest_id uuid, PRIMARY KEY ((hotel_id, start_date), room_number) ); CREATE TABLE reservation.reservations_by_confirmation ( confirm_number text PRIMARY KEY, hotel_id text, start_date date, end_date date, room_number smallint, guest_id uuid ); /* The following tables are provided for completeness with the book text, but they are not used in the current implementation of the Reservation Service */ CREATE TABLE reservation.reservations_by_guest ( guest_last_name text, hotel_id text, start_date date, end_date date, room_number smallint, confirm_number text, guest_id uuid, PRIMARY KEY ((guest_last_name), hotel_id) ); CREATE TYPE reservation.address ( street text, city text, state_or_province text, postal_code text, country text ); CREATE TABLE reservation.guests ( guest_id uuid PRIMARY KEY, first_name text, last_name text, title text, emails set<text>, phone_numbers list<text>, addresses map<text, frozen<address>> );
Мы также хотим отметить зависимости для драйвера DataStax Java и клиентских библиотек Kafka по пути reservation-service/pom.xml
:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.cassandraguide</groupId> <artifactId>reservation-service</artifactId> <version>1.0.0</version> <packaging>jar</packaging> <name>reservation-service</name> <description>Demo service using Cassandra driver and Spring Boot</description> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.6.RELEASE</version> <relativePath /> </parent> <properties> <java.version>11</java.version> <swagger.version>2.9.2</swagger.version> <junit-jupiter.version>5.4.2</junit-jupiter.version> <junit-platform.version>1.7.0-M1</junit-platform.version> <oss-java-driver.version>4.5.1</oss-java-driver.version> <kafka-client.version>2.5.0</kafka-client.version> <testcontainers.version>1.14.1</testcontainers.version> <version.maven.plugin.compiler>3.8.0</version.maven.plugin.compiler> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <start-class>dev.cassandraguide.ReservationServiceApp</start-class> <dockerfile-maven-version>1.4.13</dockerfile-maven-version> </properties> <dependencies> <!-- Create a RESTFul Service --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- Document for REST Service --> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger2</artifactId> <version>${swagger.version}</version> </dependency> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger-ui</artifactId> <version>${swagger.version}</version> </dependency> <!-- Cassandra Driver --> <dependency> <groupId>com.datastax.oss</groupId> <artifactId>java-driver-core</artifactId> <version>${oss-java-driver.version}</version> </dependency> <dependency> <groupId>com.datastax.oss</groupId> <artifactId>java-driver-query-builder</artifactId> <version>${oss-java-driver.version}</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>${kafka-client.version}</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> </dependency> <!-- Provides JSON serialization/deserialization for date/time types --> <dependency> <groupId>com.fasterxml.jackson.datatype</groupId> <artifactId>jackson-datatype-jsr310</artifactId> </dependency> <!-- Tests --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>junit</groupId> <artifactId>junit</artifactId> </exclusion> </exclusions> </dependency> <!-- Junit 5 --> <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter-api</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter-engine</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter-params</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.junit.platform</groupId> <artifactId>junit-platform-engine</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.junit.platform</groupId> <artifactId>junit-platform-launcher</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.junit.platform</groupId> <artifactId>junit-platform-runner</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.junit.platform</groupId> <artifactId>junit-platform-console-standalone</artifactId> <version>${junit-platform.version}</version> <scope>test</scope> </dependency> <!-- Test against Docker Containers --> <dependency> <groupId>org.testcontainers</groupId> <artifactId>cassandra</artifactId> <scope>test</scope> <version>${testcontainers.version}</version> </dependency> <dependency> <groupId>org.testcontainers</groupId> <artifactId>testcontainers</artifactId> <scope>test</scope> <version>${testcontainers.version}</version> </dependency> <dependency> <groupId>org.testcontainers</groupId> <artifactId>kafka</artifactId> <scope>test</scope> <version>${testcontainers.version}</version> </dependency> <!-- Add driver keys to spring-boot config file --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> <optional>true</optional> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <configuration> <fork>true</fork> <mainClass>${start-class}</mainClass> </configuration> <executions> <execution> <goals> <goal>repackage</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>${java.version}</source> <target>${java.version}</target> </configuration> </plugin> <plugin> <groupId>com.spotify</groupId> <artifactId>dockerfile-maven-plugin</artifactId> <version>${dockerfile-maven-version}</version> <configuration> <repository>reservation-service</repository> <tag>${project.version}</tag> <buildArgs> <JAR_FILE>${project.build.finalName}.jar</JAR_FILE> </buildArgs> </configuration> </plugin> </plugins> </build> </project>
В исходном коде есть несколько ключевых классов Java, которые мы могли бы также изучить:
Определение класса сущностей, используемое в HTTP API, которое представляет тип данных, которые мы сохраняем в Cassandra и публикуем в теме Kafka reservation-service/src/main/java/dev/cassandraguide/model/Reservation.java
:
/* * Copyright (C) 2017-2020 Jeff Carpenter * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package dev.cassandraguide.model; import java.io.Serializable; import java.time.LocalDate; import java.util.UUID; /** * Entity working with Reservation on Cassandra. * * @author Jeff Carpenter */ public class Reservation implements Serializable { /** Serial. */ private static final long serialVersionUID = -3392237616280919281L; /** Hotel identifier, as Text not UUID (for simplicity). */ private String hotelId; /** Formated as YYYY-MM-DD in interfaces. */ private LocalDate startDate; /** Formated as YYYY-MM-DD in interfaces. */ private LocalDate endDate; /** Room number. */ private short roomNumber; /** UUID. */ private UUID guestId; /** Confirmation for this Reservation. */ private String confirmationNumber; /** * Default constructor */ public Reservation() { } /** * Default constructor */ public Reservation(ReservationRequest form) { setStartDate(form.getStartDate()); setEndDate(form.getEndDate()); setHotelId(form.getHotelId()); setGuestId(form.getGuestId()); setRoomNumber(form.getRoomNumber()); } /** * Default constructor */ public Reservation(ReservationRequest form, String confirmationNumber) { this(form); this.confirmationNumber = confirmationNumber; } /** * Getter accessor for attribute 'hotelId'. * * @return * current value of 'hotelId' */ public String getHotelId() { return hotelId; } /** * Setter accessor for attribute 'hotelId'. * @param hotelId * new value for 'hotelId ' */ public void setHotelId(String hotelId) { this.hotelId = hotelId; } /** * Getter accessor for attribute 'startDate'. * * @return * current value of 'startDate' */ public LocalDate getStartDate() { return startDate; } /** * Setter accessor for attribute 'startDate'. * @param startDate * new value for 'startDate ' */ public void setStartDate(LocalDate startDate) { this.startDate = startDate; } /** * Getter accessor for attribute 'endDate'. * * @return * current value of 'endDate' */ public LocalDate getEndDate() { return endDate; } /** * Setter accessor for attribute 'endDate'. * @param endDate * new value for 'endDate ' */ public void setEndDate(LocalDate endDate) { this.endDate = endDate; } /** * Getter accessor for attribute 'roomNumber'. * * @return * current value of 'roomNumber' */ public short getRoomNumber() { return roomNumber; } /** * Setter accessor for attribute 'roomNumber'. * @param roomNumber * new value for 'roomNumber ' */ public void setRoomNumber(short roomNumber) { this.roomNumber = roomNumber; } /** * Getter accessor for attribute 'guestId'. * * @return * current value of 'guestId' */ public UUID getGuestId() { return guestId; } /** * Setter accessor for attribute 'guestId'. * @param guestId * new value for 'guestId ' */ public void setGuestId(UUID guestId) { this.guestId = guestId; } /** * Getter accessor for attribute 'confirmationNumber'. * * @return * current value of 'confirmationNumber' */ public String getConfirmationNumber() { return confirmationNumber; } /** * Setter accessor for attribute 'confirmationNumber'. * @param confirmationNumber * new value for 'confirmationNumber ' */ public void setConfirmationNumber(String confirmationNumber) { this.confirmationNumber = confirmationNumber; } /** {@inheritDoc} */ @Override public String toString() { return "Confirmation Number = " + confirmationNumber + ", Hotel ID: " + getHotelId() + ", Start Date = " + getStartDate() + ", End Date = " + getEndDate() + ", Room Number = " + getRoomNumber() + ", Guest ID = " + getGuestId(); } }
Логика хранения данных сервиса, где мы будем выполнять большую часть нашей работы reservation-service/src/main/java/dev/cassandraguide/repository/ReservationRepository.java
:
ReservationRepository.java
/* * Copyright (C) 2017-2020 Jeff Carpenter * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package dev.cassandraguide.repository; import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.bindMarker; import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.deleteFrom; import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.selectFrom; import static com.datastax.oss.driver.api.querybuilder.SchemaBuilder.createTable; import static com.datastax.oss.driver.api.querybuilder.SchemaBuilder.createType; import static com.datastax.oss.driver.api.querybuilder.relation.Relation.column; import java.time.LocalDate; import java.util.List; import java.util.Objects; import java.util.Optional; import java.util.UUID; import java.util.stream.Collectors; import javax.annotation.PreDestroy; import com.fasterxml.jackson.core.JsonProcessingException; import dev.cassandraguide.model.Reservation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Profile; import org.springframework.lang.NonNull; import org.springframework.stereotype.Repository; import com.datastax.oss.driver.api.core.CqlIdentifier; import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.cql.BatchStatement; import com.datastax.oss.driver.api.core.cql.BoundStatement; import com.datastax.oss.driver.api.core.cql.DefaultBatchType; import com.datastax.oss.driver.api.core.cql.PreparedStatement; import com.datastax.oss.driver.api.core.cql.ResultSet; import com.datastax.oss.driver.api.core.cql.Row; import com.datastax.oss.driver.api.core.metadata.schema.ClusteringOrder; import com.datastax.oss.driver.api.core.type.DataTypes; import com.datastax.oss.driver.api.core.type.UserDefinedType; import com.datastax.oss.driver.api.querybuilder.QueryBuilder; // TODO: Review imports for publishing to Kafka import org.apache.kafka.clients.producer.*; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; /** * The goal of this project is to provide a minimally functional implementation of a microservice * that uses Apache Cassandra for its data storage. The reservation service is implemented as a * RESTful service using Spring Boot. * * @author Jeff Carpenter, Cedrick Lunven */ @Repository @Profile("!unit-test") // When I do some 'unit-test' no connectivity to DB public class ReservationRepository { /** Logger for the class. */ private static final Logger logger = LoggerFactory.getLogger(ReservationRepository.class); // Reservation Schema Constants public static final CqlIdentifier TYPE_ADDRESS = CqlIdentifier.fromCql("address"); public static final CqlIdentifier TABLE_RESERVATION_BY_HOTEL_DATE = CqlIdentifier.fromCql("reservations_by_hotel_date"); public static final CqlIdentifier TABLE_RESERVATION_BY_CONFI = CqlIdentifier.fromCql("reservations_by_confirmation"); public static final CqlIdentifier TABLE_RESERVATION_BY_GUEST = CqlIdentifier.fromCql("reservations_by_guest"); public static final CqlIdentifier TABLE_GUESTS = CqlIdentifier.fromCql("guests"); public static final CqlIdentifier STREET = CqlIdentifier.fromCql("street"); public static final CqlIdentifier CITY = CqlIdentifier.fromCql("city"); public static final CqlIdentifier STATE_PROVINCE = CqlIdentifier.fromCql("state_or_province"); public static final CqlIdentifier POSTAL_CODE = CqlIdentifier.fromCql("postal_code"); public static final CqlIdentifier COUNTRY = CqlIdentifier.fromCql("country"); public static final CqlIdentifier HOTEL_ID = CqlIdentifier.fromCql("hotel_id"); public static final CqlIdentifier START_DATE = CqlIdentifier.fromCql("start_date"); public static final CqlIdentifier END_DATE = CqlIdentifier.fromCql("end_date"); public static final CqlIdentifier ROOM_NUMBER = CqlIdentifier.fromCql("room_number"); public static final CqlIdentifier CONFIRM_NUMBER = CqlIdentifier.fromCql("confirm_number"); public static final CqlIdentifier GUEST_ID = CqlIdentifier.fromCql("guest_id"); public static final CqlIdentifier GUEST_LAST_NAME = CqlIdentifier.fromCql("guest_last_name"); public static final CqlIdentifier FIRSTNAME = CqlIdentifier.fromCql("first_name"); public static final CqlIdentifier LASTNAME = CqlIdentifier.fromCql("last_name"); public static final CqlIdentifier TITLE = CqlIdentifier.fromCql("title"); public static final CqlIdentifier EMAILS = CqlIdentifier.fromCql("emails"); public static final CqlIdentifier PHONE_NUMBERS = CqlIdentifier.fromCql("phone_numbers"); public static final CqlIdentifier ADDRESSES = CqlIdentifier.fromCql("addresses"); private PreparedStatement psExistReservation; private PreparedStatement psFindReservation; private PreparedStatement psInsertReservationByHotelDate; private PreparedStatement psInsertReservationByConfirmation; private PreparedStatement psDeleteReservationByHotelDate; private PreparedStatement psDeleteReservationByConfirmation; private PreparedStatement psSearchReservation; /** CqlSession holding metadata to interact with Cassandra. */ private CqlSession cqlSession; private CqlIdentifier keyspaceName; // TODO: Review variables used for publishing to Kafka /** KafkaProducer for publishing messages to Kafka. */ private KafkaProducer<String, String> kafkaProducer; private String kafkaTopicName; private ObjectMapper objectMapper; /** External Initialization. */ public ReservationRepository( @NonNull CqlSession cqlSession, @Qualifier("keyspace") @NonNull CqlIdentifier keyspaceName, @NonNull KafkaProducer<String, String> kafkaProducer, @NonNull String kafkaTopicName) { this.cqlSession = cqlSession; this.keyspaceName = keyspaceName; // TODO: Review initialization of objects needed for publishing to Kafka this.kafkaProducer = kafkaProducer; this.kafkaTopicName = kafkaTopicName; objectMapper = new ObjectMapper(); objectMapper.registerModule(new JavaTimeModule()); objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); objectMapper.enable(SerializationFeature.INDENT_OUTPUT); // Will create tables (if they do not exist) createReservationTables(); // Prepare Statements of reservation prepareStatements(); logger.info("Application initialized."); } /** * CqlSession is a stateful object handling TCP connections to nodes in the cluster. * This operation properly closes sockets when the application is stopped */ @PreDestroy public void cleanup() { if (null != cqlSession) { cqlSession.close(); logger.info("+ CqlSession has been successfully closed"); } } /** * Testing existence is useful for building correct semantics in the RESTful API. To evaluate existence find the * table where confirmation number is partition key which is reservations_by_confirmation * * @param confirmationNumber * unique identifier for confirmation * @return * true if the reservation exists, false if it does not */ public boolean exists(String confirmationNumber) { return cqlSession.execute(psExistReservation.bind(confirmationNumber)) .getAvailableWithoutFetching() > 0; } /** * Similar to exists() but maps and parses results. * * @param confirmationNumber * unique identifier for confirmation * @return * reservation if present or empty */ @NonNull public Optional<Reservation> findByConfirmationNumber(@NonNull String confirmationNumber) { ResultSet resultSet = cqlSession.execute(psFindReservation.bind(confirmationNumber)); // Hint: an empty result might not be an error as this method is sometimes used to check whether a // reservation with this confirmation number exists Row row = resultSet.one(); if (row == null) { logger.debug("Unable to load reservation with confirmation number: " + confirmationNumber); return Optional.empty(); } // Hint: If there is a result, create a new reservation object and set the values // Bonus: factor the logic to extract a reservation from a row into a separate method // (you will reuse it again later in getAllReservations()) return Optional.of(mapRowToReservation(row)); } /** * Create new entry in multiple tables for this reservation. * * @param reservation * current reservation object * @return * confirmation number for the reservation * */ public String upsert(Reservation reservation) { Objects.requireNonNull(reservation); if (null == reservation.getConfirmationNumber()) { // Generating a new reservation number if none has been provided reservation.setConfirmationNumber(UUID.randomUUID().toString()); } // Insert into 'reservations_by_hotel_date' BoundStatement bsInsertReservationByHotel = psInsertReservationByHotelDate.bind(reservation.getHotelId(), reservation.getStartDate(), reservation.getEndDate(), reservation.getRoomNumber(), reservation.getConfirmationNumber(), reservation.getGuestId()); // Insert into 'reservations_by_confirmation' BoundStatement bsInsertReservationByConfirmation = psInsertReservationByConfirmation.bind(reservation.getConfirmationNumber(), reservation.getHotelId(), reservation.getStartDate(), reservation.getEndDate(), reservation.getRoomNumber(), reservation.getGuestId()); BatchStatement batchInsertReservation = BatchStatement .builder(DefaultBatchType.LOGGED) .addStatement(bsInsertReservationByHotel) .addStatement(bsInsertReservationByConfirmation) .build(); cqlSession.execute(batchInsertReservation); // TODO: Publish message to Kafka containing reservation try { String reservationJson = objectMapper.writeValueAsString(reservation); // HINT: use the constructor ProducerRecord(String topic, K key, V value) // with the reservation confirmation number as the key, and the JSON string as the value ProducerRecord<String, String> record = null; kafkaProducer.send(record); } catch (Exception e) { logger.warn("Error publishing reservation message to Kafka: {}", e); } return reservation.getConfirmationNumber(); } /** * We pick 'reservations_by_confirmation' table to list reservations * BUT we could have used 'reservations_by_hotel_date' (as no key provided in request) * * @return * list containing all reservations */ public List<Reservation> findAll() { return cqlSession.execute(selectFrom(keyspaceName, TABLE_RESERVATION_BY_CONFI).all().build()) .all() // no paging we retrieve all objects .stream() // because we are good people .map(this::mapRowToReservation) // Mapping row as Reservation .collect(Collectors.toList()); // Back to list objects } /** * Deleting a reservation. * * @param confirmationNumber * unique identifier for confirmation. */ public boolean delete(String confirmationNumber) { // Retrieving entire reservation in order to obtain the attributes we will need to delete from // reservations_by_hotel_date table Optional<Reservation> reservationToDelete = this.findByConfirmationNumber(confirmationNumber); if (reservationToDelete.isPresent()) { // Delete from 'reservations_by_hotel_date' Reservation reservation = reservationToDelete.get(); BoundStatement bsDeleteReservationByHotelDate = psDeleteReservationByHotelDate.bind(reservation.getHotelId(), reservation.getStartDate(), reservation.getRoomNumber()); // Delete from 'reservations_by_confirmation' BoundStatement bsDeleteReservationByConfirmation = psDeleteReservationByConfirmation.bind(confirmationNumber); BatchStatement batchDeleteReservation = BatchStatement .builder(DefaultBatchType.LOGGED) .addStatement(bsDeleteReservationByHotelDate) .addStatement(bsDeleteReservationByConfirmation) .build(); cqlSession.execute(batchDeleteReservation); // TODO: Publish message to Kafka with empty payload to indicate deletion // HINT: use the constructor ProducerRecord(String topic, K key, V value) // with the reservation confirmation number as the key, and an empty string as the value ProducerRecord<String, String> record = null; kafkaProducer.send(record); return true; } return false; } /** * Search all reservation for an hotel id and LocalDate. * * @param hotelId * hotel identifier * @param date * searched Date * @return * list of reservations matching the search criteria */ public List<Reservation> findByHotelAndDate(String hotelId, LocalDate date) { Objects.requireNonNull(hotelId); Objects.requireNonNull(date); return cqlSession.execute(psSearchReservation.bind(hotelId, date)) .all() // no paging we retrieve all objects .stream() // because we are good people .map(this::mapRowToReservation) // Mapping row as Reservation .collect(Collectors.toList()); // Back to list objects } /** * Utility method to marshal a row as expected Reservation Bean. * * @param row * current row from ResultSet * @return * object */ private Reservation mapRowToReservation(Row row) { Reservation reservation = new Reservation(); reservation.setHotelId(row.getString(HOTEL_ID)); reservation.setConfirmationNumber(row.getString(CONFIRM_NUMBER)); reservation.setGuestId(row.getUuid(GUEST_ID)); reservation.setRoomNumber(row.getShort(ROOM_NUMBER)); reservation.setStartDate(row.getLocalDate(START_DATE)); reservation.setEndDate(row.getLocalDate(END_DATE)); return reservation; } /** * Create Keyspace and relevant tables as per defined in 'reservation.cql' */ public void createReservationTables() { /** * Create TYPE 'Address' if not exists * * CREATE TYPE reservation.address ( * street text, * city text, * state_or_province text, * postal_code text, * country text * ); */ cqlSession.execute( createType(keyspaceName, TYPE_ADDRESS) .ifNotExists() .withField(STREET, DataTypes.TEXT) .withField(CITY, DataTypes.TEXT) .withField(STATE_PROVINCE, DataTypes.TEXT) .withField(POSTAL_CODE, DataTypes.TEXT) .withField(COUNTRY, DataTypes.TEXT) .build()); logger.debug("+ Type '{}' has been created (if needed)", TYPE_ADDRESS.asInternal()); /** * CREATE TABLE reservation.reservations_by_hotel_date ( * hotel_id text, * start_date date, * end_date date, * room_number smallint, * confirm_number text, * guest_id uuid, * PRIMARY KEY ((hotel_id, start_date), room_number) * ); */ cqlSession.execute(createTable(keyspaceName, TABLE_RESERVATION_BY_HOTEL_DATE) .ifNotExists() .withPartitionKey(HOTEL_ID, DataTypes.TEXT) .withPartitionKey(START_DATE, DataTypes.DATE) .withClusteringColumn(ROOM_NUMBER, DataTypes.SMALLINT) .withColumn(END_DATE, DataTypes.DATE) .withColumn(CONFIRM_NUMBER, DataTypes.TEXT) .withColumn(GUEST_ID, DataTypes.UUID) .withClusteringOrder(ROOM_NUMBER, ClusteringOrder.ASC) .withComment("Q7. Find reservations by hotel and date") .build()); logger.debug("+ Table '{}' has been created (if needed)", TABLE_RESERVATION_BY_HOTEL_DATE.asInternal()); /** * CREATE TABLE reservation.reservations_by_confirmation ( * confirm_number text PRIMARY KEY, * hotel_id text, * start_date date, * end_date date, * room_number smallint, * guest_id uuid * ); */ cqlSession.execute(createTable(keyspaceName, TABLE_RESERVATION_BY_CONFI) .ifNotExists() .withPartitionKey(CONFIRM_NUMBER, DataTypes.TEXT) .withColumn(HOTEL_ID, DataTypes.TEXT) .withColumn(START_DATE, DataTypes.DATE) .withColumn(END_DATE, DataTypes.DATE) .withColumn(ROOM_NUMBER, DataTypes.SMALLINT) .withColumn(GUEST_ID, DataTypes.UUID) .build()); logger.debug("+ Table '{}' has been created (if needed)", TABLE_RESERVATION_BY_CONFI.asInternal()); /** * CREATE TABLE reservation.reservations_by_guest ( * guest_last_name text, * hotel_id text, * start_date date, * end_date date, * room_number smallint, * confirm_number text, * guest_id uuid, * PRIMARY KEY ((guest_last_name), hotel_id) * ); */ cqlSession.execute(createTable(keyspaceName, TABLE_RESERVATION_BY_GUEST) .ifNotExists() .withPartitionKey(GUEST_LAST_NAME, DataTypes.TEXT) .withClusteringColumn(HOTEL_ID, DataTypes.TEXT) .withColumn(START_DATE, DataTypes.DATE) .withColumn(END_DATE, DataTypes.DATE) .withColumn(ROOM_NUMBER, DataTypes.SMALLINT) .withColumn(CONFIRM_NUMBER, DataTypes.TEXT) .withColumn(GUEST_ID, DataTypes.UUID) .withComment("Q8. Find reservations by guest name") .build()); logger.debug("+ Table '{}' has been created (if needed)", TABLE_RESERVATION_BY_GUEST.asInternal()); /** * CREATE TABLE reservation.guests ( * guest_id uuid PRIMARY KEY, * first_name text, * last_name text, * title text, * emails set<text>, * phone_numbers list<text>, * addresses map<text, frozen<address>>, * confirm_number text * ); */ UserDefinedType udtAddressType = cqlSession.getMetadata().getKeyspace(keyspaceName).get() // Retrieving KeySpaceMetadata .getUserDefinedType(TYPE_ADDRESS).get(); // Looking for UDT (extending DataType) cqlSession.execute(createTable(keyspaceName, TABLE_GUESTS) .ifNotExists() .withPartitionKey(GUEST_ID, DataTypes.UUID) .withColumn(FIRSTNAME, DataTypes.TEXT) .withColumn(LASTNAME, DataTypes.TEXT) .withColumn(TITLE, DataTypes.TEXT) .withColumn(EMAILS, DataTypes.setOf(DataTypes.TEXT)) .withColumn(PHONE_NUMBERS, DataTypes.listOf(DataTypes.TEXT)) .withColumn(ADDRESSES, DataTypes.mapOf(DataTypes.TEXT, udtAddressType, true)) .withColumn(CONFIRM_NUMBER, DataTypes.TEXT) .withComment("Q9. Find guest by ID") .build()); logger.debug("+ Table '{}' has been created (if needed)", TABLE_GUESTS.asInternal()); logger.info("Schema has been successfully initialized."); } private void prepareStatements() { if (psExistReservation == null) { psExistReservation = cqlSession.prepare(selectFrom(keyspaceName, TABLE_RESERVATION_BY_CONFI).column(CONFIRM_NUMBER) .where(column(CONFIRM_NUMBER).isEqualTo(bindMarker(CONFIRM_NUMBER))) .build()); psFindReservation = cqlSession.prepare( selectFrom(keyspaceName, TABLE_RESERVATION_BY_CONFI).all() .where(column(CONFIRM_NUMBER).isEqualTo(bindMarker(CONFIRM_NUMBER))) .build()); psSearchReservation = cqlSession.prepare( selectFrom(keyspaceName, TABLE_RESERVATION_BY_HOTEL_DATE).all() .where(column(HOTEL_ID).isEqualTo(bindMarker(HOTEL_ID))) .where(column(START_DATE).isEqualTo(bindMarker(START_DATE))) .build()); psDeleteReservationByConfirmation = cqlSession.prepare( deleteFrom(keyspaceName, TABLE_RESERVATION_BY_CONFI) .where(column(CONFIRM_NUMBER).isEqualTo(bindMarker(CONFIRM_NUMBER))) .build()); psDeleteReservationByHotelDate = cqlSession.prepare( deleteFrom(keyspaceName, TABLE_RESERVATION_BY_HOTEL_DATE) .where(column(HOTEL_ID).isEqualTo(bindMarker(HOTEL_ID))) .where(column(START_DATE).isEqualTo(bindMarker(START_DATE))) .where(column(ROOM_NUMBER).isEqualTo(bindMarker(ROOM_NUMBER))) .build()); psInsertReservationByHotelDate = cqlSession.prepare(QueryBuilder.insertInto(keyspaceName, TABLE_RESERVATION_BY_HOTEL_DATE) .value(HOTEL_ID, bindMarker(HOTEL_ID)) .value(START_DATE, bindMarker(START_DATE)) .value(END_DATE, bindMarker(END_DATE)) .value(ROOM_NUMBER, bindMarker(ROOM_NUMBER)) .value(CONFIRM_NUMBER, bindMarker(CONFIRM_NUMBER)) .value(GUEST_ID, bindMarker(GUEST_ID)) .build()); psInsertReservationByConfirmation = cqlSession.prepare(QueryBuilder.insertInto(keyspaceName, TABLE_RESERVATION_BY_CONFI) .value(CONFIRM_NUMBER, bindMarker(CONFIRM_NUMBER)) .value(HOTEL_ID, bindMarker(HOTEL_ID)) .value(START_DATE, bindMarker(START_DATE)) .value(END_DATE, bindMarker(END_DATE)) .value(ROOM_NUMBER, bindMarker(ROOM_NUMBER)) .value(GUEST_ID, bindMarker(GUEST_ID)) .build()); logger.info("Statements have been successfully prepared."); } } }
Код, который создает подключение к Kafka reservation-service/src/main/java/dev/cassandraguide/conf/KafkaConfiguration.java
:
package dev.cassandraguide.conf; import dev.cassandraguide.repository.ReservationRepository; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; /** * Import Configuration from Configuration File * * @author Jeff Carpenter */ @Configuration public class KafkaConfiguration { // Logger private static final Logger logger = LoggerFactory.getLogger(ReservationRepository.class); // Bootstrap servers @Value("${kafka.bootstrap-servers:localhost:9092}") protected String bootstrapServers; // Kafka Client ID @Value("${kafka.client-id:ReservationService}") protected String clientId = "ReservationService"; // Topic Name @Value("${kafka.topicName:reservation}") public String topicName = "reservation"; /** * Default configuration. */ public KafkaConfiguration() {} /** * Initialization of Configuration. * * @param bootstrapServers * @param clientId * @param topicName */ public KafkaConfiguration( String bootstrapServers, String clientId, String topicName) { super(); this.bootstrapServers = bootstrapServers; this.clientId = clientId; this.topicName = topicName; } @Bean public KafkaProducer<String, String> kafkaProducer() { logger.info("Creating Kafka Producer."); Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); return new KafkaProducer<>(props); } /** * Getter accessor for attribute 'bootstrapServers'. * * @return * current value of 'bootstrapServers' */ public String getBootstrapServers() { return bootstrapServers; } /** * Setter accessor for attribute 'bootstrapServers'. * @param bootstrapServers * new value for 'bootstrapServers' */ public void setBootstrapServers(String bootstrapServers) { this.bootstrapServers = bootstrapServers; } /** * Getter accessor for attribute 'clientId'. * * @return * current value of 'clientId' */ public String getClientId() { return clientId; } /** * Setter accessor for attribute 'clientId'. * @param clientId * new value for 'clientId ' */ public void setClientId(String clientId) { this.clientId = clientId; } /** * Getter accessor for attribute 'topicName'. * * @return * current value of 'topicName' */ public String getTopicName() { return topicName; } /** * Setter accessor for attribute 'topicName'. * @param topicName * new value for 'topicName ' */ public void setTopicName(String topicName) { this.topicName = topicName; } }
Обратите внимание, что служба резервирования использует файл конфигурации для хранения информации о конфигурации Cassandra и Kafka, которую мы можем просмотреть здесь reservation-service/src/main/resources/application.yml
:
# ---------------------------------------------------------- # Spring Boot Config # ---------------------------------------------------------- spring: application: name: Reservation Reservicess jackson: serialization: WRITE_DATES_AS_TIMESTAMPS: false server: port: 8080 # ---------------------------------------------------------- # DataStax Enterprise Java Driver Config # ---------------------------------------------------------- cassandra: contactPoint: 127.0.0.1 port: 9042 keyspaceName: reservation localDataCenterName: datacenter1 dropSchema: false # ---------------------------------------------------------- # Kafka Producer Config # ---------------------------------------------------------- kafka: bootstrap-servers: localhost:9092 client-id: ReservationService topicName: reservation
Таблицы Cassandra для хранения бронирований
В дистрибутив Cassandra входит cqlsh
, оболочка для выдачи команд на CQL. Запустим оболочку:
apache-cassandra-4.0-alpha4/bin/cqlsh
Мы должны увидеть приглашение cqlsh>
в терминале.
Как только подсказка будет доступна, мы можем загрузить файл схемы, который мы просмотрели ранее, чтобы создать пространство ключей, содержащее таблицы для резервирования:
SOURCE 'reservation-service/src/main/resources/reservation.cql';
Теперь мы можем просмотреть только что созданную схему с помощью команды DESCRIBE
:
DESCRIBE KEYSPACE reservation;
Давайте установим это как кейспейс для будущих команд:
USE reservation;
Обратите внимание, что существуют три разные таблицы для хранения данных резервирования с похожими столбцами, но с разными первичными ключами.
Наша естественная тенденция как специалистов по моделированию данных заключалась бы в том, чтобы сначала сосредоточиться на разработке таблиц для хранения бронирований и других записей, таких как профили гостей, и только потом начинать думать о запросах, которые будут получать к ним доступ. Но в моделировании данных Cassandra мы начинаем с наших запросов. Эти таблицы предназначены для поддержки запросов, которые определяют, как ваши пользователи будут получать доступ к бронированиям, что приводит к денормализованному дизайну.
Во-первых, таблица reservations_by_confirmation
поддерживает поиск бронирований по уникальному номеру подтверждения, предоставленному клиенту во время бронирования:
CREATE TABLE reservations_by_confirmation ( confirm_number text, hotel_id text, start_date date, end_date date, room_number smallint, guest_id uuid, PRIMARY KEY (confirm_number) )
Вот пример загрузки строки данных в эту таблицу:
INSERT INTO reservations_by_confirmation (confirm_number, hotel_id, start_date, end_date, room_number, guest_id) VALUES ('RS2G0Z', 'NY456', '2020-06-08', '2020-06-10', 111, 1b4d86f4-ccff-4256-a63d-45c905df2677);
Чтобы получить эту строку позже, мы использовали бы такой запрос:
SELECT * FROM reservations_by_confirmation WHERE confirm_number = 'RS2G0Z';
Во-вторых, таблица reservations_by_hotel_date
позволяет персоналу отеля просматривать записи предстоящих бронирований по датам, чтобы получить представление о том, как работает отель, например, когда отель был заполнен или не заполнен:
CREATE TABLE reservations_by_hotel_date ( hotel_id text, start_date date, end_date date, room_number smallint, confirm_number text, guest_id uuid, PRIMARY KEY ((hotel_id, start_date), room_number) );
Вот пример загрузки той же брони, которую мы только что загрузили в reservations_by_confirmation
, в эту дополнительную таблицу:
INSERT INTO reservations_by_hotel_date (confirm_number, hotel_id, start_date, end_date, room_number, guest_id) VALUES ('RS2G0Z', 'NY456', '2020-06-08', '2020-06-10', 111, 1b4d86f4-ccff-4256-a63d-45c905df2677);
Эта таблица может поддерживать два запроса. Во-первых, поскольку ключ раздела содержит hotel_id и start_date, мы можем получить все бронирования для определенного отеля и даты:
SELECT * FROM reservations_by_hotel_date WHERE hotel_id = 'NY456' AND start_date = '2020-06-08';
Также мы можем найти бронь на конкретный номер по дате:
SELECT * FROM reservations_by_hotel_date WHERE hotel_id = 'NY456' AND start_date = '2020-06-08' AND room_number = 111;
Точно так же таблицу reservations_by_guest
можно использовать для поиска бронирования по имени гостя для гостя, который забыл свой номер подтверждения. Для целей мы сосредоточимся на таблицах reservations_by_confirmation
и reservations_by_hotel_date
.
Мы заметим, что класс ReservationRepository
использует инструкции Cassandra BATCH для группировки изменений в таблицах reservations_by_confirmation
и reservations_by_hotel_date
, чтобы они выполнялись как одна операция CQL reservation-service/src/main/java/dev/cassandraguide/repository/ReservationRepository.java
:
ReservationRepository.java
/* * Copyright (C) 2017-2020 Jeff Carpenter * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package dev.cassandraguide.repository; import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.bindMarker; import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.deleteFrom; import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.selectFrom; import static com.datastax.oss.driver.api.querybuilder.SchemaBuilder.createTable; import static com.datastax.oss.driver.api.querybuilder.SchemaBuilder.createType; import static com.datastax.oss.driver.api.querybuilder.relation.Relation.column; import java.time.LocalDate; import java.util.List; import java.util.Objects; import java.util.Optional; import java.util.UUID; import java.util.stream.Collectors; import javax.annotation.PreDestroy; import com.fasterxml.jackson.core.JsonProcessingException; import dev.cassandraguide.model.Reservation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Profile; import org.springframework.lang.NonNull; import org.springframework.stereotype.Repository; import com.datastax.oss.driver.api.core.CqlIdentifier; import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.cql.BatchStatement; import com.datastax.oss.driver.api.core.cql.BoundStatement; import com.datastax.oss.driver.api.core.cql.DefaultBatchType; import com.datastax.oss.driver.api.core.cql.PreparedStatement; import com.datastax.oss.driver.api.core.cql.ResultSet; import com.datastax.oss.driver.api.core.cql.Row; import com.datastax.oss.driver.api.core.metadata.schema.ClusteringOrder; import com.datastax.oss.driver.api.core.type.DataTypes; import com.datastax.oss.driver.api.core.type.UserDefinedType; import com.datastax.oss.driver.api.querybuilder.QueryBuilder; // TODO: Review imports for publishing to Kafka import org.apache.kafka.clients.producer.*; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; /** * The goal of this project is to provide a minimally functional implementation of a microservice * that uses Apache Cassandra for its data storage. The reservation service is implemented as a * RESTful service using Spring Boot. * * @author Jeff Carpenter, Cedrick Lunven */ @Repository @Profile("!unit-test") // When I do some 'unit-test' no connectivity to DB public class ReservationRepository { /** Logger for the class. */ private static final Logger logger = LoggerFactory.getLogger(ReservationRepository.class); // Reservation Schema Constants public static final CqlIdentifier TYPE_ADDRESS = CqlIdentifier.fromCql("address"); public static final CqlIdentifier TABLE_RESERVATION_BY_HOTEL_DATE = CqlIdentifier.fromCql("reservations_by_hotel_date"); public static final CqlIdentifier TABLE_RESERVATION_BY_CONFI = CqlIdentifier.fromCql("reservations_by_confirmation"); public static final CqlIdentifier TABLE_RESERVATION_BY_GUEST = CqlIdentifier.fromCql("reservations_by_guest"); public static final CqlIdentifier TABLE_GUESTS = CqlIdentifier.fromCql("guests"); public static final CqlIdentifier STREET = CqlIdentifier.fromCql("street"); public static final CqlIdentifier CITY = CqlIdentifier.fromCql("city"); public static final CqlIdentifier STATE_PROVINCE = CqlIdentifier.fromCql("state_or_province"); public static final CqlIdentifier POSTAL_CODE = CqlIdentifier.fromCql("postal_code"); public static final CqlIdentifier COUNTRY = CqlIdentifier.fromCql("country"); public static final CqlIdentifier HOTEL_ID = CqlIdentifier.fromCql("hotel_id"); public static final CqlIdentifier START_DATE = CqlIdentifier.fromCql("start_date"); public static final CqlIdentifier END_DATE = CqlIdentifier.fromCql("end_date"); public static final CqlIdentifier ROOM_NUMBER = CqlIdentifier.fromCql("room_number"); public static final CqlIdentifier CONFIRM_NUMBER = CqlIdentifier.fromCql("confirm_number"); public static final CqlIdentifier GUEST_ID = CqlIdentifier.fromCql("guest_id"); public static final CqlIdentifier GUEST_LAST_NAME = CqlIdentifier.fromCql("guest_last_name"); public static final CqlIdentifier FIRSTNAME = CqlIdentifier.fromCql("first_name"); public static final CqlIdentifier LASTNAME = CqlIdentifier.fromCql("last_name"); public static final CqlIdentifier TITLE = CqlIdentifier.fromCql("title"); public static final CqlIdentifier EMAILS = CqlIdentifier.fromCql("emails"); public static final CqlIdentifier PHONE_NUMBERS = CqlIdentifier.fromCql("phone_numbers"); public static final CqlIdentifier ADDRESSES = CqlIdentifier.fromCql("addresses"); private PreparedStatement psExistReservation; private PreparedStatement psFindReservation; private PreparedStatement psInsertReservationByHotelDate; private PreparedStatement psInsertReservationByConfirmation; private PreparedStatement psDeleteReservationByHotelDate; private PreparedStatement psDeleteReservationByConfirmation; private PreparedStatement psSearchReservation; /** CqlSession holding metadata to interact with Cassandra. */ private CqlSession cqlSession; private CqlIdentifier keyspaceName; // TODO: Review variables used for publishing to Kafka /** KafkaProducer for publishing messages to Kafka. */ private KafkaProducer<String, String> kafkaProducer; private String kafkaTopicName; private ObjectMapper objectMapper; /** External Initialization. */ public ReservationRepository( @NonNull CqlSession cqlSession, @Qualifier("keyspace") @NonNull CqlIdentifier keyspaceName, @NonNull KafkaProducer<String, String> kafkaProducer, @NonNull String kafkaTopicName) { this.cqlSession = cqlSession; this.keyspaceName = keyspaceName; // TODO: Review initialization of objects needed for publishing to Kafka this.kafkaProducer = kafkaProducer; this.kafkaTopicName = kafkaTopicName; objectMapper = new ObjectMapper(); objectMapper.registerModule(new JavaTimeModule()); objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); objectMapper.enable(SerializationFeature.INDENT_OUTPUT); // Will create tables (if they do not exist) createReservationTables(); // Prepare Statements of reservation prepareStatements(); logger.info("Application initialized."); } /** * CqlSession is a stateful object handling TCP connections to nodes in the cluster. * This operation properly closes sockets when the application is stopped */ @PreDestroy public void cleanup() { if (null != cqlSession) { cqlSession.close(); logger.info("+ CqlSession has been successfully closed"); } } /** * Testing existence is useful for building correct semantics in the RESTful API. To evaluate existence find the * table where confirmation number is partition key which is reservations_by_confirmation * * @param confirmationNumber * unique identifier for confirmation * @return * true if the reservation exists, false if it does not */ public boolean exists(String confirmationNumber) { return cqlSession.execute(psExistReservation.bind(confirmationNumber)) .getAvailableWithoutFetching() > 0; } /** * Similar to exists() but maps and parses results. * * @param confirmationNumber * unique identifier for confirmation * @return * reservation if present or empty */ @NonNull public Optional<Reservation> findByConfirmationNumber(@NonNull String confirmationNumber) { ResultSet resultSet = cqlSession.execute(psFindReservation.bind(confirmationNumber)); // Hint: an empty result might not be an error as this method is sometimes used to check whether a // reservation with this confirmation number exists Row row = resultSet.one(); if (row == null) { logger.debug("Unable to load reservation with confirmation number: " + confirmationNumber); return Optional.empty(); } // Hint: If there is a result, create a new reservation object and set the values // Bonus: factor the logic to extract a reservation from a row into a separate method // (you will reuse it again later in getAllReservations()) return Optional.of(mapRowToReservation(row)); } /** * Create new entry in multiple tables for this reservation. * * @param reservation * current reservation object * @return * confirmation number for the reservation * */ public String upsert(Reservation reservation) { Objects.requireNonNull(reservation); if (null == reservation.getConfirmationNumber()) { // Generating a new reservation number if none has been provided reservation.setConfirmationNumber(UUID.randomUUID().toString()); } // Insert into 'reservations_by_hotel_date' BoundStatement bsInsertReservationByHotel = psInsertReservationByHotelDate.bind(reservation.getHotelId(), reservation.getStartDate(), reservation.getEndDate(), reservation.getRoomNumber(), reservation.getConfirmationNumber(), reservation.getGuestId()); // Insert into 'reservations_by_confirmation' BoundStatement bsInsertReservationByConfirmation = psInsertReservationByConfirmation.bind(reservation.getConfirmationNumber(), reservation.getHotelId(), reservation.getStartDate(), reservation.getEndDate(), reservation.getRoomNumber(), reservation.getGuestId()); BatchStatement batchInsertReservation = BatchStatement .builder(DefaultBatchType.LOGGED) .addStatement(bsInsertReservationByHotel) .addStatement(bsInsertReservationByConfirmation) .build(); cqlSession.execute(batchInsertReservation); // TODO: Publish message to Kafka containing reservation try { String reservationJson = objectMapper.writeValueAsString(reservation); // HINT: use the constructor ProducerRecord(String topic, K key, V value) // with the reservation confirmation number as the key, and the JSON string as the value ProducerRecord<String, String> record = null; kafkaProducer.send(record); } catch (Exception e) { logger.warn("Error publishing reservation message to Kafka: {}", e); } return reservation.getConfirmationNumber(); } /** * We pick 'reservations_by_confirmation' table to list reservations * BUT we could have used 'reservations_by_hotel_date' (as no key provided in request) * * @return * list containing all reservations */ public List<Reservation> findAll() { return cqlSession.execute(selectFrom(keyspaceName, TABLE_RESERVATION_BY_CONFI).all().build()) .all() // no paging we retrieve all objects .stream() // because we are good people .map(this::mapRowToReservation) // Mapping row as Reservation .collect(Collectors.toList()); // Back to list objects } /** * Deleting a reservation. * * @param confirmationNumber * unique identifier for confirmation. */ public boolean delete(String confirmationNumber) { // Retrieving entire reservation in order to obtain the attributes we will need to delete from // reservations_by_hotel_date table Optional<Reservation> reservationToDelete = this.findByConfirmationNumber(confirmationNumber); if (reservationToDelete.isPresent()) { // Delete from 'reservations_by_hotel_date' Reservation reservation = reservationToDelete.get(); BoundStatement bsDeleteReservationByHotelDate = psDeleteReservationByHotelDate.bind(reservation.getHotelId(), reservation.getStartDate(), reservation.getRoomNumber()); // Delete from 'reservations_by_confirmation' BoundStatement bsDeleteReservationByConfirmation = psDeleteReservationByConfirmation.bind(confirmationNumber); BatchStatement batchDeleteReservation = BatchStatement .builder(DefaultBatchType.LOGGED) .addStatement(bsDeleteReservationByHotelDate) .addStatement(bsDeleteReservationByConfirmation) .build(); cqlSession.execute(batchDeleteReservation); // TODO: Publish message to Kafka with empty payload to indicate deletion // HINT: use the constructor ProducerRecord(String topic, K key, V value) // with the reservation confirmation number as the key, and an empty string as the value ProducerRecord<String, String> record = null; kafkaProducer.send(record); return true; } return false; } /** * Search all reservation for an hotel id and LocalDate. * * @param hotelId * hotel identifier * @param date * searched Date * @return * list of reservations matching the search criteria */ public List<Reservation> findByHotelAndDate(String hotelId, LocalDate date) { Objects.requireNonNull(hotelId); Objects.requireNonNull(date); return cqlSession.execute(psSearchReservation.bind(hotelId, date)) .all() // no paging we retrieve all objects .stream() // because we are good people .map(this::mapRowToReservation) // Mapping row as Reservation .collect(Collectors.toList()); // Back to list objects } /** * Utility method to marshal a row as expected Reservation Bean. * * @param row * current row from ResultSet * @return * object */ private Reservation mapRowToReservation(Row row) { Reservation reservation = new Reservation(); reservation.setHotelId(row.getString(HOTEL_ID)); reservation.setConfirmationNumber(row.getString(CONFIRM_NUMBER)); reservation.setGuestId(row.getUuid(GUEST_ID)); reservation.setRoomNumber(row.getShort(ROOM_NUMBER)); reservation.setStartDate(row.getLocalDate(START_DATE)); reservation.setEndDate(row.getLocalDate(END_DATE)); return reservation; } /** * Create Keyspace and relevant tables as per defined in 'reservation.cql' */ public void createReservationTables() { /** * Create TYPE 'Address' if not exists * * CREATE TYPE reservation.address ( * street text, * city text, * state_or_province text, * postal_code text, * country text * ); */ cqlSession.execute( createType(keyspaceName, TYPE_ADDRESS) .ifNotExists() .withField(STREET, DataTypes.TEXT) .withField(CITY, DataTypes.TEXT) .withField(STATE_PROVINCE, DataTypes.TEXT) .withField(POSTAL_CODE, DataTypes.TEXT) .withField(COUNTRY, DataTypes.TEXT) .build()); logger.debug("+ Type '{}' has been created (if needed)", TYPE_ADDRESS.asInternal()); /** * CREATE TABLE reservation.reservations_by_hotel_date ( * hotel_id text, * start_date date, * end_date date, * room_number smallint, * confirm_number text, * guest_id uuid, * PRIMARY KEY ((hotel_id, start_date), room_number) * ); */ cqlSession.execute(createTable(keyspaceName, TABLE_RESERVATION_BY_HOTEL_DATE) .ifNotExists() .withPartitionKey(HOTEL_ID, DataTypes.TEXT) .withPartitionKey(START_DATE, DataTypes.DATE) .withClusteringColumn(ROOM_NUMBER, DataTypes.SMALLINT) .withColumn(END_DATE, DataTypes.DATE) .withColumn(CONFIRM_NUMBER, DataTypes.TEXT) .withColumn(GUEST_ID, DataTypes.UUID) .withClusteringOrder(ROOM_NUMBER, ClusteringOrder.ASC) .withComment("Q7. Find reservations by hotel and date") .build()); logger.debug("+ Table '{}' has been created (if needed)", TABLE_RESERVATION_BY_HOTEL_DATE.asInternal()); /** * CREATE TABLE reservation.reservations_by_confirmation ( * confirm_number text PRIMARY KEY, * hotel_id text, * start_date date, * end_date date, * room_number smallint, * guest_id uuid * ); */ cqlSession.execute(createTable(keyspaceName, TABLE_RESERVATION_BY_CONFI) .ifNotExists() .withPartitionKey(CONFIRM_NUMBER, DataTypes.TEXT) .withColumn(HOTEL_ID, DataTypes.TEXT) .withColumn(START_DATE, DataTypes.DATE) .withColumn(END_DATE, DataTypes.DATE) .withColumn(ROOM_NUMBER, DataTypes.SMALLINT) .withColumn(GUEST_ID, DataTypes.UUID) .build()); logger.debug("+ Table '{}' has been created (if needed)", TABLE_RESERVATION_BY_CONFI.asInternal()); /** * CREATE TABLE reservation.reservations_by_guest ( * guest_last_name text, * hotel_id text, * start_date date, * end_date date, * room_number smallint, * confirm_number text, * guest_id uuid, * PRIMARY KEY ((guest_last_name), hotel_id) * ); */ cqlSession.execute(createTable(keyspaceName, TABLE_RESERVATION_BY_GUEST) .ifNotExists() .withPartitionKey(GUEST_LAST_NAME, DataTypes.TEXT) .withClusteringColumn(HOTEL_ID, DataTypes.TEXT) .withColumn(START_DATE, DataTypes.DATE) .withColumn(END_DATE, DataTypes.DATE) .withColumn(ROOM_NUMBER, DataTypes.SMALLINT) .withColumn(CONFIRM_NUMBER, DataTypes.TEXT) .withColumn(GUEST_ID, DataTypes.UUID) .withComment("Q8. Find reservations by guest name") .build()); logger.debug("+ Table '{}' has been created (if needed)", TABLE_RESERVATION_BY_GUEST.asInternal()); /** * CREATE TABLE reservation.guests ( * guest_id uuid PRIMARY KEY, * first_name text, * last_name text, * title text, * emails set<text>, * phone_numbers list<text>, * addresses map<text, frozen<address>>, * confirm_number text * ); */ UserDefinedType udtAddressType = cqlSession.getMetadata().getKeyspace(keyspaceName).get() // Retrieving KeySpaceMetadata .getUserDefinedType(TYPE_ADDRESS).get(); // Looking for UDT (extending DataType) cqlSession.execute(createTable(keyspaceName, TABLE_GUESTS) .ifNotExists() .withPartitionKey(GUEST_ID, DataTypes.UUID) .withColumn(FIRSTNAME, DataTypes.TEXT) .withColumn(LASTNAME, DataTypes.TEXT) .withColumn(TITLE, DataTypes.TEXT) .withColumn(EMAILS, DataTypes.setOf(DataTypes.TEXT)) .withColumn(PHONE_NUMBERS, DataTypes.listOf(DataTypes.TEXT)) .withColumn(ADDRESSES, DataTypes.mapOf(DataTypes.TEXT, udtAddressType, true)) .withColumn(CONFIRM_NUMBER, DataTypes.TEXT) .withComment("Q9. Find guest by ID") .build()); logger.debug("+ Table '{}' has been created (if needed)", TABLE_GUESTS.asInternal()); logger.info("Schema has been successfully initialized."); } private void prepareStatements() { if (psExistReservation == null) { psExistReservation = cqlSession.prepare(selectFrom(keyspaceName, TABLE_RESERVATION_BY_CONFI).column(CONFIRM_NUMBER) .where(column(CONFIRM_NUMBER).isEqualTo(bindMarker(CONFIRM_NUMBER))) .build()); psFindReservation = cqlSession.prepare( selectFrom(keyspaceName, TABLE_RESERVATION_BY_CONFI).all() .where(column(CONFIRM_NUMBER).isEqualTo(bindMarker(CONFIRM_NUMBER))) .build()); psSearchReservation = cqlSession.prepare( selectFrom(keyspaceName, TABLE_RESERVATION_BY_HOTEL_DATE).all() .where(column(HOTEL_ID).isEqualTo(bindMarker(HOTEL_ID))) .where(column(START_DATE).isEqualTo(bindMarker(START_DATE))) .build()); psDeleteReservationByConfirmation = cqlSession.prepare( deleteFrom(keyspaceName, TABLE_RESERVATION_BY_CONFI) .where(column(CONFIRM_NUMBER).isEqualTo(bindMarker(CONFIRM_NUMBER))) .build()); psDeleteReservationByHotelDate = cqlSession.prepare( deleteFrom(keyspaceName, TABLE_RESERVATION_BY_HOTEL_DATE) .where(column(HOTEL_ID).isEqualTo(bindMarker(HOTEL_ID))) .where(column(START_DATE).isEqualTo(bindMarker(START_DATE))) .where(column(ROOM_NUMBER).isEqualTo(bindMarker(ROOM_NUMBER))) .build()); psInsertReservationByHotelDate = cqlSession.prepare(QueryBuilder.insertInto(keyspaceName, TABLE_RESERVATION_BY_HOTEL_DATE) .value(HOTEL_ID, bindMarker(HOTEL_ID)) .value(START_DATE, bindMarker(START_DATE)) .value(END_DATE, bindMarker(END_DATE)) .value(ROOM_NUMBER, bindMarker(ROOM_NUMBER)) .value(CONFIRM_NUMBER, bindMarker(CONFIRM_NUMBER)) .value(GUEST_ID, bindMarker(GUEST_ID)) .build()); psInsertReservationByConfirmation = cqlSession.prepare(QueryBuilder.insertInto(keyspaceName, TABLE_RESERVATION_BY_CONFI) .value(CONFIRM_NUMBER, bindMarker(CONFIRM_NUMBER)) .value(HOTEL_ID, bindMarker(HOTEL_ID)) .value(START_DATE, bindMarker(START_DATE)) .value(END_DATE, bindMarker(END_DATE)) .value(ROOM_NUMBER, bindMarker(ROOM_NUMBER)) .value(GUEST_ID, bindMarker(GUEST_ID)) .build()); logger.info("Statements have been successfully prepared."); } } }
Теперь мы закончили использовать cqlsh
, давайте выйдем:
Exit
Устанавливаем и запускаем Apache Kafka
Теперь давайте сосредоточимся на расширении службы резервирования, чтобы она публиковала события в Apache Kafka.
Сначала скачиваем и устанавливаем Kafka:
wget
http://archive.apache.org/dist/kafka/2.5.0/kafka_2.12-2.5.0.tgz
tar xzf kafka_2.12-2.5.0.tgz
Теперь у вас есть папка в вашей файловой системе с именем _kafka_x.x-x.x.x_
(на основе версий Scala и Kafka соответственно).
Чтобы запустить кластер Kafka с одним узлом, вам нужно запустить два процесса: Zookeeper и брокер Kafka. Kafka использует Zookeeper для управления кластером. Он работает вместе с каждым брокером и гарантирует, что брокер может взаимодействовать с кластером. Zookeeper должен быть запущен, чтобы брокер мог получать или передавать сообщения. Запускаем Zookeeper командой:
kafka_2.12-2.5.0/bin/zookeeper-server-start.sh kafka_2.12-2.5.0/config/zookeeper.properties &> zookeeper_start.log &
Далее запускаем брокера Kafka:
kafka_2.12-2.5.0/bin/kafka-server-start.sh kafka_2.12-2.5.0/config/server.properties &> kafka_start.log &
Создаем Kafka Topic и публикуем сообщения
Kafka поддерживает стиль обмена сообщениями публикации и подписки, при котором сообщения публикуются в темах в формате ключ-значение. Подобно Cassandra, Kafka разделяет данные с помощью ключа и реплицирует данные между несколькими узлами, известными как брокеры в Kafka.
Теперь давайте создадим топик Kafka для данных бронирования:
kafka_2.12-2.5.0/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic reservation --config retention.ms=-1
Далее мы обновим службу резервирования, чтобы публиковать ее в разделе Kafka о бронировании каждый раз, когда резервирование изменяется или удаляется. Мы будем вводить элементы API понемногу, и сначала мы получим опыт работы с ними в jshell, Java REPL, прежде чем обновлять фактический код в службе бронирования:
cd reservation-service mvn -q compile com.github.johnpoth:jshell-maven-plugin:1.3:run
Давайте создадим KafkaProducer, который мы можем использовать для публикации сообщений в топике Kafka:
import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.CLIENT_ID_CONFIG, "ReservationService"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); KafkaProducer<String, String> producer = new KafkaProducer<>(props);
Давайте создадим объект Reservation, используя класс, который служба Reservation использует для представления обрабатываемого нами бронирования:
import java.time.LocalDate; import java.util.UUID; import dev.cassandraguide.model.Reservation; String confirmationNumber = "RS2G0Z" Reservation reservation = new Reservation(); reservation.setConfirmationNumber(confirmationNumber); reservation.setStartDate(LocalDate.now()); reservation.setEndDate(LocalDate.now().plusDays(2)); reservation.setHotelId("NY456"); reservation.setGuestId(UUID.fromString("1b4d86f4-ccff-4256-a63d-45c905df2677")); reservation.setRoomNumber((short)111);
Теперь сериализуйте этот класс в строку JSON, используя Jackson ObjectMapper:
Параметры, выбранные в objectMapper, предназначены для упрощения форматирования даты и красивой печати JSON. Чтобы увидеть результирующую строку, мы выполняем:
System.out.println(reservationJson)
;
Теперь мы можем опубликовать сообщение в топике бронирования Kafka, используя confirmNumber
в качестве ключа и reservationJson
в качестве полезной нагрузки:
ProducerRecord<String, String> record = new ProducerRecord<>("reservation", confirmationNumber, reservationJson); producer.send(record);
Выйдем из jshell
: exit
Давайте воспользуемся простым консьюмером командной строки, чтобы прочитать сообщение, которое мы только что опубликовали:
cd kafka_2.12-2.5.0/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic reservation --max-messages 10
Выйти из консьюмера можно с помощью Ctrl+C.
Обновление службы бронирования для публикации в Kafka
Теперь, когда у нас есть некоторый опыт публикации в Kafka, давайте обновим службу резервирования, чтобы публиковать события в Kafka при изменении или удалении резервирования. Взглянем на класс ReservationRepository
:
ReservationRepository.java
/* * Copyright (C) 2017-2020 Jeff Carpenter * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package dev.cassandraguide.repository; import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.bindMarker; import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.deleteFrom; import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.selectFrom; import static com.datastax.oss.driver.api.querybuilder.SchemaBuilder.createTable; import static com.datastax.oss.driver.api.querybuilder.SchemaBuilder.createType; import static com.datastax.oss.driver.api.querybuilder.relation.Relation.column; import java.time.LocalDate; import java.util.List; import java.util.Objects; import java.util.Optional; import java.util.UUID; import java.util.stream.Collectors; import javax.annotation.PreDestroy; import com.fasterxml.jackson.core.JsonProcessingException; import dev.cassandraguide.model.Reservation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Profile; import org.springframework.lang.NonNull; import org.springframework.stereotype.Repository; import com.datastax.oss.driver.api.core.CqlIdentifier; import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.cql.BatchStatement; import com.datastax.oss.driver.api.core.cql.BoundStatement; import com.datastax.oss.driver.api.core.cql.DefaultBatchType; import com.datastax.oss.driver.api.core.cql.PreparedStatement; import com.datastax.oss.driver.api.core.cql.ResultSet; import com.datastax.oss.driver.api.core.cql.Row; import com.datastax.oss.driver.api.core.metadata.schema.ClusteringOrder; import com.datastax.oss.driver.api.core.type.DataTypes; import com.datastax.oss.driver.api.core.type.UserDefinedType; import com.datastax.oss.driver.api.querybuilder.QueryBuilder; // TODO: Review imports for publishing to Kafka import org.apache.kafka.clients.producer.*; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; /** * The goal of this project is to provide a minimally functional implementation of a microservice * that uses Apache Cassandra for its data storage. The reservation service is implemented as a * RESTful service using Spring Boot. * * @author Jeff Carpenter, Cedrick Lunven */ @Repository @Profile("!unit-test") // When I do some 'unit-test' no connectivity to DB public class ReservationRepository { /** Logger for the class. */ private static final Logger logger = LoggerFactory.getLogger(ReservationRepository.class); // Reservation Schema Constants public static final CqlIdentifier TYPE_ADDRESS = CqlIdentifier.fromCql("address"); public static final CqlIdentifier TABLE_RESERVATION_BY_HOTEL_DATE = CqlIdentifier.fromCql("reservations_by_hotel_date"); public static final CqlIdentifier TABLE_RESERVATION_BY_CONFI = CqlIdentifier.fromCql("reservations_by_confirmation"); public static final CqlIdentifier TABLE_RESERVATION_BY_GUEST = CqlIdentifier.fromCql("reservations_by_guest"); public static final CqlIdentifier TABLE_GUESTS = CqlIdentifier.fromCql("guests"); public static final CqlIdentifier STREET = CqlIdentifier.fromCql("street"); public static final CqlIdentifier CITY = CqlIdentifier.fromCql("city"); public static final CqlIdentifier STATE_PROVINCE = CqlIdentifier.fromCql("state_or_province"); public static final CqlIdentifier POSTAL_CODE = CqlIdentifier.fromCql("postal_code"); public static final CqlIdentifier COUNTRY = CqlIdentifier.fromCql("country"); public static final CqlIdentifier HOTEL_ID = CqlIdentifier.fromCql("hotel_id"); public static final CqlIdentifier START_DATE = CqlIdentifier.fromCql("start_date"); public static final CqlIdentifier END_DATE = CqlIdentifier.fromCql("end_date"); public static final CqlIdentifier ROOM_NUMBER = CqlIdentifier.fromCql("room_number"); public static final CqlIdentifier CONFIRM_NUMBER = CqlIdentifier.fromCql("confirm_number"); public static final CqlIdentifier GUEST_ID = CqlIdentifier.fromCql("guest_id"); public static final CqlIdentifier GUEST_LAST_NAME = CqlIdentifier.fromCql("guest_last_name"); public static final CqlIdentifier FIRSTNAME = CqlIdentifier.fromCql("first_name"); public static final CqlIdentifier LASTNAME = CqlIdentifier.fromCql("last_name"); public static final CqlIdentifier TITLE = CqlIdentifier.fromCql("title"); public static final CqlIdentifier EMAILS = CqlIdentifier.fromCql("emails"); public static final CqlIdentifier PHONE_NUMBERS = CqlIdentifier.fromCql("phone_numbers"); public static final CqlIdentifier ADDRESSES = CqlIdentifier.fromCql("addresses"); private PreparedStatement psExistReservation; private PreparedStatement psFindReservation; private PreparedStatement psInsertReservationByHotelDate; private PreparedStatement psInsertReservationByConfirmation; private PreparedStatement psDeleteReservationByHotelDate; private PreparedStatement psDeleteReservationByConfirmation; private PreparedStatement psSearchReservation; /** CqlSession holding metadata to interact with Cassandra. */ private CqlSession cqlSession; private CqlIdentifier keyspaceName; // TODO: Review variables used for publishing to Kafka /** KafkaProducer for publishing messages to Kafka. */ private KafkaProducer<String, String> kafkaProducer; private String kafkaTopicName; private ObjectMapper objectMapper; /** External Initialization. */ public ReservationRepository( @NonNull CqlSession cqlSession, @Qualifier("keyspace") @NonNull CqlIdentifier keyspaceName, @NonNull KafkaProducer<String, String> kafkaProducer, @NonNull String kafkaTopicName) { this.cqlSession = cqlSession; this.keyspaceName = keyspaceName; // TODO: Review initialization of objects needed for publishing to Kafka this.kafkaProducer = kafkaProducer; this.kafkaTopicName = kafkaTopicName; objectMapper = new ObjectMapper(); objectMapper.registerModule(new JavaTimeModule()); objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); objectMapper.enable(SerializationFeature.INDENT_OUTPUT); // Will create tables (if they do not exist) createReservationTables(); // Prepare Statements of reservation prepareStatements(); logger.info("Application initialized."); } /** * CqlSession is a stateful object handling TCP connections to nodes in the cluster. * This operation properly closes sockets when the application is stopped */ @PreDestroy public void cleanup() { if (null != cqlSession) { cqlSession.close(); logger.info("+ CqlSession has been successfully closed"); } } /** * Testing existence is useful for building correct semantics in the RESTful API. To evaluate existence find the * table where confirmation number is partition key which is reservations_by_confirmation * * @param confirmationNumber * unique identifier for confirmation * @return * true if the reservation exists, false if it does not */ public boolean exists(String confirmationNumber) { return cqlSession.execute(psExistReservation.bind(confirmationNumber)) .getAvailableWithoutFetching() > 0; } /** * Similar to exists() but maps and parses results. * * @param confirmationNumber * unique identifier for confirmation * @return * reservation if present or empty */ @NonNull public Optional<Reservation> findByConfirmationNumber(@NonNull String confirmationNumber) { ResultSet resultSet = cqlSession.execute(psFindReservation.bind(confirmationNumber)); // Hint: an empty result might not be an error as this method is sometimes used to check whether a // reservation with this confirmation number exists Row row = resultSet.one(); if (row == null) { logger.debug("Unable to load reservation with confirmation number: " + confirmationNumber); return Optional.empty(); } // Hint: If there is a result, create a new reservation object and set the values // Bonus: factor the logic to extract a reservation from a row into a separate method // (you will reuse it again later in getAllReservations()) return Optional.of(mapRowToReservation(row)); } /** * Create new entry in multiple tables for this reservation. * * @param reservation * current reservation object * @return * confirmation number for the reservation * */ public String upsert(Reservation reservation) { Objects.requireNonNull(reservation); if (null == reservation.getConfirmationNumber()) { // Generating a new reservation number if none has been provided reservation.setConfirmationNumber(UUID.randomUUID().toString()); } // Insert into 'reservations_by_hotel_date' BoundStatement bsInsertReservationByHotel = psInsertReservationByHotelDate.bind(reservation.getHotelId(), reservation.getStartDate(), reservation.getEndDate(), reservation.getRoomNumber(), reservation.getConfirmationNumber(), reservation.getGuestId()); // Insert into 'reservations_by_confirmation' BoundStatement bsInsertReservationByConfirmation = psInsertReservationByConfirmation.bind(reservation.getConfirmationNumber(), reservation.getHotelId(), reservation.getStartDate(), reservation.getEndDate(), reservation.getRoomNumber(), reservation.getGuestId()); BatchStatement batchInsertReservation = BatchStatement .builder(DefaultBatchType.LOGGED) .addStatement(bsInsertReservationByHotel) .addStatement(bsInsertReservationByConfirmation) .build(); cqlSession.execute(batchInsertReservation); // TODO: Publish message to Kafka containing reservation try { String reservationJson = objectMapper.writeValueAsString(reservation); // HINT: use the constructor ProducerRecord(String topic, K key, V value) // with the reservation confirmation number as the key, and the JSON string as the value ProducerRecord<String, String> record = null; kafkaProducer.send(record); } catch (Exception e) { logger.warn("Error publishing reservation message to Kafka: {}", e); } return reservation.getConfirmationNumber(); } /** * We pick 'reservations_by_confirmation' table to list reservations * BUT we could have used 'reservations_by_hotel_date' (as no key provided in request) * * @return * list containing all reservations */ public List<Reservation> findAll() { return cqlSession.execute(selectFrom(keyspaceName, TABLE_RESERVATION_BY_CONFI).all().build()) .all() // no paging we retrieve all objects .stream() // because we are good people .map(this::mapRowToReservation) // Mapping row as Reservation .collect(Collectors.toList()); // Back to list objects } /** * Deleting a reservation. * * @param confirmationNumber * unique identifier for confirmation. */ public boolean delete(String confirmationNumber) { // Retrieving entire reservation in order to obtain the attributes we will need to delete from // reservations_by_hotel_date table Optional<Reservation> reservationToDelete = this.findByConfirmationNumber(confirmationNumber); if (reservationToDelete.isPresent()) { // Delete from 'reservations_by_hotel_date' Reservation reservation = reservationToDelete.get(); BoundStatement bsDeleteReservationByHotelDate = psDeleteReservationByHotelDate.bind(reservation.getHotelId(), reservation.getStartDate(), reservation.getRoomNumber()); // Delete from 'reservations_by_confirmation' BoundStatement bsDeleteReservationByConfirmation = psDeleteReservationByConfirmation.bind(confirmationNumber); BatchStatement batchDeleteReservation = BatchStatement .builder(DefaultBatchType.LOGGED) .addStatement(bsDeleteReservationByHotelDate) .addStatement(bsDeleteReservationByConfirmation) .build(); cqlSession.execute(batchDeleteReservation); // TODO: Publish message to Kafka with empty payload to indicate deletion // HINT: use the constructor ProducerRecord(String topic, K key, V value) // with the reservation confirmation number as the key, and an empty string as the value ProducerRecord<String, String> record = null; kafkaProducer.send(record); return true; } return false; } /** * Search all reservation for an hotel id and LocalDate. * * @param hotelId * hotel identifier * @param date * searched Date * @return * list of reservations matching the search criteria */ public List<Reservation> findByHotelAndDate(String hotelId, LocalDate date) { Objects.requireNonNull(hotelId); Objects.requireNonNull(date); return cqlSession.execute(psSearchReservation.bind(hotelId, date)) .all() // no paging we retrieve all objects .stream() // because we are good people .map(this::mapRowToReservation) // Mapping row as Reservation .collect(Collectors.toList()); // Back to list objects } /** * Utility method to marshal a row as expected Reservation Bean. * * @param row * current row from ResultSet * @return * object */ private Reservation mapRowToReservation(Row row) { Reservation reservation = new Reservation(); reservation.setHotelId(row.getString(HOTEL_ID)); reservation.setConfirmationNumber(row.getString(CONFIRM_NUMBER)); reservation.setGuestId(row.getUuid(GUEST_ID)); reservation.setRoomNumber(row.getShort(ROOM_NUMBER)); reservation.setStartDate(row.getLocalDate(START_DATE)); reservation.setEndDate(row.getLocalDate(END_DATE)); return reservation; } /** * Create Keyspace and relevant tables as per defined in 'reservation.cql' */ public void createReservationTables() { /** * Create TYPE 'Address' if not exists * * CREATE TYPE reservation.address ( * street text, * city text, * state_or_province text, * postal_code text, * country text * ); */ cqlSession.execute( createType(keyspaceName, TYPE_ADDRESS) .ifNotExists() .withField(STREET, DataTypes.TEXT) .withField(CITY, DataTypes.TEXT) .withField(STATE_PROVINCE, DataTypes.TEXT) .withField(POSTAL_CODE, DataTypes.TEXT) .withField(COUNTRY, DataTypes.TEXT) .build()); logger.debug("+ Type '{}' has been created (if needed)", TYPE_ADDRESS.asInternal()); /** * CREATE TABLE reservation.reservations_by_hotel_date ( * hotel_id text, * start_date date, * end_date date, * room_number smallint, * confirm_number text, * guest_id uuid, * PRIMARY KEY ((hotel_id, start_date), room_number) * ); */ cqlSession.execute(createTable(keyspaceName, TABLE_RESERVATION_BY_HOTEL_DATE) .ifNotExists() .withPartitionKey(HOTEL_ID, DataTypes.TEXT) .withPartitionKey(START_DATE, DataTypes.DATE) .withClusteringColumn(ROOM_NUMBER, DataTypes.SMALLINT) .withColumn(END_DATE, DataTypes.DATE) .withColumn(CONFIRM_NUMBER, DataTypes.TEXT) .withColumn(GUEST_ID, DataTypes.UUID) .withClusteringOrder(ROOM_NUMBER, ClusteringOrder.ASC) .withComment("Q7. Find reservations by hotel and date") .build()); logger.debug("+ Table '{}' has been created (if needed)", TABLE_RESERVATION_BY_HOTEL_DATE.asInternal()); /** * CREATE TABLE reservation.reservations_by_confirmation ( * confirm_number text PRIMARY KEY, * hotel_id text, * start_date date, * end_date date, * room_number smallint, * guest_id uuid * ); */ cqlSession.execute(createTable(keyspaceName, TABLE_RESERVATION_BY_CONFI) .ifNotExists() .withPartitionKey(CONFIRM_NUMBER, DataTypes.TEXT) .withColumn(HOTEL_ID, DataTypes.TEXT) .withColumn(START_DATE, DataTypes.DATE) .withColumn(END_DATE, DataTypes.DATE) .withColumn(ROOM_NUMBER, DataTypes.SMALLINT) .withColumn(GUEST_ID, DataTypes.UUID) .build()); logger.debug("+ Table '{}' has been created (if needed)", TABLE_RESERVATION_BY_CONFI.asInternal()); /** * CREATE TABLE reservation.reservations_by_guest ( * guest_last_name text, * hotel_id text, * start_date date, * end_date date, * room_number smallint, * confirm_number text, * guest_id uuid, * PRIMARY KEY ((guest_last_name), hotel_id) * ); */ cqlSession.execute(createTable(keyspaceName, TABLE_RESERVATION_BY_GUEST) .ifNotExists() .withPartitionKey(GUEST_LAST_NAME, DataTypes.TEXT) .withClusteringColumn(HOTEL_ID, DataTypes.TEXT) .withColumn(START_DATE, DataTypes.DATE) .withColumn(END_DATE, DataTypes.DATE) .withColumn(ROOM_NUMBER, DataTypes.SMALLINT) .withColumn(CONFIRM_NUMBER, DataTypes.TEXT) .withColumn(GUEST_ID, DataTypes.UUID) .withComment("Q8. Find reservations by guest name") .build()); logger.debug("+ Table '{}' has been created (if needed)", TABLE_RESERVATION_BY_GUEST.asInternal()); /** * CREATE TABLE reservation.guests ( * guest_id uuid PRIMARY KEY, * first_name text, * last_name text, * title text, * emails set<text>, * phone_numbers list<text>, * addresses map<text, frozen<address>>, * confirm_number text * ); */ UserDefinedType udtAddressType = cqlSession.getMetadata().getKeyspace(keyspaceName).get() // Retrieving KeySpaceMetadata .getUserDefinedType(TYPE_ADDRESS).get(); // Looking for UDT (extending DataType) cqlSession.execute(createTable(keyspaceName, TABLE_GUESTS) .ifNotExists() .withPartitionKey(GUEST_ID, DataTypes.UUID) .withColumn(FIRSTNAME, DataTypes.TEXT) .withColumn(LASTNAME, DataTypes.TEXT) .withColumn(TITLE, DataTypes.TEXT) .withColumn(EMAILS, DataTypes.setOf(DataTypes.TEXT)) .withColumn(PHONE_NUMBERS, DataTypes.listOf(DataTypes.TEXT)) .withColumn(ADDRESSES, DataTypes.mapOf(DataTypes.TEXT, udtAddressType, true)) .withColumn(CONFIRM_NUMBER, DataTypes.TEXT) .withComment("Q9. Find guest by ID") .build()); logger.debug("+ Table '{}' has been created (if needed)", TABLE_GUESTS.asInternal()); logger.info("Schema has been successfully initialized."); } private void prepareStatements() { if (psExistReservation == null) { psExistReservation = cqlSession.prepare(selectFrom(keyspaceName, TABLE_RESERVATION_BY_CONFI).column(CONFIRM_NUMBER) .where(column(CONFIRM_NUMBER).isEqualTo(bindMarker(CONFIRM_NUMBER))) .build()); psFindReservation = cqlSession.prepare( selectFrom(keyspaceName, TABLE_RESERVATION_BY_CONFI).all() .where(column(CONFIRM_NUMBER).isEqualTo(bindMarker(CONFIRM_NUMBER))) .build()); psSearchReservation = cqlSession.prepare( selectFrom(keyspaceName, TABLE_RESERVATION_BY_HOTEL_DATE).all() .where(column(HOTEL_ID).isEqualTo(bindMarker(HOTEL_ID))) .where(column(START_DATE).isEqualTo(bindMarker(START_DATE))) .build()); psDeleteReservationByConfirmation = cqlSession.prepare( deleteFrom(keyspaceName, TABLE_RESERVATION_BY_CONFI) .where(column(CONFIRM_NUMBER).isEqualTo(bindMarker(CONFIRM_NUMBER))) .build()); psDeleteReservationByHotelDate = cqlSession.prepare( deleteFrom(keyspaceName, TABLE_RESERVATION_BY_HOTEL_DATE) .where(column(HOTEL_ID).isEqualTo(bindMarker(HOTEL_ID))) .where(column(START_DATE).isEqualTo(bindMarker(START_DATE))) .where(column(ROOM_NUMBER).isEqualTo(bindMarker(ROOM_NUMBER))) .build()); psInsertReservationByHotelDate = cqlSession.prepare(QueryBuilder.insertInto(keyspaceName, TABLE_RESERVATION_BY_HOTEL_DATE) .value(HOTEL_ID, bindMarker(HOTEL_ID)) .value(START_DATE, bindMarker(START_DATE)) .value(END_DATE, bindMarker(END_DATE)) .value(ROOM_NUMBER, bindMarker(ROOM_NUMBER)) .value(CONFIRM_NUMBER, bindMarker(CONFIRM_NUMBER)) .value(GUEST_ID, bindMarker(GUEST_ID)) .build()); psInsertReservationByConfirmation = cqlSession.prepare(QueryBuilder.insertInto(keyspaceName, TABLE_RESERVATION_BY_CONFI) .value(CONFIRM_NUMBER, bindMarker(CONFIRM_NUMBER)) .value(HOTEL_ID, bindMarker(HOTEL_ID)) .value(START_DATE, bindMarker(START_DATE)) .value(END_DATE, bindMarker(END_DATE)) .value(ROOM_NUMBER, bindMarker(ROOM_NUMBER)) .value(GUEST_ID, bindMarker(GUEST_ID)) .build()); logger.info("Statements have been successfully prepared."); } } }
TODO
в этом классе включают несколько пунктов, которые мы должны рассмотреть:
-
TODO
: проверяет импорт для публикации в Kafka. Обратите внимание, что классы, которые вы использовали ранее, импортированы. -
TODO
: Это обзор переменных, используемых для публикации в Kafka. В репозитории хранятсяObjectMapper
иKafkaProducer
. -
TODO
: Проверка инициализации объектов, необходимых для публикации в Kafka, инициализация переменных.
Теперь самое интересное — реализация кода для публикации сообщений:
-
TODO
: Это публикует сообщение Кафке, содержащее reservation. В методеupsert()
опубликуйте сообщение, содержащее резервирование, в виде строки JSON. -
TODO
: публикует сообщение для Kafka с пустой полезной нагрузкой, указывающее на удаление. В методеdelete()
публикует сообщение об удалении резервирования.
Подправим наш код:
ReservationRepositorySolution.java
/* * Copyright (C) 2017-2020 Jeff Carpenter * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package dev.cassandraguide.repository; import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.bindMarker; import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.deleteFrom; import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.selectFrom; import static com.datastax.oss.driver.api.querybuilder.SchemaBuilder.createTable; import static com.datastax.oss.driver.api.querybuilder.SchemaBuilder.createType; import static com.datastax.oss.driver.api.querybuilder.relation.Relation.column; import java.time.LocalDate; import java.util.List; import java.util.Objects; import java.util.Optional; import java.util.UUID; import java.util.stream.Collectors; import javax.annotation.PreDestroy; import com.fasterxml.jackson.core.JsonProcessingException; import dev.cassandraguide.model.Reservation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Profile; import org.springframework.lang.NonNull; import org.springframework.stereotype.Repository; import com.datastax.oss.driver.api.core.CqlIdentifier; import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.cql.BatchStatement; import com.datastax.oss.driver.api.core.cql.BoundStatement; import com.datastax.oss.driver.api.core.cql.DefaultBatchType; import com.datastax.oss.driver.api.core.cql.PreparedStatement; import com.datastax.oss.driver.api.core.cql.ResultSet; import com.datastax.oss.driver.api.core.cql.Row; import com.datastax.oss.driver.api.core.metadata.schema.ClusteringOrder; import com.datastax.oss.driver.api.core.type.DataTypes; import com.datastax.oss.driver.api.core.type.UserDefinedType; import com.datastax.oss.driver.api.querybuilder.QueryBuilder; // TODO: Review imports for publishing to Kafka import org.apache.kafka.clients.producer.*; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; /** * The goal of this project is to provide a minimally functional implementation of a microservice * that uses Apache Cassandra for its data storage. The reservation service is implemented as a * RESTful service using Spring Boot. * * @author Jeff Carpenter, Cedrick Lunven */ @Repository @Profile("!unit-test") // When I do some 'unit-test' no connectivity to DB public class ReservationRepository { /** Logger for the class. */ private static final Logger logger = LoggerFactory.getLogger(ReservationRepository.class); // Reservation Schema Constants public static final CqlIdentifier TYPE_ADDRESS = CqlIdentifier.fromCql("address"); public static final CqlIdentifier TABLE_RESERVATION_BY_HOTEL_DATE = CqlIdentifier.fromCql("reservations_by_hotel_date"); public static final CqlIdentifier TABLE_RESERVATION_BY_CONFI = CqlIdentifier.fromCql("reservations_by_confirmation"); public static final CqlIdentifier TABLE_RESERVATION_BY_GUEST = CqlIdentifier.fromCql("reservations_by_guest"); public static final CqlIdentifier TABLE_GUESTS = CqlIdentifier.fromCql("guests"); public static final CqlIdentifier STREET = CqlIdentifier.fromCql("street"); public static final CqlIdentifier CITY = CqlIdentifier.fromCql("city"); public static final CqlIdentifier STATE_PROVINCE = CqlIdentifier.fromCql("state_or_province"); public static final CqlIdentifier POSTAL_CODE = CqlIdentifier.fromCql("postal_code"); public static final CqlIdentifier COUNTRY = CqlIdentifier.fromCql("country"); public static final CqlIdentifier HOTEL_ID = CqlIdentifier.fromCql("hotel_id"); public static final CqlIdentifier START_DATE = CqlIdentifier.fromCql("start_date"); public static final CqlIdentifier END_DATE = CqlIdentifier.fromCql("end_date"); public static final CqlIdentifier ROOM_NUMBER = CqlIdentifier.fromCql("room_number"); public static final CqlIdentifier CONFIRMATION_NUMBER = CqlIdentifier.fromCql("confirmation_number"); public static final CqlIdentifier GUEST_ID = CqlIdentifier.fromCql("guest_id"); public static final CqlIdentifier GUEST_LAST_NAME = CqlIdentifier.fromCql("guest_last_name"); public static final CqlIdentifier FIRSTNAME = CqlIdentifier.fromCql("first_name"); public static final CqlIdentifier LASTNAME = CqlIdentifier.fromCql("last_name"); public static final CqlIdentifier TITLE = CqlIdentifier.fromCql("title"); public static final CqlIdentifier EMAILS = CqlIdentifier.fromCql("emails"); public static final CqlIdentifier PHONE_NUMBERS = CqlIdentifier.fromCql("phone_numbers"); public static final CqlIdentifier ADDRESSES = CqlIdentifier.fromCql("addresses"); private PreparedStatement psExistReservation; private PreparedStatement psFindReservation; private PreparedStatement psInsertReservationByHotelDate; private PreparedStatement psInsertReservationByConfirmation; private PreparedStatement psDeleteReservationByHotelDate; private PreparedStatement psDeleteReservationByConfirmation; private PreparedStatement psSearchReservation; /** CqlSession holding metadata to interact with Cassandra. */ private CqlSession cqlSession; private CqlIdentifier keyspaceName; // TODO: Review variables used for publishing to Kafka /** KafkaProducer for publishing messages to Kafka. */ private KafkaProducer<String, String> kafkaProducer; private String kafkaTopicName; private ObjectMapper objectMapper; /** External Initialization. */ public ReservationRepository( @NonNull CqlSession cqlSession, @Qualifier("keyspace") @NonNull CqlIdentifier keyspaceName, @NonNull KafkaProducer<String, String> kafkaProducer, @NonNull String kafkaTopicName) { this.cqlSession = cqlSession; this.keyspaceName = keyspaceName; // TODO: Review initialization of objects needed for publishing to Kafka this.kafkaProducer = kafkaProducer; this.kafkaTopicName = kafkaTopicName; objectMapper = new ObjectMapper(); objectMapper.registerModule(new JavaTimeModule()); objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS) objectMapper.enable(SerializationFeature.INDENT_OUTPUT) // Will create tables (if they do not exist) createReservationTables(); // Prepare Statements of reservation prepareStatements(); logger.info("Application initialized."); } /** * CqlSession is a stateful object handling TCP connections to nodes in the cluster. * This operation properly closes sockets when the application is stopped */ @PreDestroy public void cleanup() { if (null != cqlSession) { cqlSession.close(); logger.info("+ CqlSession has been successfully closed"); } } /** * Testing existence is useful for building correct semantics in the RESTful API. To evaluate existence find the * table where confirmation number is partition key which is reservations_by_confirmation * * @param confirmationNumber * unique identifier for confirmation * @return * true if the reservation exists, false if it does not */ public boolean exists(String confirmationNumber) { return cqlSession.execute(psExistReservation.bind(confirmationNumber)) .getAvailableWithoutFetching() > 0; } /** * Similar to exists() but maps and parses results. * * @param confirmationNumber * unique identifier for confirmation * @return * reservation if present or empty */ @NonNull public Optional<Reservation> findByConfirmationNumber(@NonNull String confirmationNumber) { ResultSet resultSet = cqlSession.execute(psFindReservation.bind(confirmationNumber)); // Hint: an empty result might not be an error as this method is sometimes used to check whether a // reservation with this confirmation number exists Row row = resultSet.one(); if (row == null) { logger.debug("Unable to load reservation with confirmation number: " + confirmationNumber); return Optional.empty(); } // Hint: If there is a result, create a new reservation object and set the values // Bonus: factor the logic to extract a reservation from a row into a separate method // (you will reuse it again later in getAllReservations()) return Optional.of(mapRowToReservation(row)); } /** * Create new entry in multiple tables for this reservation. * * @param reservation * current reservation object * @return * confirmation number for the reservation * */ public String upsert(Reservation reservation) { Objects.requireNonNull(reservation); if (null == reservation.getConfirmationNumber()) { // Generating a new reservation number if none has been provided reservation.setConfirmationNumber(UUID.randomUUID().toString()); } // Insert into 'reservations_by_hotel_date' BoundStatement bsInsertReservationByHotel = psInsertReservationByHotelDate.bind(reservation.getHotelId(), reservation.getStartDate(), reservation.getEndDate(), reservation.getRoomNumber(), reservation.getConfirmationNumber(), reservation.getGuestId()); // Insert into 'reservations_by_confirmation' BoundStatement bsInsertReservationByConfirmation = psInsertReservationByConfirmation.bind(reservation.getConfirmationNumber(), reservation.getHotelId(), reservation.getStartDate(), reservation.getEndDate(), reservation.getRoomNumber(), reservation.getGuestId()); BatchStatement batchInsertReservation = BatchStatement .builder(DefaultBatchType.LOGGED) .addStatement(bsInsertReservationByHotel) .addStatement(bsInsertReservationByConfirmation) .build(); cqlSession.execute(batchInsertReservation); // TODO: Publish message to Kafka containing reservation try { String reservationJson = objectMapper.writeValueAsString(reservation); ProducerRecord<String, String> record = new ProducerRecord<>(kafkaTopicName, reservation.getConfirmationNumber(), reservationJson); kafkaProducer.send(record); } catch (Exception e) { logger.warn("Error publishing reservation message to Kafka: {}", e); } return reservation.getConfirmationNumber(); } /** * We pick 'reservations_by_confirmation' table to list reservations * BUT we could have used 'reservations_by_hotel_date' (as no key provided in request) * * @return * list containing all reservations */ public List<Reservation> findAll() { return cqlSession.execute(selectFrom(keyspaceName, TABLE_RESERVATION_BY_CONFI).all().build()) .all() // no paging we retrieve all objects .stream() // because we are good people .map(this::mapRowToReservation) // Mapping row as Reservation .collect(Collectors.toList()); // Back to list objects } /** * Deleting a reservation. * * @param confirmationNumber * unique identifier for confirmation. */ public boolean delete(String confirmationNumber) { // Retrieving entire reservation in order to obtain the attributes we will need to delete from // reservations_by_hotel_date table Optional<Reservation> reservationToDelete = this.findByConfirmationNumber(confirmationNumber); if (reservationToDelete.isPresent()) { // Delete from 'reservations_by_hotel_date' Reservation reservation = reservationToDelete.get(); BoundStatement bsDeleteReservationByHotelDate = psDeleteReservationByHotelDate.bind(reservation.getHotelId(), reservation.getStartDate(), reservation.getRoomNumber()); // Delete from 'reservations_by_confirmation' BoundStatement bsDeleteReservationByConfirmation = psDeleteReservationByConfirmation.bind(confirmationNumber); BatchStatement batchDeleteReservation = BatchStatement .builder(DefaultBatchType.LOGGED) .addStatement(bsDeleteReservationByHotelDate) .addStatement(bsDeleteReservationByConfirmation) .build(); cqlSession.execute(batchDeleteReservation); // TODO: Publish message to Kafka with empty payload to indicate deletion ProducerRecord<String, String> record = new ProducerRecord<>(kafkaTopicName, reservation.getConfirmationNumber(), ""); kafkaProducer.send(record); return true; } return false; } /** * Search all reservation for an hotel id and LocalDate. * * @param hotelId * hotel identifier * @param date * searched Date * @return * list of reservations matching the search criteria */ public List<Reservation> findByHotelAndDate(String hotelId, LocalDate date) { Objects.requireNonNull(hotelId); Objects.requireNonNull(date); return cqlSession.execute(psSearchReservation.bind(hotelId, date)) .all() // no paging we retrieve all objects .stream() // because we are good people .map(this::mapRowToReservation) // Mapping row as Reservation .collect(Collectors.toList()); // Back to list objects } /** * Utility method to marshal a row as expected Reservation Bean. * * @param row * current row from ResultSet * @return * object */ private Reservation mapRowToReservation(Row row) { Reservation reservation = new Reservation(); reservation.setHotelId(row.getString(HOTEL_ID)); reservation.setConfirmationNumber(row.getString(CONFIRMATION_NUMBER)); reservation.setGuestId(row.getUuid(GUEST_ID)); reservation.setRoomNumber(row.getShort(ROOM_NUMBER)); reservation.setStartDate(row.getLocalDate(START_DATE)); reservation.setEndDate(row.getLocalDate(END_DATE)); return reservation; } /** * Create Keyspace and relevant tables as per defined in 'reservation.cql' */ public void createReservationTables() { /** * Create TYPE 'Address' if not exists * * CREATE TYPE reservation.address ( * street text, * city text, * state_or_province text, * postal_code text, * country text * ); */ cqlSession.execute( createType(keyspaceName, TYPE_ADDRESS) .ifNotExists() .withField(STREET, DataTypes.TEXT) .withField(CITY, DataTypes.TEXT) .withField(STATE_PROVINCE, DataTypes.TEXT) .withField(POSTAL_CODE, DataTypes.TEXT) .withField(COUNTRY, DataTypes.TEXT) .build()); logger.debug("+ Type '{}' has been created (if needed)", TYPE_ADDRESS.asInternal()); /** * CREATE TABLE reservation.reservations_by_hotel_date ( * hotel_id text, * start_date date, * end_date date, * room_number smallint, * confirmation_number text, * guest_id uuid, * PRIMARY KEY ((hotel_id, start_date), room_number) * ) WITH comment = 'Q7. Find reservations by hotel and date'; */ cqlSession.execute(createTable(keyspaceName, TABLE_RESERVATION_BY_HOTEL_DATE) .ifNotExists() .withPartitionKey(HOTEL_ID, DataTypes.TEXT) .withPartitionKey(START_DATE, DataTypes.DATE) .withClusteringColumn(ROOM_NUMBER, DataTypes.SMALLINT) .withColumn(END_DATE, DataTypes.DATE) .withColumn(CONFIRMATION_NUMBER, DataTypes.TEXT) .withColumn(GUEST_ID, DataTypes.UUID) .withClusteringOrder(ROOM_NUMBER, ClusteringOrder.ASC) .withComment("Q7. Find reservations by hotel and date") .build()); logger.debug("+ Table '{}' has been created (if needed)", TABLE_RESERVATION_BY_HOTEL_DATE.asInternal()); /** * CREATE TABLE reservation.reservations_by_confirmation ( * confirmation_number text PRIMARY KEY, * hotel_id text, * start_date date, * end_date date, * room_number smallint, * guest_id uuid * ); */ cqlSession.execute(createTable(keyspaceName, TABLE_RESERVATION_BY_CONFI) .ifNotExists() .withPartitionKey(CONFIRMATION_NUMBER, DataTypes.TEXT) .withColumn(HOTEL_ID, DataTypes.TEXT) .withColumn(START_DATE, DataTypes.DATE) .withColumn(END_DATE, DataTypes.DATE) .withColumn(ROOM_NUMBER, DataTypes.SMALLINT) .withColumn(GUEST_ID, DataTypes.UUID) .build()); logger.debug("+ Table '{}' has been created (if needed)", TABLE_RESERVATION_BY_CONFI.asInternal()); /** * CREATE TABLE reservation.reservations_by_guest ( * guest_last_name text, * hotel_id text, * start_date date, * end_date date, * room_number smallint, * confirmation_number text, * guest_id uuid, * PRIMARY KEY ((guest_last_name), hotel_id) * ) WITH comment = 'Q8. Find reservations by guest name'; */ cqlSession.execute(createTable(keyspaceName, TABLE_RESERVATION_BY_GUEST) .ifNotExists() .withPartitionKey(GUEST_LAST_NAME, DataTypes.TEXT) .withClusteringColumn(HOTEL_ID, DataTypes.TEXT) .withColumn(START_DATE, DataTypes.DATE) .withColumn(END_DATE, DataTypes.DATE) .withColumn(ROOM_NUMBER, DataTypes.SMALLINT) .withColumn(CONFIRMATION_NUMBER, DataTypes.TEXT) .withColumn(GUEST_ID, DataTypes.UUID) .withComment("Q8. Find reservations by guest name") .build()); logger.debug("+ Table '{}' has been created (if needed)", TABLE_RESERVATION_BY_GUEST.asInternal()); /** * CREATE TABLE reservation.guests ( * guest_id uuid PRIMARY KEY, * first_name text, * last_name text, * title text, * emails set<text>, * phone_numbers list<text>, * addresses map<text, frozen<address>>, * confirmation_number text * ) WITH comment = 'Q9. Find guest by ID'; */ UserDefinedType udtAddressType = cqlSession.getMetadata().getKeyspace(keyspaceName).get() // Retrieving KeySpaceMetadata .getUserDefinedType(TYPE_ADDRESS).get(); // Looking for UDT (extending DataType) cqlSession.execute(createTable(keyspaceName, TABLE_GUESTS) .ifNotExists() .withPartitionKey(GUEST_ID, DataTypes.UUID) .withColumn(FIRSTNAME, DataTypes.TEXT) .withColumn(LASTNAME, DataTypes.TEXT) .withColumn(TITLE, DataTypes.TEXT) .withColumn(EMAILS, DataTypes.setOf(DataTypes.TEXT)) .withColumn(PHONE_NUMBERS, DataTypes.listOf(DataTypes.TEXT)) .withColumn(ADDRESSES, DataTypes.mapOf(DataTypes.TEXT, udtAddressType, true)) .withColumn(CONFIRMATION_NUMBER, DataTypes.TEXT) .withComment("Q9. Find guest by ID") .build()); logger.debug("+ Table '{}' has been created (if needed)", TABLE_GUESTS.asInternal()); logger.info("Schema has been successfully initialized."); } private void prepareStatements() { if (psExistReservation == null) { psExistReservation = cqlSession.prepare(selectFrom(keyspaceName, TABLE_RESERVATION_BY_CONFI).column(CONFIRMATION_NUMBER) .where(column(CONFIRMATION_NUMBER).isEqualTo(bindMarker(CONFIRMATION_NUMBER))) .build()); psFindReservation = cqlSession.prepare( selectFrom(keyspaceName, TABLE_RESERVATION_BY_CONFI).all() .where(column(CONFIRMATION_NUMBER).isEqualTo(bindMarker(CONFIRMATION_NUMBER))) .build()); psSearchReservation = cqlSession.prepare( selectFrom(keyspaceName, TABLE_RESERVATION_BY_HOTEL_DATE).all() .where(column(HOTEL_ID).isEqualTo(bindMarker(HOTEL_ID))) .where(column(START_DATE).isEqualTo(bindMarker(START_DATE))) .build()); psDeleteReservationByConfirmation = cqlSession.prepare( deleteFrom(keyspaceName, TABLE_RESERVATION_BY_CONFI) .where(column(CONFIRMATION_NUMBER).isEqualTo(bindMarker(CONFIRMATION_NUMBER))) .build()); psDeleteReservationByHotelDate = cqlSession.prepare( deleteFrom(keyspaceName, TABLE_RESERVATION_BY_HOTEL_DATE) .where(column(HOTEL_ID).isEqualTo(bindMarker(HOTEL_ID))) .where(column(START_DATE).isEqualTo(bindMarker(START_DATE))) .where(column(ROOM_NUMBER).isEqualTo(bindMarker(ROOM_NUMBER))) .build()); psInsertReservationByHotelDate = cqlSession.prepare(QueryBuilder.insertInto(keyspaceName, TABLE_RESERVATION_BY_HOTEL_DATE) .value(HOTEL_ID, bindMarker(HOTEL_ID)) .value(START_DATE, bindMarker(START_DATE)) .value(END_DATE, bindMarker(END_DATE)) .value(ROOM_NUMBER, bindMarker(ROOM_NUMBER)) .value(CONFIRMATION_NUMBER, bindMarker(CONFIRMATION_NUMBER)) .value(GUEST_ID, bindMarker(GUEST_ID)) .build()); psInsertReservationByConfirmation = cqlSession.prepare(QueryBuilder.insertInto(keyspaceName, TABLE_RESERVATION_BY_CONFI) .value(CONFIRMATION_NUMBER, bindMarker(CONFIRMATION_NUMBER)) .value(HOTEL_ID, bindMarker(HOTEL_ID)) .value(START_DATE, bindMarker(START_DATE)) .value(END_DATE, bindMarker(END_DATE)) .value(ROOM_NUMBER, bindMarker(ROOM_NUMBER)) .value(GUEST_ID, bindMarker(GUEST_ID)) .build()); logger.info("Statements have been successfully prepared."); } } }
И переходим к тестам.
Служба резервирования также включает тесты, которые выполняются для контейнера Cassandra, запущенного в Docker, который запускается для целей теста, а затем уничтожается.
Перед запуском этого теста мы хотим отключить узел Cassandra:
kill `cat /tmp/cassandra-pid`
Теперь мы можем запустить тесты с помощью Maven:
mvn test
Мы узнали, как написать микросервис, который сохраняет данные в Apache Cassandra с помощью Java-драйвера DataStax и генерирует события для изменения данных в Apache Kafka.
Статья подготовлена в преддверии старта курса Software Architect. Также хочу поделиться с вами записью бесплатного урока по теме «Архитектурное свойство «Сопровождаемость» на примере сервисов k8s».
ссылка на оригинал статьи https://habr.com/ru/articles/722278/
Добавить комментарий