{"id":300167,"date":"2020-03-15T21:00:17","date_gmt":"2020-03-15T21:00:17","guid":{"rendered":"http:\/\/savepearlharbor.com\/?p=300167"},"modified":"-0001-11-30T00:00:00","modified_gmt":"-0001-11-29T21:00:00","slug":"","status":"publish","type":"post","link":"https:\/\/savepearlharbor.com\/?p=300167","title":{"rendered":"Avro serialization \u0432 Kafka"},"content":{"rendered":"\n<div class=\"post__text post__text-html post__text_v1\" id=\"post-content-body\" data-io-article-url=\"https:\/\/habr.com\/ru\/post\/492312\/\">\n<p>\u0417\u0434\u0435\u0441\u044c \u043e\u043f\u0438\u0448\u0443 \u043f\u0440\u0438\u043c\u0435\u0440 \u0441\u0435\u0440\u0438\u0430\u043b\u0438\u0437\u0446\u0438\u0438 \u0434\u0430\u043d\u043d\u044b\u0445 \u0447\u0435\u0440\u0435\u0437 Avro \u0438 \u043f\u0435\u0440\u0435\u0434\u0430\u0447\u0430 \u0432 Kafka. \u0414\u043b\u044f Avro \u0435\u0441\u0442\u044c \u0441\u0435\u0440\u0438\u0430\u043b\u0438\u0437\u0430\u0442\u043e\u0440 \u0434\u0430\u043d\u043d\u044b\u0445 \u0434\u043b\u044f Kafka, \u043e\u043d \u0438\u0441\u043f\u043e\u043b\u044c\u0437\u0443\u0435\u0442 \u0432 \u0441\u0432\u043e\u0435\u0439 \u0440\u0430\u0431\u043e\u0442\u0435 \u0440\u0435\u0435\u0441\u0442\u0440 (registry) \u0441\u0445\u0435\u043c \u0438 \u043f\u043e\u0434\u0434\u0435\u0440\u0436\u0438\u0432\u0430\u0435\u0442 \u0432\u0435\u0440\u0441\u0438\u043e\u043d\u043d\u043e\u0441\u0442\u044c \u043d\u0430 \u043e\u0442\u0434\u0435\u043b\u044c\u043d\u043e\u043c \u0440\u0430\u0437\u0432\u0435\u0440\u043d\u0443\u0442\u043e\u043c \u0441\u0435\u0440\u0432\u0435\u0440\u0435. \u0417\u0434\u0435\u0441\u044c \u0431\u0443\u0434\u0435\u0442 \u0442\u043e\u043b\u044c\u043a\u043e \u0441\u0435\u0440\u0438\u0430\u043b\u0438\u0437\u0430\u0442\u043e\u0440, \u0430 \u0432\u0435\u0440\u0441\u0438\u043e\u043d\u043d\u043e\u0441\u0442\u044c \u0435\u0441\u043b\u0438 \u043f\u043e\u0442\u0440\u0435\u0431\u0443\u0435\u0442\u0441\u044f, \u0442\u043e \u043d\u0430\u043f\u0440\u0438\u043c\u0435\u0440 \u043c\u043e\u0436\u0435\u0442 \u0431\u044b\u0442\u044c \u0440\u0435\u0430\u043b\u0438\u0437\u043e\u0432\u0430\u043d\u0430 \u0441\u0432\u043e\u044f, \u043d\u0430\u043f\u0440\u0438\u043c\u0435\u0440 \u0432 \u0411\u0414. <\/p>\n<p><a name=\"habracut\"><\/a>  <\/p>\n<p><a href=\"https:\/\/github.com\/avrylkov\/avro-kafka\" rel=\"nofollow\">\u041f\u0440\u043e\u0435\u043a\u0442 \u043d\u0430 Github<\/a><\/p>\n<p>  <img decoding=\"async\" src=\"https:\/\/habrastorage.org\/webt\/wq\/qx\/-h\/wqqx-hr2x3qaa1ncobvewatfley.jpeg\">   <\/p>\n<p>\u0412\u043e\u0442 \u0442\u0430\u043a \u043c\u043e\u0436\u0435\u0442 \u0432\u044b\u0433\u043b\u044f\u0434\u0435\u0442\u044c \u0441\u0435\u0440\u0438\u0430\u043b\u0438\u0437\u043e\u0432\u0430\u043d\u043d\u044b\u0435 \u0434\u0430\u043d\u043d\u044b\u0435 \u043f\u043e\u0434\u0433\u043e\u0442\u043e\u0432\u043b\u0435\u043d\u043d\u044b\u0435 Avro. \u0415\u0441\u0442\u044c \u0437\u0430\u0433\u043e\u043b\u043e\u0432\u043e\u043a-\u043e\u043f\u0438\u0441\u0430\u043d\u0438\u0435 \u0434\u0430\u043d\u043d\u044b\u0445 \u0438 \u0434\u0430\u043b\u0435\u0435 \u0441\u0430\u043c\u0438 \u0434\u0430\u043d\u043d\u044b\u0435. \u041f\u043e\u043b\u0443\u0447\u0430\u0435\u0442\u0441\u044f \u043a\u043e\u043c\u043f\u0430\u043a\u0442\u043d\u043e \u0438 \u0431\u044b\u0441\u0442\u0440\u043e, \u043e\u0442\u0441\u0443\u0442\u0441\u0442\u0432\u0443\u044e\u0442 \u043f\u043e\u0432\u0442\u043e\u0440\u044f\u044e\u0449\u0438\u0435\u0441\u044f \u043d\u0430\u0437\u0432\u0430\u043d\u0438\u044f \u043f\u043e\u043b\u0435\u0439, \u0444\u043e\u0440\u043c\u0430\u0442 \u0434\u0430\u043d\u043d\u044b\u0445 \u0431\u0438\u043d\u0430\u0440\u043d\u044b\u0439. \u0414\u0430\u043d\u043d\u044b\u0435 \u043f\u0440\u043e\u0432\u0435\u0440\u044f\u044e\u0442\u0441\u044f \u043f\u0440\u0438 \u0434\u043e\u0431\u0430\u0432\u043b\u0435\u043d\u0438\u0438 \u043f\u043e \u0441\u0445\u0435\u043c\u0435 \u0434\u0430\u043d\u043d\u044b\u0445.<\/p>\n<p>  <\/p>\n<p>\u041f\u0440\u0438\u043c\u0435\u0440 \u0441\u0445\u0435\u043c\u044b:<\/p>\n<p>  <\/p>\n<pre><code class=\"plaintext\">{&quot;namespace&quot;: &quot;avro&quot;, &quot;type&quot;: &quot;record&quot;, &quot;name&quot;: &quot;Person&quot;, &quot;fields&quot;: [      {&quot;name&quot;: &quot;name&quot;, &quot;type&quot;: &quot;string&quot;},      {&quot;name&quot;: &quot;age&quot;,  &quot;type&quot;: [&quot;int&quot;, &quot;null&quot;]} ] }<\/code><\/pre>\n<p>  <\/p>\n<p>\u0418\u0441\u043f\u043e\u043b\u044c\u0437\u0443\u044f Spring Shell, \u0432 \u043f\u0435\u0440\u0432\u043e\u0439 \u043a\u043e\u043c\u0430\u043d\u0434\u0435 \u0434\u043e\u0431\u0430\u0432\u043b\u044f\u044e \u0432 \u0441\u043f\u0438\u0441\u043e\u043a \u043b\u0438\u0446, \u043f\u0440\u043e\u0432\u0435\u0440\u044f\u044f \u043f\u043e \u0441\u0445\u0435\u043c\u0435 Avro:<\/p>\n<p>  <\/p>\n<pre><code class=\"java\">@ShellComponent public class Commands {      private List&lt;GenericRecord&gt; records = new ArrayList&lt;&gt;();      @ShellMethod(&quot;add user to list for send&quot;)     public void add(String name, int age) {         GenericRecord record = new GenericData.Record(SchemaRepository.instance().getSchemaObject());         record.put(&quot;name&quot;, name);         record.put(&quot;age&quot;, age);          records.add(record);     }<\/code><\/pre>\n<p>  <\/p>\n<p>GenericRecord \u2014 \u044d\u0442\u043e Avro \u0437\u0430\u043f\u0438\u0441\u044c \u043a\u043e\u0442\u043e\u0440\u0430\u044f \u0444\u043e\u0440\u043c\u0438\u0440\u0443\u0435\u0442\u0441\u044f \u043d\u0430 \u043e\u0441\u043d\u043e\u0432\u0435 \u0441\u0445\u0435\u043c\u044b<\/p>\n<p>  <\/p>\n<pre><code class=\"java\">public class SchemaRepository {      private static final String SCHEMA = &quot;{\\&quot;namespace\\&quot;: \\&quot;avro\\&quot;,\\n&quot; +             &quot;\\&quot;type\\&quot;: \\&quot;record\\&quot;,\\n&quot; +             &quot;\\&quot;name\\&quot;: \\&quot;Person\\&quot;,\\n&quot; +             &quot;\\&quot;fields\\&quot;: [\\n&quot; +             &quot;     {\\&quot;name\\&quot;: \\&quot;name\\&quot;, \\&quot;type\\&quot;: \\&quot;string\\&quot;},\\n&quot; +             &quot;     {\\&quot;name\\&quot;: \\&quot;age\\&quot;,  \\&quot;type\\&quot;: [\\&quot;int\\&quot;, \\&quot;null\\&quot;]}\\n&quot; +             &quot;]\\n&quot; +             &quot;}\\n&quot;;      private static final Schema SCHEMA_OBJECT = new Schema.Parser().parse(SCHEMA);      private static SchemaRepository INSTANCE = new SchemaRepository();      public static SchemaRepository instance() {       return INSTANCE;     }      public Schema getSchemaObject() {         return SCHEMA_OBJECT;     }  }<\/code><\/pre>\n<p>  <\/p>\n<p>\u0414\u043e\u0431\u0430\u0432\u043b\u0435\u043d\u0438\u0435 \u0432 \u043a\u043e\u043d\u0441\u043e\u043b\u0438 shell \u043b\u0438\u0446, \u0438 \u043e\u0442\u043f\u0440\u0430\u0432\u043a\u0430 \u0432 Kafka \u0442\u043e\u043f\u0438\u043a:<\/p>\n<p>  <img decoding=\"async\" src=\"https:\/\/habrastorage.org\/webt\/yy\/zc\/zs\/yyzczslpgepbwrtvcnvwjvlxvgk.jpeg\">  <\/p>\n<pre><code class=\"java\">@ShellComponent public class Commands {      private List&lt;GenericRecord&gt; records = new ArrayList&lt;&gt;();      final private KafkaTemplate template;      public Commands(KafkaTemplate template) {       this.template = template;     }      @ShellMethod(&quot;send list users to Kafka&quot;)     public void send() {         template.setDefaultTopic(&quot;test&quot;);         template.sendDefault(&quot;1&quot;, records);         template.flush();         records.clear();     }<\/code><\/pre>\n<p>  <\/p>\n<p>\u0412\u043e\u0442 \u0441\u0430\u043c Avro \u0441\u0435\u0440\u0438\u0430\u043b\u0438\u0437\u0430\u0442\u043e\u0440 \u0434\u043b\u044f Kafka:<\/p>\n<p>  <\/p>\n<pre><code class=\"java\">public class AvroGenericRecordSerializer implements Serializer&lt;List&lt;GenericRecord&gt;&gt; {      private Schema schema = null;      @Override public void configure(Map&lt;String, ?&gt; map, boolean b) {         schema = (Schema) map.get(&quot;SCHEMA&quot;);     }      @Override public byte[] serialize(String arg0, List&lt;GenericRecord&gt; records) {         byte[] retVal = null;          ByteArrayOutputStream outputStream = new ByteArrayOutputStream();         GenericDatumWriter&lt;GenericRecord&gt; datumWriter = new GenericDatumWriter&lt;&gt;(schema);          DataFileWriter dataFileWriter = new DataFileWriter&lt;&gt;(datumWriter);         try {             dataFileWriter.create(schema, outputStream);             for (GenericRecord record : records) {                 dataFileWriter.append(record);             }             dataFileWriter.flush();             dataFileWriter.close();             retVal = outputStream.toByteArray();         } catch (IOException e) {             e.printStackTrace();         }         return retVal;     }      @Override public void close() {     }  }<\/code><\/pre>\n<p>  <\/p>\n<p>\u041a\u043e\u043d\u0444\u0438\u0433\u0443\u0440\u0430\u0446\u0438\u044f Kafka producer:<\/p>\n<p>  <\/p>\n<pre><code class=\"java\">    @Bean     public Map&lt;String, Object&gt; producerConfigs() {         Map&lt;String, Object&gt; props = new HashMap&lt;&gt;();         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers().get(0));         props.put(ProducerConfig.RETRIES_CONFIG, 0);         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, &quot;com.example.model.AvroGenericRecordSerializer&quot;);         props.put(&quot;SCHEMA&quot;, SchemaRepository.instance().getSchemaObject());         return props;     }<\/code><\/pre>\n<p>  <\/p>\n<p>\u0417\u0434\u0435\u0441\u044c \u0443\u043a\u0430\u0437\u0430\u043d \u043a\u043b\u0430\u0441\u0441 \u0441\u0435\u0440\u0438\u0430\u043b\u0438\u0437\u0430\u0446\u0438\u0438 \u2014 &quot;com.example.model.AvroGenericRecordSerializer&quot;<br \/>  \u0438 \u043d\u043e\u0432\u044b\u0439 \u043f\u0430\u0440\u0430\u043c\u0435\u0442\u0440 &quot;SCHEMA&quot; \u2014 \u044d\u0442\u043e \u043e\u0431\u044a\u0435\u043a\u0442 \u0441\u0445\u0435\u043c\u0430, \u043e\u043d \u043d\u0443\u0436\u0435\u043d \u0432 AvroGenericRecordSerializer \u0432 \u043f\u043e\u0434\u0433\u043e\u0442\u043e\u0432\u043a\u0435 \u0431\u0438\u043d\u0430\u0440\u043d\u044b\u0445 \u0434\u0430\u043d\u043d\u044b\u0445<\/p>\n<p>  <\/p>\n<p>\u041d\u0430 \u043f\u0440\u0438\u043d\u0438\u043c\u0430\u044e\u0449\u0435\u0439 \u0441\u0442\u043e\u0440\u043e\u043d\u0435 \u0432 \u043a\u043e\u043d\u0441\u043e\u043b\u0438 \u0432\u0438\u0434\u0438\u043c \u043f\u0440\u0438\u043d\u044f\u0442\u044b\u0435 \u0434\u0430\u043d\u043d\u044b\u0435:<\/p>\n<p>  <img decoding=\"async\" src=\"https:\/\/habrastorage.org\/webt\/qu\/l8\/il\/qul8ilf8_nekkiexezjjjbdyph4.jpeg\">  <\/p>\n<p>Avro Deserializer<\/p>\n<p>  <\/p>\n<pre><code class=\"java\">public class AvroGenericRecordDeserializer implements Deserializer {      private Schema schema = null;      @Override     public void configure(Map configs, boolean isKey) {         schema = (Schema) configs.get(&quot;SCHEMA&quot;);     }      @Override     public Object deserialize(String s, byte[] bytes) {         DatumReader&lt;GenericRecord&gt; datumReader = new GenericDatumReader&lt;&gt;(schema);         SeekableByteArrayInput arrayInput = new SeekableByteArrayInput(bytes);         List&lt;GenericRecord&gt; records = new ArrayList&lt;&gt;();          DataFileReader&lt;GenericRecord&gt; dataFileReader = null;         try {             dataFileReader = new DataFileReader&lt;&gt;(arrayInput, datumReader);             while (dataFileReader.hasNext()) {                 GenericRecord record = dataFileReader.next();                 records.add(record);             }         } catch (IOException e) {             e.printStackTrace();         }         return records;      }  }<\/code><\/pre>\n<p>  <\/p>\n<p>\u0418 \u0430\u043d\u0430\u043b\u043e\u0433\u0438\u0447\u043d\u044b\u0439 Kafka consumer:<\/p>\n<p>  <\/p>\n<pre><code class=\"java\">    @Bean     public Map&lt;String, Object&gt; consumerConfigs() {         Map&lt;String, Object&gt; props = new HashMap&lt;&gt;();         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers().get(0));         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, &quot;com.example.model.AvroGenericRecordDeserializer&quot;);         props.put(&quot;SCHEMA&quot;, SchemaRepository.instance().getSchemaObject());         return props;     }<\/code><\/pre>\n<p>  <\/p>\n<p>Kafka \u0438\u0441\u043f\u043e\u043b\u044c\u0437\u043e\u0432\u0430\u043b \u0438\u0437 Docker <a href=\"https:\/\/github.com\/wurstmeister\/kafka-docker\" rel=\"nofollow\">wurstmeister\/kafka-docker<\/a>, \u043c\u043e\u0436\u043d\u043e \u043b\u044e\u0431\u0443\u044e \u0434\u0440\u0443\u0433\u0443\u044e<\/p>\n<p>  <\/p>\n<p><a href=\"https:\/\/github.com\/avrylkov\/avro-kafka\" rel=\"nofollow\">\u041f\u0440\u043e\u0435\u043a\u0442 \u043d\u0430 Github<\/a><\/p>\n<p>  <\/p>\n<p><a href=\"https:\/\/avro.apache.org\/docs\/current\/gettingstartedjava.html\" rel=\"nofollow\">avro.apache<\/a><\/p>\n<\/div>\n<p> \u0441\u0441\u044b\u043b\u043a\u0430 \u043d\u0430 \u043e\u0440\u0438\u0433\u0438\u043d\u0430\u043b \u0441\u0442\u0430\u0442\u044c\u0438 <a href=\"https:\/\/habr.com\/ru\/post\/492312\/\"> https:\/\/habr.com\/ru\/post\/492312\/<\/a><\/p>\n","protected":false},"excerpt":{"rendered":"\n<div class=\"post__text post__text-html post__text_v1\" id=\"post-content-body\" data-io-article-url=\"https:\/\/habr.com\/ru\/post\/492312\/\">\n<p>\u0417\u0434\u0435\u0441\u044c \u043e\u043f\u0438\u0448\u0443 \u043f\u0440\u0438\u043c\u0435\u0440 \u0441\u0435\u0440\u0438\u0430\u043b\u0438\u0437\u0446\u0438\u0438 \u0434\u0430\u043d\u043d\u044b\u0445 \u0447\u0435\u0440\u0435\u0437 Avro \u0438 \u043f\u0435\u0440\u0435\u0434\u0430\u0447\u0430 \u0432 Kafka. \u0414\u043b\u044f Avro \u0435\u0441\u0442\u044c \u0441\u0435\u0440\u0438\u0430\u043b\u0438\u0437\u0430\u0442\u043e\u0440 \u0434\u0430\u043d\u043d\u044b\u0445 \u0434\u043b\u044f Kafka, \u043e\u043d \u0438\u0441\u043f\u043e\u043b\u044c\u0437\u0443\u0435\u0442 \u0432 \u0441\u0432\u043e\u0435\u0439 \u0440\u0430\u0431\u043e\u0442\u0435 \u0440\u0435\u0435\u0441\u0442\u0440 (registry) \u0441\u0445\u0435\u043c \u0438 \u043f\u043e\u0434\u0434\u0435\u0440\u0436\u0438\u0432\u0430\u0435\u0442 \u0432\u0435\u0440\u0441\u0438\u043e\u043d\u043d\u043e\u0441\u0442\u044c \u043d\u0430 \u043e\u0442\u0434\u0435\u043b\u044c\u043d\u043e\u043c \u0440\u0430\u0437\u0432\u0435\u0440\u043d\u0443\u0442\u043e\u043c \u0441\u0435\u0440\u0432\u0435\u0440\u0435. \u0417\u0434\u0435\u0441\u044c \u0431\u0443\u0434\u0435\u0442 \u0442\u043e\u043b\u044c\u043a\u043e \u0441\u0435\u0440\u0438\u0430\u043b\u0438\u0437\u0430\u0442\u043e\u0440, \u0430 \u0432\u0435\u0440\u0441\u0438\u043e\u043d\u043d\u043e\u0441\u0442\u044c \u0435\u0441\u043b\u0438 \u043f\u043e\u0442\u0440\u0435\u0431\u0443\u0435\u0442\u0441\u044f, \u0442\u043e \u043d\u0430\u043f\u0440\u0438\u043c\u0435\u0440 \u043c\u043e\u0436\u0435\u0442 \u0431\u044b\u0442\u044c \u0440\u0435\u0430\u043b\u0438\u0437\u043e\u0432\u0430\u043d\u0430 \u0441\u0432\u043e\u044f, \u043d\u0430\u043f\u0440\u0438\u043c\u0435\u0440 \u0432 \u0411\u0414. <\/p>\n","protected":false},"author":1,"featured_media":0,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[],"tags":[],"class_list":["post-300167","post","type-post","status-publish","format-standard","hentry"],"_links":{"self":[{"href":"https:\/\/savepearlharbor.com\/index.php?rest_route=\/wp\/v2\/posts\/300167","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/savepearlharbor.com\/index.php?rest_route=\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/savepearlharbor.com\/index.php?rest_route=\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/savepearlharbor.com\/index.php?rest_route=\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/savepearlharbor.com\/index.php?rest_route=%2Fwp%2Fv2%2Fcomments&post=300167"}],"version-history":[{"count":0,"href":"https:\/\/savepearlharbor.com\/index.php?rest_route=\/wp\/v2\/posts\/300167\/revisions"}],"wp:attachment":[{"href":"https:\/\/savepearlharbor.com\/index.php?rest_route=%2Fwp%2Fv2%2Fmedia&parent=300167"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/savepearlharbor.com\/index.php?rest_route=%2Fwp%2Fv2%2Fcategories&post=300167"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/savepearlharbor.com\/index.php?rest_route=%2Fwp%2Fv2%2Ftags&post=300167"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}