Skip to content

Basic Patterns

Fundamental cancelation patterns for getting started.

All examples on this page are complete and runnable. Find the source in examples/01_basics/.

Timeout Cancelation

Cancel operations that exceed a time limit.

Simple Timeout

    # Example: Timeout-based cancelation
    cancel = None
    try:
        async with Cancelable.with_timeout(2.0, name="timeout_example") as cancel:
            cancel.on_progress(lambda op_id, msg, meta: print(f"  Progress: {msg}"))

            await cancel.report_progress("Starting operation")

            # This will timeout
            await anyio.sleep(5.0)

            await cancel.report_progress("This won't be reached")

    except anyio.get_cancelled_exc_class():
        if cancel:
            print(
                f"  Operation timed out after {(cancel.context.duration.total_seconds() if cancel.context.duration else 0.0):.2f}s"
            )
            print(f"  Final status: {cancel.context.status.value}")
            print(f"  Cancel reason: {cancel.context.cancel_reason.value if cancel.context.cancel_reason else 'unknown'}")

This example shows timeout-based cancelation with error handling.

Run it:

python examples/01_basics/02_timeout_cancelation.py

Timeout with Cleanup

async def with_cleanup():
    async with Cancelable.with_timeout(10.0) as cancel:
        try:
            await process_data()
        finally:
            # Shield cleanup from cancelation
            async with cancel.shield():
                await cleanup_resources()

Manual Cancelation

Cancel operations programmatically using tokens.

Basic Token Usage

Here's a complete example showing manual cancelation with tokens:

    # Example: Manual cancelation with token

    # Create a cancelation token
    token = CancelationToken()

    async def background_task() -> None:
        """Simulate a long-running task."""
        try:
            async with Cancelable.with_token(token, name="background_task") as cancel:
                cancel.on_progress(lambda op_id, msg, meta: print(f"  Progress: {msg}"))
                for i in range(10):
                    await cancel.report_progress(f"Step {i + 1}/10")
                    await anyio.sleep(0.5)
        except anyio.get_cancelled_exc_class():
            print("  Background task was cancelled!")

    # Run task and cancel after 2 seconds
    try:
        async with anyio.create_task_group() as tg:
            # Start background task
            tg.start_soon(background_task)

            # Cancel after 2 seconds
            await anyio.sleep(2.0)
            print("Cancelling task...")
            await token.cancel(message="User requested cancelation")
    except* anyio.get_cancelled_exc_class():
        # Handle the cancelation from task group
        print("  Task group cancelled due to operation cancelation")

Run it:

python examples/01_basics/03_manual_cancelation.py

Thread-Safe Cancelation

Cancel from synchronous code or threads:

import threading
import time
from hother.cancelable import CancelationToken

token = CancelationToken()

async def async_worker():
    async with Cancelable.with_token(token) as cancel:
        await long_async_operation()

def sync_canceller():
    """Runs in a separate thread."""
    time.sleep(5)
    # Thread-safe cancelation
    token.cancel_sync(message="Cancelled from thread")

# Start thread
thread = threading.Thread(target=sync_canceller)
thread.start()

# Run async work
await async_worker()
thread.join()

Signal Handling

Cancel on OS signals for graceful shutdown.

Graceful Shutdown

import signal
from hother.cancelable import Cancelable

async def main():
    async with Cancelable.with_signal(
        signal.SIGTERM,  # Graceful shutdown
        signal.SIGINT,   # Ctrl+C
        name="application"
    ) as cancel:
        try:
            print("Application running... (Ctrl+C to stop)")
            await run_server()
        finally:
            # Always run cleanup
            async with cancel.shield():
                print("Shutting down gracefully...")
                await save_state()
                await close_connections()
                print("Shutdown complete")

anyio.run(main)

Run it:

python examples/02_advanced/08_signal_handling.py
# Press Ctrl+C to trigger graceful shutdown

Platform Support

Signal handling works on Unix-like systems (Linux, macOS). Windows supports SIGINT (Ctrl+C) only.

Condition-Based Cancelation

Cancel when custom conditions are met.

Resource Monitoring

import shutil
from hother.cancelable import ConditionSource, Cancelable

def disk_full():
    """Check if disk usage exceeds 95%."""
    usage = shutil.disk_usage("/")
    return (usage.used / usage.total) > 0.95

async def main():
    # Check disk every 5 seconds
    async with Cancelable(
        sources=[ConditionSource(
            predicate=disk_full,
            check_interval=5.0,
            description="Disk space monitor"
        )],
        name="file_processor"
    ) as cancel:
        await process_large_files()

anyio.run(main)

Custom Business Logic

class JobController:
    def __init__(self):
        self.should_stop = False

    def check_stop_flag(self):
        return self.should_stop

controller = JobController()

async def run_job():
    async with Cancelable(
        sources=[ConditionSource(
            predicate=controller.check_stop_flag,
            check_interval=1.0
        )],
        name="background_job"
    ) as cancel:
        await process_job()

# Stop job from API endpoint
@app.post("/jobs/stop")
async def stop_job():
    controller.should_stop = True
    return {"status": "stopping"}

Run it:

python examples/02_advanced/07_condition_cancelation.py

Combined Cancelation

Compose multiple sources - cancels on FIRST trigger.

