Skip to content

Python Client

The Python client library provides an async gRPC interface to the WorkManager service.

Installation

From Gitea PyPI Registry

Set up authentication with a Gitea personal access token (scope: read:packages):

export PIP_USERNAME=<your-gitea-username>
export PIP_PASSWORD=<your-gitea-token>

pip install virtufin-workmanager \
  --index-url https://pypi.haenerconsulting.com/api/packages/virtufin/pypi/simple/ \
  --extra-index-url https://pypi.org/simple

From Local Source

pip install grpcio grpcio-reflection
export PYTHONPATH=/path/to/virtufin-workmanager/src/python

Quick Start

import asyncio
from virtufin import WorkManagerClient

async def main():
    async with WorkManagerClient("localhost", 5102) as client:
        # Create a worker
        worker_id = await client.create_worker(
            code_source={"content": "def Process(event):\n    return {\"type\": \"com.example.output\", \"source\": \"test\", \"data\": event[\"data\"]}"},
            mime_type="text/x-python",
            topic="my-topic"
        )
        print(f"Created worker: {worker_id}")

        # List workers
        workers = await client.list_workers()
        for w in workers:
            print(f"  {w['id']}: {w['status']}")

        # Delete worker
        await client.delete_worker(worker_id)

asyncio.run(main())

WorkManagerClient

The main client class for interacting with the WorkManager service.

Constructor

def __init__(self, host: str = "localhost", port: int = 5002)
Parameter Type Default Description
host str "localhost" gRPC server hostname
port int 5102 gRPC server port

Context Manager

async with WorkManagerClient() as client:
    # client is connected
    pass
# Automatically closed

connect()

Manually establish connection.

await client.connect()

close()

Close the connection.

await client.close()

Worker Operations

create_worker

Creates a new worker.

worker_id: str = await client.create_worker(
    code_source: Dict[str, Any],
    mime_type: str,
    topic: str,
    group: str = ""
)
Parameter Type Description
code_source Dict Either {"url": "..."} or {"content": "base64_or_raw_string"}
mime_type str MIME type like "text/x-python"
topic str Pub/sub topic to subscribe to
group str Optional group for coordinated execution

Returns: Worker ID string.

Example:

# Create with inline code
worker_id = await client.create_worker(
    code_source={"content": "def Process(e): return {\"type\": \"output\", \"source\": \"w\", \"data\": e[\"data\"]}"},
    mime_type="text/x-python",
    topic="events"
)

# Create from URL
worker_id = await client.create_worker(
    code_source={"url": "https://example.com/worker.py"},
    mime_type="text/x-python",
    topic="events"
)

load_code_from_content

Update worker code with inline content.

await client.load_code_from_content(id: str, content: bytes)
Parameter Type Description
id str Worker ID
content bytes New code content

load_code_from_url

Update worker code from a URL.

await client.load_code_from_url(id: str, url: str)
Parameter Type Description
id str Worker ID
url str URL to fetch new code from

delete_worker

Delete a worker.

await client.delete_worker(id: str)
Parameter Type Description
id str Worker ID to delete

start_worker

Start a stopped worker.

await client.start_worker(id: str)
Parameter Type Description
id str Worker ID to start

stop_worker

Stop a running worker.

await client.stop_worker(id: str)
Parameter Type Description
id str Worker ID to stop

list_workers

List all workers.

workers: List[Dict[str, Any]] = await client.list_workers()

Returns: List of worker info dictionaries with keys: - id - Worker ID - code_source - {"url": ...} or {"content": ...} - mime_type - MIME type string - language - Language name - topic - Subscribed topic - group - Group name (or empty string) - created_at - ISO timestamp - status - "STOPPED" or "RUNNING"

get_worker_history

Get code change history for a worker.

history: List[Dict[str, Any]] = await client.get_worker_history(id: str)
Parameter Type Description
id str Worker ID

Returns: List of history entries with: - code_source - Code source at that point - created_at - ISO timestamp

recover_workers

Recover workers from persistent state.

count: int = await client.recover_workers()

Returns: Number of workers recovered.

create_worker_from_file

Convenience method to create a worker from a source file.

