Skip to content

Worker DevKit

The Virtufin.Worker.DevKit NuGet package is the shared base library for all Virtufin workers. It provides a single, idiomatic .NET API for writing workers that runs against the two managed engines:

Engine DevKit class to extend Runs in
DotNetDllEngine WorkerBase<TInput, TOutput> (or one of its subclasses) In-process (JIT via hostfxr)
NativeDllEngine WorkerBase<TInput, TOutput> (or one of its subclasses), wrapped in AotNative<W> In-process (AOT-compiled, FlatBuffer wire format)

The package is dependency-light: it only references CloudNative.CloudEvents (plus, for the API helpers, Virtufin.Api.Client). The AOT native side also references Google.FlatBuffers.

The package is published from this repository at the same LIBRARY_VERSION as the WorkManager service. See the introduction for the package-publishing model.

Class hierarchy

The hierarchy is generic on the wire codec. The base WorkerBase<TInput, TOutput> takes a codec that owns payload decode/encode; the worker only deals with the typed in/out. The default JSON codec is JsonCloudEventCodec; plug a different codec in for non-JSON formats.

classDiagram
    class IWorker {
        <<interface>>
        +ProcessAsync(CloudEvent) Task~CloudEvent?~
    }

    class WorkerBase~TInput, TOutput~ {
        <<abstract, generic>>
        +Source Uri
        +ResponseEventType string
        +Codec ICloudEventCodec~TInput, TOutput~
        +ProcessAsync(CloudEvent) Task~CloudEvent?~
        +BuildResponse(CloudEvent, TOutput) CloudEvent
        +WithCorrelationId(CloudEvent) CloudEvent
        +WithTimestamp(CloudEvent) CloudEvent
    }
    IWorker <|.. WorkerBase~TInput, TOutput~

    class CommandWorker~TInput, TOutput, TCommand~ {
        <<abstract, generic>>
        +HandleCommandAsync(CloudEvent, TCommand, TInput) Task~TOutput?~
        +ExtractCommand(CloudEvent, TInput) TCommand
    }
    WorkerBase~TInput, TOutput~ <|-- CommandWorker~TInput, TOutput, TCommand~

    class CommandWorker~T~ {
        <<abstract, generic (JSON convenience)>>
        -Codec JsonCloudEventCodec
    }
    CommandWorker~TInput, TOutput, TCommand~ <|-- CommandWorker~T~

    class ApiWorker {
        <<sealed (JSON)>>
        +Api ApiClient?
        +HandleAsync(CloudEvent, JsonNode) Task~JsonObject?~
    }
    WorkerBase~JsonNode, JsonObject~ <|-- ApiWorker

    class ApiCommandWorker~T~ {
        <<sealed (JSON)>>
        +Api ApiClient?
        +HandleCommandAsync(CloudEvent, T, JsonNode) Task~JsonObject?~
        +EnsureClient(host, port)
        +EnsureClientFromCommand(node)
    }
    CommandWorker~T~ <|-- ApiCommandWorker~T~

    class ICloudEventCodec~TInput, TOutput~ {
        <<interface>>
        +ContentType string
        +DecodePayload(CloudEvent) TInput
        +EncodeResponse(CloudEvent, TOutput) (object?, string)
        +EncodeError(CloudEvent, string) TOutput
    }

    class JsonCloudEventCodec {
        +ContentType = "application/json"
    }
    ICloudEventCodec~TInput, TOutput~ <|.. JsonCloudEventCodec

    class AotNative~W~ {
        <<sealed, generic bridge>>
        +CurrentContext WorkerContext?
        +ProcessStatic(host, inBuf, inLen) int
        +FreeResultStatic(result) void
    }
    note for AotNative~W~ "Wraps any IWorker; the same worker<br/>class works under DotNetDllEngine<br/>(JIT) and NativeDllEngine (AOT)."

CommandWorker<T> and the *CommandWorker* variants require the enum identifier names to match the wire-level command strings (case-insensitive). See the CommandWorker<T> section below for the data flow.

Public API (JIT side: DotNetDllEngine)

IWorker and WorkerBase<TInput, TOutput>

The minimal contract a worker must satisfy:

