Skip to content

Worker Lifecycle

Workers go through several state transitions during their lifetime. This guide covers creation, state management, and cleanup.

Worker States

stateDiagram-v2
    [*] --> Stopped : CreateWorkerAsync()
    Stopped --> Running : StartWorkerAsync()
    Running --> Stopped : StopWorkerAsync()
    Stopped --> Running : StartWorkerAsync()
    Running --> Deleted : DeleteWorkerAsync()
    Stopped --> Deleted : DeleteWorkerAsync()

Creation

CreateWorkerAsync

var workerId = await workManager.CreateWorkerAsync(new CreateWorkerRequest(
    CodeSource: new CodeSourceContent(Encoding.UTF8.GetBytes(pythonCode)),
    MimeType: "text/x-python",
    Topic: "my-topic",
    Group: null  // or "my-group" for coordination
));

What happens:

  1. Engine lookup: Validates that an engine exists for the MIME type
  2. Code fetch: Extracts code from URL or uses inline content
  3. Code loading: Calls engine.LoadCodeAsync(code)
  4. Persistence: Saves worker config to Dapr state store
  5. Subscription: Creates Dapr pub/sub subscription
  6. Registry: Adds worker to in-memory registry
  7. Status: Sets worker to Running

CodeSource Types

Inline Content:

CodeSource: new CodeSourceContent(Encoding.UTF8.GetBytes(code))

From URL:

CodeSource: new CodeSourceUrl(new Uri("https://example.com/worker.py"))

State Transitions

StartWorkerAsync

Resumes a stopped worker:

await workManager.StartWorkerAsync(workerId);

What happens: 1. Re-establishes Dapr topic subscription 2. Sets status to Running

Notes: - Idempotent: If already running, does nothing - Worker continues processing from where it left off

StopWorkerAsync

Pauses a running worker:

await workManager.StopWorkerAsync(workerId);

What happens: 1. Removes Dapr topic subscription 2. Sets status to Stopped

Notes: - Idempotent: If already stopped, does nothing - Worker can be restarted with StartWorkerAsync

Code Updates

LoadCodeFromContent

Updates worker code while running:

await workManager.LoadCodeFromContent(workerId, Encoding.UTF8.GetBytes(newCode));

What happens: 1. Calls worker.engine.LoadCodeAsync(newCode) 2. Adds history entry with new code source 3. Updates persisted configuration

LoadCodeFromUrl

Updates worker code from a URL:

await workManager.LoadCodeFromUrl(workerId, new Uri("https://example.com/new-worker.py"));

What happens: 1. Fetches code from URL 2. Calls worker.engine.LoadCodeAsync(fetchedCode) 3. Adds history entry with new URL 4. Updates persisted configuration

Deletion

DeleteWorkerAsync

Permanently removes a worker:

await workManager.DeleteWorkerAsync(workerId);

What happens: 1. Removes worker from in-memory registry 2. Deletes worker config from state store 3. Removes Dapr topic subscription 4. Does not stop the worker first (subscription is removed regardless)

Recovery

Worker recovery is automatic on service startup.

The WorkManagerRecoveryHostedService calls RecoverWorkersAsync() inside a BackgroundService before Kestrel begins serving traffic. The health check reports Degraded until recovery completes, and Unhealthy if recovery fails.

How recovery works

  1. On startup, the hosted service automatically restores all workers from the Dapr state store
  2. For each worker:
  3. Creates a new engine instance via the engine factory
  4. Loads the original code into the engine
  5. Re-establishes the Dapr pub/sub subscription
  6. Sets status to Running
  7. After recovery, the health check reports Healthy and Kestrel starts serving traffic

Multi-instance safety (leader election)

When multiple WorkManager instances start concurrently (rolling updates, HPA scaling, pod rescheduling), only one instance performs recovery — the others would otherwise load every worker's engine into RAM, wasting memory and CPU (see WM #4 — the N×W engine-load problem).

The RecoveryLeaderElector acquires a CAS lease on the Dapr state store key workmanager/recovery/leader (TTL WORKER_RECOVERY_LEASE_TTL_SECONDS, default 5 minutes). The instance that wins the lease performs recovery; the others mark recovery complete locally and rely on the normal message-flow path. If the leader crashes mid-recovery, the lease auto-expires and another instance can take over after WORKER_RECOVERY_ACQUIRE_TIMEOUT_SECONDS.

The lease is released in a finally block when the leader finishes (success or failure), so the next scale-up cycle acquires it promptly rather than waiting for the TTL.

Disabling automatic recovery

Set WORKER_RECOVERY_ON_START=false to opt out. The health check still reports Healthy (operators have explicitly chosen this mode), and workers must be restored manually via the RecoverWorkers gRPC endpoint.

