Skip to content

Operation Registry

The global registry tracks all active cancelable operations for monitoring and debugging.

Overview

The OperationRegistry is a singleton that automatically tracks all Cancelable operations. It provides visibility into:

  • Currently active operations
  • Operation status and progress
  • Hierarchical relationships
  • Historical operation data

Accessing the Registry

from hother.cancelable import OperationRegistry

# Get singleton instance
registry = OperationRegistry.get_instance()

Querying Operations

Get All Active Operations

from hother.cancelable import OperationRegistry

registry = OperationRegistry.get_instance()

# Get all currently running operations
active_ops = registry.get_active_operations()

for op in active_ops:
    print(f"Operation: {op.name}")
    print(f"  ID: {op.id}")
    print(f"  Status: {op.status}")
    print(f"  Elapsed: {op.elapsed_time:.2f}s")
    print(f"  Parent: {op.parent_id}")

Get Operation by ID

op_id = "operation-123"
operation = registry.get_operation(op_id)

if operation:
    print(f"Found: {operation.name}")
    print(f"Status: {operation.status}")

Get Operation by Name

operation = registry.get_operation_by_name("data_processor")

if operation:
    print(f"ID: {operation.id}")
    print(f"Running for: {operation.elapsed_time:.1f}s")

Get Child Operations

# Get all children of a parent operation
parent_op = registry.get_operation_by_name("parent_task")
children = registry.get_children(parent_op.id)

for child in children:
    print(f"Child: {child.name} ({child.status})")

Operation Information

Each operation in the registry contains:

class OperationContext:
    id: str                    # Unique operation ID
    name: str                  # Human-readable name
    status: OperationStatus    # PENDING, RUNNING, CANCELLED, COMPLETED
    start_time: float          # Timestamp when started
    end_time: Optional[float]  # Timestamp when finished
    parent_id: Optional[str]   # Parent operation ID
    cancel_reason: Optional[CancelationReason]  # Why cancelled
    cancel_message: str        # Cancelation message
    progress_callbacks: List   # Registered progress handlers

Status Values

from hother.cancelable import OperationStatus

# Possible statuses
OperationStatus.PENDING    # Created but not started
OperationStatus.RUNNING    # Currently executing
OperationStatus.CANCELLED  # Cancelled by source
OperationStatus.COMPLETED  # Finished successfully
OperationStatus.FAILED     # Finished with error

Cancelation Reasons

from hother.cancelable import CancelationReason

# Why operations cancel
CancelationReason.TIMEOUT     # TimeoutSource triggered
CancelationReason.MANUAL      # TokenSource triggered
CancelationReason.SIGNAL      # SignalSource triggered
CancelationReason.CONDITION   # ConditionSource triggered

Monitoring Dashboard Example

Build a real-time monitoring dashboard:

import anyio
from hother.cancelable import OperationRegistry, Cancelable

async def monitor_operations():
    """Print operation status every second."""
    registry = OperationRegistry.get_instance()

    while True:
        active = registry.get_active_operations()

        print("\n" + "="*60)
        print(f"Active Operations: {len(active)}")
        print("="*60)

        for op in active:
            elapsed = op.elapsed_time
            print(f"📊 {op.name}")
            print(f"   ID: {op.id[:8]}...")
            print(f"   Status: {op.status}")
            print(f"   Elapsed: {elapsed:.1f}s")

            if op.parent_id:
                parent = registry.get_operation(op.parent_id)
                if parent:
                    print(f"   Parent: {parent.name}")

        await anyio.sleep(1.0)

async def main():
    # Start monitoring in background
    async with anyio.create_task_group() as tg:
        tg.start_soon(monitor_operations)

        # Run some operations
        async with Cancelable(name="task_1") as cancel:
            await anyio.sleep(5)

anyio.run(main)

Operation Lifecycle

Understanding operation registration and cleanup:

stateDiagram-v2
    [*] --> Registered: Cancelable created
    Registered --> Running: __aenter__
    Running --> Completed: operation succeeds
    Running --> Cancelled: source triggers
    Running --> Failed: exception raised
    Completed --> Unregistered
    Cancelled --> Unregistered
    Failed --> Unregistered
    Unregistered --> [*]

Automatic Registration

Operations are automatically registered when created:

# Auto-registered (default)
async with Cancelable(name="auto_registered") as cancel:
    await operation()
    # Automatically unregistered after context exits

# Disable auto-registration (not recommended)
async with Cancelable(name="manual", auto_register=False) as cancel:
    await operation()
    # NOT in registry

Manual Cleanup

The registry automatically cleans up completed operations, but you can manually clear if needed:

registry = OperationRegistry.get_instance()

# Clear all operations (use with caution!)
registry.clear()

# In tests, use clean registry
registry.clear()
# ... run test ...
registry.clear()  # Cleanup after

Hierarchical Tracking

The registry tracks parent-child relationships:

registry = OperationRegistry.get_instance()

