Skip to content

Engine System

The WorkManager uses a pluggable engine architecture to support multiple programming languages. Each engine handles code execution for a specific MIME type.

IEngine Interface

All engines implement the IEngine interface:

public interface IEngine
{
    Task LoadCodeAsync(byte[] code, CancellationToken cancellationToken = default);
    Task<CloudEvent?> ProcessAsync(CloudEvent input, CancellationToken cancellationToken = default);
}
Method Description
LoadCodeAsync Loads the worker code into the engine. Called once at worker creation or when code is updated.
ProcessAsync Processes an incoming CloudEvent and returns a response CloudEvent, or null to publish nothing.

Architecture

flowchart TD
    subgraph ER["EngineRegistry"]
        P["text/x-python"] --> PE[PythonEngine Factory]
        C["text/x-csharp"] --> CS[CSharpSourceEngine Factory]
        D["application/x-dotnet-dll"] --> CD[DotNetDllEngine Factory]
        N["application/x-native-dll"] --> CN[NativeDllEngine Factory]
    end
    ER --> EF[Engine Factory<br/>per worker]
    EF --> E1[PythonEngine Instance]
    EF --> E2[CSharpSourceEngine Instance]
    EF --> E3[DotNetDllEngine Instance]
    EF --> E4[NativeDllEngine Instance]

Engine Instance Lifecycle:

flowchart LR
    subgraph IE["IEngine Instance"]
        LC[LoadCodeAsync] --> CL[Compiled / Loaded Code]
        P[ProcessAsync] --> CE[Process CloudEvent]
    end

Engine Resolution

When a worker is created:

  1. WorkManager looks up the engine factory by MIME type in EngineRegistry
  2. A new engine instance is created via the factory function
  3. Code is loaded into the engine via LoadCodeAsync()
  4. The engine instance is associated with the worker

This design ensures: - Each worker has its own engine instance (no shared state) - Engines can be stateless or maintain internal state - New engines can be added without modifying existing code

Built-in Engines

PythonEngine

MIME Type: text/x-python

Executes Python code in a persistent sandboxed python3 subprocess. Code is injected directly via stdin at startup — no temporary files are written. The subprocess stays alive for the worker's lifetime, communicating via a JSON-line message loop over stdin/stdout.

Data flow:

sequenceDiagram
    autonumber
    participant Sub as Pub/Sub
    participant WM as WorkManager<br/>(PythonEngine)
    participant Py as python3 subprocess
    participant Code as Worker's Process()

    Sub->>WM: input CloudEvent
    WM->>WM: write worker code to .py temp file
    WM->>Py: start python3 X.py --socket S
    Py->>WM: ready frame (JSON over Unix domain socket)
    Note over WM,Py: engine waits for ready. timeout triggers relaunch.
    WM->>Py: process frame (cloudEvent)
    Py->>Code: Process(ce_dict)
    Code-->>Py: result dict (or None)
    Py->>WM: result frame
    WM->>WM: parse CE → publish reply<br/>(or null → no reply)
    WM->>Sub: reply CloudEvent
    Note over WM,Py: 10s health-check ping (empty heartbeat).<br/>Unresponsive subprocess is auto-restarted.

Code Requirements:

def Process(event):
    # event is a dict with CloudEvent fields
    # Return a dict representing the output CloudEvent, or None
    return {
        "type": "com.example.output",
        "source": "my-worker",
        "data": { "result": "processed" }
    }

CloudEvent Mapping:

Input CloudEvents are serialized to JSON and passed via stdin:

{
    "id": "event-123",
    "type": "com.example.input",
    "source": "urn:source",
    "specversion": "1.0",
    "datacontenttype": "application/json",
    "data": { "key": "value" }
}

The Process function should return a dict that gets deserialized back to a CloudEvent.

Error Handling: - Python stdout is captured and parsed as JSON - If stdout contains __error__ key, an exception is thrown - stderr output is captured as warning-level log entries (not treated as errors) - Execution timeout is 30 seconds by default - The subprocess is health-checked every 10 seconds; if unresponsive, it is automatically restarted

CSharpSourceEngine

MIME Type: text/x-csharp

Compiles and executes C# source code at runtime using Roslyn. The code is compiled into an in-memory assembly.

Data flow:

sequenceDiagram
    autonumber
    participant Sub as Pub/Sub
    participant WM as WorkManager<br/>(CSharpSourceEngine)
    participant Ros as Roslyn compiler
    participant W as Compiled worker<br/>(in-memory assembly)

    Note over WM,Ros: === LoadCodeAsync (once per worker) ===
    WM->>WM: Encoding.UTF8.GetString(code)
    WM->>Ros: CSharpSyntaxTree.ParseText(source)
    Ros-->>WM: syntaxTree
    WM->>Ros: CSharpCompilation.Create(...)<br/>.Emit(MemoryStream)
    Ros-->>WM: EmitResult
    WM->>WM: Assembly.Load(ilBytes)
    WM->>W: find static/instance "Process" via reflection
    WM->>WM: cache Func<CloudEvent, CloudEvent?> delegate
    Note over WM,W: --- one-time, ~2s startup ---

    Note over Sub,W: === Per-message ProcessAsync ===
    Sub->>WM: input CloudEvent
    WM->>W: invoke cached delegate(ce)
    W-->>WM: result CloudEvent (or null)
    WM->>Sub: reply CloudEvent

Code Requirements:

using CloudNative.CloudEvents;

// Must have a Process method that returns CloudEvent or null
public static CloudEvent ProcessAsync(CloudEvent input)
{
    return new CloudEvent
    {
        Type = "com.example.output",
        Source = new Uri("urn:my-worker"),
        Data = input.Data
    };
}

Requirements: - Method must be named Process - Must accept a single CloudEvent parameter - Must return CloudEvent or null - Can be static or instance method - Assembly is compiled with references to: - System.* - netstandard - CloudNative.CloudEvents

Error Handling: - Compilation errors throw InvalidOperationException with full error messages - Runtime exceptions propagate to the caller

NativeDllEngine

MIME Type: application/x-native-dll

Loads per-architecture native shared libraries (.so / .dylib / .dll) via NativeLibrary.Load in-process inside the AOT-compiled WorkManager. Exchanges FlatBuffers-encoded CloudEvents with the worker's Process export and supports host callbacks (log, gateway_call, free_response) on the VirtufinHost struct.

Data flow:

sequenceDiagram
    autonumber
    participant Sub as Pub/Sub
    participant WM as WorkManager<br/>(NativeDllEngine,<br/>AOT)
    participant Lib as Native worker<br/>(lib*.so)
    participant Base as "AotNative<W>"
    participant Ctx as WorkerContext

    Note over WM,Lib: === LoadCodeAsync (once per worker) ===
    WM->>WM: extract zip to temp dir
    WM->>WM: find runtimes/<rid>/native/lib*.so
    WM->>Lib: NativeLibrary.Load(path)
    WM->>Lib: GetExport("Process") → fn-ptr
    WM->>Lib: GetExport("FreeResult") → fn-ptr
    WM->>WM: build VirtufinHost vtable<br/>(log, gateway_call, free_response)
    Note over WM,Lib: --- one-time, <10ms startup ---

    Note over Sub,Lib: === Per-message ProcessAsync ===
    Sub->>WM: input CloudEvent
    WM->>WM: CloudEventCodec.EncodeCloudEvent(ce) → inBytes
    WM->>WM: inBuf = Marshal.AllocHGlobal(inBytes)
    WM->>Lib: Process(host, inBuf, inLen, &outBuf, &outLen)
    Lib->>Base: ProcessStatic<T>(host, inBuf, inLen, ...)
    Base->>Base: CloudEventCodec.DecodeCloudEvent(inBuf) → ce
    Base->>Base: _instances.GetOrAdd(host) → worker
    Base->>Ctx: worker.HandleAsync(ce, ctx)
    Ctx-->>Base: result CloudEvent (or null)
    Base->>Base: CloudEventCodec.EncodeWorkerResponse(result) → outBytes
    Base->>WM: outBuf = AllocHGlobal(outBytes), outLen = outBytes.Length
    WM->>WM: Marshal.Copy(outBuf, span)
    WM->>Lib: FreeResult(outBuf)
    WM->>WM: CloudEventCodec.DecodeWorkerResponse(span)
    WM->>Sub: publish reply CloudEvent

Isolation caveat: the engine runs in-process; an unrecoverable native error terminates the entire WorkManager. See NativeDllEngine Workers for the full author guide, C ABI, and defensive-coding requirements.

Performance: low-latency (~50-200 µs/message) without a subprocess boundary. Suitable for trusted, pre-compiled native code.

Code requirements: - Worker is a per-architecture shared library at runtimes/<rid>/native/lib<library>.so (or .dylib / .dll), packaged in a standard NuGet .nupkg. - The worker's .nuspec declares virtufin* extension elements (virtufinAbiVersion, virtufinEntryPoint, virtufinFreeResult, virtufinLibrary) inside <metadata>. - Two exported C functions: Process(const VirtufinHost*, const uint8_t* in, int32_t in_len, uint8_t** out, int32_t* out_len) and FreeResult(uint8_t*). - See NativeDllEngine Workers for a full example and the vendored virtufin_worker_api_c.h.

Error handling: - ABI mismatch (NotSupportedException). - Missing / malformed .nuspec, invalid library basename, missing RID entry (InvalidWorkerNuspecException / UnsupportedArchitectureException). - Process returns non-zero → engine returns null (caller publishes worker.error). - Process returns error_message in WorkerResponse → engine throws InvalidOperationException; lifecycle publisher surfaces worker.error.

DotNetDllEngine (in-process, default)

MIME Type: application/x-dotnet-dll

Default engine as of LIBRARY_VERSION 0.0.59. The engine is AOT-compiled into the WorkManager binary and embeds the .NET runtime (CoreCLR) on first use via libhostfxr. Worker DLLs are loaded into a per-worker, collectible AssemblyLoadContext. Per-call latency: sub-microsecond (direct in-process method dispatch, no socket, no JSON).

See In-Process DotNet DLL Workers for the full architecture, setup contract, and trade-offs.

Data flow:

sequenceDiagram
    autonumber
    participant Sub as Pub/Sub
    participant WM as WorkManager<br/>(DotNetDllEngine,<br/>AOT + CoreCLR)
    participant Ctx as AssemblyLoadContext
    participant W as IWorker impl

    Note over WM: === First LoadCodeAsync (one-time) ===
    WM->>WM: HostFxrBootstrap<br/>ensure CoreCLR loaded
    Note over WM,Ctx: === Per-worker LoadCodeAsync ===
    WM->>WM: WorkerNupkgLoader.Extract
    WM->>Ctx: new AssemblyLoadContext
    WM->>Ctx: LoadFromStream(worker.dll)
    WM->>Ctx: scan for IWorker impl
    WM->>W: Activator.CreateInstance

    Note over Sub,W: === Per-message ProcessAsync ===
    Sub->>WM: input CloudEvent
    WM->>W: worker.ProcessAsync(ce)<br/>(direct call, <1 µs)
    W-->>WM: result CloudEvent (or null)
    WM->>Sub: reply CloudEvent

Code Requirements: Same as before — workers implement Virtufin.Worker.DevKit.IWorker (or extend WorkerBase). The worker DLL is published as a NuGet .nupkg with a lib/<tfm>/<id>.dll entry and a <virtufinLibrary> extension in the nuspec. See DotNet DLL Workers for the full author guide.

