Worker DevKit
The Virtufin.Worker.DevKit NuGet package is the shared base library for all
Virtufin workers. It provides a single, idiomatic .NET API for writing workers
that runs against the two managed engines:
| Engine | DevKit class to extend | Runs in |
|---|---|---|
DotNetDllEngine |
WorkerBase<TInput, TOutput> (or one of its subclasses) |
In-process (JIT via hostfxr) |
NativeDllEngine |
WorkerBase<TInput, TOutput> (or one of its subclasses), wrapped in AotNative<W> |
In-process (AOT-compiled, FlatBuffer wire format) |
The package is dependency-light: it only references CloudNative.CloudEvents
(plus, for the API helpers, Virtufin.Api.Client). The AOT native side also
references Google.FlatBuffers.
The package is published from this repository at the same LIBRARY_VERSION as
the WorkManager service. See the introduction for the package-publishing model.
Class hierarchy
The hierarchy is generic on the wire codec. The base
WorkerBase<TInput, TOutput> takes a codec that owns payload decode/encode;
the worker only deals with the typed in/out. The default JSON codec is
JsonCloudEventCodec; plug a different codec in for non-JSON formats.
classDiagram
class IWorker {
<<interface>>
+ProcessAsync(CloudEvent) Task~CloudEvent?~
}
class WorkerBase~TInput, TOutput~ {
<<abstract, generic>>
+Source Uri
+ResponseEventType string
+Codec ICloudEventCodec~TInput, TOutput~
+ProcessAsync(CloudEvent) Task~CloudEvent?~
+BuildResponse(CloudEvent, TOutput) CloudEvent
+WithCorrelationId(CloudEvent) CloudEvent
+WithTimestamp(CloudEvent) CloudEvent
}
IWorker <|.. WorkerBase~TInput, TOutput~
class CommandWorker~TInput, TOutput, TCommand~ {
<<abstract, generic>>
+HandleCommandAsync(CloudEvent, TCommand, TInput) Task~TOutput?~
+ExtractCommand(CloudEvent, TInput) TCommand
}
WorkerBase~TInput, TOutput~ <|-- CommandWorker~TInput, TOutput, TCommand~
class CommandWorker~T~ {
<<abstract, generic (JSON convenience)>>
-Codec JsonCloudEventCodec
}
CommandWorker~TInput, TOutput, TCommand~ <|-- CommandWorker~T~
class ApiWorker {
<<sealed (JSON)>>
+Api ApiClient?
+HandleAsync(CloudEvent, JsonNode) Task~JsonObject?~
}
WorkerBase~JsonNode, JsonObject~ <|-- ApiWorker
class ApiCommandWorker~T~ {
<<sealed (JSON)>>
+Api ApiClient?
+HandleCommandAsync(CloudEvent, T, JsonNode) Task~JsonObject?~
+EnsureClient(host, port)
+EnsureClientFromCommand(node)
}
CommandWorker~T~ <|-- ApiCommandWorker~T~
class ICloudEventCodec~TInput, TOutput~ {
<<interface>>
+ContentType string
+DecodePayload(CloudEvent) TInput
+EncodeResponse(CloudEvent, TOutput) (object?, string)
+EncodeError(CloudEvent, string) TOutput
}
class JsonCloudEventCodec {
+ContentType = "application/json"
}
ICloudEventCodec~TInput, TOutput~ <|.. JsonCloudEventCodec
class AotNative~W~ {
<<sealed, generic bridge>>
+CurrentContext WorkerContext?
+ProcessStatic(host, inBuf, inLen) int
+FreeResultStatic(result) void
}
note for AotNative~W~ "Wraps any IWorker; the same worker<br/>class works under DotNetDllEngine<br/>(JIT) and NativeDllEngine (AOT)."
CommandWorker<T> and the *CommandWorker* variants require the enum
identifier names to match the wire-level command strings (case-insensitive).
See the CommandWorker<T> section below for the data flow.
Public API (JIT side: DotNetDllEngine)
IWorker and WorkerBase<TInput, TOutput>
The minimal contract a worker must satisfy:
public interface IWorker
{
Task<CloudEvent?> ProcessAsync(CloudEvent input);
}
WorkerBase<TInput, TOutput> is the abstract, generic base class. The
codec — supplied in the constructor — owns the wire-format decode/encode
(ICloudEventCodec<TInput, TOutput>). The base class provides response-building
helpers, automatic correlation-id propagation, and the canonical CloudEvent
extension attribute names.
public abstract class WorkerBase<TInput, TOutput> : IWorker
{
public const string CorrelationIdAttribute = "correlationid";
public const string ReplyTopicAttribute = "replytopic";
public const string FieldError = "error";
public const string DataContentTypeJson = "application/json";
protected ICloudEventCodec<TInput, TOutput> Codec { get; }
protected WorkerBase(
Uri source,
string responseEventType,
ICloudEventCodec<TInput, TOutput> codec) { ... }
public Task<CloudEvent?> ProcessAsync(CloudEvent input); // not sealed
protected abstract Task<TOutput?> ProcessAsync(CloudEvent input, TInput data);
protected CloudEvent BuildResponse(CloudEvent input, TOutput payload);
protected CloudEvent BuildResponse(CloudEvent input, object? data, string contentType);
protected CloudEvent WithCorrelationId(CloudEvent input);
protected CloudEvent WithTimestamp(CloudEvent input);
}
Response helpers
BuildResponse builds a CloudEvent with the configured Source and
ResponseEventType. If the input has a correlationid extension, the reply
inherits it automatically (via WithCorrelationId); the WithTimestamp
extension is also applied automatically:
sequenceDiagram
autonumber
participant Sub as Pub/Sub
participant WM as WorkManager
participant W as WorkerBase.ProcessAsync
participant Codec as JsonCloudEventCodec
participant H as HandleAsync(CloudEvent, JsonNode)
Sub->>WM: input CE (with correlationid)
WM->>W: ProcessAsync(ce)
W->>Codec: DecodePayload(ce) → node
Codec-->>W: node
W->>H: HandleAsync(ce, node)
H-->>W: JsonObject payload
W->>W: WithCorrelationId(ce).WithTimestamp()<br/>Type = responseEventType
W-->>WM: CloudEvent envelope with payload
WM->>Sub: publish response
If the handler throws, the base class catches and builds an error envelope
via codec.EncodeError(...) — the typed error payload (JsonObject
shaped like {"command":"error","success":false,"message":"..."} for JSON).
replytopic extension
If the input CloudEvent carries a replytopic extension attribute, the
WorkManager routes the response to that topic instead of the worker's default
reply topic. WorkerBase does not need to do anything — the engine reads
this extension and overrides the publish topic.
CommandWorker<T> — typed command dispatch
The CommandWorker<T> base class is for workers that handle a finite set of
named commands carried in the CloudEvent data payload. The wire format is:
{ "command": "create", "api_host": "...", "data": { ... } }
CommandWorker<T> parses data.command, calls
Enum.TryParse<T>(command, ignoreCase: true, out _), and forwards the parsed
enum value plus the full JsonNode payload to your override:
sequenceDiagram
autonumber
participant Sub as Pub/Sub
participant WM as WorkManager
participant CW as CommandWorker.ProcessAsync
participant Codec as JsonCloudEventCodec
participant H as HandleCommandAsync
Sub->>WM: data = {"command": "create", ...}
WM->>CW: ProcessAsync(ce)
CW->>Codec: DecodePayload(ce) → node
Codec-->>CW: node
CW->>CW: ExtractCommand(ce, node)<br/>node["command"] → "create"<br/>Enum.TryParse<T>("create") → MyCmd.create
CW->>H: HandleCommandAsync(ce, MyCmd.create, node)
alt Handler returns JsonObject
H-->>CW: payload JsonObject
CW->>Codec: EncodeResponse(ce, payload) → (data, contentType)
CW->>CW: envelope = WithCorrelationId(ce).WithTimestamp()<br/>Type = responseEventType
CW-->>WM: response CE
else Handler returns null
H-->>CW: null (no reply)
CW-->>WM: null
end
Usage:
public enum MyCommand { create, destroy, list }
public class MyWorker : CommandWorker<MyCommand>
{
public MyWorker(Uri source = new("urn:com.example.myworker"),
string responseEventType = "com.example.myworker.response")
: base(source, responseEventType) { }
protected override Task<JsonObject?> HandleCommandAsync(
CloudEvent input, MyCommand command, JsonNode node)
=> command switch
{
MyCommand.create => HandleCreateAsync(input, node),
MyCommand.destroy => HandleDestroyAsync(input),
MyCommand.list => HandleListAsync(input),
_ => null,
};
}
The enum identifier names are the wire strings. The same enum drives both the gRPC service definition (proto) and the worker handler, so a rename in one place surfaces as a wire-protocol break.
The handler returns a typed payload (JsonObject), not a CloudEvent.
The base class wraps it in the envelope with Id, Type, Source,
correlationid extension (auto-applied from input), and time extension
(auto-applied). Use the Response(...) helper on CommandWorker<T> to build
the standard { "command", "success", "id", "message" } payload:
return Response(input, MyCommand.create.ToString(), true,
id: createdId, message: "Created");
For richer payloads, build a JsonObject directly.
ApiWorker and ApiCommandWorker<T>
ApiWorker : WorkerBase<JsonNode, JsonObject> is the JSON-convenience base
for workers that call backend services through the API Gateway without a
typed command. Override HandleAsync(CloudEvent, JsonNode) → Task<JsonObject?>.
ApiCommandWorker<T> : CommandWorker<T> combines typed command dispatch with
the API client helper. Both inject a pre-configured
Virtufin.Api.Client.ApiClient for backend calls through the API Gateway.
public class MyApiWorker : ApiCommandWorker<MyCommand>
{
public MyApiWorker() : base(
source: new("urn:com.example.myworker"),
responseEventType: "com.example.myworker.response",
apiHost: "localhost",
apiPort: 5002) { }
protected override async Task<JsonObject?> HandleCommandAsync(
CloudEvent input, MyCommand command, JsonNode node)
{
if (command == MyCommand.create)
{
EnsureClientFromCommand(node); // or pass apiHost/apiPort in ctor
var created = await Api!.SomeService.CreateAsync(...);
return Response(input, command.ToString(), true,
id: created.Id, message: "Created");
}
return ErrorPayload("Unsupported command");
}
}
Two API-client construction modes are available:
- Eager: pass
apiHostandapiPortto the base ctor;Apiis ready inHandleCommandAsync. - Lazy: pass only
sourceandresponseEventType.ApiisnulluntilEnsureClient(host, port)(orEnsureClientFromCommand(node), which reads theapi_hostfield from the decodedJsonNode) is called. Use this when the API host comes from the command payload.
AOT native side (NativeDllEngine)
The AOT side is a thin bridge over the same IWorker contract — there
is no parallel class hierarchy. Any IWorker implementation (typically
extending WorkerBase<TInput, TOutput> or CommandWorker<T>) runs unchanged
under DotNetDllEngine (JIT, in-process ALC) and NativeDllEngine
(AOT, via the AotNative<W> wrapper).
AotNative<W>
AotNative<W> where W : IWorker, new() is the only AOT-side base — a
sealed generic bridge that provides the FlatBuffer / native C ABI plumbing
needed by NativeDllEngine. The worker's class declares two
[UnmanagedCallersOnly] exports (Process, FreeResult) that delegate
to AotNative<W>.ProcessStatic and AotNative<W>.FreeResultStatic.
public class MyWorker : CommandWorker<MyCommand>
{
public MyWorker() : base(new Uri("urn:com.example"), "com.example.response") { }
protected override async Task<JsonObject?> HandleCommandAsync(
CloudEvent input, MyCommand command, JsonNode node)
{
var context = AotNative<MyWorker>.CurrentContext;
if (context is null) return ErrorPayload("no AOT context");
var gateway = new NativeApiGateway(context);
// ... dispatch on command ...
return Response(input, command.ToString(), true, message: "ok");
}
[UnmanagedCallersOnly(EntryPoint = "Process")]
public static int Process(IntPtr host, IntPtr inBuf, int inLen,
IntPtr outBuf, IntPtr outLen)
{
unsafe
{
return AotNative<MyWorker>.ProcessStatic(
host, inBuf, inLen, (IntPtr*)outBuf, (int*)outLen);
}
}
[UnmanagedCallersOnly(EntryPoint = "FreeResult")]
public static void FreeResult(IntPtr result)
=> AotNative<MyWorker>.FreeResultStatic(result);
}
The same MyWorker class is also usable directly by DotNetDllEngine
(the JIT engine calls worker.ProcessAsync(CloudEvent) without going
through the AotNative bridge).
AotNative<W>.CurrentContext
A static AsyncLocal<WorkerContext?> set by ProcessStatic immediately
before the wrapped worker's ProcessAsync is called and cleared in a
finally block after it returns. Workers read this to obtain a
NativeApiGateway for backend calls under NativeDllEngine. It is
null outside a native invocation (i.e. under DotNetDllEngine).
Per-instance state
Each unique host pointer (one per LoadCodeAsync call) gets its own
wrapped W instance, looked up via a ConcurrentDictionary<IntPtr,
AotNative<W>>. Multiple workers sharing the same loaded library still
get independent state.
sequenceDiagram
autonumber
participant WM as WorkManager<br/>(NativeDllEngine)
participant Lib as Native worker<br/>(lib*.so)
participant Bridge as AotNative<MyWorker>
participant Sub as MyWorker.HandleCommandAsync
participant Ctx as WorkerContext
WM->>Lib: Process(host, inBuf, inLen, &outBuf, &outLen)
Note over WM,Lib: inBuf is a CloudEvent FlatBuffer
Lib->>Bridge: ProcessStatic(host, inBuf, inLen, ...)
Bridge->>Bridge: CloudEventCodec.DecodeCloudEvent(inBuf) → ce
Bridge->>Bridge: _instances.GetOrAdd(host) → wrapper
Bridge->>Bridge: CurrentContext = BuildWorkerContext(host)
Bridge->>Sub: wrapper.ProcessAsync(ce)
Sub->>Ctx: ctx.Log("...", level)
Ctx->>Lib: VirtufinHost.log(host, level, msg)
Lib-->>Ctx: log goes back to WM via callback
Sub->>Ctx: gateway.InvokeAsync(...)
Ctx->>Lib: VirtufinHost.gateway_call(host, ...)
Lib-->>Ctx: gateway goes back to WM
Sub-->>Bridge: payload JsonObject (or null)
Bridge->>Bridge: codec.EncodeResponse(ce, payload) → envelope
Bridge->>Bridge: WithCorrelationId(ce).WithTimestamp()
Bridge->>Bridge: CloudEventCodec.EncodeWorkerResponse(envelope) → bytes
Bridge->>Bridge: Marshal.AllocHGlobal(bytes)<br/>*outBuf = ptr, *outLen = bytes.Length
Bridge-->>Lib: 0
Lib-->>WM: outBuf, outLen
WM->>WM: Marshal.Copy → span → DecodeWorkerResponse
WM->>Lib: FreeResult(outBuf)
WM->>WM: publish reply CE
WorkerContext
WorkerContext wraps the native VirtufinHost* that the engine hands to the
worker. It exposes typed helpers for the three host callbacks:
Log(LogLevel level, string message)— write to the WorkManager logInvokeGateway(string service, string method, byte[] requestJson)— invoke a backend service through the API Gateway via the host'sgateway_callcallback- The host's
free_responsecallback is invoked automatically byInvokeGatewayafter the worker reads the response bytes
Workers read WorkerContext via AotNative<W>.CurrentContext and wrap it
in a NativeApiGateway for typed gateway calls. Do not dereference
WorkerContext.Host; it is opaque.
NativeApiGateway
Thin wrapper over WorkerContext.InvokeGateway. Construct one from the
ambient CurrentContext:
var gateway = new NativeApiGateway(AotNative<MyWorker>.CurrentContext!);
var responseBytes = gateway.InvokeAsync("websocketmanager", "Connect", requestBytes);
CloudEventCodec
CloudEventCodec is the single source of truth for the CloudEvent ↔ FlatBuffer
mapping shared by NativeDllEngine (engine side) and
AotNative<W> (AOT-compiled worker side). The wire format is the
FlatBuffer binary produced by FlatBufferBuilder.Finish(offset) without a
size prefix — ByteBuffer.Wrap(bytes) consumes the bytes.
| Method | Direction | Description |
|---|---|---|
EncodeCloudEvent(CloudEvent) → byte[] |
managed → wire | Encode a managed CloudEvent to the FlatBuffer wire format. |
DecodeCloudEvent(byte[]) → CloudEvent |
wire → managed | Decode a Process input buffer. |
EncodeWorkerResponse(CloudEvent?) → byte[] |
managed → wire | Encode the WorkerResponse returned from Process. |
DecodeWorkerResponse(byte[]) → (CloudEvent?, string?) |
wire → managed | Decode the WorkerResponse; (null, "...") on error. |
If the worker's WorkerResponse.error_message is set, the engine surfaces a
worker.error lifecycle event and (if no result_event is also set) throws
InvalidOperationException. If neither field is set, the worker declines to
publish a reply (the engine returns null).
JIT vs AOT — what changes?
The JIT and AOT paths share WorkerBase<TInput, TOutput>,
CommandWorker<TInput, TOutput, TCommand>, CommandWorker<T>,
ApiWorker, and ApiCommandWorker<T>. Only the plumbing differs:
| Concern | DotNetDllEngine (JIT) |
NativeDllEngine (AOT) |
|---|---|---|
| Worker class | Plain IWorker impl |
Plain IWorker impl, wrapped in AotNative<W> |
| Bridge | (none — engine calls ProcessAsync directly) |
[UnmanagedCallersOnly] Process + FreeResult exports on the worker, delegating to AotNative<W>.ProcessStatic / FreeResultStatic |
| Native exports | None | Required on the worker class; need <AllowUnsafeBlocks>true</AllowUnsafeBlocks> (the bridge uses IntPtr*/int* because .NET 10 forbids out on [UnmanagedCallersOnly]) |
| Backend gateway | ApiClient (gRPC) injected into ApiWorker/ApiCommandWorker<T> |
NativeApiGateway(context) constructed from AotNative<W>.CurrentContext |
| Per-instance state | One IWorker per load |
One AotNative<W> (and one wrapped W) per host pointer |
Wire format (FlatBuffer)
The on-the-wire schema is the FlatBuffer defined in
src/Virtufin.Worker.DevKit/Schemas/worker_api.fbs. Three top-level tables:
CloudEvent— the input toProcess. Mirrors the CloudEvents 1.0 spec fields (id,type,source,specversion,datacontenttype,dataschema,subject,time,data: [ubyte]) plus anextensions: [Extension]vector carrying (name, value) string pairs. Non-string extension values are JSON-encoded by the engine before being placed in the wire buffer.WorkerResponse— the output ofProcess. Exactly one ofresult_event: CloudEventorerror_message: stringis set.Extension— the(name, value)pair for theextensionsvector.
namespace Virtufin.Worker.FlatBuffers;
table CloudEvent { id, type, source, specversion, datacontenttype,
dataschema, subject, time, data: [ubyte],
extensions: [Extension]; }
table Extension { name, value: string; }
table WorkerResponse { result_event: CloudEvent; error_message: string; }
The matching generated C# types live in src/Virtufin.Worker.DevKit/Generated/
(CloudEvent.cs, Extension.cs, WorkerResponse.cs) and are checked in.
Regenerating the wire format
# Requires flatc (Google FlatBuffers compiler) on PATH.
flatc --csharp -o src/Virtufin.Worker.DevKit/Generated \
src/Virtufin.Worker.DevKit/Schemas/worker_api.fbs
Regenerate when the schema changes; do not hand-edit the files under
Generated/.
See also
- Engine System — how each engine loads, calls, and disposes the worker
- DotNetDll Workers — full guide for JIT workers,
including dependency resolution and the
Virtufin.Worker.DevKitpackage reference - NativeDll Workers — full guide for in-process native workers, including the C ABI, host struct, and per-RID build instructions