Timeout + Manual + Signal

    # Example: Multiple cancelation sources combined

    # Create multiple cancelation sources
    token = CancelationToken()
    print(f"Created manual token: {token.id}")

    # Create individual cancelables with logging
    timeout_cancellable = Cancelable.with_timeout(10.0)
    print(f"Created timeout cancelable: {timeout_cancellable.context.id} with token {timeout_cancellable.token.id}")

    token_cancellable = Cancelable.with_token(token)
    print(f"Created token cancelable: {token_cancellable.context.id} with token {token_cancellable.token.id}")

    signal_cancellable = Cancelable.with_signal(signal.SIGINT)
    print(f"Created signal cancelable: {signal_cancellable.context.id} with token {signal_cancellable.token.id}")

    # Combine them step by step with logging
    print("=== COMBINING STEP 1: timeout + token ===")
    first_combine = timeout_cancellable.combine(token_cancellable)
    print(f"First combine result: {first_combine.context.id} with token {first_combine.token.id}")

    print("=== COMBINING STEP 2: (timeout+token) + signal ===")
    final_cancellable = first_combine.combine(signal_cancellable)
    print(f"Final combine result: {final_cancellable.context.id} with token {final_cancellable.token.id}")

    print(f"Final combined cancelable: {final_cancellable.context.id}")
    print(f"Final combined cancelable token: {final_cancellable.token.id}")

    final_cancellable.on_cancel(
        lambda ctx: print(
            f"  Cancelled: {ctx.cancel_reason.value if ctx.cancel_reason else 'unknown'} - {ctx.cancel_message or 'no message'}"
        )
    )

    print("  Press Ctrl+C to cancel, or wait for timeout/manual cancel...")
    try:
        async with final_cancellable:
            # Simulate work
            for i in range(20):
                await asyncio.sleep(SLEEP_DURATION)
                print(f"  Working... {i + 1}/20")

                # Manual cancel after 3 seconds
                if i == 6 * SLEEP_DURATION:
                    print("  Triggering manual cancelation...")
                    print(f"About to cancel token: {token.id}")
                    await token.cancel(message="Demonstration cancel")
                    print("Token cancel call completed")
    except asyncio.CancelledError:
        print("  Operation was cancelled")
        print(
            f"  Reason: {final_cancellable.context.cancel_reason.value if final_cancellable.context.cancel_reason else 'unknown'}"
        )
        print(f"  Message: {final_cancellable.context.cancel_message or 'no message'}")

Run it:

python examples/02_advanced/01_combined_cancelation.py

All Sources Combined

from hother.cancelable import TimeoutSource, SignalSource, ConditionSource

async def comprehensive_example():
    token = CancelationToken()

    async with Cancelable.combine([
        TimeoutSource(600.0),                    # 10 min timeout
        SignalSource(signal.SIGTERM, signal.SIGINT),
        token,                                    # Manual
        ConditionSource(disk_full, 5.0)          # Disk space
    ], name="robust_operation") as cancel:
        # Cancels on FIRST trigger from ANY source
        await operation()

Hierarchical Operations

Parent-child relationships with automatic propagation.

Basic Hierarchy

async def parent_task():
    async with Cancelable(name="parent") as parent:
        print("Parent started")

        # Child 1
        async with Cancelable(name="child_1", parent=parent) as child1:
            print("Child 1 working")
            await anyio.sleep(1)

        # Child 2
        async with Cancelable(name="child_2", parent=parent) as child2:
            print("Child 2 working")
            await anyio.sleep(1)

        # If parent cancels, ALL children auto-cancel!
        print("Parent complete")

Timeout Propagation

# Parent has 10s timeout
async with Cancelable.with_timeout(10.0, name="parent") as parent:

    # Children inherit cancelation (but can have their own)
    async with Cancelable(name="quick_task", parent=parent) as child1:
        await quick_operation()  # Must finish before parent timeout

    async with Cancelable(name="slow_task", parent=parent) as child2:
        await slow_operation()  # Cancelled if parent times out

Decorated Functions

Apply cancelation to functions with decorators.

Complete Example

The @cancelable decorator makes it easy to add cancelation to any function:

    # Example: Using decorators

    try:
        # This will complete
        result = await slow_operation(1.5)
        print(f"  Result: {result}")

        # This will timeout
        result = await slow_operation(3.0)
        print(f"  Result: {result}")

    except anyio.get_cancelled_exc_class():
        print("  Operation was cancelled!")

Run it:

python examples/01_basics/04_decorated_functions.py

Best Practices

✅ Do

  • Name your operations for better debugging
  • Handle cancelation exceptions gracefully
  • Use shields for critical cleanup
  • Combine sources for robust cancelation
  • Start simple - timeout only, then add more

❌ Don't

  • Don't ignore cancelation - it indicates important state
  • Don't shield entire operations - only cleanup sections
  • Don't use very short check intervals (< 0.1s) for ConditionSource
  • Don't forget platform limits for signal handling

Common Patterns

API Call with Timeout

async def fetch_data(url: str):
    async with Cancelable.with_timeout(30.0, name=f"fetch_{url}") as cancel:
        response = await http_client.get(url)
        return response.json()

Background Task with Cancel Button

# Global token for UI cancel button
job_token = CancelationToken()

@app.post("/cancel")
async def cancel_job():
    await job_token.cancel("User clicked cancel")

async def background_job():
    async with Cancelable.with_token(job_token) as cancel:
        await process_job()

CLI Tool with Ctrl+C

import signal

async def cli_tool():
    async with Cancelable.with_signal(signal.SIGINT) as cancel:
        try:
            await process_files()
        finally:
            async with cancel.shield():
                await save_progress()

Next Steps