Confluent.Kafka.DependencyInjection 3.4.0
Kafka Dependency Injection
An extension of Confluent's Kafka client for use with Microsoft.Extensions.DependencyInjection
(and friends).
Features
- Configure/resolve Kafka clients using the service container.
- Load client config properties using
Microsoft.Extensions.Configuration
. - Automatically log client events using
Microsoft.Extensions.Logging
.
Installation
Add the NuGet package to your project:
$ dotnet add package Confluent.Kafka.DependencyInjection
Usage
Resolving clients
Kafka DI works out-of-the-box after registering services with an IServiceCollection
.
services.AddKafkaClient();
services.AddTransient<MyService>();
Inject Kafka clients via constructor.
public MyService(IProducer<string, byte[]> producer, IConsumer<Ignore, MyType> consumer, IAdminClient adminClient)
{
// Clients are singletons managed by the container.
Producer = producer;
Consumer = consumer;
AdminClient = adminClient;
}
Configuring clients
Client configuration properties are bound to the Kafka
section of .NET configuration providers, such as appsettings.json
.
{
"Kafka": {
"Producer": {
"bootstrap.servers": "localhost:9092",
"transactional.id": "example"
},
"Consumer": {
"bootstrap.servers": "localhost:9092",
"group.id": "example"
},
"Admin": {
"bootstrap.servers": "localhost:9092"
}
}
}
You can also specify configuration properties using the options pattern.
// Prepare consumers for manual offset storage.
services.Configure<ConsumerConfig>(x => x.EnableAutoOffsetStore = false);
Configure serialization by registering the appropriate interface.
// "Open" generic registrations apply to all key/value types (except built-in types).
services.AddTransient(typeof(IAsyncDeserializer<>), typeof(JsonDeserializer<>));
// Configure schema registry (required by Confluent serializers).
services.AddSingleton<ISchemaRegistryClient>(
x => new CachedSchemaRegistryClient(new SchemaRegistryConfig { Url = "localhost:8081" }));
For advanced scenarios, implement IClientBuilderSetup
to customize clients further.
class MyClientSetup : IClientBuilderSetup
{
public void Apply<TKey, TValue>(ProducerBuilder<TKey, TValue> builder)
{
builder.SetStatisticsHandler(OnStatistics);
}
public void Apply<TKey, TValue>(ConsumerBuilder<TKey, TValue> builder)
{
builder.SetStatisticsHandler(OnStatistics);
}
public void Apply(AdminClientBuilder builder)
{
builder.SetStatisticsHandler(OnStatistics);
}
void OnStatistics(IClient client, string statistics)
{
Console.WriteLine($"New statistics available for {client.Name}");
}
}
Register custom setup with services.
services.AddTransient<IClientBuilderSetup, MyClientSetup>();
Consumer hosting
Once the client is configured, a common pattern for consuming is to implement a .NET BackgroundService
.
class MyWorker(IConsumer<Ignore, MyType> consumer) : BackgroundService
{
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
Console.WriteLine("Consumer service started.");
// ConsumeAllAsync() is an extension provided by this library.
await foreach (var result in consumer.ConsumeAllAsync().WithCancellation(stoppingToken))
{
Console.WriteLine($"Message {result.TopicPartitionOffset} processed.");
}
}
}
Register the service with the container.
services.AddHostedService<MyWorker>();
No packages depend on Confluent.Kafka.DependencyInjection.
.NET 6.0
- Confluent.Kafka (>= 2.11.0)
- Microsoft.Extensions.Configuration.Abstractions (>= 8.0.0)
- Microsoft.Extensions.DependencyInjection.Abstractions (>= 8.0.2)
- Microsoft.Extensions.Logging.Abstractions (>= 8.0.3)
- Microsoft.Extensions.Options (>= 8.0.2)
.NET 8.0
- Confluent.Kafka (>= 2.11.0)
- Microsoft.Extensions.Configuration.Abstractions (>= 8.0.0)
- Microsoft.Extensions.DependencyInjection.Abstractions (>= 8.0.2)
- Microsoft.Extensions.Logging.Abstractions (>= 8.0.3)
- Microsoft.Extensions.Options (>= 8.0.2)
.NET Standard 2.0
- Confluent.Kafka (>= 2.11.0)
- Microsoft.Extensions.Configuration.Abstractions (>= 8.0.0)
- Microsoft.Extensions.DependencyInjection.Abstractions (>= 8.0.2)
- Microsoft.Extensions.Logging.Abstractions (>= 8.0.3)
- Microsoft.Extensions.Options (>= 8.0.2)
Version | Downloads | Last updated |
---|---|---|
3.4.0 | 1 | 09/15/2025 |
3.3.0 | 0 | 06/20/2025 |
3.2.0 | 0 | 03/15/2025 |
3.1.0 | 0 | 07/21/2023 |
3.0.2 | 0 | 07/21/2023 |
3.0.1 | 0 | 04/16/2023 |
3.0.0 | 0 | 03/28/2023 |
3.0.0-test3 | 0 | 03/27/2023 |
3.0.0-test2 | 0 | 03/21/2023 |
3.0.0-test | 0 | 03/10/2023 |
2.2.0 | 0 | 02/17/2023 |
2.1.2 | 0 | 02/02/2023 |
2.1.1 | 0 | 01/03/2023 |
2.1.0 | 0 | 11/03/2022 |
2.0.1 | 0 | 11/02/2022 |
2.0.1-test | 0 | 11/02/2022 |
2.0.0 | 0 | 06/21/2021 |
1.1.0 | 0 | 02/28/2021 |
1.0.0 | 0 | 01/08/2021 |
0.1.0 | 0 | 06/15/2020 |