Architecture
The WorkManager is an event-driven worker system built on Dapr that enables you to deploy and manage workers written in different programming languages. Workers subscribe to pub/sub topics, process incoming CloudEvents, and optionally publish responses.
System Overview
flowchart TB
subgraph WM["WorkManager Service"]
W[WorkerManager<br/>Create/Start/Stop/<br/>Delete/Recover]
ER[EngineRegistry<br/>text/x-python<br/>text/x-csharp<br/>...]
WR[WorkerRegistry<br/>W1 W2 ...]
W <--> ER
ER <--> WR
W --> DC[Dapr Client<br/>State / Pub/Sub]
DC <--> DS[Dapr Subscriptions]
DS --> WR
end
DC --> DR[Dapr Runtime]
DR --> SS[(State Store<br/>statestore)]
DR --> PS[(Pub/Sub<br/>pubsub)]
Core Components
WorkManager (Components/WorkManager.cs)
The central orchestrator that manages the entire worker lifecycle. It:
- Creates, starts, stops, and deletes workers
- Manages Dapr subscriptions to pub/sub topics
- Persists worker configurations to the state store for recovery
- Handles message processing and group-based locking for coordinated execution
- Publishes worker output back to pub/sub topics
Worker (Runtime/Worker.cs)
Represents a single worker instance with:
- Id: Unique identifier (GUID)
- CodeSource: Where the code comes from (URL or inline content)
- MimeType: The content type identifying the language/engine
- Topic: The pub/sub topic to subscribe to
- Group: Optional coordination group for mutually exclusive execution
- Status: Either
StoppedorRunning - History: Track of all code changes over time
EngineRegistry (Runtime/EngineRegistry.cs)
Maps MIME types to engine implementations. Each engine factory creates new engine instances per worker.
Engine Implementations (Engines/)
All engines implement the IEngine interface:
public interface IEngine
{
Task LoadCodeAsync(byte[] code, CancellationToken cancellationToken = default);
Task<CloudEvent?> ProcessAsync(CloudEvent input, CancellationToken cancellationToken = default);
}
Available Engines:
| Engine | MIME Type | Description |
|---|---|---|
PythonEngine |
text/x-python |
Executes Python code via python3 subprocess |
CSharpSourceEngine |
text/x-csharp |
Compiles and executes C# source code at runtime |
DotNetDllEngine |
application/x-dotnet-dll |
Loads pre-compiled .NET assemblies |
NativeDllEngine |
application/x-native-dll |
Loads per-architecture native shared libraries in-process |
WorkerRegistry (Runtime/WorkerRegistry.cs)
In-memory storage for active worker instances. Workers are recovered from persistent state on restart.
Data Flow
Worker Creation Flow
- Client calls
CreateWorkerAsync()via gRPC or direct injection - WorkManager validates the MIME type has a registered engine
- Code is fetched (from URL or extracted from content)
- Engine instance is created and code is loaded
- Worker configuration is persisted to Dapr state store
- Worker is added to the in-memory registry in
Stoppedstatus - Client calls
StartWorkerAsync()to subscribe to Dapr topic and set status toRunning
Message Processing Flow
- Dapr delivers a CloudEvent to the subscription handler
- For workers in a group: Try to acquire a distributed lock
- Worker engine processes the input CloudEvent
- If engine returns a response CloudEvent:
- If no group: Publish directly to output topic
- If group: Publish after lock acquisition
- Lock is released (if group-based)
Worker Recovery Flow
On service restart:
- The
WorkManagerRecoveryHostedService(BackgroundService) automatically callsRecoverWorkersAsync()during startup — no operator action or gRPC call required. The hosted service acquires a leader lease via the Dapr state store so concurrent instances elect a single recovery coordinator (see Worker Lifecycle: Recovery for the lease semantics). - All worker configurations are loaded from state store
- For each worker:
- Engine is created and code is re-loaded
- Dapr subscription is re-established
- Worker status is set to
Running
Persistence
Worker configurations are stored in the Dapr state store with these keys:
workmanager/workers/index- List of all worker IDsworkmanager/workers/{workerId}- Individual worker configuration
Group locks use:
workmanager/groups/{groupId}/lock- Lock data (owner instance ID and acquisition time)
Group Coordination
Workers can be assigned to a group for mutually exclusive execution. When multiple workers share a group:
- Only one worker processes a message at a time
- A distributed lock prevents concurrent processing
- Locks have a 30-second max age and can be "stolen" if stale
Code Source Types
Workers support two code source types:
CodeSourceUrl: Code fetched from an HTTP(S) URL at worker creation time
{ "url": "https://example.com/worker.py" }
CodeSourceContent: Base64-encoded inline code
{ "content": "aW1wb3J0IGpzb24..." }
Extension Points
Adding a New Engine
- Create a class implementing
IEngine - Register it in
Program.cs:
engineRegistry.Register(
new ContentType("text/x-my-language"),
"MyLanguage",
() => new MyLanguageEngine()
);
Custom Worker Logic
Workers must implement a ProcessAsync(CloudEvent input) function:
Python:
def Process(event):
return {
"type": "com.example.output",
"source": "my-worker",
"data": { "result": event["data"] }
}
C#:
public static CloudEvent ProcessAsync(CloudEvent input)
{
return new CloudEvent
{
Type = "com.example.output",
Source = new Uri("urn:my-worker"),
Data = input.Data
};
}