Skip to content

Architecture

The WorkManager is an event-driven worker system built on Dapr that enables you to deploy and manage workers written in different programming languages. Workers subscribe to pub/sub topics, process incoming CloudEvents, and optionally publish responses.

System Overview

flowchart TB
    subgraph WM["WorkManager Service"]
        W[WorkerManager<br/>Create/Start/Stop/<br/>Delete/Recover]
        ER[EngineRegistry<br/>text/x-python<br/>text/x-csharp<br/>...]
        WR[WorkerRegistry<br/>W1 W2 ...]
        W <--> ER
        ER <--> WR
        W --> DC[Dapr Client<br/>State / Pub/Sub]
        DC <--> DS[Dapr Subscriptions]
        DS --> WR
    end
    DC --> DR[Dapr Runtime]
    DR --> SS[(State Store<br/>statestore)]
    DR --> PS[(Pub/Sub<br/>pubsub)]

Core Components

WorkManager (Components/WorkManager.cs)

The central orchestrator that manages the entire worker lifecycle. It:

  • Creates, starts, stops, and deletes workers
  • Manages Dapr subscriptions to pub/sub topics
  • Persists worker configurations to the state store for recovery
  • Handles message processing and group-based locking for coordinated execution
  • Publishes worker output back to pub/sub topics

Worker (Runtime/Worker.cs)

Represents a single worker instance with:

  • Id: Unique identifier (GUID)
  • CodeSource: Where the code comes from (URL or inline content)
  • MimeType: The content type identifying the language/engine
  • Topic: The pub/sub topic to subscribe to
  • Group: Optional coordination group for mutually exclusive execution
  • Status: Either Stopped or Running
  • History: Track of all code changes over time

EngineRegistry (Runtime/EngineRegistry.cs)

Maps MIME types to engine implementations. Each engine factory creates new engine instances per worker.

Engine Implementations (Engines/)

All engines implement the IEngine interface:

public interface IEngine
{
    Task LoadCodeAsync(byte[] code, CancellationToken cancellationToken = default);
    Task<CloudEvent?> ProcessAsync(CloudEvent input, CancellationToken cancellationToken = default);
}

Available Engines:

Engine MIME Type Description
PythonEngine text/x-python Executes Python code via python3 subprocess
CSharpSourceEngine text/x-csharp Compiles and executes C# source code at runtime
DotNetDllEngine application/x-dotnet-dll Loads pre-compiled .NET assemblies
NativeDllEngine application/x-native-dll Loads per-architecture native shared libraries in-process

WorkerRegistry (Runtime/WorkerRegistry.cs)

In-memory storage for active worker instances. Workers are recovered from persistent state on restart.

Data Flow

Worker Creation Flow

  1. Client calls CreateWorkerAsync() via gRPC or direct injection
  2. WorkManager validates the MIME type has a registered engine
  3. Code is fetched (from URL or extracted from content)
  4. Engine instance is created and code is loaded
  5. Worker configuration is persisted to Dapr state store
  6. Worker is added to the in-memory registry in Stopped status
  7. Client calls StartWorkerAsync() to subscribe to Dapr topic and set status to Running

Message Processing Flow

  1. Dapr delivers a CloudEvent to the subscription handler
  2. For workers in a group: Try to acquire a distributed lock
  3. Worker engine processes the input CloudEvent
  4. If engine returns a response CloudEvent:
  5. If no group: Publish directly to output topic
  6. If group: Publish after lock acquisition
  7. Lock is released (if group-based)

Worker Recovery Flow

On service restart:

  1. The WorkManagerRecoveryHostedService (BackgroundService) automatically calls RecoverWorkersAsync() during startup — no operator action or gRPC call required. The hosted service acquires a leader lease via the Dapr state store so concurrent instances elect a single recovery coordinator (see Worker Lifecycle: Recovery for the lease semantics).
  2. All worker configurations are loaded from state store
  3. For each worker:
  4. Engine is created and code is re-loaded
  5. Dapr subscription is re-established
  6. Worker status is set to Running

Persistence

Worker configurations are stored in the Dapr state store with these keys:

  • workmanager/workers/index - List of all worker IDs
  • workmanager/workers/{workerId} - Individual worker configuration

Group locks use:

  • workmanager/groups/{groupId}/lock - Lock data (owner instance ID and acquisition time)

Group Coordination

Workers can be assigned to a group for mutually exclusive execution. When multiple workers share a group:

  • Only one worker processes a message at a time
  • A distributed lock prevents concurrent processing
  • Locks have a 30-second max age and can be "stolen" if stale

Code Source Types

Workers support two code source types:

CodeSourceUrl: Code fetched from an HTTP(S) URL at worker creation time

{ "url": "https://example.com/worker.py" }

CodeSourceContent: Base64-encoded inline code

{ "content": "aW1wb3J0IGpzb24..." }

Extension Points

Adding a New Engine

  1. Create a class implementing IEngine
  2. Register it in Program.cs:
engineRegistry.Register(
    new ContentType("text/x-my-language"),
    "MyLanguage",
    () => new MyLanguageEngine()
);

Custom Worker Logic

Workers must implement a ProcessAsync(CloudEvent input) function:

Python:

def Process(event):
    return {
        "type": "com.example.output",
        "source": "my-worker",
        "data": { "result": event["data"] }
    }

C#:

public static CloudEvent ProcessAsync(CloudEvent input)
{
    return new CloudEvent
    {
        Type = "com.example.output",
        Source = new Uri("urn:my-worker"),
        Data = input.Data
    };
}