Skip to content

Threading & Cross-Context Cancelation

Cancel async operations from threads and bridge between sync/async worlds.

Overview

Cross-thread cancelation is a core feature that allows you to cancel async operations from synchronous code or different threads. This solves the fundamental problem of coordinating between Python's threading and async ecosystems.

Cancelable solves this with thread-safe cancelation:

token = CancelationToken()

def button_click_handler():  # Runs in GUI thread
    token.cancel_sync("User clicked cancel")  # Thread-safe!

async def long_async_operation():
    async with Cancelable.with_token(token) as cancel:
        await process_data()  # Respects cancelation from thread

Thread-Safe Cancelation

Cancel async operations from synchronous threads:

import threading
import time
from hother.cancelable import CancelationToken, Cancelable

token = CancelationToken()

async def async_worker():
    """Long-running async operation."""
    async with Cancelable.with_token(token, name="worker") as cancel:
        for i in range(100):
            await process_item(i)
            await anyio.sleep(0.1)

def sync_canceller():
    """Runs in separate thread."""
    time.sleep(5)  # Wait 5 seconds
    # Thread-safe cancelation
    token.cancel_sync(message="Cancelled from thread")  # (1)!

# Start thread
thread = threading.Thread(target=sync_canceller)
thread.start()

# Run async work
await async_worker()  # Will be cancelled after 5 seconds
thread.join()
  1. cancel_sync() is thread-safe and works from any thread

Thread-Safe Registry Operations

ThreadSafeRegistry

Synchronous API for the operation registry, designed for thread-based web frameworks (Flask, Django).

Why you need this: OperationRegistry is async by default, but many Python web frameworks run in synchronous threads.

from hother.cancelable.utils.threading_bridge import ThreadSafeRegistry

# In Flask/Django view
registry = ThreadSafeRegistry()

# Cancel an operation from a synchronous endpoint
@app.post("/jobs/<job_id>/cancel")
def cancel_job(job_id):
    registry.cancel_operation(job_id, "User requested cancel")
    return {"status": "cancelled"}

# Get statistics synchronously
@app.get("/stats")
def get_stats():
    stats = registry.get_statistics()
    return {
        "total": stats.total_operations,
        "running": stats.running_operations,
        "cancelled": stats.cancelled_operations
    }

# Cancel all operations matching a pattern
@app.post("/jobs/cancel-all")
def cancel_all_user_jobs(user_id):
    registry.cancel_all(name_pattern=f"user_{user_id}_*")
    return {"status": "all_cancelled"}

Methods: - cancel_operation(operation_id, reason) - Cancel specific operation - cancel_all(name_pattern=None) - Cancel all or filtered operations - get_statistics() - Get registry statistics - get_operation(operation_id) - Retrieve operation context

Context Propagation to Threads

ContextBridge

Propagate context variables (like current_operation()) to threads safely.

The problem: Python's contextvars don't automatically propagate to threads, so current_operation() returns None in threads.

The solution:

from hother.cancelable.utils.context_bridge import ContextBridge

async def main():
    async with Cancelable.with_timeout(30.0, name="parent") as cancel:
        # Run blocking operation in thread with context preserved
        bridge = ContextBridge()

        def sync_work():
            # current_operation() works here!
            ctx = current_operation()
            print(f"Operation: {ctx.context.name}")  # Prints "parent"

        result = await bridge.run_in_thread_with_context(sync_work)

Methods: - run_in_thread_with_context(func, *args, **kwargs) - Run function in thread with context - copy_context() - Capture current context - restore_context(context) - Restore captured context

Running Blocking Operations in Threads

run_in_thread() Method

Run synchronous functions in threads while preserving cancelation context.

async with Cancelable.with_timeout(30.0) as cancel:
    # Run blocking I/O in thread
    def blocking_database_query():
        # Simulates blocking call that can't be async
        return database.execute_slow_query()

    result = await cancel.run_in_thread(blocking_database_query)
    # Respects cancelation even while running in thread

Use case: Integrating blocking libraries (database drivers, file operations) that don't have async support.

Thread-to-Async Communication

AnyioBridge

Thread-to-anyio communication bridge for high-throughput scenarios.

from hother.cancelable.utils.anyio_bridge import AnyioBridge

bridge = AnyioBridge(buffer_size=1000)  # Configure buffer

async with bridge:
    # From another thread, send data to async code
    def thread_producer():
        for item in generate_data():
            bridge.call_soon_threadsafe(process_item, item)

    # Start thread
    thread = threading.Thread(target=thread_producer)
    thread.start()

    # Process in async context
    await process_all_items()

Parameters: - buffer_size - Queue size for thread-to-async communication - max_workers - Thread pool size

Use cases: - High-throughput data ingestion - Integrating synchronous libraries - Thread pool management