Common Patterns and Best Practices
Here is a collection of common patterns that leverage Cancelable.
Graceful Shutdown
Handle application shutdown gracefully:
import signal
from hother.cancelable import Cancelable
async def main():
# Handle SIGINT and SIGTERM
async with Cancelable.with_signal(signal.SIGINT, signal.SIGTERM) as cancel:
cancel.on_cancel(lambda ctx: print("Shutting down gracefully..."))
# Your application logic
await run_application()
Or using the decorator:
import signal
from hother.cancelable import cancelable_with_signal
@cancelable_with_signal(signal.SIGINT, signal.SIGTERM)
async def main(cancelable=None):
"""Application with graceful shutdown."""
cancelable.on_cancel(lambda ctx: print("Shutting down gracefully..."))
await run_application()
Resource Cleanup
Ensure resources are cleaned up even on cancelation:
async def process_with_cleanup():
resource = None
async with Cancelable.with_timeout(30) as cancel:
try:
# Acquire resource
resource = await acquire_resource()
# Process
result = await process(resource)
return result
finally:
# Shield cleanup from cancelation
if resource:
async with cancel.shield():
await resource.cleanup()
Batch Processing with Progress
Process data in batches with progress reporting:
async def process_large_dataset(data: List[Any], batch_size: int = 100):
async with Cancelable(name="batch_processing") as cancel:
cancel.on_progress(lambda op_id, msg, meta: logger.info(msg, **meta))
total = len(data)
processed = 0
for i in range(0, total, batch_size):
batch = data[i:i + batch_size]
# Process batch
await process_batch(batch)
processed += len(batch)
# Report progress
progress = (processed / total) * 100
await cancel.report_progress(
f"Processed {processed}/{total} items",
{"progress_percent": progress, "batch_number": i // batch_size + 1}
)
Retry with Cancelation
Implement retry logic with cancelation support:
async def retry_with_cancelation(
operation: Callable,
max_retries: int = 3,
delay: float = 1.0,
backoff: float = 2.0,
):
async with Cancelable(name="retry_operation") as cancel:
last_error = None
# Wrap operation to automatically check cancelation
wrapped_op = cancel.wrap(operation)
for attempt in range(max_retries):
try:
# Cancelation checked automatically by wrap()
result = await wrapped_op()
return result
except Exception as e:
last_error = e
if attempt < max_retries - 1:
await cancel.report_progress(
f"Attempt {attempt + 1} failed, retrying...",
{"error": str(e)}
)
# Wait with exponential backoff
await anyio.sleep(delay)
delay *= backoff
raise last_error
Concurrent Operations with Shared Cancelation
Run multiple operations with shared cancelation:
async def parallel_operations():
async with Cancelable(name="parallel_work") as cancel:
# Wrap process_item to automatically check cancelation
wrapped_process = cancel.wrap(process_item)
async def worker(worker_id: int, items: List[Any]):
for item in items:
# Cancelation checked automatically
await wrapped_process(item)
# Report progress
await cancel.report_progress(
f"Worker {worker_id} processed item",
{"worker_id": worker_id, "item": item}
)
# Split work among workers
work_items = split_into_chunks(all_items, worker_count=4)
# Run workers concurrently
async with anyio.create_task_group() as tg:
for i, items in enumerate(work_items):
tg.start_soon(worker, i, items)
Hierarchical Cancelation
Create operation hierarchies with parent-child relationships:
async def hierarchical_operations():
async with Cancelable(name="parent_operation") as parent:
# Create child operations
async def child_operation(child_id: int):
child = Cancelable(
name=f"child_{child_id}",
parent=parent
)
async with child:
# Child will be cancelled if parent is cancelled
await do_child_work()
# Run children
async with anyio.create_task_group() as tg:
for i in range(5):
tg.start_soon(child_operation, i)
Conditional Cancelation
Cancel based on system resources:
import psutil
def check_memory_usage():
"""Cancel if memory usage is too high."""
return psutil.virtual_memory().percent > 90
async def memory_aware_operation():
async with Cancelable.with_condition(
check_memory_usage,
check_interval=5.0,
condition_name="memory_check"
) as cancel:
cancel.on_cancel(
lambda ctx: logger.warning("Operation cancelled due to high memory usage")
)
await memory_intensive_operation()