Worker Lifecycle
Workers go through several state transitions during their lifetime. This guide covers creation, state management, and cleanup.
Worker States
stateDiagram-v2
[*] --> Stopped : CreateWorkerAsync()
Stopped --> Running : StartWorkerAsync()
Running --> Stopped : StopWorkerAsync()
Stopped --> Running : StartWorkerAsync()
Running --> Deleted : DeleteWorkerAsync()
Stopped --> Deleted : DeleteWorkerAsync()
Creation
CreateWorkerAsync
var workerId = await workManager.CreateWorkerAsync(new CreateWorkerRequest(
CodeSource: new CodeSourceContent(Encoding.UTF8.GetBytes(pythonCode)),
MimeType: "text/x-python",
Topic: "my-topic",
Group: null // or "my-group" for coordination
));
What happens:
- Engine lookup: Validates that an engine exists for the MIME type
- Code fetch: Extracts code from URL or uses inline content
- Code loading: Calls
engine.LoadCodeAsync(code) - Persistence: Saves worker config to Dapr state store
- Subscription: Creates Dapr pub/sub subscription
- Registry: Adds worker to in-memory registry
- Status: Sets worker to
Running
CodeSource Types
Inline Content:
CodeSource: new CodeSourceContent(Encoding.UTF8.GetBytes(code))
From URL:
CodeSource: new CodeSourceUrl(new Uri("https://example.com/worker.py"))
State Transitions
StartWorkerAsync
Resumes a stopped worker:
await workManager.StartWorkerAsync(workerId);
What happens:
1. Re-establishes Dapr topic subscription
2. Sets status to Running
Notes: - Idempotent: If already running, does nothing - Worker continues processing from where it left off
StopWorkerAsync
Pauses a running worker:
await workManager.StopWorkerAsync(workerId);
What happens:
1. Removes Dapr topic subscription
2. Sets status to Stopped
Notes:
- Idempotent: If already stopped, does nothing
- Worker can be restarted with StartWorkerAsync
Code Updates
LoadCodeFromContent
Updates worker code while running:
await workManager.LoadCodeFromContent(workerId, Encoding.UTF8.GetBytes(newCode));
What happens:
1. Calls worker.engine.LoadCodeAsync(newCode)
2. Adds history entry with new code source
3. Updates persisted configuration
LoadCodeFromUrl
Updates worker code from a URL:
await workManager.LoadCodeFromUrl(workerId, new Uri("https://example.com/new-worker.py"));
What happens:
1. Fetches code from URL
2. Calls worker.engine.LoadCodeAsync(fetchedCode)
3. Adds history entry with new URL
4. Updates persisted configuration
Deletion
DeleteWorkerAsync
Permanently removes a worker:
await workManager.DeleteWorkerAsync(workerId);
What happens: 1. Removes worker from in-memory registry 2. Deletes worker config from state store 3. Removes Dapr topic subscription 4. Does not stop the worker first (subscription is removed regardless)
Recovery
Worker recovery is automatic on service startup.
The WorkManagerRecoveryHostedService calls RecoverWorkersAsync() inside a BackgroundService before Kestrel begins serving traffic. The health check reports Degraded until recovery completes, and Unhealthy if recovery fails.
How recovery works
- On startup, the hosted service automatically restores all workers from the Dapr state store
- For each worker:
- Creates a new engine instance via the engine factory
- Loads the original code into the engine
- Re-establishes the Dapr pub/sub subscription
- Sets status to
Running - After recovery, the health check reports
Healthyand Kestrel starts serving traffic
Multi-instance safety (leader election)
When multiple WorkManager instances start concurrently (rolling updates, HPA scaling, pod rescheduling), only one instance performs recovery — the others would otherwise load every worker's engine into RAM, wasting memory and CPU (see WM #4 — the N×W engine-load problem).
The RecoveryLeaderElector acquires a CAS lease on the Dapr state
store key workmanager/recovery/leader (TTL WORKER_RECOVERY_LEASE_TTL_SECONDS,
default 5 minutes). The instance that wins the lease performs
recovery; the others mark recovery complete locally and rely on
the normal message-flow path. If the leader crashes mid-recovery,
the lease auto-expires and another instance can take over after
WORKER_RECOVERY_ACQUIRE_TIMEOUT_SECONDS.
The lease is released in a finally block when the leader finishes
(success or failure), so the next scale-up cycle acquires it
promptly rather than waiting for the TTL.
Disabling automatic recovery
Set WORKER_RECOVERY_ON_START=false to opt out. The health check
still reports Healthy (operators have explicitly chosen this
mode), and workers must be restored manually via the RecoverWorkers
gRPC endpoint.
Error handling
- Recovery failures are surfaced as structured
Error-level log events with per-worker detail - If recovery fails entirely, the health check reports
Unhealthyand the pod will not become ready - Individual worker load failures are logged but don't stop recovery of other workers
- Workers can still be recovered manually via the
RecoverWorkersgRPC endpoint if needed
Monitoring
The /health endpoint includes a recovery health check that reports:
- Healthy — all workers recovered successfully (or recovery disabled by config, or another instance is the recovery leader)
- Degraded — recovery still in progress (during startup)
- Unhealthy — recovery failed (pod will not become ready)
History Tracking
Each worker maintains a history of code changes:
var history = workManager.GetWorkerHistory(workerId);
foreach (var entry in history)
{
Console.WriteLine($"Code loaded at {entry.CreatedAt}");
Console.WriteLine($" Type: {entry.CodeSource.GetType().Name}");
}
What's tracked:
- Initial creation with original code source
- Each LoadCodeFromContent call
- Each LoadCodeFromUrl call
What's not tracked: - Runtime execution errors (these go to logs) - Message processing counts
Group Coordination
Workers can belong to a group for mutually exclusive execution:
// Worker 1
await workManager.CreateWorkerAsync(new CreateWorkerRequest(
CodeSource: code1,
MimeType: "text/x-python",
Topic: "shared-topic",
Group: "group-a" // Same group as Worker 2
));
// Worker 2
await workManager.CreateWorkerAsync(new CreateWorkerRequest(
CodeSource: code2,
MimeType: "text/x-python",
Topic: "shared-topic",
Group: "group-a" // Same group as Worker 1
));
Behavior:
- Only one worker in a group processes a message at a time
- Distributed lock acquired before processing with exponential backoff retry (default 500ms base, default max 3 attempts — both tunable via GROUP_LOCK_RETRY_BASE_DELAY_MS and GROUP_LOCK_MAX_RETRIES, see Configuration)
- Each lock acquisition increments a monotonic fencing token, preventing lock theft races
- Lock released after processing (or on error)
- Lock max age is 30 seconds (stale locks can be stolen if the original holder hasn't made writes)
- If lock cannot be acquired after all retries, a LockContentionException is thrown, which causes Dapr to nack the message and redeliver it later
Retry semantics (audit, June 2026):
- The subscribe handler returns TopicResponseAction.Retry on LockContentionException (see Components/WorkManager.cs:SubscribeToTopicAsync). The Dapr SDK sends a negative acknowledgement with retry=true, which causes Dapr to redeliver the message after backoff. No message is silently dropped on lock contention.
- Dapr redelivers the message with exponential backoff (configured in the pubsub component)
- After exhausting all Dapr redeliveries (maxRedeliveryCount: 3), messages are routed to the dead-letter topic
- Monitor the dead-letter topic for alerting on persistent lock contention
- The acquire-loop retries are independent of the Dapr redelivery retries: the former exhausts the local in-process backoff (default 3 attempts with 500ms → 1s → 2s backoff), then throws, then Dapr handles redelivery at the broker level
Lock lifecycle:
Message received
│
▼
AcquireGroupLock() ── retry (base × 2^attempt, max GROUP_LOCK_MAX_RETRIES)
│
├── Lock acquired (fencing token incremented) ──► Process message ──► ReleaseLock()
│
├── Lock held by another ──► retry with backoff
│
├── Lock stolen from stale holder ──► fence check: if owner wrote, abort steal
│
└── Exhausted retries ──► LockContentionException ──► TopicResponseAction.Retry ──► Dapr nack(retry=true) ──► redeliver ──► dead-letter topic (after Dapr maxRedeliveryCount)
Fencing token: Each lock includes a monotonically increasing integer. Before a stale lock is stolen, the new owner checks whether the original owner made writes under the old fence token. If writes were made, the lock theft is aborted, preventing dual-processing of the same message. This protects against slow-but-valid workers (e.g., first-time Roslyn compilation) from having their lock stolen mid-processing.
Cleanup on Disposal
When WorkManager.DisposeAsync() is called:
- All Dapr subscriptions are disposed
- All topic-specific subscriptions are disposed
- In-memory registry is cleared
Note: State store data is NOT deleted. Workers can be recovered on restart.
Best Practices
-
Use groups for related workers: If multiple workers should handle the same topic without duplicate processing, use a group.
-
Recovery is automatic: Worker recovery runs automatically on startup via the
WorkManagerRecoveryHostedService. No manual gRPC call is required. The service readiness probe ensures traffic is not routed until recovery completes. -
Track history for debugging: History entries help audit code changes:
var history = workManager.GetWorkerHistory(id); -
Stop before delete is optional:
DeleteWorkerAsynccleans up subscriptions regardless of state, so explicitStopWorkerAsyncis not required before deletion. -
Code updates don't restart subscriptions:
LoadCodeFromContentandLoadCodeFromUrlupdate code in-place without disrupting processing.