async with Cancelable(name="parent") as parent:
    parent_id = parent.context.id

    async with Cancelable(name="child_1", parent=parent) as child1:
        # Query hierarchy
        children = registry.get_children(parent_id)
        assert len(children) == 1
        assert children[0].name == "child_1"

    async with Cancelable(name="child_2", parent=parent) as child2:
        children = registry.get_children(parent_id)
        assert len(children) == 1  # child_1 already completed

Examples

Export Metrics

def export_operation_metrics():
    """Export metrics for Prometheus/Datadog."""
    registry = OperationRegistry.get_instance()
    active = registry.get_active_operations()

    metrics = {
        "active_operations": len(active),
        "operations_by_status": {
            "running": sum(1 for op in active if op.status == OperationStatus.RUNNING),
            "pending": sum(1 for op in active if op.status == OperationStatus.PENDING),
        },
        "longest_running": max(
            (op.elapsed_time for op in active),
            default=0
        )
    }

    return metrics

Health Check Endpoint

from fastapi import FastAPI
from hother.cancelable import OperationRegistry

app = FastAPI()

@app.get("/health/operations")
async def operation_health():
    """Health check showing active operations."""
    registry = OperationRegistry.get_instance()
    active = registry.get_active_operations()

    return {
        "status": "healthy",
        "active_count": len(active),
        "operations": [
            {
                "name": op.name,
                "status": op.status.value,
                "elapsed": round(op.elapsed_time, 2)
            }
            for op in active
        ]
    }

Alert on Long-Running Operations

import anyio

async def monitor_long_running():
    """Alert if operations run too long."""
    registry = OperationRegistry.get_instance()

    while True:
        active = registry.get_active_operations()

        for op in active:
            if op.elapsed_time > 300:  # 5 minutes
                await send_alert(
                    f"Long-running operation: {op.name} "
                    f"({op.elapsed_time:.0f}s)"
                )

        await anyio.sleep(60)  # Check every minute

Common Patterns

Find Stuck Operations

def find_stuck_operations(threshold_seconds: float = 300):
    """Find operations running longer than threshold."""
    registry = OperationRegistry.get_instance()
    active = registry.get_active_operations()

    stuck = [
        op for op in active
        if op.elapsed_time > threshold_seconds
    ]

    return stuck

Operation Tree Visualization

def print_operation_tree():
    """Print hierarchical operation tree."""
    registry = OperationRegistry.get_instance()
    active = registry.get_active_operations()

    # Find root operations (no parent)
    roots = [op for op in active if not op.parent_id]

    def print_node(op, indent=0):
        prefix = "  " * indent
        print(f"{prefix}├─ {op.name} ({op.status}, {op.elapsed_time:.1f}s)")

        children = registry.get_children(op.id)
        for child in children:
            print_node(child, indent + 1)

    for root in roots:
        print_node(root)

Progress Aggregation

def aggregate_progress():
    """Calculate overall progress across all operations."""
    registry = OperationRegistry.get_instance()
    active = registry.get_active_operations()

    total_weight = len(active)
    completed_weight = sum(
        1 for op in active
        if op.status == OperationStatus.COMPLETED
    )

    if total_weight == 0:
        return 100.0

    return (completed_weight / total_weight) * 100

Maintenance

cleanup_completed()

Remove completed operations from registry to prevent memory growth:

from hother.cancelable import OperationRegistry

registry = OperationRegistry.get_instance()

# Remove operations completed more than 1 hour ago
await registry.cleanup_completed(max_age_seconds=3600)

# Or cleanup all completed
await registry.cleanup_completed()

Use cases: - Long-running services - Preventing memory leaks - Periodic maintenance tasks

Historical Queries

get_history()

Retrieve historical operations including completed and cancelled:

# Get last 100 operations
history = await registry.get_history(limit=100)

for op in history:
    print(f"{op.name}: {op.status} - {op.elapsed_time}s")

# Filter by status
completed = await registry.get_history(
    status=OperationStatus.COMPLETED,
    limit=50
)

Parameters: - limit - Maximum operations to return - status - Filter by status (COMPLETED, CANCELLED, etc.) - since - Operations after timestamp

Bulk Cancelation

cancel_all() with Filters

Bulk cancelation with pattern matching:

# Cancel all operations for a user
await registry.cancel_all(
    name_pattern="user_123_*",
    reason=CancelationReason.MANUAL
)

# Cancel all long-running operations
await registry.cancel_all(
    min_age_seconds=300,  # Older than 5 minutes
    reason=CancelationReason.TIMEOUT
)

# Cancel operations matching complex criteria
await registry.cancel_all(
    filter_func=lambda op: op.context.metadata.get('priority') == 'low',
    reason=CancelationReason.MANUAL
)

Parameters: - name_pattern - Glob pattern for operation names - min_age_seconds - Minimum age in seconds - max_age_seconds - Maximum age in seconds - filter_func - Custom filter function - reason - Cancelation reason to record