Доброго времени суток! Эта статья написана для тех, кто в общих чертах знаком с тем, что такое и для чего используется Apache Kafka, кто такие Producer и Consumer и как они друг с другом работают. Целью этой статьи является показать способ использования библиотеки modern-cpp-kafka для работы с Apache Kafka на современном C++. В общих чертах с темой можно ознакомится, например, здесь (отсюда же взяты скриншоты), а в этой статье будет рассмотрено решение проблем с владением и (де)сериализацией наиболее простым способом.
Идея написания этого небольшого руководства появилась у меня, когда я начал изучать одну из самых популярных библиотек для работы с Apache Kafka — а именно modern-cpp-kafka. Она основана в виде оболочки над старой доброй реализацией для C — lirdbkaka. Версия для C++ подкупила меня тем, что она предоставляет довольно удобный интерфейс для работы, использует современные и актуальные семантики языка вроде RAII и шаблонов, а также, как утверждает создатель, отсутствие оверхеда и самую быструю скорость при работе с данными размерами в пределах 256 B ~ 2 KB.
Однако, как водится, ничто не идеально, а именно — владение данными, сериализация и десериализация, которые реализованы что в librdkafka, что в modern-cpp-kafka примерно никак. Для того, чтобы разобраться, что именно не так, необходимо: рассмотреть механизм работы самой Kafka; ознакомиться с реализацией функций отправки и приема в librdkafka и modern-cpp-kafka; понять, причем здесь владение; разработать способ сериализации и десериализации.
В конце статьи я приведу код собственного решения проблемы с сериализацией. Решение отчасти банальное и не годится для промышленного использования, но оно хорошо подойдет начинающим пользователям ввиду своей минималистичности и простоты.
Механизм работы с данными в Apache Kafka
Максимально кратко рассмотрим механизм добавления и извлечения данных в Apache Kafka. Данные доставляются, хранятся и извлекаются исключительно в двоичном представлении, что автоматически поднимает вопрос о том, как получить данные в нужном пользователю виде — то есть о сериализации и десериализации.
Как видно на скриншоте, при добавлении ключа и соответствующего ему значения требуется преобразовать их в двоичный вид (сериализовать), для чего используются сериализаторы как для ключа, так и значения. Эти задачи берет на себя Producer.
Ситуация с извлечением данных выглядит очень похоже.
На этом скриншоте показана схема работы Consumer-а, который занимается извлечением двоичных данных и преобразованием их в нужный тип (десериализацией).
Механизм работы с данными в librdkafka и modern-cpp-kafka
Библиотека librdkafka, будучи написанной на C, использует именно C-шный механизм так называемого стирания типов (type erasure) путем использования функции rd_kafka_produce, а также связки void-указателя и длины данных.
RD_EXPORT int rd_kafka_produce(rd_kafka_topic_t *rkt, int32_t partition, int msgflags, void *payload, size_t len, const void *key, size_t keylen, void *msg_opaque);
Критика такого подхода стара как сам язык C, однако это единственный доступный в C способ передавать любые данные, нужно лишь их сериализовать. Тем не менее, сама библиотека не предоставляет никаких инструментов для сериализации.
Не решает ее, увы, и библиотека modern-cpp-kafka. Рассмотрим функцию KafkaProducer::send для отправки сообщения:
inline void KafkaProducer::send(const producer::ProducerRecord& record, // Структура с данными (ключ, значение, хедеры, топик, партишен и id) const producer::Callback& deliveryCb, // Callback-функция SendOption option, // Опции для отправки ActionWhileQueueIsFull action); // Опции действия на заполненность очереди (блокировать или нет)
Нам нужен первый тип первого параметра — ProducerRecord. Его структура (не включая методы) выглядит так:
/** * A key/value pair to be sent to Kafka. * This consists of a topic name to which the record is being sent, an optional partition number, and an optional key and value. * Note: `ProducerRecord` would not take the ownership from the memory block of `Value`. */ class ProducerRecord { public: using Id = std::uint64_t; // конструкторы, геттеры, сеттеры private: Topic _topic; Partition _partition; Key _key; Value _value; Headers _headers; Optional<Id> _id; };
Типы Key и Value являются type alias-ами на тип ConstBuffer:
// Which is similar with `boost::const_buffer` (thus avoid the dependency towards `boost`) class ConstBuffer { public: // побайтовое копирование указателя explicit ConstBuffer(const void* data = nullptr, std::size_t size = 0): _data(data), _size(size) {} const void* data() const { return _data; } std::size_t size() const { return _size; } std::string toString() const { // опустим реализацию } private: const void* _data; std::size_t _size; };
Поля класса выглядят как-то знакомо, не правда ли? Да, это именно та связка void-указателя и длины, которая используется и в функции rd_kafka_produce.
Копирование указателей выполняется побайтово, из-за этого мы сталкиваемся с проблемой «висящего» указателя и, как следствие, use-after-free. Именно реализация класса ConstBuffer делает класс ProducerRecord невладеющим (и крайне неудобным в нетривиальных случаях), что и сказано прямиков в коментарии в описании класса (для удобства переведенного на русский):
Примечание:
ProducerRecordне будет принимать права владения блоком памятиValue.
Как мы видим, что в librdkafka, что в modern-cpp-kafka используется невладеющий механизм для передачи или хранения параметров.
Функции Serialize и Deserialize будут описаны ниже, в соответствующем пункте, пока же нам важно знать, что Serialize возврашает std::vector<std::byte>.Уже на этапе формировании данных для отправки возникает проблема с потерей данных:
// Хотим получить готовый объект из сериализованных данных // но получаем "висящий" указатель - увы и ах! template <typename T> ConstBuffer ValueFromNonOwning(const T& value) { const std::vector<byte> serialized = Serialize(value); // глубокого копирования не происходит - "висящий" указатель return ConstBuffer(serialized.data(), serialized.size()); } // Не спасет тут и умный указатель: ни на локальную переменную serialized // т.к. данные из умного указателя будут просто скопированы по значению // вместо создания копии; ни на сам возвращаемый объект ConstBuffer // (по изначальной причине) - и снова увы и ах! template <typename T> ConstBuffer ValueFromNonOwningUniquePtr(const T& value) { // выделяем память в куче (с shared_ptr та же история, ведь счетчик ссылок равен 0) const auto serialized = std::make_unique<std::vector<std::byte>>(Serialize(value)); // проблема остается, потому что теперь разрушается сам умный указатель return ConstBuffer(serialized->data(), serialized->size()); }
Владеющий ConstBuffer
Чтобы починить проблему с «висящим» указателем, мне пришла идея написать тот владеющий буфер, который будет гарантировать сохранение данных на протяжении всей жизни объекта. Так появился OwningBuffer:
// Владеющий аналог класса ConstBuffer class OwningBuffer { public: explicit OwningBuffer(const void* data = nullptr, const std::size_t size = 0) { if (data && size > 0) { m_rawData.assign(static_cast<const std::byte*>(data), static_cast<const std::byte*>(data) + size); } } explicit OwningBuffer(const ConstBuffer& buffer) : OwningBuffer(buffer.data(), buffer.size()) {} // конструктор копирования для создания из сырых данных explicit OwningBuffer(const std::vector<std::byte>& bytes) : m_rawData(bytes) {} // конструктор перемещения для создания из сырых данных explicit OwningBuffer(std::vector<std::byte>&& bytes) noexcept : m_rawData(std::move(bytes)) {} [[nodiscard]] const void* data() const { return m_rawData.data(); } [[nodiscard]] std::size_t size() const { return m_rawData.size(); } [[nodiscard]] std::string toString() const { // так как создание ConstBuffer не содержит сложных операций, // нет и оверхеда на создание временного объекта const ConstBuffer buffer(m_rawData.data(), m_rawData.size()); return buffer.toString(); } // получение всегда валидного объекта ConstBuffer [[nodiscard]] ConstBuffer asConstBuffer() const { return ConstBuffer(m_rawData.data(), m_rawData.size()); } private: // сырые данные объекта std::vector<std::byte> m_rawData; }; // Такой вариант функции работает корректно, никаких "висящих" указателей template <typename T> OwningBuffer ValueFrom(const T& value) { const auto serialized = Serialize(value); return OwningBuffer(serialized); }
Интерфейс класса аналогичен интерфейсу ConstBuffer, но содержит некоторые дополнения в виде новых конструкторов для создания объекта из сырых данных, а также вспомогательного метода для получения валидного объекта ConstBuffer. Владеющий подход полностью решает проблему с «висящими» указателями и use-after-free, и теперь можно перейти к сериализации и десериализации.
Сериализация и десериализация
Теперь, решив проблему с владением, можно перейти к самому главному — сериализации и десериализации данных. Несмотря на обилие библиотек, которые предоставляют такую функциональность, я не буду их использовать, чтобы не перегружать статью лишними зависимостями и усложнять материал разбором доступных средств для сериализации. Поэтому мы будем использовать старый добрый std::byte из C++17 в еще более старом std::vector и reinterpret_cast.
#include <cstring> #include <stdexcept> #include <type_traits> #include <vector> template <typename T> std::vector<std::byte> Serialize(const T& value) { static_assert(std::is_trivially_copyable_v<T>, "Type must be trivially copyable"); const auto begin = reinterpret_cast<const std::byte*>(&value); return {begin, begin + sizeof(T)}; } template <typename T> T Deserialize(const std::vector<std::byte>& serializedData) { static_assert(std::is_trivially_copyable_v<T>, "Type must be trivially copyable"); static_assert(std::is_default_constructible_v<T>, "Type must be default constructible"); // примечание: проверяется только размер объекта. // никто не мешает записать в int32_t содержимое std::string размером 5 байт if (serializedData.size() != sizeof(T)) { throw std::runtime_error("Serialized data size does not match target type size"); } T value; std::memcpy(&value, serializedData.data(), sizeof(T)); return value; } // специализация для std::string template <> inline std::vector<std::byte> Serialize<std::string>(const std::string& value) { auto begin = reinterpret_cast<const std::byte*>(value.data()); return {begin, begin + value.size()}; } // специализация для std::string template <> inline std::string Deserialize<std::string>( const std::vector<std::byte>& serializedData) { return {reinterpret_cast<const char*>(serializedData.data()), serializedData.size()}; }
В static_assert-ах, по большому счету, нет такой необходимости, ведь код и так и так не скомпилируется, если что-то пойдет не так, но всегда приятнее видеть красивые сообщения об ошибках, верно? Сеханизм сериализации банален: берем адрес объекта и прибавляем его размер, таким образом «охватывая» все содержимое объекта, на выходе получаем вектор байтов, то есть сырые данные объекта.
С десериализацией все немного сложнее (и не так гладко). Здесь мы используем функцию memcpy, чтобы заполнить созданный при помощи конструктора по умолчанию объект переданными сырыми байтами. Также нужно иметь в виду, что проверяются только размеры десериализуемого объекта, то есть можно записать объект, например, std::string в объект типа float и получить бессмыслицу.
Остается лишь создать обертки для сериализации и десериализации. В этом нам помогут функции ValueTo и ValueFrom:
// сериализация template <typename T> OwningBuffer ValueFrom(const T& value) { const auto serialized = Serialize(value); return OwningBuffer(serialized); } // десериализация template <typename T> T ValueTo(const Value& value) { if (!value.data()) { throw std::runtime_error("Received empty value"); } // преобразуем void* к const byte* и "захватываем" все значения const std::vector<std::byte> serializedData( static_cast<const std::byte*>(value.data()), static_cast<const std::byte*>(value.data()) + value.size()); // возвращаем объект, полученный при помощи десериализации return Deserialize<T>(serializedData); }
Демонстрация работы
В качестве примера рассмотрим классическую модель Single Producer — SIngle Consumer. Код Producer-а:
#include <kafka/KafkaProducer.h> #include <iostream> #include <string> #include "KafkaUtils.h" using namespace kafka::clients::producer; void SendValues(const kafka::Topic& topic, KafkaProducer& producer) { // Prepare delivery callback auto deliveryCb = [](const RecordMetadata& metadata, const kafka::Error& error) { if (!error) { std::cout << "Message delivered: " << metadata.toString() << std::endl; } else { std::cerr << "Message failed to be delivered: " << error.message() << std::endl; } }; { std::cout << "Sending messages with FLOAT type" << std::endl; constexpr float f = 2.34; kafka::extensions::SendValue(producer, topic, kafka::NullKey, kafka::extensions::ValueFrom(f).AsConstBuffer(), deliveryCb); } { std::cout << "Sending messages with STRINGS type" << std::endl; const auto values = std::vector<std::string>{"amogus", "breakpoint", "cappa", "delta", "extension", "final"}; for (const auto& value : values) { kafka::extensions::SendValue(producer, topic, kafka::NullKey, kafka::extensions::ValueFrom(value).AsConstBuffer(), deliveryCb); } } } void DoProducerWork() { // взято из конфигурации Docker Compose const std::string brokers = "localhost:29092"; const kafka::Topic topic = "test-topic"; // Prepare the configuration kafka::Properties props; props.put("bootstrap.servers", brokers); // Create a producer KafkaProducer producer(props); std::println(std::cout, "SendValues function"); SendValues(topic, producer); } int main() { try { DoProducerWork(); } catch (const std::exception& e) { std::cerr << "Error: " << e.what() << std::endl; return 1; } return 0; }
Код Consumer-а:
#include <KafkaUtils.h> #include <kafka/KafkaConsumer.h> #include <csignal> #include <iostream> #include <string> std::atomic_bool running = {true}; void StopRunning(int sig) { if (sig != SIGINT) return; if (running) { running = false; } else { // Restore the signal handler, -- to avoid stuck with this handler signal(SIGINT, SIG_IGN); // NOLINT } } void DoConsumerWork() { using namespace kafka; using namespace kafka::clients::consumer; const std::string brokers = "localhost:29092"; const Topic topic = "test-topic"; // Prepare the configuration Properties props; props.put("bootstrap.servers", brokers); // Create a consumer instance KafkaConsumer consumer(props); // Subscribe to topics consumer.subscribe({topic}); while (running) { // Poll messages from Kafka brokers for (const auto records = consumer.poll(std::chrono::milliseconds(100)); const auto& record : records) { if (record.error()) { std::cerr << record.toString() << std::endl; continue; } std::cout << "Got a new message..." << std::endl; std::cout << " Topic : " << record.topic() << std::endl; std::cout << " Partition: " << record.partition() << std::endl; std::cout << " Offset : " << record.offset() << std::endl; std::cout << " Timestamp: " << record.timestamp().toString() << std::endl; std::cout << " Headers : " << toString(record.headers()) << std::endl; try { const auto stringValue = kafka::extensions::ValueTo<std::string>(record.value()); std::cout << " STRING [" << stringValue << "]" << std::endl; } catch (const std::exception& e) { std::cerr << " Failed to deserialize as string: " << e.what() << std::endl; } try { const auto floatValue = kafka::extensions::ValueTo<float>(record.value()); std::cout << " FLOAT [" << floatValue << "]" << std::endl; } catch (const std::exception& e) { std::cerr << " Failed to deserialize as float: " << e.what() << std::endl; } } } } int main() { // Use Ctrl-C to terminate the program signal(SIGINT, StopRunning); // NOLINT DoConsumerWork(); return 0; }
Заключение
В этой статье я постарался совместить сразу несколько вещей: рассказать про Apache Kafka, про хранение данных внутри, про наличие определенных проблем и подходы к их решению. Написание текстов (помимо курсовых и дипломной работ) для меня в новинку, поэтому призываю каждого, кто прочитал, поделиться мыслями — с удовольствием почитаю и отвечу.
Демонстрацию сериализации и десериализации прямо в браузере можно посмотреть на сайте godbolt. Исходный код с настройкой Apache Kafka в Docker Compose находится в моем github-репозитории.
ссылка на оригинал статьи https://habr.com/ru/articles/880094/
Добавить комментарий