Confluent.Kafka.DependencyInjection 3.4.0

Kafka Dependency Injection

An extension of Confluent's Kafka client for use with Microsoft.Extensions.DependencyInjection (and friends).

Features

  • Configure/resolve Kafka clients using the service container.
  • Load client config properties using Microsoft.Extensions.Configuration.
  • Automatically log client events using Microsoft.Extensions.Logging.

Installation

Add the NuGet package to your project:

$ dotnet add package Confluent.Kafka.DependencyInjection

Usage

Resolving clients

Kafka DI works out-of-the-box after registering services with an IServiceCollection.

services.AddKafkaClient();
services.AddTransient<MyService>();

Inject Kafka clients via constructor.

public MyService(IProducer<string, byte[]> producer, IConsumer<Ignore, MyType> consumer, IAdminClient adminClient)
{
    // Clients are singletons managed by the container.
    Producer = producer;
    Consumer = consumer;
    AdminClient = adminClient;
}

Configuring clients

Client configuration properties are bound to the Kafka section of .NET configuration providers, such as appsettings.json.

{
  "Kafka": {
    "Producer": {
      "bootstrap.servers": "localhost:9092",
      "transactional.id": "example"
    },
    "Consumer": {
      "bootstrap.servers": "localhost:9092",
      "group.id": "example"
    },
    "Admin": {
      "bootstrap.servers": "localhost:9092"
    }
  }
}

You can also specify configuration properties using the options pattern.

// Prepare consumers for manual offset storage.
services.Configure<ConsumerConfig>(x => x.EnableAutoOffsetStore = false);

Configure serialization by registering the appropriate interface.

// "Open" generic registrations apply to all key/value types (except built-in types).
services.AddTransient(typeof(IAsyncDeserializer<>), typeof(JsonDeserializer<>));

// Configure schema registry (required by Confluent serializers).
services.AddSingleton<ISchemaRegistryClient>(
    x => new CachedSchemaRegistryClient(new SchemaRegistryConfig { Url = "localhost:8081" }));

For advanced scenarios, implement IClientBuilderSetup to customize clients further.

class MyClientSetup : IClientBuilderSetup
{
    public void Apply<TKey, TValue>(ProducerBuilder<TKey, TValue> builder)
    {
        builder.SetStatisticsHandler(OnStatistics);
    }

    public void Apply<TKey, TValue>(ConsumerBuilder<TKey, TValue> builder)
    {
        builder.SetStatisticsHandler(OnStatistics);
    }

    public void Apply(AdminClientBuilder builder)
    {
        builder.SetStatisticsHandler(OnStatistics);
    }

    void OnStatistics(IClient client, string statistics)
    {
        Console.WriteLine($"New statistics available for {client.Name}");
    }
}

Register custom setup with services.

services.AddTransient<IClientBuilderSetup, MyClientSetup>();

Consumer hosting

Once the client is configured, a common pattern for consuming is to implement a .NET BackgroundService.

class MyWorker(IConsumer<Ignore, MyType> consumer) : BackgroundService
{
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        Console.WriteLine("Consumer service started.");

        // ConsumeAllAsync() is an extension provided by this library.
        await foreach (var result in consumer.ConsumeAllAsync().WithCancellation(stoppingToken))
        {
            Console.WriteLine($"Message {result.TopicPartitionOffset} processed.");
        }
    }
}

Register the service with the container.

services.AddHostedService<MyWorker>();

No packages depend on Confluent.Kafka.DependencyInjection.

Version Downloads Last updated
3.4.0 1 09/15/2025
3.3.0 0 06/20/2025
3.2.0 0 03/15/2025
3.1.0 0 07/21/2023
3.0.2 0 07/21/2023
3.0.1 0 04/16/2023
3.0.0 0 03/28/2023
3.0.0-test3 0 03/27/2023
3.0.0-test2 0 03/21/2023
3.0.0-test 0 03/10/2023
2.2.0 0 02/17/2023
2.1.2 0 02/02/2023
2.1.1 0 01/03/2023
2.1.0 0 11/03/2022
2.0.1 0 11/02/2022
2.0.1-test 0 11/02/2022
2.0.0 0 06/21/2021
1.1.0 0 02/28/2021
1.0.0 0 01/08/2021
0.1.0 0 06/15/2020