modern-cpp-kafka для C++. Решаем проблемы владения и сериализации

от автора

Доброго времени суток! Эта статья написана для тех, кто в общих чертах знаком с тем, что такое и для чего используется Apache Kafka, кто такие Producer и Consumer и как они друг с другом работают. Целью этой статьи является показать способ использования библиотеки modern-cpp-kafka для работы с Apache Kafka на современном C++. В общих чертах с темой можно ознакомится, например, здесь (отсюда же взяты скриншоты), а в этой статье будет рассмотрено решение проблем с владением и (де)сериализацией наиболее простым способом.

Идея написания этого небольшого руководства появилась у меня, когда я начал изучать одну из самых популярных библиотек для работы с Apache Kafka — а именно modern-cpp-kafka. Она основана в виде оболочки над старой доброй реализацией для C — lirdbkaka. Версия для C++ подкупила меня тем, что она предоставляет довольно удобный интерфейс для работы, использует современные и актуальные семантики языка вроде RAII и шаблонов, а также, как утверждает создатель, отсутствие оверхеда и самую быструю скорость при работе с данными размерами в пределах 256 B ~ 2 KB.

Однако, как водится, ничто не идеально, а именно — владение данными, сериализация и десериализация, которые реализованы что в librdkafka, что в modern-cpp-kafka примерно никак. Для того, чтобы разобраться, что именно не так, необходимо: рассмотреть механизм работы самой Kafka; ознакомиться с реализацией функций отправки и приема в librdkafka и modern-cpp-kafka; понять, причем здесь владение; разработать способ сериализации и десериализации.

В конце статьи я приведу код собственного решения проблемы с сериализацией. Решение отчасти банальное и не годится для промышленного использования, но оно хорошо подойдет начинающим пользователям ввиду своей минималистичности и простоты.

Механизм работы с данными в Apache Kafka

Максимально кратко рассмотрим механизм добавления и извлечения данных в Apache Kafka. Данные доставляются, хранятся и извлекаются исключительно в двоичном представлении, что автоматически поднимает вопрос о том, как получить данные в нужном пользователю виде — то есть о сериализации и десериализации.

Пример добавления сообщения с ключом int, равным 123, и значением string, равным "hello world"

Пример добавления сообщения с ключом int, равным 123, и значением string, равным «hello world»

Как видно на скриншоте, при добавлении ключа и соответствующего ему значения требуется преобразовать их в двоичный вид (сериализовать), для чего используются сериализаторы как для ключа, так и значения. Эти задачи берет на себя Producer.

Ситуация с извлечением данных выглядит очень похоже.

Пример извлечения сообщения с ключом int, равным 123, и значением string, равным "hello world"

Пример извлечения сообщения с ключом int, равным 123, и значением string, равным «hello world»

На этом скриншоте показана схема работы Consumer-а, который занимается извлечением двоичных данных и преобразованием их в нужный тип (десериализацией).

Механизм работы с данными в librdkafka и modern-cpp-kafka

Библиотека librdkafka, будучи написанной на C, использует именно C-шный механизм так называемого стирания типов (type erasure) путем использования функции rd_kafka_produce, а также связки void-указателя и длины данных.

RD_EXPORT int rd_kafka_produce(rd_kafka_topic_t *rkt,                      int32_t partition,                      int msgflags,                      void *payload,                      size_t len,                      const void *key,                      size_t keylen,                      void *msg_opaque);

Критика такого подхода стара как сам язык C, однако это единственный доступный в C способ передавать любые данные, нужно лишь их сериализовать. Тем не менее, сама библиотека не предоставляет никаких инструментов для сериализации.

Не решает ее, увы, и библиотека modern-cpp-kafka. Рассмотрим функцию KafkaProducer::send для отправки сообщения:

inline void KafkaProducer::send(const producer::ProducerRecord& record,     // Структура с данными (ключ, значение, хедеры, топик, партишен и id)                     const producer::Callback&       deliveryCb, // Callback-функция                     SendOption                      option,     // Опции для отправки                     ActionWhileQueueIsFull          action);    // Опции действия на заполненность очереди (блокировать или нет)

Нам нужен первый тип первого параметра — ProducerRecord. Его структура (не включая методы) выглядит так:

/**  * A key/value pair to be sent to Kafka.  * This consists of a topic name to which the record is being sent, an optional partition number, and an optional key and value.  * Note: `ProducerRecord` would not take the ownership from the memory block of `Value`.  */ class ProducerRecord { public:     using Id  = std::uint64_t;      // конструкторы, геттеры, сеттеры  private:     Topic        _topic;     Partition    _partition;     Key          _key;     Value        _value;     Headers      _headers;     Optional<Id> _id; };

Типы Key и Value являются type alias-ами на тип ConstBuffer:

// Which is similar with `boost::const_buffer` (thus avoid the dependency towards `boost`) class ConstBuffer { public:     // побайтовое копирование указателя     explicit ConstBuffer(const void* data = nullptr, std::size_t size = 0): _data(data), _size(size) {}     const void* data()     const { return _data; }     std::size_t size()     const { return _size; }     std::string toString() const     {         // опустим реализацию     } private:     const void* _data;     std::size_t _size; };

Поля класса выглядят как-то знакомо, не правда ли? Да, это именно та связка void-указателя и длины, которая используется и в функции rd_kafka_produce.

Копирование указателей выполняется побайтово, из-за этого мы сталкиваемся с проблемой «висящего» указателя и, как следствие, use-after-free. Именно реализация класса ConstBuffer делает класс ProducerRecord невладеющим (и крайне неудобным в нетривиальных случаях), что и сказано прямиков в коментарии в описании класса (для удобства переведенного на русский):

Примечание: ProducerRecord не будет принимать права владения блоком памяти Value.

Как мы видим, что в librdkafka, что в modern-cpp-kafka используется невладеющий механизм для передачи или хранения параметров.

Функции Serialize и Deserialize будут описаны ниже, в соответствующем пункте, пока же нам важно знать, что Serialize возврашает std::vector<std::byte>.Уже на этапе формировании данных для отправки возникает проблема с потерей данных:

// Хотим получить готовый объект из сериализованных данных // но получаем "висящий" указатель - увы и ах! template <typename T> ConstBuffer ValueFromNonOwning(const T& value) {   const std::vector<byte> serialized = Serialize(value);    // глубокого копирования не происходит - "висящий" указатель   return ConstBuffer(serialized.data(), serialized.size()); }  // Не спасет тут и умный указатель: ни на локальную переменную serialized // т.к. данные из умного указателя будут просто скопированы по значению // вместо создания копии; ни на сам возвращаемый объект ConstBuffer // (по изначальной причине) - и снова увы и ах! template <typename T> ConstBuffer ValueFromNonOwningUniquePtr(const T& value) {   // выделяем память в куче (с shared_ptr та же история, ведь счетчик ссылок равен 0)   const auto serialized = std::make_unique<std::vector<std::byte>>(Serialize(value));    // проблема остается, потому что теперь разрушается сам умный указатель   return ConstBuffer(serialized->data(), serialized->size()); }

Владеющий ConstBuffer

Чтобы починить проблему с «висящим» указателем, мне пришла идея написать тот владеющий буфер, который будет гарантировать сохранение данных на протяжении всей жизни объекта. Так появился OwningBuffer:

// Владеющий аналог класса ConstBuffer class OwningBuffer {  public:   explicit OwningBuffer(const void* data = nullptr, const std::size_t size = 0) {     if (data && size > 0) {       m_rawData.assign(static_cast<const std::byte*>(data),                        static_cast<const std::byte*>(data) + size);     }   }    explicit OwningBuffer(const ConstBuffer& buffer)       : OwningBuffer(buffer.data(), buffer.size()) {}    // конструктор копирования для создания из сырых данных   explicit OwningBuffer(const std::vector<std::byte>& bytes) : m_rawData(bytes) {}      // конструктор перемещения для создания из сырых данных   explicit OwningBuffer(std::vector<std::byte>&& bytes) noexcept       : m_rawData(std::move(bytes)) {}    [[nodiscard]] const void* data() const { return m_rawData.data(); }   [[nodiscard]] std::size_t size() const { return m_rawData.size(); }    [[nodiscard]] std::string toString() const {     // так как создание ConstBuffer не содержит сложных операций,     // нет и оверхеда на создание временного объекта     const ConstBuffer buffer(m_rawData.data(), m_rawData.size());     return buffer.toString();   }    // получение всегда валидного объекта ConstBuffer   [[nodiscard]] ConstBuffer asConstBuffer() const {     return ConstBuffer(m_rawData.data(), m_rawData.size());   }   private:   // сырые данные объекта   std::vector<std::byte> m_rawData; };  // Такой вариант функции работает корректно, никаких "висящих" указателей template <typename T> OwningBuffer ValueFrom(const T& value) {   const auto serialized = Serialize(value);    return OwningBuffer(serialized); }

Интерфейс класса аналогичен интерфейсу ConstBuffer, но содержит некоторые дополнения в виде новых конструкторов для создания объекта из сырых данных, а также вспомогательного метода для получения валидного объекта ConstBuffer. Владеющий подход полностью решает проблему с «висящими» указателями и use-after-free, и теперь можно перейти к сериализации и десериализации.

Сериализация и десериализация

Теперь, решив проблему с владением, можно перейти к самому главному — сериализации и десериализации данных. Несмотря на обилие библиотек, которые предоставляют такую функциональность, я не буду их использовать, чтобы не перегружать статью лишними зависимостями и усложнять материал разбором доступных средств для сериализации. Поэтому мы будем использовать старый добрый std::byte из C++17 в еще более старом std::vector и reinterpret_cast.

#include <cstring> #include <stdexcept> #include <type_traits> #include <vector>  template <typename T> std::vector<std::byte> Serialize(const T& value) {   static_assert(std::is_trivially_copyable_v<T>, "Type must be trivially copyable");    const auto begin = reinterpret_cast<const std::byte*>(&value);   return {begin, begin + sizeof(T)}; }  template <typename T> T Deserialize(const std::vector<std::byte>& serializedData) {   static_assert(std::is_trivially_copyable_v<T>, "Type must be trivially copyable");   static_assert(std::is_default_constructible_v<T>, "Type must be default constructible");    // примечание: проверяется только размер объекта.   // никто не мешает записать в int32_t содержимое std::string размером 5 байт   if (serializedData.size() != sizeof(T)) {     throw std::runtime_error("Serialized data size does not match target type size");   }    T value;   std::memcpy(&value, serializedData.data(), sizeof(T));   return value; }  // специализация для std::string template <> inline std::vector<std::byte> Serialize<std::string>(const std::string& value) {   auto begin = reinterpret_cast<const std::byte*>(value.data());   return {begin, begin + value.size()}; }  // специализация для std::string template <> inline std::string Deserialize<std::string>(     const std::vector<std::byte>& serializedData) {   return {reinterpret_cast<const char*>(serializedData.data()), serializedData.size()}; }

В static_assert-ах, по большому счету, нет такой необходимости, ведь код и так и так не скомпилируется, если что-то пойдет не так, но всегда приятнее видеть красивые сообщения об ошибках, верно? Сеханизм сериализации банален: берем адрес объекта и прибавляем его размер, таким образом «охватывая» все содержимое объекта, на выходе получаем вектор байтов, то есть сырые данные объекта.

С десериализацией все немного сложнее (и не так гладко). Здесь мы используем функцию memcpy, чтобы заполнить созданный при помощи конструктора по умолчанию объект переданными сырыми байтами. Также нужно иметь в виду, что проверяются только размеры десериализуемого объекта, то есть можно записать объект, например, std::string в объект типа float и получить бессмыслицу.

Остается лишь создать обертки для сериализации и десериализации. В этом нам помогут функции ValueTo и ValueFrom:

// сериализация template <typename T> OwningBuffer ValueFrom(const T& value) {   const auto serialized = Serialize(value);    return OwningBuffer(serialized); }  // десериализация template <typename T> T ValueTo(const Value& value) {   if (!value.data()) {     throw std::runtime_error("Received empty value");   }    // преобразуем void* к const byte* и "захватываем" все значения   const std::vector<std::byte> serializedData(       static_cast<const std::byte*>(value.data()),       static_cast<const std::byte*>(value.data()) + value.size());    // возвращаем объект, полученный при помощи десериализации   return Deserialize<T>(serializedData); }

Демонстрация работы

В качестве примера рассмотрим классическую модель Single Producer — SIngle Consumer. Код Producer-а:

#include <kafka/KafkaProducer.h>  #include <iostream> #include <string>  #include "KafkaUtils.h"  using namespace kafka::clients::producer;  void SendValues(const kafka::Topic& topic, KafkaProducer& producer) {   // Prepare delivery callback   auto deliveryCb = [](const RecordMetadata& metadata, const kafka::Error& error) {     if (!error) {       std::cout << "Message delivered: " << metadata.toString() << std::endl;     } else {       std::cerr << "Message failed to be delivered: " << error.message() << std::endl;     }   };    {     std::cout << "Sending messages with FLOAT type" << std::endl;      constexpr float f = 2.34;      kafka::extensions::SendValue(producer, topic, kafka::NullKey,                                  kafka::extensions::ValueFrom(f).AsConstBuffer(),                                  deliveryCb);   }    {     std::cout << "Sending messages with STRINGS type" << std::endl;      const auto values = std::vector<std::string>{"amogus", "breakpoint", "cappa",                                                  "delta",  "extension",  "final"};      for (const auto& value : values) {       kafka::extensions::SendValue(producer, topic, kafka::NullKey,                                    kafka::extensions::ValueFrom(value).AsConstBuffer(),                                    deliveryCb);     }   } }  void DoProducerWork() {   // взято из конфигурации Docker Compose   const std::string brokers = "localhost:29092";   const kafka::Topic topic = "test-topic";    // Prepare the configuration   kafka::Properties props;   props.put("bootstrap.servers", brokers);    // Create a producer   KafkaProducer producer(props);    std::println(std::cout, "SendValues function");   SendValues(topic, producer); }  int main() {   try {     DoProducerWork();   } catch (const std::exception& e) {     std::cerr << "Error: " << e.what() << std::endl;     return 1;   }    return 0; } 

Код Consumer-а:

#include <KafkaUtils.h> #include <kafka/KafkaConsumer.h>  #include <csignal> #include <iostream> #include <string>  std::atomic_bool running = {true};  void StopRunning(int sig) {   if (sig != SIGINT) return;    if (running) {     running = false;   } else {     // Restore the signal handler, -- to avoid stuck with this handler     signal(SIGINT, SIG_IGN);  // NOLINT   } }  void DoConsumerWork() {   using namespace kafka;   using namespace kafka::clients::consumer;    const std::string brokers = "localhost:29092";   const Topic topic = "test-topic";    // Prepare the configuration   Properties props;   props.put("bootstrap.servers", brokers);    // Create a consumer instance   KafkaConsumer consumer(props);    // Subscribe to topics   consumer.subscribe({topic});    while (running) {     // Poll messages from Kafka brokers     for (const auto records = consumer.poll(std::chrono::milliseconds(100));          const auto& record : records) {       if (record.error()) {         std::cerr << record.toString() << std::endl;         continue;       }        std::cout << "Got a new message..." << std::endl;       std::cout << "    Topic    : " << record.topic() << std::endl;       std::cout << "    Partition: " << record.partition() << std::endl;       std::cout << "    Offset   : " << record.offset() << std::endl;       std::cout << "    Timestamp: " << record.timestamp().toString() << std::endl;       std::cout << "    Headers  : " << toString(record.headers()) << std::endl;        try {         const auto stringValue = kafka::extensions::ValueTo<std::string>(record.value());         std::cout << "    STRING [" << stringValue << "]" << std::endl;       } catch (const std::exception& e) {         std::cerr << "    Failed to deserialize as string: " << e.what() << std::endl;       }        try {         const auto floatValue = kafka::extensions::ValueTo<float>(record.value());         std::cout << "    FLOAT [" << floatValue << "]" << std::endl;       } catch (const std::exception& e) {         std::cerr << "    Failed to deserialize as float: " << e.what() << std::endl;       }     }   } }  int main() {   // Use Ctrl-C to terminate the program   signal(SIGINT, StopRunning);  // NOLINT    DoConsumerWork();    return 0; } 

Заключение

В этой статье я постарался совместить сразу несколько вещей: рассказать про Apache Kafka, про хранение данных внутри, про наличие определенных проблем и подходы к их решению. Написание текстов (помимо курсовых и дипломной работ) для меня в новинку, поэтому призываю каждого, кто прочитал, поделиться мыслями — с удовольствием почитаю и отвечу.

Демонстрацию сериализации и десериализации прямо в браузере можно посмотреть на сайте godbolt. Исходный код с настройкой Apache Kafka в Docker Compose находится в моем github-репозитории.


ссылка на оригинал статьи https://habr.com/ru/articles/880094/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *