Прежде чем начать я бы хотел отметить, что это всего лишь небольшой туториал по быстрому старту для тех кто, как и я, ни разу не использовал Kafka на практике.
И так приступим!
Единственный брокер Kafka и необходимый для его работы ZooKeeper я буду запускать в Docker.
Сперва создам отдельную сеть kafkanet
docker network create kafkanet
Запуск контейнера с ZooKeeper
docker run -d --network=kafkanet --name=zookeeper -e ZOOKEEPER_CLIENT_PORT=2181 -e ZOOKEEPER_TICK_TIME=2000 -p 2181:2181 confluentinc/cp-zookeeper
Запуск контейнера с Kafka
docker run -d --network=kafkanet --name=kafka -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 -p 9092:9092 confluentinc/cp-kafka
Для того чтобы убедиться в отсутствии ошибок, можно вывести лог docker logs kafka
Далее проверю функционирование брокера Kafka, выполнив простые операции, включающие создание тестовой темы, генерацию сообщений и их потребление
Для этого сценария подключусь к контейнеру kafka
docker exec -it kafka bash
Создам топик demo-topic
/bin/kafka-topics --create --topic demo-topic --bootstrap-server kafka:9092
Выведу список всех топиков
/bin/kafka-topics --list --zookeeper zookeeper:2181
И выведу описание созданного топика
/bin/kafka-topics --describe --topic demo-topic --bootstrap-server kafka:9092
Сгенерирую несколько сообщений
/bin/kafka-console-producer --topic demo-topic --bootstrap-server kafka:9092
И после прочитаю эти сообщения
/bin/kafka-console-consumer --topic demo-topic --from-beginning --bootstrap-server kafka:9092
Далее я создам два небольших .NET приложения: KafkaProducer
, которое будет генерировать сообщения, и KafkaConsumer
, которое будет потреблять сообщения. Для реализации мне понадобятся пакеты Confluent.Kafka и Microsoft.Extensions.Hosting.
В проект KafkaProducer
добавлю класс KafkaProducerService
using Confluent.Kafka; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using System.Threading; using System.Threading.Tasks; namespace KafkaProducer { public class KafkaProducerService : IHostedService { private readonly ILogger<KafkaProducerService> _logger; private readonly IProducer<Null, string> _producer; public KafkaProducerService(ILogger<KafkaProducerService> logger) { _logger = logger; var config = new ProducerConfig { BootstrapServers = "localhost:9092" }; _producer = new ProducerBuilder<Null, string>(config).Build(); } public async Task StartAsync(CancellationToken cancellationToken) { for (var i = 0; i < 5; i++) { var value = $"Event N {i}"; _logger.LogInformation($"Sending >> {value}"); await _producer.ProduceAsync( "demo-topic", new Message<Null, string> { Value = value }, cancellationToken); } } public Task StopAsync(CancellationToken cancellationToken) { _producer?.Dispose(); _logger.LogInformation($"{nameof(KafkaProducerService)} stopped"); return Task.CompletedTask; } } }
Изменю файл Program.cs
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using System; namespace KafkaProducer { class Program { static void Main(string[] args) { CreateHostBuilder(args).Build().Run(); Console.ReadKey(); } private static IHostBuilder CreateHostBuilder(string[] args) => Host .CreateDefaultBuilder(args) .ConfigureServices((context, collection) => collection.AddHostedService<KafkaProducerService>()); } }
В проект KafkaConsumer
добавлю класс KafkaConsumerService
using Confluent.Kafka; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using System.Threading; using System.Threading.Tasks; namespace KafkaConsumer { public class KafkaConsumerService : IHostedService { private readonly ILogger<KafkaConsumerService> _logger; private readonly IConsumer<Ignore, string> _consumer; public KafkaConsumerService(ILogger<KafkaConsumerService> logger) { _logger = logger; var config = new ConsumerConfig { BootstrapServers = "localhost:9092", GroupId = "demo-group", AutoOffsetReset = AutoOffsetReset.Earliest }; _consumer = new ConsumerBuilder<Ignore, string>(config).Build(); } public Task StartAsync(CancellationToken cancellationToken) { _consumer.Subscribe("demo-topic"); while (!cancellationToken.IsCancellationRequested) { var consumeResult = _consumer.Consume(cancellationToken); _logger.LogInformation($"Received >> {consumeResult.Message.Value}"); } return Task.CompletedTask; } public Task StopAsync(CancellationToken cancellationToken) { _consumer?.Dispose(); _logger.LogInformation($"{nameof(KafkaConsumerService)} stopped"); return Task.CompletedTask; } } }
Изменю файл Program.cs
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using System; namespace KafkaConsumer { class Program { static void Main(string[] args) { CreateHostBuilder(args).Build().Run(); Console.ReadKey(); } private static IHostBuilder CreateHostBuilder(string[] args) => Host .CreateDefaultBuilder(args) .ConfigureServices((context, collection) => collection.AddHostedService<KafkaConsumerService>()); } }
Результат работы приложений (ссылка на репозиторий)
ссылка на оригинал статьи https://habr.com/ru/post/543732/
Добавить комментарий