public interface IWorker
{
    Task<CloudEvent?> ProcessAsync(CloudEvent input);
}

WorkerBase<TInput, TOutput> is the abstract, generic base class. The codec — supplied in the constructor — owns the wire-format decode/encode (ICloudEventCodec<TInput, TOutput>). The base class provides response-building helpers, automatic correlation-id propagation, and the canonical CloudEvent extension attribute names.

public abstract class WorkerBase<TInput, TOutput> : IWorker
{
    public const string CorrelationIdAttribute = "correlationid";
    public const string ReplyTopicAttribute     = "replytopic";
    public const string FieldError               = "error";
    public const string DataContentTypeJson      = "application/json";

    protected ICloudEventCodec<TInput, TOutput> Codec { get; }

    protected WorkerBase(
        Uri source,
        string responseEventType,
        ICloudEventCodec<TInput, TOutput> codec) { ... }

    public Task<CloudEvent?> ProcessAsync(CloudEvent input);   // not sealed
    protected abstract Task<TOutput?> ProcessAsync(CloudEvent input, TInput data);

    protected CloudEvent BuildResponse(CloudEvent input, TOutput payload);
    protected CloudEvent BuildResponse(CloudEvent input, object? data, string contentType);
    protected CloudEvent WithCorrelationId(CloudEvent input);
    protected CloudEvent WithTimestamp(CloudEvent input);
}

Response helpers

BuildResponse builds a CloudEvent with the configured Source and ResponseEventType. If the input has a correlationid extension, the reply inherits it automatically (via WithCorrelationId); the WithTimestamp extension is also applied automatically:

sequenceDiagram
    autonumber
    participant Sub as Pub/Sub
    participant WM as WorkManager
    participant W as WorkerBase.ProcessAsync
    participant Codec as JsonCloudEventCodec
    participant H as HandleAsync(CloudEvent, JsonNode)

    Sub->>WM: input CE (with correlationid)
    WM->>W: ProcessAsync(ce)
    W->>Codec: DecodePayload(ce) → node
    Codec-->>W: node
    W->>H: HandleAsync(ce, node)
    H-->>W: JsonObject payload
    W->>W: WithCorrelationId(ce).WithTimestamp()<br/>Type = responseEventType
    W-->>WM: CloudEvent envelope with payload
    WM->>Sub: publish response

If the handler throws, the base class catches and builds an error envelope via codec.EncodeError(...) — the typed error payload (JsonObject shaped like {"command":"error","success":false,"message":"..."} for JSON).

replytopic extension

If the input CloudEvent carries a replytopic extension attribute, the WorkManager routes the response to that topic instead of the worker's default reply topic. WorkerBase does not need to do anything — the engine reads this extension and overrides the publish topic.

CommandWorker<T> — typed command dispatch

The CommandWorker<T> base class is for workers that handle a finite set of named commands carried in the CloudEvent data payload. The wire format is:

{ "command": "create", "api_host": "...", "data": { ... } }

CommandWorker<T> parses data.command, calls Enum.TryParse<T>(command, ignoreCase: true, out _), and forwards the parsed enum value plus the full JsonNode payload to your override:

sequenceDiagram
    autonumber
    participant Sub as Pub/Sub
    participant WM as WorkManager
    participant CW as CommandWorker.ProcessAsync
    participant Codec as JsonCloudEventCodec
    participant H as HandleCommandAsync

    Sub->>WM: data = {"command": "create", ...}
    WM->>CW: ProcessAsync(ce)
    CW->>Codec: DecodePayload(ce) → node
    Codec-->>CW: node
    CW->>CW: ExtractCommand(ce, node)<br/>node["command"] → "create"<br/>Enum.TryParse<T>("create") → MyCmd.create
    CW->>H: HandleCommandAsync(ce, MyCmd.create, node)
    alt Handler returns JsonObject
        H-->>CW: payload JsonObject
        CW->>Codec: EncodeResponse(ce, payload) → (data, contentType)
        CW->>CW: envelope = WithCorrelationId(ce).WithTimestamp()<br/>Type = responseEventType
        CW-->>WM: response CE
    else Handler returns null
        H-->>CW: null (no reply)
        CW-->>WM: null
    end

