Engine System
The WorkManager uses a pluggable engine architecture to support multiple programming languages. Each engine handles code execution for a specific MIME type.
IEngine Interface
All engines implement the IEngine interface:
public interface IEngine
{
Task LoadCodeAsync(byte[] code, CancellationToken cancellationToken = default);
Task<CloudEvent?> ProcessAsync(CloudEvent input, CancellationToken cancellationToken = default);
}
| Method | Description |
|---|---|
LoadCodeAsync |
Loads the worker code into the engine. Called once at worker creation or when code is updated. |
ProcessAsync |
Processes an incoming CloudEvent and returns a response CloudEvent, or null to publish nothing. |
Architecture
flowchart TD
subgraph ER["EngineRegistry"]
P["text/x-python"] --> PE[PythonEngine Factory]
C["text/x-csharp"] --> CS[CSharpSourceEngine Factory]
D["application/x-dotnet-dll"] --> CD[DotNetDllEngine Factory]
N["application/x-native-dll"] --> CN[NativeDllEngine Factory]
end
ER --> EF[Engine Factory<br/>per worker]
EF --> E1[PythonEngine Instance]
EF --> E2[CSharpSourceEngine Instance]
EF --> E3[DotNetDllEngine Instance]
EF --> E4[NativeDllEngine Instance]
Engine Instance Lifecycle:
flowchart LR
subgraph IE["IEngine Instance"]
LC[LoadCodeAsync] --> CL[Compiled / Loaded Code]
P[ProcessAsync] --> CE[Process CloudEvent]
end
Engine Resolution
When a worker is created:
- WorkManager looks up the engine factory by MIME type in
EngineRegistry - A new engine instance is created via the factory function
- Code is loaded into the engine via
LoadCodeAsync() - The engine instance is associated with the worker
This design ensures: - Each worker has its own engine instance (no shared state) - Engines can be stateless or maintain internal state - New engines can be added without modifying existing code
Built-in Engines
PythonEngine
MIME Type: text/x-python
Executes Python code in a persistent sandboxed python3 subprocess. Code is injected directly via stdin at startup — no temporary files are written. The subprocess stays alive for the worker's lifetime, communicating via a JSON-line message loop over stdin/stdout.
Data flow:
sequenceDiagram
autonumber
participant Sub as Pub/Sub
participant WM as WorkManager<br/>(PythonEngine)
participant Py as python3 subprocess
participant Code as Worker's Process()
Sub->>WM: input CloudEvent
WM->>WM: write worker code to .py temp file
WM->>Py: start python3 X.py --socket S
Py->>WM: ready frame (JSON over Unix domain socket)
Note over WM,Py: engine waits for ready. timeout triggers relaunch.
WM->>Py: process frame (cloudEvent)
Py->>Code: Process(ce_dict)
Code-->>Py: result dict (or None)
Py->>WM: result frame
WM->>WM: parse CE → publish reply<br/>(or null → no reply)
WM->>Sub: reply CloudEvent
Note over WM,Py: 10s health-check ping (empty heartbeat).<br/>Unresponsive subprocess is auto-restarted.
Code Requirements:
def Process(event):
# event is a dict with CloudEvent fields
# Return a dict representing the output CloudEvent, or None
return {
"type": "com.example.output",
"source": "my-worker",
"data": { "result": "processed" }
}
CloudEvent Mapping:
Input CloudEvents are serialized to JSON and passed via stdin:
{
"id": "event-123",
"type": "com.example.input",
"source": "urn:source",
"specversion": "1.0",
"datacontenttype": "application/json",
"data": { "key": "value" }
}
The Process function should return a dict that gets deserialized back to a CloudEvent.
Error Handling:
- Python stdout is captured and parsed as JSON
- If stdout contains __error__ key, an exception is thrown
- stderr output is captured as warning-level log entries (not treated as errors)
- Execution timeout is 30 seconds by default
- The subprocess is health-checked every 10 seconds; if unresponsive, it is automatically restarted
CSharpSourceEngine
MIME Type: text/x-csharp
Compiles and executes C# source code at runtime using Roslyn. The code is compiled into an in-memory assembly.
Data flow:
sequenceDiagram
autonumber
participant Sub as Pub/Sub
participant WM as WorkManager<br/>(CSharpSourceEngine)
participant Ros as Roslyn compiler
participant W as Compiled worker<br/>(in-memory assembly)
Note over WM,Ros: === LoadCodeAsync (once per worker) ===
WM->>WM: Encoding.UTF8.GetString(code)
WM->>Ros: CSharpSyntaxTree.ParseText(source)
Ros-->>WM: syntaxTree
WM->>Ros: CSharpCompilation.Create(...)<br/>.Emit(MemoryStream)
Ros-->>WM: EmitResult
WM->>WM: Assembly.Load(ilBytes)
WM->>W: find static/instance "Process" via reflection
WM->>WM: cache Func<CloudEvent, CloudEvent?> delegate
Note over WM,W: --- one-time, ~2s startup ---
Note over Sub,W: === Per-message ProcessAsync ===
Sub->>WM: input CloudEvent
WM->>W: invoke cached delegate(ce)
W-->>WM: result CloudEvent (or null)
WM->>Sub: reply CloudEvent
Code Requirements:
using CloudNative.CloudEvents;
// Must have a Process method that returns CloudEvent or null
public static CloudEvent ProcessAsync(CloudEvent input)
{
return new CloudEvent
{
Type = "com.example.output",
Source = new Uri("urn:my-worker"),
Data = input.Data
};
}
Requirements:
- Method must be named Process
- Must accept a single CloudEvent parameter
- Must return CloudEvent or null
- Can be static or instance method
- Assembly is compiled with references to:
- System.*
- netstandard
- CloudNative.CloudEvents
Error Handling:
- Compilation errors throw InvalidOperationException with full error messages
- Runtime exceptions propagate to the caller
NativeDllEngine
MIME Type: application/x-native-dll
Loads per-architecture native shared libraries (.so / .dylib / .dll) via NativeLibrary.Load in-process inside the AOT-compiled WorkManager. Exchanges FlatBuffers-encoded CloudEvents with the worker's Process export and supports host callbacks (log, gateway_call, free_response) on the VirtufinHost struct.
Data flow:
sequenceDiagram
autonumber
participant Sub as Pub/Sub
participant WM as WorkManager<br/>(NativeDllEngine,<br/>AOT)
participant Lib as Native worker<br/>(lib*.so)
participant Base as "AotNative<W>"
participant Ctx as WorkerContext
Note over WM,Lib: === LoadCodeAsync (once per worker) ===
WM->>WM: extract zip to temp dir
WM->>WM: find runtimes/<rid>/native/lib*.so
WM->>Lib: NativeLibrary.Load(path)
WM->>Lib: GetExport("Process") → fn-ptr
WM->>Lib: GetExport("FreeResult") → fn-ptr
WM->>WM: build VirtufinHost vtable<br/>(log, gateway_call, free_response)
Note over WM,Lib: --- one-time, <10ms startup ---
Note over Sub,Lib: === Per-message ProcessAsync ===
Sub->>WM: input CloudEvent
WM->>WM: CloudEventCodec.EncodeCloudEvent(ce) → inBytes
WM->>WM: inBuf = Marshal.AllocHGlobal(inBytes)
WM->>Lib: Process(host, inBuf, inLen, &outBuf, &outLen)
Lib->>Base: ProcessStatic<T>(host, inBuf, inLen, ...)
Base->>Base: CloudEventCodec.DecodeCloudEvent(inBuf) → ce
Base->>Base: _instances.GetOrAdd(host) → worker
Base->>Ctx: worker.HandleAsync(ce, ctx)
Ctx-->>Base: result CloudEvent (or null)
Base->>Base: CloudEventCodec.EncodeWorkerResponse(result) → outBytes
Base->>WM: outBuf = AllocHGlobal(outBytes), outLen = outBytes.Length
WM->>WM: Marshal.Copy(outBuf, span)
WM->>Lib: FreeResult(outBuf)
WM->>WM: CloudEventCodec.DecodeWorkerResponse(span)
WM->>Sub: publish reply CloudEvent
Isolation caveat: the engine runs in-process; an unrecoverable native error terminates the entire WorkManager. See NativeDllEngine Workers for the full author guide, C ABI, and defensive-coding requirements.
Performance: low-latency (~50-200 µs/message) without a subprocess boundary. Suitable for trusted, pre-compiled native code.
Code requirements:
- Worker is a per-architecture shared library at runtimes/<rid>/native/lib<library>.so (or .dylib / .dll), packaged in a standard NuGet .nupkg.
- The worker's .nuspec declares virtufin* extension elements (virtufinAbiVersion, virtufinEntryPoint, virtufinFreeResult, virtufinLibrary) inside <metadata>.
- Two exported C functions: Process(const VirtufinHost*, const uint8_t* in, int32_t in_len, uint8_t** out, int32_t* out_len) and FreeResult(uint8_t*).
- See NativeDllEngine Workers for a full example and the vendored virtufin_worker_api_c.h.
Error handling:
- ABI mismatch (NotSupportedException).
- Missing / malformed .nuspec, invalid library basename, missing RID entry (InvalidWorkerNuspecException / UnsupportedArchitectureException).
- Process returns non-zero → engine returns null (caller publishes worker.error).
- Process returns error_message in WorkerResponse → engine throws InvalidOperationException; lifecycle publisher surfaces worker.error.
DotNetDllEngine (in-process, default)
MIME Type: application/x-dotnet-dll
Default engine as of LIBRARY_VERSION 0.0.59. The engine is
AOT-compiled into the WorkManager binary and embeds the .NET
runtime (CoreCLR) on first use via libhostfxr. Worker DLLs
are loaded into a per-worker, collectible
AssemblyLoadContext. Per-call latency: sub-microsecond (direct
in-process method dispatch, no socket, no JSON).
See In-Process DotNet DLL Workers for the full architecture, setup contract, and trade-offs.
Data flow:
sequenceDiagram
autonumber
participant Sub as Pub/Sub
participant WM as WorkManager<br/>(DotNetDllEngine,<br/>AOT + CoreCLR)
participant Ctx as AssemblyLoadContext
participant W as IWorker impl
Note over WM: === First LoadCodeAsync (one-time) ===
WM->>WM: HostFxrBootstrap<br/>ensure CoreCLR loaded
Note over WM,Ctx: === Per-worker LoadCodeAsync ===
WM->>WM: WorkerNupkgLoader.Extract
WM->>Ctx: new AssemblyLoadContext
WM->>Ctx: LoadFromStream(worker.dll)
WM->>Ctx: scan for IWorker impl
WM->>W: Activator.CreateInstance
Note over Sub,W: === Per-message ProcessAsync ===
Sub->>WM: input CloudEvent
WM->>W: worker.ProcessAsync(ce)<br/>(direct call, <1 µs)
W-->>WM: result CloudEvent (or null)
WM->>Sub: reply CloudEvent
Code Requirements: Same as before — workers implement
Virtufin.Worker.DevKit.IWorker (or extend WorkerBase). The
worker DLL is published as a NuGet .nupkg with a
lib/<tfm>/<id>.dll entry and a <virtufinLibrary> extension
in the nuspec. See
DotNet DLL Workers for the full author
guide.
Discovery: The engine scans the loaded assembly for the
first non-abstract, non-interface type implementing IWorker.
If no implementation is found, an InvalidOperationException
is thrown.
Virtufin.Worker.DevKit is a lightweight NuGet package
(dependency: CloudNative.CloudEvents only) that provides the
IWorker interface and WorkerBase abstract class. It is
specifically for the in-process DotNet DLL engine — not used by
other engines.
The same package also provides the AOT-native bridge
(AotNative<W>), the CloudEventCodec, and WorkerContext (plus
its typed wrapper NativeApiGateway) used by the
NativeDllEngine. See Worker DevKit for the full
class hierarchy, the AOT data flow, and the FlatBuffer wire
format.
Engine Registration
Engines are registered in Program.cs:
var engines = new (ContentType, string, Func<IEngine>)[]
{
(new ContentType("text/x-csharp"), "C#", () =>
new CSharpSourceEngine(logger)),
(new ContentType("text/x-python"), "Python", () =>
new PythonEngine(TimeSpan.FromSeconds(30), logger)),
};
foreach (var e in engines)
{
engineRegistry.Register(e.Item1, e.Item2, e.Item3);
}
Adding a Custom Engine
-
Create a new project referencing
Virtufin.WorkManager.Engines -
Implement
IEngine:using CloudNative.CloudEvents; using Virtufin.WorkManager.Engines; public sealed class MyEngine : IEngine { public void LoadCode(byte[] code) { // Load and possibly compile the code } public CloudEvent? ProcessAsync(CloudEvent input) { // Execute the code with the input // Return a response CloudEvent or null } } -
Register in
Program.cs:engineRegistry.Register( new ContentType("text/x-my-lang"), "MyLanguage", () => new MyEngine() );
Engine Lifecycle
Engines are created fresh for each worker via the factory function. This means:
- Engines should be stateless (or handle their own state initialization)
- Engine instances are not reused across workers
- When a worker is recovered, a new engine is created and
LoadCodeis called again
Thread Safety
Engine instances are specific to a single worker and accessed sequentially (one message at a time). However:
PythonEngineusesIDisposable- dispose is called when engine is replacedCSharpSourceEngineandDotNetDllEnginedo not require disposalNativeDllEngineimplementsIAsyncDisposable; dispose frees the loaded library and releases the host struct
Performance Characteristics
| Engine | Startup | Per-Message | Throughput | Recommended For |
|---|---|---|---|---|
| CSharpSourceEngine | ~2s (Roslyn compilation) | < 1ms | High | High-throughput topics, complex logic |
| DotNetDllEngine | < 10ms | < 1ms | Highest | Pre-compiled .NET assemblies |
| PythonEngine | ~500ms (subprocess start) | 1-5ms | Medium | Prototyping, scripting, Python-specific libraries |
| NativeDllEngine | < 10ms (per-RID resolve + dlopen) | ~50-200 µs | High | Pre-compiled native libraries (C/C++/Rust/Zig) where latency matters and the worker is trusted |
Python Engine Internals
The Python engine uses a persistent subprocess that communicates via stdin/stdout:
- The subprocess is started once at
LoadCodeand stays alive for the worker's lifetime - CloudEvents are exchanged as JSON lines — the host writes the input event to stdin and reads the response from stdout
- A health check pings the subprocess every 10 seconds with an empty heartbeat; if the subprocess fails to respond, it is automatically restarted
- Stderr is captured as a Warning-level log channel, not treated as an error
- No temporary files are written — code is injected directly via stdin at startup
Recommendations
- Use CSharpSourceEngine for high-throughput or latency-sensitive topics
- Use PythonEngine for rapid prototyping, scripting, or when Python-specific packages are needed
- The first Roslyn compilation in CSharpSourceEngine incurs a ~2s startup cost; subsequent messages are sub-millisecond
- Python worker subprocess overhead is ~500ms at startup and 1-5ms per message