Error handling

  • Recovery failures are surfaced as structured Error-level log events with per-worker detail
  • If recovery fails entirely, the health check reports Unhealthy and the pod will not become ready
  • Individual worker load failures are logged but don't stop recovery of other workers
  • Workers can still be recovered manually via the RecoverWorkers gRPC endpoint if needed

Monitoring

The /health endpoint includes a recovery health check that reports: - Healthy — all workers recovered successfully (or recovery disabled by config, or another instance is the recovery leader) - Degraded — recovery still in progress (during startup) - Unhealthy — recovery failed (pod will not become ready)

History Tracking

Each worker maintains a history of code changes:

var history = workManager.GetWorkerHistory(workerId);
foreach (var entry in history)
{
    Console.WriteLine($"Code loaded at {entry.CreatedAt}");
    Console.WriteLine($"  Type: {entry.CodeSource.GetType().Name}");
}

What's tracked: - Initial creation with original code source - Each LoadCodeFromContent call - Each LoadCodeFromUrl call

What's not tracked: - Runtime execution errors (these go to logs) - Message processing counts

Group Coordination

Workers can belong to a group for mutually exclusive execution:

// Worker 1
await workManager.CreateWorkerAsync(new CreateWorkerRequest(
    CodeSource: code1,
    MimeType: "text/x-python",
    Topic: "shared-topic",
    Group: "group-a"  // Same group as Worker 2
));

// Worker 2
await workManager.CreateWorkerAsync(new CreateWorkerRequest(
    CodeSource: code2,
    MimeType: "text/x-python",
    Topic: "shared-topic",
    Group: "group-a"  // Same group as Worker 1
));

Behavior: - Only one worker in a group processes a message at a time - Distributed lock acquired before processing with exponential backoff retry (default 500ms base, default max 3 attempts — both tunable via GROUP_LOCK_RETRY_BASE_DELAY_MS and GROUP_LOCK_MAX_RETRIES, see Configuration) - Each lock acquisition increments a monotonic fencing token, preventing lock theft races - Lock released after processing (or on error) - Lock max age is 30 seconds (stale locks can be stolen if the original holder hasn't made writes) - If lock cannot be acquired after all retries, a LockContentionException is thrown, which causes Dapr to nack the message and redeliver it later

Retry semantics (audit, June 2026): - The subscribe handler returns TopicResponseAction.Retry on LockContentionException (see Components/WorkManager.cs:SubscribeToTopicAsync). The Dapr SDK sends a negative acknowledgement with retry=true, which causes Dapr to redeliver the message after backoff. No message is silently dropped on lock contention. - Dapr redelivers the message with exponential backoff (configured in the pubsub component) - After exhausting all Dapr redeliveries (maxRedeliveryCount: 3), messages are routed to the dead-letter topic - Monitor the dead-letter topic for alerting on persistent lock contention - The acquire-loop retries are independent of the Dapr redelivery retries: the former exhausts the local in-process backoff (default 3 attempts with 500ms → 1s → 2s backoff), then throws, then Dapr handles redelivery at the broker level

Lock lifecycle:

Message received
     │
     ▼
AcquireGroupLock() ── retry (base × 2^attempt, max GROUP_LOCK_MAX_RETRIES)
     │
     ├── Lock acquired (fencing token incremented) ──► Process message ──► ReleaseLock()
     │
     ├── Lock held by another ──► retry with backoff
     │
     ├── Lock stolen from stale holder ──► fence check: if owner wrote, abort steal
     │
     └── Exhausted retries ──► LockContentionException ──► TopicResponseAction.Retry ──► Dapr nack(retry=true) ──► redeliver ──► dead-letter topic (after Dapr maxRedeliveryCount)

Fencing token: Each lock includes a monotonically increasing integer. Before a stale lock is stolen, the new owner checks whether the original owner made writes under the old fence token. If writes were made, the lock theft is aborted, preventing dual-processing of the same message. This protects against slow-but-valid workers (e.g., first-time Roslyn compilation) from having their lock stolen mid-processing.

Cleanup on Disposal

When WorkManager.DisposeAsync() is called:

  1. All Dapr subscriptions are disposed
  2. All topic-specific subscriptions are disposed
  3. In-memory registry is cleared

Note: State store data is NOT deleted. Workers can be recovered on restart.

Best Practices

  1. Use groups for related workers: If multiple workers should handle the same topic without duplicate processing, use a group.

  2. Recovery is automatic: Worker recovery runs automatically on startup via the WorkManagerRecoveryHostedService. No manual gRPC call is required. The service readiness probe ensures traffic is not routed until recovery completes.

  3. Track history for debugging: History entries help audit code changes:

    var history = workManager.GetWorkerHistory(id);
    

  4. Stop before delete is optional: DeleteWorkerAsync cleans up subscriptions regardless of state, so explicit StopWorkerAsync is not required before deletion.

  5. Code updates don't restart subscriptions: LoadCodeFromContent and LoadCodeFromUrl update code in-place without disrupting processing.