Microsoft.Orleans.DurableJobs 10.0.0-rc.2.alpha.1
Microsoft Orleans Durable Jobs
Introduction
Microsoft Orleans Durable Jobs provides a distributed, scalable system for scheduling one-time jobs that execute at a specific time. Unlike Orleans Reminders which are designed for recurring tasks, Durable Jobs are ideal for one-time future events such as appointment notifications, delayed processing, scheduled workflow steps, and time-based triggers.
Key Features:
- At Least One-time Execution: Jobs are scheduled to run at least once
- Persistent: Jobs survive grain deactivation and silo restarts
- Distributed: Jobs are automatically distributed and rebalanced across silos
- Reliable: Failed jobs can be automatically retried with configurable policies
- Rich Metadata: Associate custom metadata with each job
- Cancellable: Jobs can be canceled before execution
Getting Started
Installation
To use this package, install it via NuGet:
dotnet add package Microsoft.Orleans.DurableJobs
For production scenarios with persistence, also install a storage provider:
dotnet add package Microsoft.Orleans.DurableJobs.AzureStorage
Configuration
Using In-Memory Storage (Development/Testing)
using Microsoft.Extensions.Hosting;
using Orleans.Hosting;
var builder = Host.CreateApplicationBuilder(args);
builder.UseOrleans(siloBuilder =>
{
siloBuilder
.UseLocalhostClustering()
// Configure in-memory Durable Jobs (no persistence)
.UseInMemoryDurableJobs();
});
await builder.Build().RunAsync();
Using Azure Storage (Production)
using Microsoft.Extensions.Hosting;
using Orleans.Hosting;
var builder = Host.CreateApplicationBuilder(args);
builder.UseOrleans(siloBuilder =>
{
siloBuilder
.UseLocalhostClustering()
// Configure Azure Storage Durable Jobs
.UseAzureStorageDurableJobs(options =>
{
options.Configure(o =>
{
o.BlobServiceClient = new Azure.Storage.Blobs.BlobServiceClient("YOUR_CONNECTION_STRING");
o.ContainerName = "durable-jobs";
});
});
});
await builder.Build().RunAsync();
Advanced Configuration
builder.UseOrleans(siloBuilder =>
{
siloBuilder
.UseLocalhostClustering()
.UseInMemoryDurableJobs()
.ConfigureServices(services =>
{
services.Configure<DurableJobsOptions>(options =>
{
// Duration of each job shard (jobs are partitioned by time)
options.ShardDuration = TimeSpan.FromMinutes(5);
// Maximum number of jobs that can execute concurrently on each silo
options.MaxConcurrentJobsPerSilo = 100;
// Custom retry policy
options.ShouldRetry = (context, exception) =>
{
// Retry up to 3 times with exponential backoff
if (context.DequeueCount < 3)
{
var delay = TimeSpan.FromSeconds(Math.Pow(2, context.DequeueCount));
return DateTimeOffset.UtcNow.Add(delay);
}
return null; // Don't retry
};
});
});
});
Usage Examples
Basic Job Scheduling
1. Implement the IDurableJobHandler Interface
using Orleans;
using Orleans.DurableJobs;
public interface INotificationGrain : IGrainWithStringKey
{
Task ScheduleNotification(string message, DateTimeOffset sendTime);
Task CancelScheduledNotification();
}
public class NotificationGrain : Grain, INotificationGrain, IDurableJobHandler
{
private readonly ILocalDurableJobManager _jobManager;
private readonly ILogger<NotificationGrain> _logger;
private IDurableJob? _durableJob;
public NotificationGrain(
ILocalDurableJobManager jobManager,
ILogger<NotificationGrain> logger)
{
_jobManager = jobManager;
_logger = logger;
}
public async Task ScheduleNotification(string message, DateTimeOffset sendTime)
{
var userId = this.GetPrimaryKeyString();
var metadata = new Dictionary<string, string>
{
["Message"] = message
};
_durableJob = await _jobManager.ScheduleJobAsync(
this.GetGrainId(),
"SendNotification",
sendTime,
metadata);
_logger.LogInformation(
"Scheduled notification for user {UserId} at {SendTime} (JobId: {JobId})",
userId, sendTime, _durableJob.Id);
}
public async Task CancelScheduledNotification()
{
if (_durableJob is null)
{
_logger.LogWarning("No scheduled notification to cancel");
return;
}
var canceled = await _jobManager.TryCancelDurableJobAsync(_durableJob);
_logger.LogInformation("Notification {JobId} canceled: {Canceled}", _durableJob.Id, canceled);
if (canceled)
{
_durableJob = null;
}
}
// This method is called when the durable job executes
public Task ExecuteJobAsync(IDurableJobContext context, CancellationToken cancellationToken)
{
var userId = this.GetPrimaryKeyString();
var message = context.Job.Metadata?["Message"];
_logger.LogInformation(
"Sending notification to user {UserId}: {Message} (Job: {JobId}, Run: {RunId}, Attempt: {DequeueCount})",
userId, message, context.Job.Id, context.RunId, context.DequeueCount);
// Send the notification here
// If this throws an exception, the job can be retried based on your retry policy
_durableJob = null;
return Task.CompletedTask;
}
}
2. Order Workflow with Multiple Jobs
public interface IOrderGrain : IGrainWithGuidKey
{
Task PlaceOrder(OrderDetails details);
Task CancelOrder();
}
public class OrderGrain : Grain, IOrderGrain, IDurableJobHandler
{
private readonly ILocalDurableJobManager _jobManager;
private readonly IOrderService _orderService;
private readonly IGrainFactory _grainFactory;
private readonly ILogger<OrderGrain> _logger;
public OrderGrain(
ILocalDurableJobManager jobManager,
IOrderService orderService,
IGrainFactory grainFactory,
ILogger<OrderGrain> logger)
{
_jobManager = jobManager;
_orderService = orderService;
_grainFactory = grainFactory;
_logger = logger;
}
public async Task PlaceOrder(OrderDetails details)
{
var orderId = this.GetPrimaryKey();
// Create the order
await _orderService.CreateOrderAsync(orderId, details);
// Schedule delivery reminder for 24 hours before delivery
var reminderTime = details.DeliveryDate.AddHours(-24);
await _jobManager.ScheduleJobAsync(
this.GetGrainId(),
"DeliveryReminder",
reminderTime,
new Dictionary<string, string>
{
["Step"] = "DeliveryReminder",
["CustomerId"] = details.CustomerId,
["OrderNumber"] = details.OrderNumber
});
// Schedule order expiration if payment not received
var expirationTime = DateTimeOffset.UtcNow.AddHours(24);
await _jobManager.ScheduleJobAsync(
this.GetGrainId(),
"OrderExpiration",
expirationTime,
new Dictionary<string, string>
{
["Step"] = "OrderExpiration"
});
}
public async Task CancelOrder()
{
var orderId = this.GetPrimaryKey();
await _orderService.CancelOrderAsync(orderId);
}
public async Task ExecuteJobAsync(IDurableJobContext context, CancellationToken cancellationToken)
{
var step = context.Job.Metadata!["Step"];
var orderId = this.GetPrimaryKey();
switch (step)
{
case "DeliveryReminder":
await HandleDeliveryReminder(context, cancellationToken);
break;
case "OrderExpiration":
await HandleOrderExpiration(cancellationToken);
break;
}
}
private async Task HandleDeliveryReminder(IDurableJobContext context, CancellationToken ct)
{
var customerId = context.Job.Metadata!["CustomerId"];
var orderNumber = context.Job.Metadata["OrderNumber"];
var notificationGrain = _grainFactory.GetGrain<INotificationGrain>(customerId);
await notificationGrain.ScheduleNotification(
$"Your order #{orderNumber} will be delivered tomorrow!",
DateTimeOffset.UtcNow);
}
private async Task HandleOrderExpiration(CancellationToken ct)
{
var orderId = this.GetPrimaryKey();
var order = await _orderService.GetOrderAsync(orderId, ct);
if (order?.Status == OrderStatus.Pending)
{
await _orderService.CancelOrderAsync(orderId, ct);
_logger.LogInformation("Order {OrderId} expired and canceled", orderId);
}
}
}
Advanced Scenarios
Job with Retry Logic
public class PaymentProcessorGrain : Grain, IDurableJobHandler
{
private readonly IPaymentService _paymentService;
private readonly ILogger<PaymentProcessorGrain> _logger;
public Task ExecuteJobAsync(IDurableJobContext context, CancellationToken cancellationToken)
{
var paymentId = context.Job.Metadata?["PaymentId"];
_logger.LogInformation(
"Processing payment {PaymentId} (Attempt {Attempt})",
paymentId, context.DequeueCount);
try
{
await _paymentService.ProcessPaymentAsync(paymentId, cancellationToken);
return Task.CompletedTask;
}
catch (TransientException ex)
{
_logger.LogWarning(ex, "Payment processing failed with transient error, will retry");
throw; // Let the retry policy handle it
}
catch (Exception ex)
{
_logger.LogError(ex, "Payment processing failed with permanent error");
throw; // This will not be retried if the retry policy returns null
}
}
}
Tracking Job Completion
public class WorkflowGrain : Grain, IDurableJobHandler
{
private readonly Dictionary<string, TaskCompletionSource> _pendingJobs = new();
public async Task<IDurableJob> ScheduleWorkflowStep(string stepName, DateTimeOffset executeAt)
{
var job = await _jobManager.ScheduleJobAsync(
this.GetGrainId(),
stepName,
executeAt);
_pendingJobs[job.Id] = new TaskCompletionSource();
return job;
}
public async Task WaitForJobCompletion(string jobId, TimeSpan timeout)
{
if (_pendingJobs.TryGetValue(jobId, out var tcs))
{
using var cts = new CancellationTokenSource(timeout);
await tcs.Task.WaitAsync(cts.Token);
}
}
public Task ExecuteJobAsync(IDurableJobContext context, CancellationToken cancellationToken)
{
// Execute the workflow step...
// Mark as complete
if (_pendingJobs.TryRemove(context.Job.Id, out var tcs))
{
tcs.SetResult();
}
return Task.CompletedTask;
}
}
How It Works
Architecture Overview
- Job Sharding: Jobs are partitioned into time-based shards (default: 1-minute windows)
- Shard Ownership: Each shard is owned by a single silo for execution
- Automatic Rebalancing: When a silo fails, its shards are automatically reassigned to healthy silos
- Ordered Execution: Within a shard, jobs are processed in order of their due time
- Concurrency Control: The
MaxConcurrentJobsPerSilosetting limits concurrent job execution
Job Lifecycle
┌─────────────┐
│ Scheduled │ ──▶ Job is created and added to appropriate shard
└─────────────┘
│
▼
┌─────────────┐
│ Waiting │ ──▶ Job waits in queue until due time
└─────────────┘
│
▼
┌─────────────┐
│ Executing │ ──▶ Job handler is invoked on target grain
└─────────────┘
│
├──▶ Success ──▶ Job is removed
│
└──▶ Failure ──▶ Retry policy decides:
• Retry: Job is re-queued with new due time
• No Retry: Job is removed
Configuration Reference
DurableJobsOptions
| Property | Type | Default | Description |
|---|---|---|---|
ShardDuration |
TimeSpan |
1 minute | Duration of each job shard. Smaller values reduce latency but increase overhead. |
MaxConcurrentJobsPerSilo |
int |
100 | Maximum number of jobs that can execute simultaneously on a silo. |
ShouldRetry |
Func<IDurableJobContext, Exception, DateTimeOffset?> |
3 retries with exp. backoff | Determines if a failed job should be retried. Return the new due time or null to not retry. |
Best Practices
Set Reasonable Concurrency Limits: Prevent resource exhaustion
options.MaxConcurrentJobsPerSilo = 100; // Adjust based on your workloadImplement Idempotent Job Handlers: Jobs may be retried, ensure handlers are idempotent
public async Task ExecuteJobAsync(IDurableJobContext context, CancellationToken ct) { var jobId = context.Job.Id; // Check if already processed if (await _state.IsProcessed(jobId)) return; // Process job... await _state.MarkProcessed(jobId); }Use Metadata Wisely: Keep metadata lightweight
// Good: Store IDs var metadata = new Dictionary<string, string> { ["OrderId"] = "12345" }; // Bad: Store large objects var metadata = new Dictionary<string, string> { ["Order"] = JsonSerializer.Serialize(largeOrder) };Handle Cancellation: Respect the cancellation token
public async Task ExecuteJobAsync(IDurableJobContext context, CancellationToken ct) { await SomeLongRunningOperation(ct); }
Documentation
For more comprehensive documentation, please refer to:
Feedback & Contributing
- If you have any issues or would like to provide feedback, please open an issue on GitHub
- Join our community on Discord
- Follow the @msftorleans Twitter account for Orleans announcements
- Contributions are welcome! Please review our contribution guidelines
- This project is licensed under the MIT license
Showing the top 20 packages that depend on Microsoft.Orleans.DurableJobs.
| Packages | Downloads |
|---|---|
|
Microsoft.Orleans.TestingHost
Microsoft Orleans library for hosting a silo in a testing project.
|
1 |
.NET 8.0
- Microsoft.Orleans.Analyzers (>= 10.0.0-rc.2)
- Microsoft.Orleans.CodeGenerator (>= 10.0.0-rc.2)
- Microsoft.Orleans.Core.Abstractions (>= 10.0.0-rc.2)
- Microsoft.Orleans.Runtime (>= 10.0.0-rc.2)
- Microsoft.Orleans.Sdk (>= 10.0.0-rc.2)
- Microsoft.AspNetCore.Connections.Abstractions (>= 8.0.11)
- Microsoft.CodeAnalysis.Analyzers (>= 3.11.0)
- Microsoft.CodeAnalysis.Common (>= 4.5.0)
- Microsoft.CodeAnalysis.Workspaces.Common (>= 4.5.0)
- Microsoft.Extensions.Configuration (>= 8.0.0)
- Microsoft.Extensions.Configuration.Abstractions (>= 8.0.0)
- Microsoft.Extensions.Configuration.Binder (>= 8.0.2)
- Microsoft.Extensions.Configuration.Json (>= 8.0.1)
- Microsoft.Extensions.DependencyInjection (>= 8.0.1)
- Microsoft.Extensions.DependencyInjection.Abstractions (>= 8.0.2)
- Microsoft.Extensions.DependencyModel (>= 8.0.2)
- Microsoft.Extensions.Hosting (>= 8.0.1)
- Microsoft.Extensions.Hosting.Abstractions (>= 8.0.1)
- Microsoft.Extensions.Logging (>= 8.0.1)
- Microsoft.Extensions.Logging.Abstractions (>= 8.0.3)
- Microsoft.Extensions.Logging.Console (>= 8.0.1)
- Microsoft.Extensions.Logging.Debug (>= 8.0.1)
- Microsoft.Extensions.ObjectPool (>= 8.0.22)
- Microsoft.Extensions.Options (>= 8.0.2)
- Microsoft.Extensions.Options.ConfigurationExtensions (>= 8.0.0)
- Newtonsoft.Json (>= 13.0.4)
- System.IO.Hashing (>= 10.0.0)
- System.IO.Pipelines (>= 8.0.0)
- System.Memory.Data (>= 8.0.1)
.NET 10.0
- Microsoft.Orleans.Analyzers (>= 10.0.0-rc.2)
- Microsoft.Orleans.CodeGenerator (>= 10.0.0-rc.2)
- Microsoft.Orleans.Core.Abstractions (>= 10.0.0-rc.2)
- Microsoft.Orleans.Runtime (>= 10.0.0-rc.2)
- Microsoft.Orleans.Sdk (>= 10.0.0-rc.2)
- Microsoft.AspNetCore.Connections.Abstractions (>= 10.0.0)
- Microsoft.CodeAnalysis.Analyzers (>= 3.11.0)
- Microsoft.CodeAnalysis.Common (>= 5.0.0)
- Microsoft.CodeAnalysis.Workspaces.Common (>= 5.0.0)
- Microsoft.Extensions.Configuration (>= 10.0.0)
- Microsoft.Extensions.Configuration.Abstractions (>= 10.0.0)
- Microsoft.Extensions.Configuration.Binder (>= 10.0.0)
- Microsoft.Extensions.Configuration.Json (>= 10.0.0)
- Microsoft.Extensions.DependencyInjection (>= 10.0.0)
- Microsoft.Extensions.DependencyInjection.Abstractions (>= 10.0.0)
- Microsoft.Extensions.DependencyModel (>= 10.0.0)
- Microsoft.Extensions.Hosting (>= 10.0.0)
- Microsoft.Extensions.Hosting.Abstractions (>= 10.0.0)
- Microsoft.Extensions.Logging (>= 10.0.0)
- Microsoft.Extensions.Logging.Abstractions (>= 10.0.0)
- Microsoft.Extensions.Logging.Console (>= 10.0.0)
- Microsoft.Extensions.Logging.Debug (>= 10.0.0)
- Microsoft.Extensions.ObjectPool (>= 10.0.0)
- Microsoft.Extensions.Options (>= 10.0.0)
- Microsoft.Extensions.Options.ConfigurationExtensions (>= 10.0.0)
- Newtonsoft.Json (>= 13.0.4)
- System.IO.Hashing (>= 10.0.0)
- System.Memory.Data (>= 10.0.0)
| Version | Downloads | Last updated |
|---|---|---|
| 10.0.0-rc.2.alpha.1 | 1 | 01/09/2026 |