worker_id: Optional[str] = await client.create_worker_from_file(
    file_path: Path,
    mime_type: str = "text/x-python",
    topic: str = "time-worker-topic",
    group: str = None
)
Parameter Type Default Description
file_path Path - Path to worker source file
mime_type str "text/x-python" MIME type
topic str "time-worker-topic" Topic name
group str None Optional group

Via the API Gateway

The WorkManagerClient connects directly to the workmanager's gRPC port (5102 by default). Alternatively, you can invoke workmanager methods through the Virtufin API gateway using the gateway's dynamic-dispatch pattern:

from virtufin.api.client import ApiClient

async with ApiClient() as client:
    workmanager = client.gateway.workmanager

    # ListWorkers — no request fields
    workers = await workmanager.ListWorkers()
    for w in workers["workers"]:
        print(f"  {w['id']}: {w['status']} ({w['language']})")

    # CreateWorker — CodeSource oneof (url | content), mime_type, topic
    create_result = await workmanager.CreateWorker({
        "code_source": {"url": "https://example.com/worker.py"},
        "mime_type": "text/x-python",
        "topic": "worker-commands",
    })
    worker_id = create_result["id"]

    # StartWorker — id field
    await workmanager.StartWorker({"id": worker_id})

The gateway pattern is useful when: - The workmanager's gRPC port is not directly reachable (e.g., behind a service mesh, or only the gateway is exposed) - You want service discovery without configuring each backend's address explicitly - You're writing a tool that needs to talk to any of multiple backend services (workmanager, websocketmanager, custom) through one entry point

See the virtufin-api Python client docs for the full gateway client reference, and the proto-to-client-mapping spec §Layer 4 for the dynamic-dispatch call chain (gRPC reflection, JSON marshaling).

Complete Example

#!/usr/bin/env python3
"""Example: Complete worker lifecycle with the Python client."""

import asyncio
import base64
from pathlib import Path
from virtufin import WorkManagerClient

# Worker code as a string
WORKER_CODE = '''
from datetime import datetime

def Process(event):
    return {
        "type": "com.example.time-response",
        "source": "python-worker",
        "data": {
            "current_time": datetime.now().isoformat(),
            "received_event_id": event.get("id", "unknown")
        }
    }
'''

async def main():
    async with WorkManagerClient("localhost", 5102) as client:
        print("Creating worker...")
        worker_id = await client.create_worker(
            code_source={"content": WORKER_CODE},
            mime_type="text/x-python",
            topic="time-worker-topic"
        )
        print(f"Created: {worker_id}")

        print("\nListing workers...")
        workers = await client.list_workers()
        for w in workers:
            print(f"  {w['id']} - {w['status']} - topic={w['topic']}")

        print("\nUpdating worker code...")
        new_code = '''
from datetime import datetime

def Process(event):
    return {
        "type": "com.example.time-response-v2",
        "source": "python-worker-v2",
        "data": {
            "current_time": datetime.now().isoformat(),
            "message": "Updated worker!"
        }
    }
'''
        await client.load_code_from_content(worker_id, new_code.encode())
        print("Code updated")

        print("\nGetting history...")
        history = await client.get_worker_history(worker_id)
        for i, entry in enumerate(history):
            print(f"  Version {i}: {entry['created_at']}")

        print("\nStopping worker...")
        await client.stop_worker(worker_id)

        workers = await client.list_workers()
        for w in workers:
            if w['id'] == worker_id:
                print(f"  Status after stop: {w['status']}")

        print("\nStarting worker...")
        await client.start_worker(worker_id)

        print("\nDeleting worker...")
        await client.delete_worker(worker_id)

        workers = await client.list_workers()
        found = any(w['id'] == worker_id for w in workers)
        print(f"  Worker still listed: {found}")

if __name__ == "__main__":
    asyncio.run(main())

Error Handling

All methods raise grpc.aio.AioRpcError on connection or RPC errors:

try:
    await client.create_worker(...)
except grpc.aio.AioRpcError as e:
    print(f"gRPC error: {e.code()} - {e.details()}")

Common gRPC status codes: - StatusCode.NOT_FOUND - Worker doesn't exist - StatusCode.INVALID_ARGUMENT - Invalid parameters - StatusCode.UNAVAILABLE - Server not reachable

Direct gRPC Usage

If you need lower-level access, you can use grpcio-reflection directly. See examples/run_worker_grpc.py for a complete implementation using dynamic method invocation without generated stubs.

SDK Reference