Basic Patterns
Fundamental cancelation patterns for getting started.
All examples on this page are complete and runnable. Find the source in examples/01_basics/.
Timeout Cancelation
Cancel operations that exceed a time limit.
Simple Timeout
# Example: Timeout-based cancelation
cancel = None
try:
async with Cancelable.with_timeout(2.0, name="timeout_example") as cancel:
cancel.on_progress(lambda op_id, msg, meta: print(f" Progress: {msg}"))
await cancel.report_progress("Starting operation")
# This will timeout
await anyio.sleep(5.0)
await cancel.report_progress("This won't be reached")
except anyio.get_cancelled_exc_class():
if cancel:
print(
f" Operation timed out after {(cancel.context.duration.total_seconds() if cancel.context.duration else 0.0):.2f}s"
)
print(f" Final status: {cancel.context.status.value}")
print(f" Cancel reason: {cancel.context.cancel_reason.value if cancel.context.cancel_reason else 'unknown'}")
This example shows timeout-based cancelation with error handling.
Run it:
Timeout with Cleanup
async def with_cleanup():
async with Cancelable.with_timeout(10.0) as cancel:
try:
await process_data()
finally:
# Shield cleanup from cancelation
async with cancel.shield():
await cleanup_resources()
Manual Cancelation
Cancel operations programmatically using tokens.
Basic Token Usage
Here's a complete example showing manual cancelation with tokens:
# Example: Manual cancelation with token
# Create a cancelation token
token = CancelationToken()
async def background_task() -> None:
"""Simulate a long-running task."""
try:
async with Cancelable.with_token(token, name="background_task") as cancel:
cancel.on_progress(lambda op_id, msg, meta: print(f" Progress: {msg}"))
for i in range(10):
await cancel.report_progress(f"Step {i + 1}/10")
await anyio.sleep(0.5)
except anyio.get_cancelled_exc_class():
print(" Background task was cancelled!")
# Run task and cancel after 2 seconds
try:
async with anyio.create_task_group() as tg:
# Start background task
tg.start_soon(background_task)
# Cancel after 2 seconds
await anyio.sleep(2.0)
print("Cancelling task...")
await token.cancel(message="User requested cancelation")
except* anyio.get_cancelled_exc_class():
# Handle the cancelation from task group
print(" Task group cancelled due to operation cancelation")
Run it:
Thread-Safe Cancelation
Cancel from synchronous code or threads:
import threading
import time
from hother.cancelable import CancelationToken
token = CancelationToken()
async def async_worker():
async with Cancelable.with_token(token) as cancel:
await long_async_operation()
def sync_canceller():
"""Runs in a separate thread."""
time.sleep(5)
# Thread-safe cancelation
token.cancel_sync(message="Cancelled from thread")
# Start thread
thread = threading.Thread(target=sync_canceller)
thread.start()
# Run async work
await async_worker()
thread.join()
Signal Handling
Cancel on OS signals for graceful shutdown.
Graceful Shutdown
import signal
from hother.cancelable import Cancelable
async def main():
async with Cancelable.with_signal(
signal.SIGTERM, # Graceful shutdown
signal.SIGINT, # Ctrl+C
name="application"
) as cancel:
try:
print("Application running... (Ctrl+C to stop)")
await run_server()
finally:
# Always run cleanup
async with cancel.shield():
print("Shutting down gracefully...")
await save_state()
await close_connections()
print("Shutdown complete")
anyio.run(main)
Run it:
Platform Support
Signal handling works on Unix-like systems (Linux, macOS). Windows supports SIGINT (Ctrl+C) only.
Condition-Based Cancelation
Cancel when custom conditions are met.
Resource Monitoring
import shutil
from hother.cancelable import ConditionSource, Cancelable
def disk_full():
"""Check if disk usage exceeds 95%."""
usage = shutil.disk_usage("/")
return (usage.used / usage.total) > 0.95
async def main():
# Check disk every 5 seconds
async with Cancelable(
sources=[ConditionSource(
predicate=disk_full,
check_interval=5.0,
description="Disk space monitor"
)],
name="file_processor"
) as cancel:
await process_large_files()
anyio.run(main)
Custom Business Logic
class JobController:
def __init__(self):
self.should_stop = False
def check_stop_flag(self):
return self.should_stop
controller = JobController()
async def run_job():
async with Cancelable(
sources=[ConditionSource(
predicate=controller.check_stop_flag,
check_interval=1.0
)],
name="background_job"
) as cancel:
await process_job()
# Stop job from API endpoint
@app.post("/jobs/stop")
async def stop_job():
controller.should_stop = True
return {"status": "stopping"}
Run it:
Combined Cancelation
Compose multiple sources - cancels on FIRST trigger.
Timeout + Manual + Signal
# Example: Multiple cancelation sources combined
# Create multiple cancelation sources
token = CancelationToken()
print(f"Created manual token: {token.id}")
# Create individual cancelables with logging
timeout_cancellable = Cancelable.with_timeout(10.0)
print(f"Created timeout cancelable: {timeout_cancellable.context.id} with token {timeout_cancellable.token.id}")
token_cancellable = Cancelable.with_token(token)
print(f"Created token cancelable: {token_cancellable.context.id} with token {token_cancellable.token.id}")
signal_cancellable = Cancelable.with_signal(signal.SIGINT)
print(f"Created signal cancelable: {signal_cancellable.context.id} with token {signal_cancellable.token.id}")
# Combine them step by step with logging
print("=== COMBINING STEP 1: timeout + token ===")
first_combine = timeout_cancellable.combine(token_cancellable)
print(f"First combine result: {first_combine.context.id} with token {first_combine.token.id}")
print("=== COMBINING STEP 2: (timeout+token) + signal ===")
final_cancellable = first_combine.combine(signal_cancellable)
print(f"Final combine result: {final_cancellable.context.id} with token {final_cancellable.token.id}")
print(f"Final combined cancelable: {final_cancellable.context.id}")
print(f"Final combined cancelable token: {final_cancellable.token.id}")
final_cancellable.on_cancel(
lambda ctx: print(
f" Cancelled: {ctx.cancel_reason.value if ctx.cancel_reason else 'unknown'} - {ctx.cancel_message or 'no message'}"
)
)
print(" Press Ctrl+C to cancel, or wait for timeout/manual cancel...")
try:
async with final_cancellable:
# Simulate work
for i in range(20):
await asyncio.sleep(SLEEP_DURATION)
print(f" Working... {i + 1}/20")
# Manual cancel after 3 seconds
if i == 6 * SLEEP_DURATION:
print(" Triggering manual cancelation...")
print(f"About to cancel token: {token.id}")
await token.cancel(message="Demonstration cancel")
print("Token cancel call completed")
except asyncio.CancelledError:
print(" Operation was cancelled")
print(
f" Reason: {final_cancellable.context.cancel_reason.value if final_cancellable.context.cancel_reason else 'unknown'}"
)
print(f" Message: {final_cancellable.context.cancel_message or 'no message'}")
Run it:
All Sources Combined
from hother.cancelable import TimeoutSource, SignalSource, ConditionSource
async def comprehensive_example():
token = CancelationToken()
async with Cancelable.combine([
TimeoutSource(600.0), # 10 min timeout
SignalSource(signal.SIGTERM, signal.SIGINT),
token, # Manual
ConditionSource(disk_full, 5.0) # Disk space
], name="robust_operation") as cancel:
# Cancels on FIRST trigger from ANY source
await operation()
Hierarchical Operations
Parent-child relationships with automatic propagation.
Basic Hierarchy
async def parent_task():
async with Cancelable(name="parent") as parent:
print("Parent started")
# Child 1
async with Cancelable(name="child_1", parent=parent) as child1:
print("Child 1 working")
await anyio.sleep(1)
# Child 2
async with Cancelable(name="child_2", parent=parent) as child2:
print("Child 2 working")
await anyio.sleep(1)
# If parent cancels, ALL children auto-cancel!
print("Parent complete")
Timeout Propagation
# Parent has 10s timeout
async with Cancelable.with_timeout(10.0, name="parent") as parent:
# Children inherit cancelation (but can have their own)
async with Cancelable(name="quick_task", parent=parent) as child1:
await quick_operation() # Must finish before parent timeout
async with Cancelable(name="slow_task", parent=parent) as child2:
await slow_operation() # Cancelled if parent times out
Decorated Functions
Apply cancelation to functions with decorators.
Complete Example
The @cancelable decorator makes it easy to add cancelation to any function:
# Example: Using decorators
try:
# This will complete
result = await slow_operation(1.5)
print(f" Result: {result}")
# This will timeout
result = await slow_operation(3.0)
print(f" Result: {result}")
except anyio.get_cancelled_exc_class():
print(" Operation was cancelled!")
Run it:
Best Practices
✅ Do
- Name your operations for better debugging
- Handle cancelation exceptions gracefully
- Use shields for critical cleanup
- Combine sources for robust cancelation
- Start simple - timeout only, then add more
❌ Don't
- Don't ignore cancelation - it indicates important state
- Don't shield entire operations - only cleanup sections
- Don't use very short check intervals (< 0.1s) for ConditionSource
- Don't forget platform limits for signal handling
Common Patterns
API Call with Timeout
async def fetch_data(url: str):
async with Cancelable.with_timeout(30.0, name=f"fetch_{url}") as cancel:
response = await http_client.get(url)
return response.json()
Background Task with Cancel Button
# Global token for UI cancel button
job_token = CancelationToken()
@app.post("/cancel")
async def cancel_job():
await job_token.cancel("User clicked cancel")
async def background_job():
async with Cancelable.with_token(job_token) as cancel:
await process_job()
CLI Tool with Ctrl+C
import signal
async def cli_tool():
async with Cancelable.with_signal(signal.SIGINT) as cancel:
try:
await process_files()
finally:
async with cancel.shield():
await save_progress()
Next Steps
- Explore Stream Processing - Handle async streams
- Learn Web Applications - FastAPI integration
- Read Core Concepts - Understand the fundamentals
- Try Advanced Patterns - Production patterns