First touch of Kafka

от автора

Прежде чем начать я бы хотел отметить, что это всего лишь небольшой туториал по быстрому старту для тех кто, как и я, ни разу не использовал Kafka на практике.

И так приступим!

Единственный брокер Kafka и необходимый для его работы ZooKeeper я буду запускать в Docker.

Сперва создам отдельную сеть kafkanet

docker network create kafkanet

Запуск контейнера с ZooKeeper

docker run -d --network=kafkanet --name=zookeeper -e ZOOKEEPER_CLIENT_PORT=2181 -e ZOOKEEPER_TICK_TIME=2000 -p 2181:2181 confluentinc/cp-zookeeper

Запуск контейнера с Kafka

docker run -d --network=kafkanet --name=kafka -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 -p 9092:9092 confluentinc/cp-kafka

Для того чтобы убедиться в отсутствии ошибок, можно вывести лог docker logs kafka

Далее проверю функционирование брокера Kafka, выполнив простые операции, включающие создание тестовой темы, генерацию сообщений и их потребление

Для этого сценария подключусь к контейнеру kafka

docker exec -it kafka bash

Создам топик demo-topic

/bin/kafka-topics --create --topic demo-topic --bootstrap-server kafka:9092

Выведу список всех топиков

/bin/kafka-topics --list --zookeeper zookeeper:2181

И выведу описание созданного топика

/bin/kafka-topics --describe --topic demo-topic --bootstrap-server kafka:9092

Сгенерирую несколько сообщений

/bin/kafka-console-producer --topic demo-topic --bootstrap-server kafka:9092

И после прочитаю эти сообщения

/bin/kafka-console-consumer --topic demo-topic --from-beginning --bootstrap-server kafka:9092

Далее я создам два небольших .NET приложения: KafkaProducer, которое будет генерировать сообщения, и KafkaConsumer, которое будет потреблять сообщения. Для реализации мне понадобятся пакеты Confluent.Kafka и Microsoft.Extensions.Hosting.

В проект KafkaProducer добавлю класс KafkaProducerService

using Confluent.Kafka; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using System.Threading; using System.Threading.Tasks;  namespace KafkaProducer {     public class KafkaProducerService : IHostedService     {         private readonly ILogger<KafkaProducerService> _logger;         private readonly IProducer<Null, string> _producer;          public KafkaProducerService(ILogger<KafkaProducerService> logger)         {             _logger = logger;             var config = new ProducerConfig             {                 BootstrapServers = "localhost:9092"             };             _producer = new ProducerBuilder<Null, string>(config).Build();         }          public async Task StartAsync(CancellationToken cancellationToken)         {             for (var i = 0; i < 5; i++)             {                 var value = $"Event N {i}";                 _logger.LogInformation($"Sending >> {value}");                 await _producer.ProduceAsync(                     "demo-topic",                     new Message<Null, string> { Value = value },                     cancellationToken);             }         }          public Task StopAsync(CancellationToken cancellationToken)         {             _producer?.Dispose();             _logger.LogInformation($"{nameof(KafkaProducerService)} stopped");             return Task.CompletedTask;         }     } }

Изменю файл Program.cs

using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using System;  namespace KafkaProducer {     class Program     {         static void Main(string[] args)         {             CreateHostBuilder(args).Build().Run();             Console.ReadKey();         }          private static IHostBuilder CreateHostBuilder(string[] args) =>             Host                 .CreateDefaultBuilder(args)                 .ConfigureServices((context, collection) =>                     collection.AddHostedService<KafkaProducerService>());     } }

В проект KafkaConsumer добавлю класс KafkaConsumerService

using Confluent.Kafka; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using System.Threading; using System.Threading.Tasks;  namespace KafkaConsumer {     public class KafkaConsumerService : IHostedService     {         private readonly ILogger<KafkaConsumerService> _logger;         private readonly IConsumer<Ignore, string> _consumer;          public KafkaConsumerService(ILogger<KafkaConsumerService> logger)         {             _logger = logger;             var config = new ConsumerConfig             {                 BootstrapServers = "localhost:9092",                 GroupId = "demo-group",                 AutoOffsetReset = AutoOffsetReset.Earliest             };             _consumer = new ConsumerBuilder<Ignore, string>(config).Build();         }          public Task StartAsync(CancellationToken cancellationToken)         {             _consumer.Subscribe("demo-topic");             while (!cancellationToken.IsCancellationRequested)             {                 var consumeResult = _consumer.Consume(cancellationToken);                 _logger.LogInformation($"Received >> {consumeResult.Message.Value}");             }             return Task.CompletedTask;         }          public Task StopAsync(CancellationToken cancellationToken)         {             _consumer?.Dispose();             _logger.LogInformation($"{nameof(KafkaConsumerService)} stopped");             return Task.CompletedTask;         }     } }

Изменю файл Program.cs

using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using System;  namespace KafkaConsumer {     class Program     {         static void Main(string[] args)         {             CreateHostBuilder(args).Build().Run();             Console.ReadKey();         }          private static IHostBuilder CreateHostBuilder(string[] args) =>             Host                 .CreateDefaultBuilder(args)                 .ConfigureServices((context, collection) =>                     collection.AddHostedService<KafkaConsumerService>());     } }

Результат работы приложений (ссылка на репозиторий)

ссылка на оригинал статьи https://habr.com/ru/post/543732/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *