Python Client
The Python client library provides an async gRPC interface to the WorkManager service.
Installation
From Gitea PyPI Registry
Set up authentication with a Gitea personal access token (scope: read:packages):
export PIP_USERNAME=<your-gitea-username>
export PIP_PASSWORD=<your-gitea-token>
pip install virtufin-workmanager \
--index-url https://pypi.haenerconsulting.com/api/packages/virtufin/pypi/simple/ \
--extra-index-url https://pypi.org/simple
From Local Source
pip install grpcio grpcio-reflection
export PYTHONPATH=/path/to/virtufin-workmanager/src/python
Quick Start
import asyncio
from virtufin import WorkManagerClient
async def main():
async with WorkManagerClient("localhost", 5102) as client:
# Create a worker
worker_id = await client.create_worker(
code_source={"content": "def Process(event):\n return {\"type\": \"com.example.output\", \"source\": \"test\", \"data\": event[\"data\"]}"},
mime_type="text/x-python",
topic="my-topic"
)
print(f"Created worker: {worker_id}")
# List workers
workers = await client.list_workers()
for w in workers:
print(f" {w['id']}: {w['status']}")
# Delete worker
await client.delete_worker(worker_id)
asyncio.run(main())
WorkManagerClient
The main client class for interacting with the WorkManager service.
Constructor
def __init__(self, host: str = "localhost", port: int = 5002)
| Parameter | Type | Default | Description |
|---|---|---|---|
host |
str |
"localhost" |
gRPC server hostname |
port |
int |
5102 |
gRPC server port |
Context Manager
async with WorkManagerClient() as client:
# client is connected
pass
# Automatically closed
connect()
Manually establish connection.
await client.connect()
close()
Close the connection.
await client.close()
Worker Operations
create_worker
Creates a new worker.
worker_id: str = await client.create_worker(
code_source: Dict[str, Any],
mime_type: str,
topic: str,
group: str = ""
)
| Parameter | Type | Description |
|---|---|---|
code_source |
Dict |
Either {"url": "..."} or {"content": "base64_or_raw_string"} |
mime_type |
str |
MIME type like "text/x-python" |
topic |
str |
Pub/sub topic to subscribe to |
group |
str |
Optional group for coordinated execution |
Returns: Worker ID string.
Example:
# Create with inline code
worker_id = await client.create_worker(
code_source={"content": "def Process(e): return {\"type\": \"output\", \"source\": \"w\", \"data\": e[\"data\"]}"},
mime_type="text/x-python",
topic="events"
)
# Create from URL
worker_id = await client.create_worker(
code_source={"url": "https://example.com/worker.py"},
mime_type="text/x-python",
topic="events"
)
load_code_from_content
Update worker code with inline content.
await client.load_code_from_content(id: str, content: bytes)
| Parameter | Type | Description |
|---|---|---|
id |
str |
Worker ID |
content |
bytes |
New code content |
load_code_from_url
Update worker code from a URL.
await client.load_code_from_url(id: str, url: str)
| Parameter | Type | Description |
|---|---|---|
id |
str |
Worker ID |
url |
str |
URL to fetch new code from |
delete_worker
Delete a worker.
await client.delete_worker(id: str)
| Parameter | Type | Description |
|---|---|---|
id |
str |
Worker ID to delete |
start_worker
Start a stopped worker.
await client.start_worker(id: str)
| Parameter | Type | Description |
|---|---|---|
id |
str |
Worker ID to start |
stop_worker
Stop a running worker.
await client.stop_worker(id: str)
| Parameter | Type | Description |
|---|---|---|
id |
str |
Worker ID to stop |
list_workers
List all workers.
workers: List[Dict[str, Any]] = await client.list_workers()
Returns: List of worker info dictionaries with keys:
- id - Worker ID
- code_source - {"url": ...} or {"content": ...}
- mime_type - MIME type string
- language - Language name
- topic - Subscribed topic
- group - Group name (or empty string)
- created_at - ISO timestamp
- status - "STOPPED" or "RUNNING"
get_worker_history
Get code change history for a worker.
history: List[Dict[str, Any]] = await client.get_worker_history(id: str)
| Parameter | Type | Description |
|---|---|---|
id |
str |
Worker ID |
Returns: List of history entries with:
- code_source - Code source at that point
- created_at - ISO timestamp
recover_workers
Recover workers from persistent state.
count: int = await client.recover_workers()
Returns: Number of workers recovered.
create_worker_from_file
Convenience method to create a worker from a source file.
worker_id: Optional[str] = await client.create_worker_from_file(
file_path: Path,
mime_type: str = "text/x-python",
topic: str = "time-worker-topic",
group: str = None
)
| Parameter | Type | Default | Description |
|---|---|---|---|
file_path |
Path |
- | Path to worker source file |
mime_type |
str |
"text/x-python" |
MIME type |
topic |
str |
"time-worker-topic" |
Topic name |
group |
str |
None |
Optional group |
Via the API Gateway
The WorkManagerClient connects directly to the workmanager's gRPC port
(5102 by default). Alternatively, you can invoke workmanager methods
through the Virtufin API gateway
using the gateway's dynamic-dispatch pattern:
from virtufin.api.client import ApiClient
async with ApiClient() as client:
workmanager = client.gateway.workmanager
# ListWorkers — no request fields
workers = await workmanager.ListWorkers()
for w in workers["workers"]:
print(f" {w['id']}: {w['status']} ({w['language']})")
# CreateWorker — CodeSource oneof (url | content), mime_type, topic
create_result = await workmanager.CreateWorker({
"code_source": {"url": "https://example.com/worker.py"},
"mime_type": "text/x-python",
"topic": "worker-commands",
})
worker_id = create_result["id"]
# StartWorker — id field
await workmanager.StartWorker({"id": worker_id})
The gateway pattern is useful when: - The workmanager's gRPC port is not directly reachable (e.g., behind a service mesh, or only the gateway is exposed) - You want service discovery without configuring each backend's address explicitly - You're writing a tool that needs to talk to any of multiple backend services (workmanager, websocketmanager, custom) through one entry point
See the virtufin-api Python client docs for the full gateway client reference, and the proto-to-client-mapping spec §Layer 4 for the dynamic-dispatch call chain (gRPC reflection, JSON marshaling).
Complete Example
#!/usr/bin/env python3
"""Example: Complete worker lifecycle with the Python client."""
import asyncio
import base64
from pathlib import Path
from virtufin import WorkManagerClient
# Worker code as a string
WORKER_CODE = '''
from datetime import datetime
def Process(event):
return {
"type": "com.example.time-response",
"source": "python-worker",
"data": {
"current_time": datetime.now().isoformat(),
"received_event_id": event.get("id", "unknown")
}
}
'''
async def main():
async with WorkManagerClient("localhost", 5102) as client:
print("Creating worker...")
worker_id = await client.create_worker(
code_source={"content": WORKER_CODE},
mime_type="text/x-python",
topic="time-worker-topic"
)
print(f"Created: {worker_id}")
print("\nListing workers...")
workers = await client.list_workers()
for w in workers:
print(f" {w['id']} - {w['status']} - topic={w['topic']}")
print("\nUpdating worker code...")
new_code = '''
from datetime import datetime
def Process(event):
return {
"type": "com.example.time-response-v2",
"source": "python-worker-v2",
"data": {
"current_time": datetime.now().isoformat(),
"message": "Updated worker!"
}
}
'''
await client.load_code_from_content(worker_id, new_code.encode())
print("Code updated")
print("\nGetting history...")
history = await client.get_worker_history(worker_id)
for i, entry in enumerate(history):
print(f" Version {i}: {entry['created_at']}")
print("\nStopping worker...")
await client.stop_worker(worker_id)
workers = await client.list_workers()
for w in workers:
if w['id'] == worker_id:
print(f" Status after stop: {w['status']}")
print("\nStarting worker...")
await client.start_worker(worker_id)
print("\nDeleting worker...")
await client.delete_worker(worker_id)
workers = await client.list_workers()
found = any(w['id'] == worker_id for w in workers)
print(f" Worker still listed: {found}")
if __name__ == "__main__":
asyncio.run(main())
Error Handling
All methods raise grpc.aio.AioRpcError on connection or RPC errors:
try:
await client.create_worker(...)
except grpc.aio.AioRpcError as e:
print(f"gRPC error: {e.code()} - {e.details()}")
Common gRPC status codes:
- StatusCode.NOT_FOUND - Worker doesn't exist
- StatusCode.INVALID_ARGUMENT - Invalid parameters
- StatusCode.UNAVAILABLE - Server not reachable
Direct gRPC Usage
If you need lower-level access, you can use grpcio-reflection directly. See examples/run_worker_grpc.py for a complete implementation using dynamic method invocation without generated stubs.