Confluent.Kafka.DependencyInjection 3.0.0-test2
Kafka Dependency Injection
An extension of Confluent's Kafka client for use with Microsoft.Extensions.DependencyInjection (and friends).
Features
- Configure Kafka producers/consumers using
Microsoft.Extensions.DependencyInjection.IServiceCollection. - Default logging of asynchronous Kafka events through
Microsoft.Extensions.Logging.ILogger.
Installation
Add the NuGet package to your project:
$ dotnet add package Confluent.Kafka.DependencyInjection
Usage
Add a global Kafka client:
services.AddKafkaClient(new Dictionary<string, string>
{
{ "bootstrap.servers", "localhost:9092" },
{ "enable.idempotence", "true" },
{ "group.id", "group1" }
});
Alternatively, add typed clients with distinct configurations:
services.AddKafkaClient<MyService>(new ProducerConfig
{
BootstrapServers = "localhost:9092",
EnableIdempotence = true
});
services.AddKafkaClient<MyOtherService>(new ConsumerConfig
{
BootstrapServers = "somewhere.else:9092",
GroupId = "group1"
});
Optionally, configure message serialization:
// Use open generics to apply to all keys and values.
services.AddSingleton(typeof(IAsyncDeserializer<>), typeof(AvroDeserializer<>));
// Use closed generics to select type-specific serializers.
services.AddSingleton<IAsyncSerializer<MyType>, JsonSerializer<MyType>>();
// Synchronous serializers take precedence, if present.
services.AddSingleton(sp => sp.GetRequiredService<IAsyncSerializer<MyType>>().AsSyncOverAsync());
// Configure schema registry (required by some serializers).
services.AddSingleton<ISchemaRegistryClient>(sp =>
new CachedSchemaRegistryClient(new SchemaRegistryConfig
{
Url = "localhost:8081"
}));
Optionally, configure custom handlers for Kafka events:
services.AddTransient<IErrorHandler, MyHandler>()
.AddTransient<IStatisticsHandler, MyHandler>()
.AddTransient<ILogHandler, MyHandler>()
.AddTransient<IPartitionsAssignedHandler, MyHandler>()
.AddTransient<IPartitionsRevokedHandler, MyHandler>()
.AddTransient<IOffsetsCommittedHandler, MyHandler>();
Inject producers/consumers via constructor:
public MyService(IProducer<Null, string> producer)
{
// Producer is a singleton managed by the container.
this.producer = producer;
}
Alternatively, inject IKafkaFactory to override configuration and control lifespan:
using var consumer = factory.CreateConsumer<MyType, MyOtherType>(new ConsumerConfig
{
GroupId = "group2"
});
// ...
// Remember to close manually created consumers.
consumer.Close();
No packages depend on Confluent.Kafka.DependencyInjection.
.NET 6.0
- Confluent.Kafka (>= 2.0.2)
- Microsoft.Extensions.Configuration.Abstractions (>= 7.0.0)
- Microsoft.Extensions.DependencyInjection.Abstractions (>= 7.0.0)
- Microsoft.Extensions.Logging.Abstractions (>= 7.0.0)
- Microsoft.Extensions.Options (>= 7.0.1)
.NET Standard 2.0
- Confluent.Kafka (>= 2.0.2)
- Microsoft.Extensions.Configuration.Abstractions (>= 7.0.0)
- Microsoft.Extensions.DependencyInjection.Abstractions (>= 7.0.0)
- Microsoft.Extensions.Logging.Abstractions (>= 7.0.0)
- Microsoft.Extensions.Options (>= 7.0.1)
| Version | Downloads | Last updated |
|---|---|---|
| 4.1.0 | 0 | 01/10/2026 |
| 4.0.0 | 0 | 12/10/2025 |
| 3.4.0 | 0 | 07/19/2025 |
| 3.3.0 | 0 | 06/20/2025 |
| 3.2.0 | 0 | 03/15/2025 |
| 3.1.0 | 0 | 07/21/2023 |
| 3.0.2 | 0 | 07/21/2023 |
| 3.0.1 | 0 | 04/16/2023 |
| 3.0.0 | 0 | 03/28/2023 |
| 3.0.0-test3 | 0 | 03/27/2023 |
| 3.0.0-test2 | 1 | 06/07/2026 |
| 3.0.0-test | 0 | 03/10/2023 |
| 2.2.0 | 0 | 02/17/2023 |
| 2.1.2 | 0 | 02/02/2023 |
| 2.1.1 | 0 | 01/03/2023 |
| 2.1.0 | 0 | 11/03/2022 |
| 2.0.1 | 0 | 11/02/2022 |
| 2.0.1-test | 0 | 11/02/2022 |
| 2.0.0 | 0 | 06/21/2021 |
| 1.1.0 | 0 | 02/28/2021 |
| 1.0.0 | 0 | 01/08/2021 |
| 0.1.0 | 0 | 06/15/2020 |