Discovery: The engine scans the loaded assembly for the first non-abstract, non-interface type implementing IWorker. If no implementation is found, an InvalidOperationException is thrown.

Virtufin.Worker.DevKit is a lightweight NuGet package (dependency: CloudNative.CloudEvents only) that provides the IWorker interface and WorkerBase abstract class. It is specifically for the in-process DotNet DLL engine — not used by other engines.

The same package also provides the AOT-native bridge (AotNative<W>), the CloudEventCodec, and WorkerContext (plus its typed wrapper NativeApiGateway) used by the NativeDllEngine. See Worker DevKit for the full class hierarchy, the AOT data flow, and the FlatBuffer wire format.

Engine Registration

Engines are registered in Program.cs:

var engines = new (ContentType, string, Func<IEngine>)[]
{
    (new ContentType("text/x-csharp"), "C#", () => 
        new CSharpSourceEngine(logger)),
    (new ContentType("text/x-python"), "Python", () => 
        new PythonEngine(TimeSpan.FromSeconds(30), logger)),
};

foreach (var e in engines)
{
    engineRegistry.Register(e.Item1, e.Item2, e.Item3);
}

Adding a Custom Engine

  1. Create a new project referencing Virtufin.WorkManager.Engines

  2. Implement IEngine:

    using CloudNative.CloudEvents;
    using Virtufin.WorkManager.Engines;
    
    public sealed class MyEngine : IEngine
    {
        public void LoadCode(byte[] code)
        {
            // Load and possibly compile the code
        }
    
        public CloudEvent? ProcessAsync(CloudEvent input)
        {
            // Execute the code with the input
            // Return a response CloudEvent or null
        }
    }
    

  3. Register in Program.cs:

    engineRegistry.Register(
        new ContentType("text/x-my-lang"),
        "MyLanguage",
        () => new MyEngine()
    );
    

Engine Lifecycle

Engines are created fresh for each worker via the factory function. This means:

  • Engines should be stateless (or handle their own state initialization)
  • Engine instances are not reused across workers
  • When a worker is recovered, a new engine is created and LoadCode is called again

Thread Safety

Engine instances are specific to a single worker and accessed sequentially (one message at a time). However:

  • PythonEngine uses IDisposable - dispose is called when engine is replaced
  • CSharpSourceEngine and DotNetDllEngine do not require disposal
  • NativeDllEngine implements IAsyncDisposable; dispose frees the loaded library and releases the host struct

Performance Characteristics

Engine Startup Per-Message Throughput Recommended For
CSharpSourceEngine ~2s (Roslyn compilation) < 1ms High High-throughput topics, complex logic
DotNetDllEngine < 10ms < 1ms Highest Pre-compiled .NET assemblies
PythonEngine ~500ms (subprocess start) 1-5ms Medium Prototyping, scripting, Python-specific libraries
NativeDllEngine < 10ms (per-RID resolve + dlopen) ~50-200 µs High Pre-compiled native libraries (C/C++/Rust/Zig) where latency matters and the worker is trusted

Python Engine Internals

The Python engine uses a persistent subprocess that communicates via stdin/stdout:

  • The subprocess is started once at LoadCode and stays alive for the worker's lifetime
  • CloudEvents are exchanged as JSON lines — the host writes the input event to stdin and reads the response from stdout
  • A health check pings the subprocess every 10 seconds with an empty heartbeat; if the subprocess fails to respond, it is automatically restarted
  • Stderr is captured as a Warning-level log channel, not treated as an error
  • No temporary files are written — code is injected directly via stdin at startup

Recommendations

  • Use CSharpSourceEngine for high-throughput or latency-sensitive topics
  • Use PythonEngine for rapid prototyping, scripting, or when Python-specific packages are needed
  • The first Roslyn compilation in CSharpSourceEngine incurs a ~2s startup cost; subsequent messages are sub-millisecond
  • Python worker subprocess overhead is ~500ms at startup and 1-5ms per message