Class WorkManager
- Namespace
- Virtufin.WorkManager.Components
- Assembly
- Virtufin.WorkManager.dll
Manages worker lifecycle, subscriptions, and processing.
public sealed class WorkManager : IAsyncDisposable, IWorkerRecoveryExecutor
- Inheritance
-
WorkManager
- Implements
- Inherited Members
Constructors
WorkManager(DaprPublishSubscribeClient, DaprClient, IHttpClientFactory, IEngineRegistry, ILogger<WorkManager>, ILoggerFactory, DaprResiliencePipeline, ResilientDaprPublisher, string?, string?, string?, GroupLockOptions?)
Creates a new WorkManager instance.
public WorkManager(DaprPublishSubscribeClient pubsubClient, DaprClient daprClient, IHttpClientFactory httpClientFactory, IEngineRegistry engineRegistry, ILogger<WorkManager> logger, ILoggerFactory loggerFactory, DaprResiliencePipeline resilience, ResilientDaprPublisher publisher, string? stateStoreName = null, string? pubsubName = null, string? allowedCodeSourceHosts = null, GroupLockOptions? lockOptions = null)
Parameters
pubsubClientDaprPublishSubscribeClientdaprClientDaprClienthttpClientFactoryIHttpClientFactoryengineRegistryIEngineRegistryloggerILogger<WorkManager>loggerFactoryILoggerFactoryresilienceDaprResiliencePipelinepublisherResilientDaprPublisherstateStoreNamestringpubsubNamestringallowedCodeSourceHostsstringlockOptionsGroupLockOptions
Fields
CodeFetcherHttpClientName
public const string CodeFetcherHttpClientName = "CodeFetcher"
Field Value
DeadLetterTopicSuffix
Suffix appended to the topic name for dead-letter messages that have exhausted retries.
public const string DeadLetterTopicSuffix = "-dead"
Field Value
DefaultCodeFetchTimeoutSeconds
public const int DefaultCodeFetchTimeoutSeconds = 30
Field Value
UnknownCloudEventSourceUri
public static readonly Uri UnknownCloudEventSourceUri
Field Value
UnknownCloudEventType
public const string UnknownCloudEventType = "unknown"
Field Value
Methods
CreateWorkerAsync(CreateWorkerRequest, CancellationToken)
Creates a new worker and starts processing.
public Task<Guid> CreateWorkerAsync(CreateWorkerRequest request, CancellationToken cancellationToken = default)
Parameters
requestCreateWorkerRequestThe worker creation request.
cancellationTokenCancellationTokenCancellation token.
Returns
Exceptions
- EngineNotFoundException
Thrown if no engine is registered for the requested MIME type.
DeleteWorkerAsync(Guid, CancellationToken)
Deletes a worker and its subscriptions.
public Task DeleteWorkerAsync(Guid id, CancellationToken cancellationToken = default)
Parameters
idGuidThe worker ID.
cancellationTokenCancellationTokenCancellation token.
Returns
Exceptions
- WorkerNotFoundException
Thrown if the worker is not found.
DisposeAsync()
Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources asynchronously.
public ValueTask DisposeAsync()
Returns
- ValueTask
A task that represents the asynchronous dispose operation.
GetWorkerHistory(Guid)
Gets the code change history for a worker.
public IReadOnlyList<HistoryEntry> GetWorkerHistory(Guid id)
Parameters
idGuidThe worker ID.
Returns
- IReadOnlyList<HistoryEntry>
A read-only list of history entries.
Exceptions
- WorkerNotFoundException
Thrown if the worker is not found.
ListWorkers()
Lists all workers with their current status.
public IReadOnlyList<WorkerInfo> ListWorkers()
Returns
- IReadOnlyList<WorkerInfo>
A read-only list of worker information.
LoadCodeFromContent(Guid, byte[], CancellationToken)
Loads new code from content into an existing worker.
public Task LoadCodeFromContent(Guid id, byte[] content, CancellationToken cancellationToken = default)
Parameters
idGuidThe worker ID.
contentbyte[]The code content.
cancellationTokenCancellationTokenCancellation token.
Returns
Exceptions
- WorkerNotFoundException
Thrown if the worker is not found.
- ArgumentException
Thrown if content is empty.
LoadCodeFromUrl(Guid, Uri, CancellationToken)
Loads new code from a URL into an existing worker.
public Task LoadCodeFromUrl(Guid id, Uri url, CancellationToken cancellationToken = default)
Parameters
idGuidThe worker ID.
urlUriThe URL to fetch code from.
cancellationTokenCancellationTokenCancellation token.
Returns
Exceptions
- WorkerNotFoundException
Thrown if the worker is not found.
- ArgumentException
Thrown if the URL is not absolute or has an invalid scheme.
RecoverWorkersAsync(CancellationToken)
Recovers all workers from persistent state and resumes processing.
public Task RecoverWorkersAsync(CancellationToken cancellationToken = default)
Parameters
cancellationTokenCancellationTokenCancellation token.
Returns
Exceptions
- EngineNotFoundException
Thrown if an engine is missing for a recovered worker.
RegisterEngine(ContentType, string, Func<IEngine>)
Registers an engine to handle code execution for a specific content type.
public void RegisterEngine(ContentType contentType, string languageName, Func<IEngine> engineFactory)
Parameters
contentTypeContentTypeThe content type handled by the engine (e.g., "text/x-python").
languageNamestringThe programming language name (e.g., "Python").
engineFactoryFunc<IEngine>Factory function that creates new engine instances.
Exceptions
- InvalidOperationException
Thrown if an engine is already registered for the content type.
StartWorkerAsync(Guid, CancellationToken)
Starts a stopped worker, resuming topic subscriptions and processing.
public Task StartWorkerAsync(Guid id, CancellationToken cancellationToken = default)
Parameters
idGuidThe worker ID.
cancellationTokenCancellationTokenCancellation token.
Returns
Exceptions
- WorkerNotFoundException
Thrown if the worker is not found.
StopWorkerAsync(Guid, CancellationToken)
Stops a running worker, pausing topic subscriptions and processing.
public Task StopWorkerAsync(Guid id, CancellationToken cancellationToken = default)
Parameters
idGuidThe worker ID.
cancellationTokenCancellationTokenCancellation token.
Returns
Exceptions
- WorkerNotFoundException
Thrown if the worker is not found.