Usage:

public enum MyCommand { create, destroy, list }

public class MyWorker : CommandWorker<MyCommand>
{
    public MyWorker(Uri source = new("urn:com.example.myworker"),
                    string responseEventType = "com.example.myworker.response")
        : base(source, responseEventType) { }

    protected override Task<JsonObject?> HandleCommandAsync(
        CloudEvent input, MyCommand command, JsonNode node)
        => command switch
        {
            MyCommand.create  => HandleCreateAsync(input, node),
            MyCommand.destroy => HandleDestroyAsync(input),
            MyCommand.list    => HandleListAsync(input),
            _ => null,
        };
}

The enum identifier names are the wire strings. The same enum drives both the gRPC service definition (proto) and the worker handler, so a rename in one place surfaces as a wire-protocol break.

The handler returns a typed payload (JsonObject), not a CloudEvent. The base class wraps it in the envelope with Id, Type, Source, correlationid extension (auto-applied from input), and time extension (auto-applied). Use the Response(...) helper on CommandWorker<T> to build the standard { "command", "success", "id", "message" } payload:

return Response(input, MyCommand.create.ToString(), true,
                 id: createdId, message: "Created");

For richer payloads, build a JsonObject directly.

ApiWorker and ApiCommandWorker<T>

ApiWorker : WorkerBase<JsonNode, JsonObject> is the JSON-convenience base for workers that call backend services through the API Gateway without a typed command. Override HandleAsync(CloudEvent, JsonNode) → Task<JsonObject?>.

ApiCommandWorker<T> : CommandWorker<T> combines typed command dispatch with the API client helper. Both inject a pre-configured Virtufin.Api.Client.ApiClient for backend calls through the API Gateway.

public class MyApiWorker : ApiCommandWorker<MyCommand>
{
    public MyApiWorker() : base(
        source: new("urn:com.example.myworker"),
        responseEventType: "com.example.myworker.response",
        apiHost: "localhost",
        apiPort: 5002) { }

    protected override async Task<JsonObject?> HandleCommandAsync(
        CloudEvent input, MyCommand command, JsonNode node)
    {
        if (command == MyCommand.create)
        {
            EnsureClientFromCommand(node);   // or pass apiHost/apiPort in ctor
            var created = await Api!.SomeService.CreateAsync(...);
            return Response(input, command.ToString(), true,
                            id: created.Id, message: "Created");
        }
        return ErrorPayload("Unsupported command");
    }
}

Two API-client construction modes are available:

  • Eager: pass apiHost and apiPort to the base ctor; Api is ready in HandleCommandAsync.
  • Lazy: pass only source and responseEventType. Api is null until EnsureClient(host, port) (or EnsureClientFromCommand(node), which reads the api_host field from the decoded JsonNode) is called. Use this when the API host comes from the command payload.

AOT native side (NativeDllEngine)

The AOT side is a thin bridge over the same IWorker contract — there is no parallel class hierarchy. Any IWorker implementation (typically extending WorkerBase<TInput, TOutput> or CommandWorker<T>) runs unchanged under DotNetDllEngine (JIT, in-process ALC) and NativeDllEngine (AOT, via the AotNative<W> wrapper).

AotNative<W>

AotNative<W> where W : IWorker, new() is the only AOT-side base — a sealed generic bridge that provides the FlatBuffer / native C ABI plumbing needed by NativeDllEngine. The worker's class declares two [UnmanagedCallersOnly] exports (Process, FreeResult) that delegate to AotNative<W>.ProcessStatic and AotNative<W>.FreeResultStatic.

public class MyWorker : CommandWorker<MyCommand>
{
    public MyWorker() : base(new Uri("urn:com.example"), "com.example.response") { }

    protected override async Task<JsonObject?> HandleCommandAsync(
        CloudEvent input, MyCommand command, JsonNode node)
    {
        var context = AotNative<MyWorker>.CurrentContext;
        if (context is null) return ErrorPayload("no AOT context");
        var gateway = new NativeApiGateway(context);
        // ... dispatch on command ...
        return Response(input, command.ToString(), true, message: "ok");
    }

    [UnmanagedCallersOnly(EntryPoint = "Process")]
    public static int Process(IntPtr host, IntPtr inBuf, int inLen,
                              IntPtr outBuf, IntPtr outLen)
    {
        unsafe
        {
            return AotNative<MyWorker>.ProcessStatic(
                host, inBuf, inLen, (IntPtr*)outBuf, (int*)outLen);
        }
    }

    [UnmanagedCallersOnly(EntryPoint = "FreeResult")]
    public static void FreeResult(IntPtr result)
        => AotNative<MyWorker>.FreeResultStatic(result);
}

The same MyWorker class is also usable directly by DotNetDllEngine (the JIT engine calls worker.ProcessAsync(CloudEvent) without going through the AotNative bridge).

AotNative<W>.CurrentContext

A static AsyncLocal<WorkerContext?> set by ProcessStatic immediately before the wrapped worker's ProcessAsync is called and cleared in a finally block after it returns. Workers read this to obtain a NativeApiGateway for backend calls under NativeDllEngine. It is null outside a native invocation (i.e. under DotNetDllEngine).

Per-instance state

Each unique host pointer (one per LoadCodeAsync call) gets its own wrapped W instance, looked up via a ConcurrentDictionary<IntPtr, AotNative<W>>. Multiple workers sharing the same loaded library still get independent state.

sequenceDiagram
    autonumber
    participant WM as WorkManager<br/>(NativeDllEngine)
    participant Lib as Native worker<br/>(lib*.so)
    participant Bridge as AotNative<MyWorker>
    participant Sub as MyWorker.HandleCommandAsync
    participant Ctx as WorkerContext

    WM->>Lib: Process(host, inBuf, inLen, &outBuf, &outLen)
    Note over WM,Lib: inBuf is a CloudEvent FlatBuffer
    Lib->>Bridge: ProcessStatic(host, inBuf, inLen, ...)
    Bridge->>Bridge: CloudEventCodec.DecodeCloudEvent(inBuf) → ce
    Bridge->>Bridge: _instances.GetOrAdd(host) → wrapper
    Bridge->>Bridge: CurrentContext = BuildWorkerContext(host)
    Bridge->>Sub: wrapper.ProcessAsync(ce)
    Sub->>Ctx: ctx.Log("...", level)
    Ctx->>Lib: VirtufinHost.log(host, level, msg)
    Lib-->>Ctx: log goes back to WM via callback
    Sub->>Ctx: gateway.InvokeAsync(...)
    Ctx->>Lib: VirtufinHost.gateway_call(host, ...)
    Lib-->>Ctx: gateway goes back to WM
    Sub-->>Bridge: payload JsonObject (or null)
    Bridge->>Bridge: codec.EncodeResponse(ce, payload) → envelope
    Bridge->>Bridge: WithCorrelationId(ce).WithTimestamp()
    Bridge->>Bridge: CloudEventCodec.EncodeWorkerResponse(envelope) → bytes
    Bridge->>Bridge: Marshal.AllocHGlobal(bytes)<br/>*outBuf = ptr, *outLen = bytes.Length
    Bridge-->>Lib: 0
    Lib-->>WM: outBuf, outLen
    WM->>WM: Marshal.Copy → span → DecodeWorkerResponse
    WM->>Lib: FreeResult(outBuf)
    WM->>WM: publish reply CE

WorkerContext

WorkerContext wraps the native VirtufinHost* that the engine hands to the worker. It exposes typed helpers for the three host callbacks:

  • Log(LogLevel level, string message) — write to the WorkManager log
  • InvokeGateway(string service, string method, byte[] requestJson) — invoke a backend service through the API Gateway via the host's gateway_call callback
  • The host's free_response callback is invoked automatically by InvokeGateway after the worker reads the response bytes

Workers read WorkerContext via AotNative<W>.CurrentContext and wrap it in a NativeApiGateway for typed gateway calls. Do not dereference WorkerContext.Host; it is opaque.

NativeApiGateway

Thin wrapper over WorkerContext.InvokeGateway. Construct one from the ambient CurrentContext:

var gateway = new NativeApiGateway(AotNative<MyWorker>.CurrentContext!);
var responseBytes = gateway.InvokeAsync("websocketmanager", "Connect", requestBytes);

CloudEventCodec

CloudEventCodec is the single source of truth for the CloudEvent ↔ FlatBuffer mapping shared by NativeDllEngine (engine side) and AotNative<W> (AOT-compiled worker side). The wire format is the FlatBuffer binary produced by FlatBufferBuilder.Finish(offset) without a size prefix — ByteBuffer.Wrap(bytes) consumes the bytes.

Method Direction Description
EncodeCloudEvent(CloudEvent) → byte[] managed → wire Encode a managed CloudEvent to the FlatBuffer wire format.
DecodeCloudEvent(byte[]) → CloudEvent wire → managed Decode a Process input buffer.
EncodeWorkerResponse(CloudEvent?) → byte[] managed → wire Encode the WorkerResponse returned from Process.
DecodeWorkerResponse(byte[]) → (CloudEvent?, string?) wire → managed Decode the WorkerResponse; (null, "...") on error.

If the worker's WorkerResponse.error_message is set, the engine surfaces a worker.error lifecycle event and (if no result_event is also set) throws InvalidOperationException. If neither field is set, the worker declines to publish a reply (the engine returns null).

JIT vs AOT — what changes?

The JIT and AOT paths share WorkerBase<TInput, TOutput>, CommandWorker<TInput, TOutput, TCommand>, CommandWorker<T>, ApiWorker, and ApiCommandWorker<T>. Only the plumbing differs:

Concern DotNetDllEngine (JIT) NativeDllEngine (AOT)
Worker class Plain IWorker impl Plain IWorker impl, wrapped in AotNative<W>
Bridge (none — engine calls ProcessAsync directly) [UnmanagedCallersOnly] Process + FreeResult exports on the worker, delegating to AotNative<W>.ProcessStatic / FreeResultStatic
Native exports None Required on the worker class; need <AllowUnsafeBlocks>true</AllowUnsafeBlocks> (the bridge uses IntPtr*/int* because .NET 10 forbids out on [UnmanagedCallersOnly])
Backend gateway ApiClient (gRPC) injected into ApiWorker/ApiCommandWorker<T> NativeApiGateway(context) constructed from AotNative<W>.CurrentContext
Per-instance state One IWorker per load One AotNative<W> (and one wrapped W) per host pointer

Wire format (FlatBuffer)

The on-the-wire schema is the FlatBuffer defined in src/Virtufin.Worker.DevKit/Schemas/worker_api.fbs. Three top-level tables:

  • CloudEvent — the input to Process. Mirrors the CloudEvents 1.0 spec fields (id, type, source, specversion, datacontenttype, dataschema, subject, time, data: [ubyte]) plus an extensions: [Extension] vector carrying (name, value) string pairs. Non-string extension values are JSON-encoded by the engine before being placed in the wire buffer.
  • WorkerResponse — the output of Process. Exactly one of result_event: CloudEvent or error_message: string is set.
  • Extension — the (name, value) pair for the extensions vector.
namespace Virtufin.Worker.FlatBuffers;

table CloudEvent { id, type, source, specversion, datacontenttype,
                   dataschema, subject, time, data: [ubyte],
                   extensions: [Extension]; }
table Extension  { name, value: string; }
table WorkerResponse { result_event: CloudEvent; error_message: string; }

The matching generated C# types live in src/Virtufin.Worker.DevKit/Generated/ (CloudEvent.cs, Extension.cs, WorkerResponse.cs) and are checked in.

Regenerating the wire format

# Requires flatc (Google FlatBuffers compiler) on PATH.
flatc --csharp -o src/Virtufin.Worker.DevKit/Generated \
    src/Virtufin.Worker.DevKit/Schemas/worker_api.fbs

Regenerate when the schema changes; do not hand-edit the files under Generated/.

See also

  • Engine System — how each engine loads, calls, and disposes the worker
  • DotNetDll Workers — full guide for JIT workers, including dependency resolution and the Virtufin.Worker.DevKit package reference
  • NativeDll Workers — full guide for in-process native workers, including the C ABI, host struct, and per-RID build instructions