Confluent.Kafka.DependencyInjection 3.0.0-test3

Kafka Dependency Injection

An extension of Confluent's Kafka client for use with Microsoft.Extensions.DependencyInjection (and friends).

Features

  • Inject/resolve Kafka clients using the service container.
  • Configure Kafka clients using the options pattern.
  • Load client config properties using Microsoft.Extensions.Configuration.
  • Automatically log client events using Microsoft.Extensions.Logging.

Installation

Add the NuGet package to your project:

$ dotnet add package Confluent.Kafka.DependencyInjection

Usage

Resolving clients

Kafka DI works out-of-the-box after registering services with an IServiceCollection.

services.AddKafkaClient();
services.AddSingleton<MyService>();

Inject Kafka clients via constructor.

public MyService(IProducer<string, byte[]> producer, IConsumer<Ignore, MyType> consumer, IAdminClient adminClient)
{
    // Clients are singletons managed by the container.
    Producer = producer;
    Consumer = consumer;
    AdminClient = adminClient;
}

Configuring clients

Client config properties are bound to the Kafka section of .NET configuration providers, such as appsettings.json.

{
  "Kafka": {
    "Producer": {
      "bootstrap.servers": "localhost:9092",
      "transactional.id": "example"
    },
    "Consumer": {
      "bootstrap.servers": "localhost:9092",
      "group.id": "example"
    },
    "Admin": {
      "bootstrap.servers": "localhost:9092"
    }
  }
}

You can also configure KafkaClientOptions directly, including serialization and event handlers.

var builder = services.AddKafkaClient()

builder.Configure(
    options =>
    {
        // Config properties apply to all clients with a matching type (consumers, in this case).
        options.Configure(new ConsumerConfig { StatisticsIntervalMs = 5000 });

        // Optionally, configure handlers for asynchronous client events.
        options.OnStatistics((x, y) => Console.WriteLine(y));
    });

// Optionally, configure serialization for specific types.
builder.Configure<JsonDeserializer<MyType>>((x, y) => x.Deserialize(y));
services.AddSingleton(typeof(JsonDeserializer<>));

// Configure schema registry (required by some serializers).
services.AddSingleton<ISchemaRegistryClient>(
    x => new CachedSchemaRegistryClient(new SchemaRegistryConfig { Url = "localhost:8081" }));

No packages depend on Confluent.Kafka.DependencyInjection.

Version Downloads Last updated
4.1.0 0 01/10/2026
4.0.0 0 12/10/2025
3.4.0 0 07/19/2025
3.3.0 0 06/20/2025
3.2.0 0 03/15/2025
3.1.0 1 02/19/2026
3.0.2 0 07/21/2023
3.0.1 0 04/16/2023
3.0.0 1 02/19/2026
3.0.0-test3 1 02/20/2026
3.0.0-test2 0 03/21/2023
3.0.0-test 1 02/19/2026
2.2.0 0 02/17/2023
2.1.2 0 02/02/2023
2.1.1 0 01/03/2023
2.1.0 0 11/03/2022
2.0.1 0 11/02/2022
2.0.1-test 0 11/02/2022
2.0.0 0 06/21/2021
1.1.0 0 02/28/2021
1.0.0 0 01/08/2021
0.1.0 0 06/15/2020