Confluent.Kafka.DependencyInjection 3.0.0-test2

Kafka Dependency Injection

An extension of Confluent's Kafka client for use with Microsoft.Extensions.DependencyInjection (and friends).

Features

  • Configure Kafka producers/consumers using Microsoft.Extensions.DependencyInjection.IServiceCollection.
  • Default logging of asynchronous Kafka events through Microsoft.Extensions.Logging.ILogger.

Installation

Add the NuGet package to your project:

$ dotnet add package Confluent.Kafka.DependencyInjection

Usage

Add a global Kafka client:

services.AddKafkaClient(new Dictionary<string, string>
{
    { "bootstrap.servers", "localhost:9092" },
    { "enable.idempotence", "true" },
    { "group.id", "group1" }
});

Alternatively, add typed clients with distinct configurations:

services.AddKafkaClient<MyService>(new ProducerConfig
{
    BootstrapServers = "localhost:9092",
    EnableIdempotence = true
});

services.AddKafkaClient<MyOtherService>(new ConsumerConfig
{
    BootstrapServers = "somewhere.else:9092",
    GroupId = "group1"
});

Optionally, configure message serialization:

// Use open generics to apply to all keys and values.
services.AddSingleton(typeof(IAsyncDeserializer<>), typeof(AvroDeserializer<>));

// Use closed generics to select type-specific serializers.
services.AddSingleton<IAsyncSerializer<MyType>, JsonSerializer<MyType>>();

// Synchronous serializers take precedence, if present.
services.AddSingleton(sp => sp.GetRequiredService<IAsyncSerializer<MyType>>().AsSyncOverAsync());

// Configure schema registry (required by some serializers).
services.AddSingleton<ISchemaRegistryClient>(sp =>
    new CachedSchemaRegistryClient(new SchemaRegistryConfig
    {
        Url = "localhost:8081"
    }));

Optionally, configure custom handlers for Kafka events:

services.AddTransient<IErrorHandler, MyHandler>()
    .AddTransient<IStatisticsHandler, MyHandler>()
    .AddTransient<ILogHandler, MyHandler>()
    .AddTransient<IPartitionsAssignedHandler, MyHandler>()
    .AddTransient<IPartitionsRevokedHandler, MyHandler>()
    .AddTransient<IOffsetsCommittedHandler, MyHandler>();

Inject producers/consumers via constructor:

public MyService(IProducer<Null, string> producer)
{
    // Producer is a singleton managed by the container.
    this.producer = producer;
}

Alternatively, inject IKafkaFactory to override configuration and control lifespan:

using var consumer = factory.CreateConsumer<MyType, MyOtherType>(new ConsumerConfig
{
    GroupId = "group2"
});

// ...
// Remember to close manually created consumers.
consumer.Close();

No packages depend on Confluent.Kafka.DependencyInjection.

Version Downloads Last updated
4.1.0 0 01/10/2026
4.0.0 0 12/10/2025
3.4.0 0 07/19/2025
3.3.0 0 06/20/2025
3.2.0 0 03/15/2025
3.1.0 0 07/21/2023
3.0.2 0 07/21/2023
3.0.1 0 04/16/2023
3.0.0 0 03/28/2023
3.0.0-test3 0 03/27/2023
3.0.0-test2 1 06/07/2026
3.0.0-test 0 03/10/2023
2.2.0 0 02/17/2023
2.1.2 0 02/02/2023
2.1.1 0 01/03/2023
2.1.0 0 11/03/2022
2.0.1 0 11/02/2022
2.0.1-test 0 11/02/2022
2.0.0 0 06/21/2021
1.1.0 0 02/28/2021
1.0.0 0 01/08/2021
0.1.0 0 06/15/2020