Avro serialization в Kafka

от автора

Здесь опишу пример сериализции данных через Avro и передача в Kafka. Для Avro есть сериализатор данных для Kafka, он использует в своей работе реестр (registry) схем и поддерживает версионность на отдельном развернутом сервере. Здесь будет только сериализатор, а версионность если потребуется, то например может быть реализована своя, например в БД.

Проект на Github

Вот так может выглядеть сериализованные данные подготовленные Avro. Есть заголовок-описание данных и далее сами данные. Получается компактно и быстро, отсутствуют повторяющиеся названия полей, формат данных бинарный. Данные проверяются при добавлении по схеме данных.

Пример схемы:

{"namespace": "avro", "type": "record", "name": "Person", "fields": [      {"name": "name", "type": "string"},      {"name": "age",  "type": ["int", "null"]} ] }

Используя Spring Shell, в первой команде добавляю в список лиц, проверяя по схеме Avro:

@ShellComponent public class Commands {      private List<GenericRecord> records = new ArrayList<>();      @ShellMethod("add user to list for send")     public void add(String name, int age) {         GenericRecord record = new GenericData.Record(SchemaRepository.instance().getSchemaObject());         record.put("name", name);         record.put("age", age);          records.add(record);     }

GenericRecord — это Avro запись которая формируется на основе схемы

public class SchemaRepository {      private static final String SCHEMA = "{\"namespace\": \"avro\",\n" +             "\"type\": \"record\",\n" +             "\"name\": \"Person\",\n" +             "\"fields\": [\n" +             "     {\"name\": \"name\", \"type\": \"string\"},\n" +             "     {\"name\": \"age\",  \"type\": [\"int\", \"null\"]}\n" +             "]\n" +             "}\n";      private static final Schema SCHEMA_OBJECT = new Schema.Parser().parse(SCHEMA);      private static SchemaRepository INSTANCE = new SchemaRepository();      public static SchemaRepository instance() {       return INSTANCE;     }      public Schema getSchemaObject() {         return SCHEMA_OBJECT;     }  }

Добавление в консоли shell лиц, и отправка в Kafka топик:

@ShellComponent public class Commands {      private List<GenericRecord> records = new ArrayList<>();      final private KafkaTemplate template;      public Commands(KafkaTemplate template) {       this.template = template;     }      @ShellMethod("send list users to Kafka")     public void send() {         template.setDefaultTopic("test");         template.sendDefault("1", records);         template.flush();         records.clear();     }

Вот сам Avro сериализатор для Kafka:

public class AvroGenericRecordSerializer implements Serializer<List<GenericRecord>> {      private Schema schema = null;      @Override public void configure(Map<String, ?> map, boolean b) {         schema = (Schema) map.get("SCHEMA");     }      @Override public byte[] serialize(String arg0, List<GenericRecord> records) {         byte[] retVal = null;          ByteArrayOutputStream outputStream = new ByteArrayOutputStream();         GenericDatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);          DataFileWriter dataFileWriter = new DataFileWriter<>(datumWriter);         try {             dataFileWriter.create(schema, outputStream);             for (GenericRecord record : records) {                 dataFileWriter.append(record);             }             dataFileWriter.flush();             dataFileWriter.close();             retVal = outputStream.toByteArray();         } catch (IOException e) {             e.printStackTrace();         }         return retVal;     }      @Override public void close() {     }  }

Конфигурация Kafka producer:

    @Bean     public Map<String, Object> producerConfigs() {         Map<String, Object> props = new HashMap<>();         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers().get(0));         props.put(ProducerConfig.RETRIES_CONFIG, 0);         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "com.example.model.AvroGenericRecordSerializer");         props.put("SCHEMA", SchemaRepository.instance().getSchemaObject());         return props;     }

Здесь указан класс сериализации — "com.example.model.AvroGenericRecordSerializer"
и новый параметр "SCHEMA" — это объект схема, он нужен в AvroGenericRecordSerializer в подготовке бинарных данных

На принимающей стороне в консоли видим принятые данные:

Avro Deserializer

public class AvroGenericRecordDeserializer implements Deserializer {      private Schema schema = null;      @Override     public void configure(Map configs, boolean isKey) {         schema = (Schema) configs.get("SCHEMA");     }      @Override     public Object deserialize(String s, byte[] bytes) {         DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema);         SeekableByteArrayInput arrayInput = new SeekableByteArrayInput(bytes);         List<GenericRecord> records = new ArrayList<>();          DataFileReader<GenericRecord> dataFileReader = null;         try {             dataFileReader = new DataFileReader<>(arrayInput, datumReader);             while (dataFileReader.hasNext()) {                 GenericRecord record = dataFileReader.next();                 records.add(record);             }         } catch (IOException e) {             e.printStackTrace();         }         return records;      }  }

И аналогичный Kafka consumer:

    @Bean     public Map<String, Object> consumerConfigs() {         Map<String, Object> props = new HashMap<>();         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers().get(0));         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "com.example.model.AvroGenericRecordDeserializer");         props.put("SCHEMA", SchemaRepository.instance().getSchemaObject());         return props;     }

Kafka использовал из Docker wurstmeister/kafka-docker, можно любую другую

Проект на Github

avro.apache

ссылка на оригинал статьи https://habr.com/ru/post/492312/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *