Skip to content

Utilities

Utility modules providing helper functions, decorators, bridges, and testing tools.

Decorators

The @cancelable decorator for easily making functions cancelable.

hother.cancelable.utils.decorators

Decorators and convenience functions for async cancelation.

logger module-attribute

logger = get_logger(__name__)

P module-attribute

P = ParamSpec('P')

T module-attribute

T = TypeVar('T')

R module-attribute

R = TypeVar('R')

cancelable

cancelable(
    timeout: float | timedelta | None = None,
    operation_id: str | None = None,
    name: str | None = None,
    register_globally: bool = False,
    inject_param: str | None = "cancelable",
) -> Callable[
    [Callable[P, Awaitable[R]]],
    Callable[P, Awaitable[R]],
]

Decorator to make async function cancelable.

The decorator automatically creates a Cancelable context and injects it via the specified parameter name (default: 'cancelable'). The decorated function will ALWAYS receive a non-None Cancelable instance.

Parameters:

Name Type Description Default
timeout float | timedelta | None

Optional timeout for the operation

None
operation_id str | None

Optional operation ID (auto-generated if not provided)

None
name str | None

Optional operation name (defaults to function name)

None
register_globally bool

Whether to register with global registry

False
inject_param str | None

Parameter name to inject cancelable (None to disable)

'cancelable'

Returns:

Type Description
Callable[[Callable[P, Awaitable[R]]], Callable[P, Awaitable[R]]]

Decorator function

Example

@cancelable(timeout=30.0, register_globally=True) async def my_operation(data: str, cancelable: Cancelable = None): await cancelable.report_progress("Starting") # ... do work ... return result

Note

Type the injected parameter as Cancelable = None for type checker compatibility. The decorator ALWAYS provides a non-None instance, but the = None default signals to type checkers that callers don't need to provide this argument.

For strict type checking within the function, optionally add: assert cancelable is not None

Alternatively, disable injection with inject_param=None and use current_operation() instead.

Source code in src/hother/cancelable/utils/decorators.py
def cancelable(
    timeout: float | timedelta | None = None,
    operation_id: str | None = None,
    name: str | None = None,
    register_globally: bool = False,
    inject_param: str | None = "cancelable",
) -> Callable[[Callable[P, Awaitable[R]]], Callable[P, Awaitable[R]]]:
    """Decorator to make async function cancelable.

    The decorator automatically creates a Cancelable context and injects it
    via the specified parameter name (default: 'cancelable'). The decorated
    function will ALWAYS receive a non-None Cancelable instance.

    Args:
        timeout: Optional timeout for the operation
        operation_id: Optional operation ID (auto-generated if not provided)
        name: Optional operation name (defaults to function name)
        register_globally: Whether to register with global registry
        inject_param: Parameter name to inject cancelable (None to disable)

    Returns:
        Decorator function

    Example:
        @cancelable(timeout=30.0, register_globally=True)
        async def my_operation(data: str, cancelable: Cancelable = None):
            await cancelable.report_progress("Starting")
            # ... do work ...
            return result

    Note:
        **Type the injected parameter as `Cancelable = None` for type checker
        compatibility.** The decorator ALWAYS provides a non-None instance,
        but the `= None` default signals to type checkers that callers don't
        need to provide this argument.

        For strict type checking within the function, optionally add:
        `assert cancelable is not None`

        Alternatively, disable injection with `inject_param=None` and use
        `current_operation()` instead.
    """

    def decorator(func: Callable[P, Awaitable[R]]) -> Callable[P, Awaitable[R]]:
        @wraps(func)
        async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
            # Create cancelable
            cancel_kwargs: dict[str, Any] = {
                "operation_id": operation_id,
                "name": name or func.__name__,
                "register_globally": register_globally,
            }

            cancel = Cancelable.with_timeout(timeout, **cancel_kwargs) if timeout else Cancelable(**cancel_kwargs)

            async with cancel:
                # Inject cancelable if requested
                if inject_param:
                    sig = inspect.signature(func)
                    if inject_param in sig.parameters:
                        kwargs[inject_param] = cancel

                # Call the function
                return await func(*args, **kwargs)

            # Unreachable - async with block always completes above
            raise AssertionError("Unreachable")  # pragma: no cover

        # Add attribute to access decorator parameters (dynamic attribute, no type annotation needed)
        wrapper._cancelable_params = {  # type: ignore[attr-defined]
            "timeout": timeout,
            "operation_id": operation_id,
            "name": name or func.__name__,
            "register_globally": register_globally,
        }

        return wrapper

    return decorator

with_timeout async

with_timeout(
    timeout: float | timedelta,
    coro: Awaitable[T],
    operation_id: str | None = None,
    name: str | None = None,
) -> T

Run coroutine with timeout.

Parameters:

Name Type Description Default
timeout float | timedelta

Timeout duration

required
coro Awaitable[T]

Coroutine to run

required
operation_id str | None

Optional operation ID

None
name str | None

Optional operation name

None

Returns:

Type Description
T

Result from coroutine

Raises:

Type Description
CancelledError

If operation times out

Example

result = await with_timeout(5.0, fetch_data())

Source code in src/hother/cancelable/utils/decorators.py
async def with_timeout(
    timeout: float | timedelta,
    coro: Awaitable[T],
    operation_id: str | None = None,
    name: str | None = None,
) -> T:
    """Run coroutine with timeout.

    Args:
        timeout: Timeout duration
        coro: Coroutine to run
        operation_id: Optional operation ID
        name: Optional operation name

    Returns:
        Result from coroutine

    Raises:
        CancelledError: If operation times out

    Example:
        result = await with_timeout(5.0, fetch_data())
    """
    cancelable = Cancelable.with_timeout(
        timeout,
        operation_id=operation_id,
        name=name,
    )

    async with cancelable:
        return await coro

    # Unreachable - async with block always completes above
    raise AssertionError("Unreachable")  # pragma: no cover

with_current_operation

with_current_operation() -> (
    Callable[
        [Callable[P, Awaitable[R]]],
        Callable[P, Awaitable[R]],
    ]
)

Decorator that injects current operation into function.

The function must have a parameter named 'operation'. The decorator will inject the current operation context if available (may be None if called outside a Cancelable context).

Example

@with_current_operation() async def process_item(item: str, operation: Cancelable | None): if operation: await operation.report_progress(f"Processing {item}") return item.upper()

Note

Unlike @cancelable, this decorator injects the CURRENT operation (if one exists) rather than creating a new one. The operation parameter may be None if no cancelable context is active.

Source code in src/hother/cancelable/utils/decorators.py
def with_current_operation() -> Callable[[Callable[P, Awaitable[R]]], Callable[P, Awaitable[R]]]:
    """Decorator that injects current operation into function.

    The function must have a parameter named 'operation'. The decorator
    will inject the current operation context if available (may be None
    if called outside a Cancelable context).

    Example:
        @with_current_operation()
        async def process_item(item: str, operation: Cancelable | None):
            if operation:
                await operation.report_progress(f"Processing {item}")
            return item.upper()

    Note:
        Unlike @cancelable, this decorator injects the CURRENT operation
        (if one exists) rather than creating a new one. The operation
        parameter may be None if no cancelable context is active.
    """

    def decorator(func: Callable[P, Awaitable[R]]) -> Callable[P, Awaitable[R]]:
        @wraps(func)
        async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
            operation = current_operation()

            # Inject operation if function accepts it
            sig = inspect.signature(func)
            if "operation" in sig.parameters and "operation" not in kwargs:
                kwargs["operation"] = operation

            return await func(*args, **kwargs)

        return wrapper

    return decorator

cancelable_method

cancelable_method(
    timeout: float | timedelta | None = None,
    name: str | None = None,
    register_globally: bool = False,
) -> Callable[
    [Callable[..., Awaitable[R]]],
    Callable[..., Awaitable[R]],
]

Decorator for async methods that should be cancelable.

Similar to @cancelable but designed for class methods. The decorator automatically creates a Cancelable context and injects it as a 'cancelable' parameter. The decorated method will ALWAYS receive a non-None Cancelable instance.

Parameters:

Name Type Description Default
timeout float | timedelta | None

Optional timeout for the operation

None
name str | None

Optional operation name (defaults to ClassName.method_name)

None
register_globally bool

Whether to register with global registry

False

Returns:

Type Description
Callable[[Callable[..., Awaitable[R]]], Callable[..., Awaitable[R]]]

Decorator function

Example

class DataProcessor: @cancelable_method(timeout=60.0) async def process(self, data: list, cancelable: Cancelable = None): for item in data: await self._process_item(item) await cancelable.report_progress(f"Processed {item}")

Note

Type the injected parameter as Cancelable = None for type checker compatibility. The decorator ALWAYS provides a non-None instance, but the = None default signals to type checkers that callers don't need to provide this argument.

Source code in src/hother/cancelable/utils/decorators.py
def cancelable_method(
    timeout: float | timedelta | None = None,
    name: str | None = None,
    register_globally: bool = False,
) -> Callable[[Callable[..., Awaitable[R]]], Callable[..., Awaitable[R]]]:
    """Decorator for async methods that should be cancelable.

    Similar to @cancelable but designed for class methods. The decorator
    automatically creates a Cancelable context and injects it as a
    'cancelable' parameter. The decorated method will ALWAYS receive a
    non-None Cancelable instance.

    Args:
        timeout: Optional timeout for the operation
        name: Optional operation name (defaults to ClassName.method_name)
        register_globally: Whether to register with global registry

    Returns:
        Decorator function

    Example:
        class DataProcessor:
            @cancelable_method(timeout=60.0)
            async def process(self, data: list, cancelable: Cancelable = None):
                for item in data:
                    await self._process_item(item)
                    await cancelable.report_progress(f"Processed {item}")

    Note:
        **Type the injected parameter as `Cancelable = None` for type checker
        compatibility.** The decorator ALWAYS provides a non-None instance,
        but the `= None` default signals to type checkers that callers don't
        need to provide this argument.
    """

    def decorator(func: Callable[..., Awaitable[R]]) -> Callable[..., Awaitable[R]]:
        @wraps(func)
        async def wrapper(self: Any, *args: Any, **kwargs: Any) -> R:
            # Get method name including class
            method_name = f"{self.__class__.__name__}.{func.__name__}"

            cancel_kwargs: dict[str, Any] = {
                "name": name or method_name,
                "register_globally": register_globally,
            }

            cancel = Cancelable.with_timeout(timeout, **cancel_kwargs) if timeout else Cancelable(**cancel_kwargs)

            async with cancel:
                # Inject cancelable
                sig = inspect.signature(func)
                if "cancelable" in sig.parameters:
                    kwargs["cancelable"] = cancel

                return await func(self, *args, **kwargs)

            # Unreachable - async with block always completes above
            raise AssertionError("Unreachable")  # pragma: no cover

        # Add attribute to access decorator parameters
        wrapper._cancelable_params = {  # type: ignore[attr-defined]
            "timeout": timeout,
            "name": name,
            "register_globally": register_globally,
        }

        return wrapper

    return decorator

cancelable_with_token

cancelable_with_token(
    token: CancelationToken,
    operation_id: str | None = None,
    name: str | None = None,
    register_globally: bool = False,
    inject_param: str | None = "cancelable",
) -> Callable[
    [Callable[P, Awaitable[R]]],
    Callable[P, Awaitable[R]],
]

Decorator for token-based cancelation.

Creates a cancelable operation that can be cancelled via the provided token. Useful for operations that need to be cancelled from other tasks or threads.

Parameters:

Name Type Description Default
token CancelationToken

CancelationToken to use for cancelation

required
operation_id str | None

Optional operation ID (auto-generated if not provided)

None
name str | None

Optional operation name (defaults to function name)

None
register_globally bool

Whether to register with global registry

False
inject_param str | None

Parameter name to inject Cancelable (None to disable)

'cancelable'

Returns:

Type Description
Callable[[Callable[P, Awaitable[R]]], Callable[P, Awaitable[R]]]

Decorator function

Example
token = CancelationToken()

@cancelable_with_token(token, name="fetch_data")
async def fetch_data(url: str, cancelable: Cancelable = None):
    await cancelable.report_progress("Fetching...")
    return await httpx.get(url)

# Cancel from another task
await token.cancel(CancelationReason.MANUAL, "User cancelled")
Note

Type the injected parameter as Cancelable = None for type checker compatibility.

Source code in src/hother/cancelable/utils/decorators.py
def cancelable_with_token(
    token: "CancelationToken",
    operation_id: str | None = None,
    name: str | None = None,
    register_globally: bool = False,
    inject_param: str | None = "cancelable",
) -> Callable[[Callable[P, Awaitable[R]]], Callable[P, Awaitable[R]]]:
    """Decorator for token-based cancelation.

    Creates a cancelable operation that can be cancelled via the provided token.
    Useful for operations that need to be cancelled from other tasks or threads.

    Args:
        token: CancelationToken to use for cancelation
        operation_id: Optional operation ID (auto-generated if not provided)
        name: Optional operation name (defaults to function name)
        register_globally: Whether to register with global registry
        inject_param: Parameter name to inject Cancelable (None to disable)

    Returns:
        Decorator function

    Example:
        ```python
        token = CancelationToken()

        @cancelable_with_token(token, name="fetch_data")
        async def fetch_data(url: str, cancelable: Cancelable = None):
            await cancelable.report_progress("Fetching...")
            return await httpx.get(url)

        # Cancel from another task
        await token.cancel(CancelationReason.MANUAL, "User cancelled")
        ```

    Note:
        Type the injected parameter as `Cancelable = None` for type checker compatibility.
    """

    def decorator(func: Callable[P, Awaitable[R]]) -> Callable[P, Awaitable[R]]:
        @wraps(func)
        async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
            cancel = Cancelable.with_token(
                token, operation_id=operation_id, name=name or func.__name__, register_globally=register_globally
            )

            async with cancel:
                # Inject cancelable if requested
                if inject_param:
                    sig = inspect.signature(func)
                    if inject_param in sig.parameters:
                        kwargs[inject_param] = cancel

                return await func(*args, **kwargs)

            # Unreachable - async with block always completes above
            raise AssertionError("Unreachable")  # pragma: no cover

        # Add attribute to access decorator parameters
        wrapper._cancelable_params = {  # type: ignore[attr-defined]
            "token": token,
            "operation_id": operation_id,
            "name": name,
            "register_globally": register_globally,
            "inject_param": inject_param,
        }

        return wrapper

    return decorator

cancelable_with_signal

cancelable_with_signal(
    *signals: int,
    operation_id: str | None = None,
    name: str | None = None,
    register_globally: bool = False,
    inject_param: str | None = "cancelable",
) -> Callable[
    [Callable[P, Awaitable[R]]],
    Callable[P, Awaitable[R]],
]

Decorator for signal-based cancelation.

Creates a cancelable operation that responds to OS signals (Unix only). Useful for graceful shutdown of long-running services.

Parameters:

Name Type Description Default
*signals int

Signal numbers to handle (e.g., signal.SIGTERM, signal.SIGINT)

()
operation_id str | None

Optional operation ID (auto-generated if not provided)

None
name str | None

Optional operation name (defaults to function name)

None
register_globally bool

Whether to register with global registry

False
inject_param str | None

Parameter name to inject Cancelable (None to disable)

'cancelable'

Returns:

Type Description
Callable[[Callable[P, Awaitable[R]]], Callable[P, Awaitable[R]]]

Decorator function

Example
import signal

@cancelable_with_signal(signal.SIGTERM, signal.SIGINT, name="service")
async def long_running_service(cancelable: Cancelable = None):
    while True:
        await cancelable.report_progress("Processing...")
        await process_batch()
Note

Type the injected parameter as Cancelable = None for type checker compatibility.

Source code in src/hother/cancelable/utils/decorators.py
def cancelable_with_signal(
    *signals: int,
    operation_id: str | None = None,
    name: str | None = None,
    register_globally: bool = False,
    inject_param: str | None = "cancelable",
) -> Callable[[Callable[P, Awaitable[R]]], Callable[P, Awaitable[R]]]:
    """Decorator for signal-based cancelation.

    Creates a cancelable operation that responds to OS signals (Unix only).
    Useful for graceful shutdown of long-running services.

    Args:
        *signals: Signal numbers to handle (e.g., signal.SIGTERM, signal.SIGINT)
        operation_id: Optional operation ID (auto-generated if not provided)
        name: Optional operation name (defaults to function name)
        register_globally: Whether to register with global registry
        inject_param: Parameter name to inject Cancelable (None to disable)

    Returns:
        Decorator function

    Example:
        ```python
        import signal

        @cancelable_with_signal(signal.SIGTERM, signal.SIGINT, name="service")
        async def long_running_service(cancelable: Cancelable = None):
            while True:
                await cancelable.report_progress("Processing...")
                await process_batch()
        ```

    Note:
        Type the injected parameter as `Cancelable = None` for type checker compatibility.
    """

    def decorator(func: Callable[P, Awaitable[R]]) -> Callable[P, Awaitable[R]]:
        @wraps(func)
        async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:  # pyright: ignore[reportReturnType]
            cancel = Cancelable.with_signal(
                *signals, operation_id=operation_id, name=name or func.__name__, register_globally=register_globally
            )

            async with cancel:
                # Inject cancelable if requested
                if inject_param:
                    sig = inspect.signature(func)
                    if inject_param in sig.parameters:
                        kwargs[inject_param] = cancel

                return await func(*args, **kwargs)

        # Add attribute to access decorator parameters
        wrapper._cancelable_params = {  # type: ignore[attr-defined]
            "signals": signals,
            "operation_id": operation_id,
            "name": name,
            "register_globally": register_globally,
            "inject_param": inject_param,
        }

        return wrapper

    return decorator

cancelable_with_condition

cancelable_with_condition(
    condition: Callable[[], bool | Awaitable[bool]],
    check_interval: float = 0.1,
    condition_name: str | None = None,
    operation_id: str | None = None,
    name: str | None = None,
    register_globally: bool = False,
    inject_param: str | None = "cancelable",
) -> Callable[
    [Callable[P, Awaitable[R]]],
    Callable[P, Awaitable[R]],
]

Decorator for condition-based cancelation.

Creates a cancelable operation that cancels when a condition becomes True. Useful for resource-based cancelation (disk full, memory limit, etc.).

Parameters:

Name Type Description Default
condition Callable[[], bool | Awaitable[bool]]

Callable that returns True when cancelation should occur

required
check_interval float

How often to check the condition (seconds)

0.1
condition_name str | None

Name for the condition (for logging)

None
operation_id str | None

Optional operation ID (auto-generated if not provided)

None
name str | None

Optional operation name (defaults to function name)

None
register_globally bool

Whether to register with global registry

False
inject_param str | None

Parameter name to inject Cancelable (None to disable)

'cancelable'

Returns:

Type Description
Callable[[Callable[P, Awaitable[R]]], Callable[P, Awaitable[R]]]

Decorator function

Example
@cancelable_with_condition(
    lambda: disk_full(),
    check_interval=1.0,
    condition_name="disk_space"
)
async def data_processing(cancelable: Cancelable = None):
    await process_large_dataset()
Note

Type the injected parameter as Cancelable = None for type checker compatibility.

Source code in src/hother/cancelable/utils/decorators.py
def cancelable_with_condition(
    condition: Callable[[], bool | Awaitable[bool]],
    check_interval: float = 0.1,
    condition_name: str | None = None,
    operation_id: str | None = None,
    name: str | None = None,
    register_globally: bool = False,
    inject_param: str | None = "cancelable",
) -> Callable[[Callable[P, Awaitable[R]]], Callable[P, Awaitable[R]]]:
    """Decorator for condition-based cancelation.

    Creates a cancelable operation that cancels when a condition becomes True.
    Useful for resource-based cancelation (disk full, memory limit, etc.).

    Args:
        condition: Callable that returns True when cancelation should occur
        check_interval: How often to check the condition (seconds)
        condition_name: Name for the condition (for logging)
        operation_id: Optional operation ID (auto-generated if not provided)
        name: Optional operation name (defaults to function name)
        register_globally: Whether to register with global registry
        inject_param: Parameter name to inject Cancelable (None to disable)

    Returns:
        Decorator function

    Example:
        ```python
        @cancelable_with_condition(
            lambda: disk_full(),
            check_interval=1.0,
            condition_name="disk_space"
        )
        async def data_processing(cancelable: Cancelable = None):
            await process_large_dataset()
        ```

    Note:
        Type the injected parameter as `Cancelable = None` for type checker compatibility.
    """

    def decorator(func: Callable[P, Awaitable[R]]) -> Callable[P, Awaitable[R]]:
        @wraps(func)
        async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:  # pyright: ignore[reportReturnType]
            cancel = Cancelable.with_condition(
                condition,
                check_interval=check_interval,
                condition_name=condition_name,
                operation_id=operation_id,
                name=name or func.__name__,
                register_globally=register_globally,
            )

            async with cancel:
                # Inject cancelable if requested
                if inject_param:
                    sig = inspect.signature(func)
                    if inject_param in sig.parameters:
                        kwargs[inject_param] = cancel

                return await func(*args, **kwargs)

        # Add attribute to access decorator parameters
        wrapper._cancelable_params = {  # type: ignore[attr-defined]
            "condition": condition,
            "check_interval": check_interval,
            "condition_name": condition_name,
            "operation_id": operation_id,
            "name": name,
            "register_globally": register_globally,
            "inject_param": inject_param,
        }

        return wrapper

    return decorator

cancelable_combine

cancelable_combine(
    *cancelables: Cancelable,
    operation_id: str | None = None,
    name: str | None = None,
    register_globally: bool = False,
    inject_param: str | None = "cancelable",
) -> Callable[
    [Callable[P, Awaitable[R]]],
    Callable[P, Awaitable[R]],
]

Decorator for combining multiple cancelation sources.

Creates a cancelable operation that cancels when ANY of the provided cancelables trigger. Useful for operations with multiple cancelation conditions.

Parameters:

Name Type Description Default
*cancelables Cancelable

Cancelables to combine

()
operation_id str | None

Optional operation ID (auto-generated if not provided)

None
name str | None

Optional operation name (defaults to function name)

None
register_globally bool

Whether to register with global registry

False
inject_param str | None

Parameter name to inject Cancelable (None to disable)

'cancelable'

Returns:

Type Description
Callable[[Callable[P, Awaitable[R]]], Callable[P, Awaitable[R]]]

Decorator function

Example
token = CancelationToken()

@cancelable_combine(
    Cancelable.with_timeout(60),
    Cancelable.with_token(token),
    Cancelable.with_signal(signal.SIGTERM),
    name="resilient_op"
)
async def resilient_operation(cancelable: Cancelable = None):
    return await complex_task()
Note

Type the injected parameter as Cancelable = None for type checker compatibility.

Source code in src/hother/cancelable/utils/decorators.py
def cancelable_combine(
    *cancelables: Cancelable,
    operation_id: str | None = None,
    name: str | None = None,
    register_globally: bool = False,
    inject_param: str | None = "cancelable",
) -> Callable[[Callable[P, Awaitable[R]]], Callable[P, Awaitable[R]]]:
    """Decorator for combining multiple cancelation sources.

    Creates a cancelable operation that cancels when ANY of the provided
    cancelables trigger. Useful for operations with multiple cancelation conditions.

    Args:
        *cancelables: Cancelables to combine
        operation_id: Optional operation ID (auto-generated if not provided)
        name: Optional operation name (defaults to function name)
        register_globally: Whether to register with global registry
        inject_param: Parameter name to inject Cancelable (None to disable)

    Returns:
        Decorator function

    Example:
        ```python
        token = CancelationToken()

        @cancelable_combine(
            Cancelable.with_timeout(60),
            Cancelable.with_token(token),
            Cancelable.with_signal(signal.SIGTERM),
            name="resilient_op"
        )
        async def resilient_operation(cancelable: Cancelable = None):
            return await complex_task()
        ```

    Note:
        Type the injected parameter as `Cancelable = None` for type checker compatibility.
    """

    def decorator(func: Callable[P, Awaitable[R]]) -> Callable[P, Awaitable[R]]:
        @wraps(func)
        async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:  # pyright: ignore[reportReturnType]
            # Combine all cancelables
            if not cancelables:
                raise ValueError("At least one cancelable must be provided to cancelable_combine")

            # Get first cancelable and combine with rest
            # Note: We use the provided cancelables as-is since they may have
            # internal state and sources already configured
            first = cancelables[0]
            cancel = first.combine(*cancelables[1:]) if len(cancelables) > 1 else first

            # Determine the effective name
            # Always prefer explicit name, then function name (for decorator consistency)
            effective_name = name or func.__name__

            # Create a new cancelable with the desired name to avoid mutating shared state
            # We'll use the combined token from the original

            # Always wrap to apply decorator settings (name, operation_id, register_globally)
            final_cancel = Cancelable.with_token(
                cancel.token,
                operation_id=operation_id or cancel.context.id,
                name=effective_name,
                register_globally=register_globally,
            )

            async with final_cancel:
                # Inject cancelable if requested
                if inject_param:
                    sig = inspect.signature(func)
                    if inject_param in sig.parameters:
                        kwargs[inject_param] = final_cancel

                return await func(*args, **kwargs)

        return wrapper

    return decorator

with_cancelable

with_cancelable(
    cancel: Cancelable,
    inject: bool = False,
    inject_param: str = "cancelable",
) -> Callable[
    [Callable[P, Awaitable[R]]],
    Callable[P, Awaitable[R]],
]

Decorator that wraps a function with an existing Cancelable instance.

This decorator allows you to use a pre-configured Cancelable context with your async function. Unlike @cancelable which creates a new context, this decorator uses an existing one, enabling sharing of cancelation state across multiple functions.

Parameters:

Name Type Description Default
cancel Cancelable

Existing Cancelable instance to use

required
inject bool

Whether to inject the Cancelable into the function signature (default: False)

False
inject_param str

Parameter name to inject Cancelable as (default: "cancelable")

'cancelable'

Returns:

Type Description
Callable[[Callable[P, Awaitable[R]]], Callable[P, Awaitable[R]]]

Decorator function

Example
from hother.cancelable import Cancelable, with_cancelable, current_operation

# Create a shared cancelable context
cancel = Cancelable.with_timeout(30.0, name="data_pipeline")

@with_cancelable(cancel)
async def fetch_data():
    # No injection, access via current_operation()
    ctx = current_operation()
    await ctx.report_progress("Fetching data...")
    return await fetch()

@with_cancelable(cancel, inject=True)
async def process_data(cancelable: Cancelable = None):
    # With injection
    await cancelable.report_progress("Processing...")
    return await process()

# Both functions share the same cancelation context
async with cancel:
    data = await fetch_data()
    result = await process_data()
Note

When inject=False (default), use current_operation() to access the context from within the function if needed.

Source code in src/hother/cancelable/utils/decorators.py
def with_cancelable(
    cancel: Cancelable,
    inject: bool = False,
    inject_param: str = "cancelable",
) -> Callable[[Callable[P, Awaitable[R]]], Callable[P, Awaitable[R]]]:
    """Decorator that wraps a function with an existing Cancelable instance.

    This decorator allows you to use a pre-configured Cancelable context
    with your async function. Unlike @cancelable which creates a new context,
    this decorator uses an existing one, enabling sharing of cancelation state
    across multiple functions.

    Args:
        cancel: Existing Cancelable instance to use
        inject: Whether to inject the Cancelable into the function signature (default: False)
        inject_param: Parameter name to inject Cancelable as (default: "cancelable")

    Returns:
        Decorator function

    Example:
        ```python
        from hother.cancelable import Cancelable, with_cancelable, current_operation

        # Create a shared cancelable context
        cancel = Cancelable.with_timeout(30.0, name="data_pipeline")

        @with_cancelable(cancel)
        async def fetch_data():
            # No injection, access via current_operation()
            ctx = current_operation()
            await ctx.report_progress("Fetching data...")
            return await fetch()

        @with_cancelable(cancel, inject=True)
        async def process_data(cancelable: Cancelable = None):
            # With injection
            await cancelable.report_progress("Processing...")
            return await process()

        # Both functions share the same cancelation context
        async with cancel:
            data = await fetch_data()
            result = await process_data()
        ```

    Note:
        When inject=False (default), use current_operation() to access the context
        from within the function if needed.
    """

    def decorator(func: Callable[P, Awaitable[R]]) -> Callable[P, Awaitable[R]]:
        @wraps(func)
        async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
            # Note: We don't enter the cancel context here - that's the user's responsibility
            # This decorator just makes the cancel instance available to the function
            # The user must use: async with cancel: await decorated_function()

            # Inject cancelable if requested
            if inject:
                sig = inspect.signature(func)
                if inject_param in sig.parameters:
                    kwargs[inject_param] = cancel

            return await func(*args, **kwargs)

        return wrapper

    return decorator

Bridges

AnyIO Bridge

Bridge for integrating with anyio-based async code.

hother.cancelable.utils.anyio_bridge

Global bridge for thread-safe anyio operations.

Allows regular Python threads to schedule callbacks in anyio context, providing an equivalent to asyncio's loop.call_soon_threadsafe().

logger module-attribute

logger = get_logger(__name__)

AnyioBridge

Singleton bridge for thread-to-anyio communication.

Provides call_soon_threadsafe equivalent for anyio by using memory object streams and a background worker task.

Parameters:

Name Type Description Default
buffer_size int

Maximum number of queued callbacks before blocking (default: 1000)

1000
Example
# Custom buffer size for high-throughput applications
bridge = AnyioBridge(buffer_size=5000)
await bridge.start()

# Or use default
bridge = AnyioBridge.get_instance()

async with anyio.create_task_group() as tg:
    tg.start_soon(bridge.start)

    # Now thread-safe calls work
    def from_thread():
        bridge.call_soon_threadsafe(some_callback)
Source code in src/hother/cancelable/utils/anyio_bridge.py
class AnyioBridge:
    """Singleton bridge for thread-to-anyio communication.

    Provides call_soon_threadsafe equivalent for anyio by using
    memory object streams and a background worker task.

    Args:
        buffer_size: Maximum number of queued callbacks before blocking (default: 1000)

    Example:
        ```python
        # Custom buffer size for high-throughput applications
        bridge = AnyioBridge(buffer_size=5000)
        await bridge.start()

        # Or use default
        bridge = AnyioBridge.get_instance()

        async with anyio.create_task_group() as tg:
            tg.start_soon(bridge.start)

            # Now thread-safe calls work
            def from_thread():
                bridge.call_soon_threadsafe(some_callback)
        ```
    """

    _instance: AnyioBridge | None = None
    _lock = threading.Lock()

    def __init__(self, buffer_size: int = 1000) -> None:
        """Initialize the AnyioBridge.

        Args:
            buffer_size: Maximum number of queued callbacks before blocking (default: 1000)
        """
        self._buffer_size = buffer_size
        self._send_stream: anyio.abc.ObjectSendStream | None = None  # type: ignore[attr-defined]
        self._receive_stream: anyio.abc.ObjectReceiveStream | None = None  # type: ignore[attr-defined]
        self._started: bool = False

        # Fallback queue for callbacks received before bridge starts
        self._pending_callbacks: deque[Callable[[], Any]] = deque()
        self._pending_lock = threading.Lock()

    @classmethod
    def get_instance(cls) -> Self:
        """Get singleton instance of the bridge.

        Thread-safe lazy initialization.

        Returns:
            The singleton AnyioBridge instance
        """
        if cls._instance is None:
            with cls._lock:
                if cls._instance is None:
                    cls._instance = cls()
        return cls._instance  # type: ignore[return-value]

    async def start(self) -> None:
        """Start the bridge worker task.

        Should be called once at application startup from async context.
        Must be run in a task group as it blocks forever.

        Example:
            ```python
            async with anyio.create_task_group() as tg:
                tg.start_soon(bridge.start)
                # Bridge is now running
            ```
        """
        if self._started:
            logger.warning("Bridge already started, ignoring duplicate start")
            logger.info(f"Bridge worker alive check - stream is: {self._receive_stream}")  # type: ignore[attr-defined]
            return

        logger.debug("Starting anyio bridge")

        # Create communication streams
        self._send_stream, self._receive_stream = anyio.create_memory_object_stream(self._buffer_size)

        # Process any pending callbacks that arrived before bridge started
        with self._pending_lock:
            pending_count = len(self._pending_callbacks)
            if pending_count > 0:
                logger.info(f"Processing {pending_count} pending callbacks")
                while self._pending_callbacks:
                    callback = self._pending_callbacks.popleft()
                    try:
                        self._send_stream.send_nowait(callback)  # type: ignore[union-attr]
                    except anyio.WouldBlock:
                        logger.warning("Bridge queue full during startup, callback dropped")

        self._started = True
        logger.info("Anyio bridge started and ready")

        # Start worker loop (blocks forever)
        await self._worker()

    async def _worker(self) -> None:
        """Worker task that processes callbacks from threads.

        Runs forever until the receive stream is closed.
        """
        logger.info("Bridge worker started, waiting for callbacks...")
        try:
            while True:
                # Explicitly receive next callback (yields properly)
                logger.debug("Bridge worker waiting for next callback...")
                callback = await self._receive_stream.receive()  # type: ignore[union-attr]
                logger.debug(f"Bridge worker received callback: {callback}")

                try:
                    # Execute callback
                    logger.debug("Bridge worker executing callback...")
                    result = callback()  # type: ignore[var-annotated]
                    logger.debug(f"Callback result: {result}")

                    # If it's a coroutine, await it
                    if hasattr(result, "__await__"):  # type: ignore[arg-type]
                        logger.debug("Callback is coroutine, awaiting...")
                        await result
                        logger.debug("Coroutine completed")
                    else:
                        logger.debug("Callback completed (sync)")
                except Exception as e:
                    logger.error(f"Bridge callback error: {e}", exc_info=True)

                # Explicitly yield control to anyio scheduler
                await checkpoint()

        except anyio.EndOfStream:
            logger.info("Bridge stream closed, worker ending normally")
        except Exception as e:
            logger.error(f"Bridge worker error: {e}", exc_info=True)

        logger.warning("Bridge worker loop ended")

    def call_soon_threadsafe(self, callback: Callable[[], Any]) -> None:
        """Schedule callback to run in anyio context from any thread.

        This is the anyio equivalent of asyncio's loop.call_soon_threadsafe().
        The callback will be executed in the anyio event loop context.

        Args:
            callback: Function to call (can be sync or async)

        Note:
            If the bridge hasn't started yet, callbacks are queued
            and will be processed once the bridge starts.
        """
        if not self._started:
            # Queue for later processing
            with self._pending_lock:
                self._pending_callbacks.append(callback)
                logger.debug(f"Bridge not started, queuing callback (queue size: {len(self._pending_callbacks)})")
            return

        logger.debug(f"Queueing callback to bridge: {callback}")
        try:
            self._send_stream.send_nowait(callback)  # type: ignore[union-attr]
            logger.debug("Callback successfully queued to bridge stream")
        except anyio.WouldBlock:
            logger.warning(
                f"Bridge queue full ({self._buffer_size} callbacks), " "callback dropped - consider increasing buffer size"
            )
        except Exception as e:
            logger.error(f"Failed to schedule callback: {e}", exc_info=True)

    async def stop(self) -> None:
        """Stop the bridge and clean up resources.

        Properly closes the send and receive streams to avoid
        resource leak warnings during garbage collection.

        Should be called before cancelling the task group running the bridge.

        Example:
            ```python
            async with anyio.create_task_group() as tg:
                tg.start_soon(bridge.start)
                # ... use bridge ...
                await bridge.stop()
                tg.cancel_scope.cancel()
            ```
        """
        logger.debug("Stopping anyio bridge")

        # Close streams if they exist
        if self._send_stream is not None:  # type: ignore[attr-defined]
            try:
                await self._send_stream.aclose()  # type: ignore[union-attr]
                logger.debug("Send stream closed")
            except Exception as e:
                logger.warning(f"Error closing send stream: {e}")

        if self._receive_stream is not None:  # type: ignore[attr-defined]
            try:
                await self._receive_stream.aclose()  # type: ignore[union-attr]
                logger.debug("Receive stream closed")
            except Exception as e:
                logger.warning(f"Error closing receive stream: {e}")

        self._started = False
        self._send_stream = None
        self._receive_stream = None
        logger.info("Anyio bridge stopped and cleaned up")

    @property
    def is_started(self) -> bool:
        """Check if bridge is started and ready."""
        return self._started

is_started property

is_started: bool

Check if bridge is started and ready.

get_instance classmethod

get_instance() -> Self

Get singleton instance of the bridge.

Thread-safe lazy initialization.

Returns:

Type Description
Self

The singleton AnyioBridge instance

Source code in src/hother/cancelable/utils/anyio_bridge.py
@classmethod
def get_instance(cls) -> Self:
    """Get singleton instance of the bridge.

    Thread-safe lazy initialization.

    Returns:
        The singleton AnyioBridge instance
    """
    if cls._instance is None:
        with cls._lock:
            if cls._instance is None:
                cls._instance = cls()
    return cls._instance  # type: ignore[return-value]

start async

start() -> None

Start the bridge worker task.

Should be called once at application startup from async context. Must be run in a task group as it blocks forever.

Example
async with anyio.create_task_group() as tg:
    tg.start_soon(bridge.start)
    # Bridge is now running
Source code in src/hother/cancelable/utils/anyio_bridge.py
async def start(self) -> None:
    """Start the bridge worker task.

    Should be called once at application startup from async context.
    Must be run in a task group as it blocks forever.

    Example:
        ```python
        async with anyio.create_task_group() as tg:
            tg.start_soon(bridge.start)
            # Bridge is now running
        ```
    """
    if self._started:
        logger.warning("Bridge already started, ignoring duplicate start")
        logger.info(f"Bridge worker alive check - stream is: {self._receive_stream}")  # type: ignore[attr-defined]
        return

    logger.debug("Starting anyio bridge")

    # Create communication streams
    self._send_stream, self._receive_stream = anyio.create_memory_object_stream(self._buffer_size)

    # Process any pending callbacks that arrived before bridge started
    with self._pending_lock:
        pending_count = len(self._pending_callbacks)
        if pending_count > 0:
            logger.info(f"Processing {pending_count} pending callbacks")
            while self._pending_callbacks:
                callback = self._pending_callbacks.popleft()
                try:
                    self._send_stream.send_nowait(callback)  # type: ignore[union-attr]
                except anyio.WouldBlock:
                    logger.warning("Bridge queue full during startup, callback dropped")

    self._started = True
    logger.info("Anyio bridge started and ready")

    # Start worker loop (blocks forever)
    await self._worker()

call_soon_threadsafe

call_soon_threadsafe(callback: Callable[[], Any]) -> None

Schedule callback to run in anyio context from any thread.

This is the anyio equivalent of asyncio's loop.call_soon_threadsafe(). The callback will be executed in the anyio event loop context.

Parameters:

Name Type Description Default
callback Callable[[], Any]

Function to call (can be sync or async)

required
Note

If the bridge hasn't started yet, callbacks are queued and will be processed once the bridge starts.

Source code in src/hother/cancelable/utils/anyio_bridge.py
def call_soon_threadsafe(self, callback: Callable[[], Any]) -> None:
    """Schedule callback to run in anyio context from any thread.

    This is the anyio equivalent of asyncio's loop.call_soon_threadsafe().
    The callback will be executed in the anyio event loop context.

    Args:
        callback: Function to call (can be sync or async)

    Note:
        If the bridge hasn't started yet, callbacks are queued
        and will be processed once the bridge starts.
    """
    if not self._started:
        # Queue for later processing
        with self._pending_lock:
            self._pending_callbacks.append(callback)
            logger.debug(f"Bridge not started, queuing callback (queue size: {len(self._pending_callbacks)})")
        return

    logger.debug(f"Queueing callback to bridge: {callback}")
    try:
        self._send_stream.send_nowait(callback)  # type: ignore[union-attr]
        logger.debug("Callback successfully queued to bridge stream")
    except anyio.WouldBlock:
        logger.warning(
            f"Bridge queue full ({self._buffer_size} callbacks), " "callback dropped - consider increasing buffer size"
        )
    except Exception as e:
        logger.error(f"Failed to schedule callback: {e}", exc_info=True)

stop async

stop() -> None

Stop the bridge and clean up resources.

Properly closes the send and receive streams to avoid resource leak warnings during garbage collection.

Should be called before cancelling the task group running the bridge.

Example
async with anyio.create_task_group() as tg:
    tg.start_soon(bridge.start)
    # ... use bridge ...
    await bridge.stop()
    tg.cancel_scope.cancel()
Source code in src/hother/cancelable/utils/anyio_bridge.py
async def stop(self) -> None:
    """Stop the bridge and clean up resources.

    Properly closes the send and receive streams to avoid
    resource leak warnings during garbage collection.

    Should be called before cancelling the task group running the bridge.

    Example:
        ```python
        async with anyio.create_task_group() as tg:
            tg.start_soon(bridge.start)
            # ... use bridge ...
            await bridge.stop()
            tg.cancel_scope.cancel()
        ```
    """
    logger.debug("Stopping anyio bridge")

    # Close streams if they exist
    if self._send_stream is not None:  # type: ignore[attr-defined]
        try:
            await self._send_stream.aclose()  # type: ignore[union-attr]
            logger.debug("Send stream closed")
        except Exception as e:
            logger.warning(f"Error closing send stream: {e}")

    if self._receive_stream is not None:  # type: ignore[attr-defined]
        try:
            await self._receive_stream.aclose()  # type: ignore[union-attr]
            logger.debug("Receive stream closed")
        except Exception as e:
            logger.warning(f"Error closing receive stream: {e}")

    self._started = False
    self._send_stream = None
    self._receive_stream = None
    logger.info("Anyio bridge stopped and cleaned up")

call_soon_threadsafe

call_soon_threadsafe(callback: Callable[[], Any]) -> None

Convenience function for thread-safe anyio scheduling.

Equivalent to bridge.get_instance().call_soon_threadsafe(callback).

Parameters:

Name Type Description Default
callback Callable[[], Any]

Function to call in anyio context

required
Example
def on_signal(signum):
    # Called from signal handler thread
    async def cancel_operation():
        await token.cancel()

    call_soon_threadsafe(cancel_operation)
Source code in src/hother/cancelable/utils/anyio_bridge.py
def call_soon_threadsafe(callback: Callable[[], Any]) -> None:
    """Convenience function for thread-safe anyio scheduling.

    Equivalent to bridge.get_instance().call_soon_threadsafe(callback).

    Args:
        callback: Function to call in anyio context

    Example:
        ```python
        def on_signal(signum):
            # Called from signal handler thread
            async def cancel_operation():
                await token.cancel()

            call_soon_threadsafe(cancel_operation)
        ```
    """
    bridge = AnyioBridge.get_instance()
    bridge.call_soon_threadsafe(callback)

Threading Bridge

Bridge for canceling operations from threads.

hother.cancelable.utils.threading_bridge

Thread-safe wrapper for OperationRegistry.

Provides synchronous API for accessing the registry from threads.

ThreadSafeRegistry

Thread-safe wrapper for OperationRegistry.

Provides synchronous API for accessing the registry from threads. All methods are thread-safe and can be called from any thread.

This class wraps the OperationRegistry singleton and provides convenience methods without the _sync suffix.

Example
# From a thread (e.g., Flask/Django handler)
from hother.cancelable import ThreadSafeRegistry

registry = ThreadSafeRegistry()

# List running operations
operations = registry.list_operations(status=OperationStatus.RUNNING)

# Cancel a specific operation
registry.cancel_operation(op_id, reason=CancelationReason.MANUAL)

# Get statistics
stats = registry.get_statistics()
print(f"Active operations: {stats['active_operations']}")
Note
  • Read operations (get, list, statistics, history) return immediately with data
  • Write operations (cancel) schedule async work and return immediately
  • For async code, use OperationRegistry directly instead
Source code in src/hother/cancelable/utils/threading_bridge.py
class ThreadSafeRegistry:
    """Thread-safe wrapper for OperationRegistry.

    Provides synchronous API for accessing the registry from threads.
    All methods are thread-safe and can be called from any thread.

    This class wraps the OperationRegistry singleton and provides
    convenience methods without the `_sync` suffix.

    Example:
        ```python
        # From a thread (e.g., Flask/Django handler)
        from hother.cancelable import ThreadSafeRegistry

        registry = ThreadSafeRegistry()

        # List running operations
        operations = registry.list_operations(status=OperationStatus.RUNNING)

        # Cancel a specific operation
        registry.cancel_operation(op_id, reason=CancelationReason.MANUAL)

        # Get statistics
        stats = registry.get_statistics()
        print(f"Active operations: {stats['active_operations']}")
        ```

    Note:
        - Read operations (get, list, statistics, history) return immediately with data
        - Write operations (cancel) schedule async work and return immediately
        - For async code, use OperationRegistry directly instead
    """

    def __init__(self):
        """Initialize thread-safe registry wrapper."""
        self._registry = OperationRegistry.get_instance()

    def get_operation(self, operation_id: str) -> Cancelable | None:
        """Get operation by ID.

        Args:
            operation_id: Operation ID to look up

        Returns:
            Cancelable operation or None if not found
        """
        return self._registry.get_operation_sync(operation_id)

    def list_operations(
        self,
        status: OperationStatus | None = None,
        parent_id: str | None = None,
        name_pattern: str | None = None,
    ) -> list[OperationContext]:
        """List operations with optional filtering.

        Args:
            status: Filter by operation status
            parent_id: Filter by parent operation ID
            name_pattern: Filter by name (substring match)

        Returns:
            List of matching operation contexts
        """
        return self._registry.list_operations_sync(status, parent_id, name_pattern)

    def get_statistics(self) -> dict[str, Any]:
        """Get registry statistics.

        Returns:
            Dictionary with operation statistics containing:
            - active_operations: Number of active operations
            - active_by_status: Active operations grouped by status
            - history_size: Number of operations in history
            - history_by_status: Historical operations grouped by status
            - average_duration_seconds: Average duration of completed operations
            - total_completed: Total number of completed operations
        """
        return self._registry.get_statistics_sync()

    def get_history(
        self,
        limit: int | None = None,
        status: OperationStatus | None = None,
        since: datetime | None = None,
    ) -> list[OperationContext]:
        """Get operation history.

        Args:
            limit: Maximum number of operations to return
            status: Filter by final status
            since: Only return operations completed after this time

        Returns:
            List of historical operation contexts
        """
        return self._registry.get_history_sync(limit, status, since)

    def cancel_operation(
        self,
        operation_id: str,
        reason: CancelationReason = CancelationReason.MANUAL,
        message: str | None = None,
    ) -> None:
        """Cancel a specific operation.

        Schedules cancelation to be executed asynchronously and returns immediately.

        Args:
            operation_id: ID of operation to cancel
            reason: Reason for cancelation
            message: Optional cancelation message

        Note:
            This method returns immediately. The cancelation is scheduled
            asynchronously via AnyioBridge.
        """
        self._registry.cancel_operation_sync(operation_id, reason, message)

    def cancel_all(
        self,
        status: OperationStatus | None = None,
        reason: CancelationReason = CancelationReason.MANUAL,
        message: str | None = None,
    ) -> None:
        """Cancel all operations with optional status filter.

        Schedules cancelation to be executed asynchronously and returns immediately.

        Args:
            status: Only cancel operations with this status
            reason: Reason for cancelation
            message: Optional cancelation message

        Note:
            This method returns immediately. The cancelation is scheduled
            asynchronously via AnyioBridge.
        """
        self._registry.cancel_all_sync(status, reason, message)

    # Singleton pattern (optional - users can create instances directly or use singleton)

    _instance: ThreadSafeRegistry | None = None
    _lock = threading.Lock()

    @classmethod
    def get_instance(cls) -> ThreadSafeRegistry:
        """Get singleton instance of thread-safe registry.

        Thread-safe lazy initialization.

        Returns:
            The singleton ThreadSafeRegistry instance

        Example:
            ```python
            registry = ThreadSafeRegistry.get_instance()
            stats = registry.get_statistics()
            ```
        """
        if cls._instance is None:
            with cls._lock:
                if cls._instance is None:
                    cls._instance = cls()
        return cls._instance

get_operation

get_operation(operation_id: str) -> Cancelable | None

Get operation by ID.

Parameters:

Name Type Description Default
operation_id str

Operation ID to look up

required

Returns:

Type Description
Cancelable | None

Cancelable operation or None if not found

Source code in src/hother/cancelable/utils/threading_bridge.py
def get_operation(self, operation_id: str) -> Cancelable | None:
    """Get operation by ID.

    Args:
        operation_id: Operation ID to look up

    Returns:
        Cancelable operation or None if not found
    """
    return self._registry.get_operation_sync(operation_id)

list_operations

list_operations(
    status: OperationStatus | None = None,
    parent_id: str | None = None,
    name_pattern: str | None = None,
) -> list[OperationContext]

List operations with optional filtering.

Parameters:

Name Type Description Default
status OperationStatus | None

Filter by operation status

None
parent_id str | None

Filter by parent operation ID

None
name_pattern str | None

Filter by name (substring match)

None

Returns:

Type Description
list[OperationContext]

List of matching operation contexts

Source code in src/hother/cancelable/utils/threading_bridge.py
def list_operations(
    self,
    status: OperationStatus | None = None,
    parent_id: str | None = None,
    name_pattern: str | None = None,
) -> list[OperationContext]:
    """List operations with optional filtering.

    Args:
        status: Filter by operation status
        parent_id: Filter by parent operation ID
        name_pattern: Filter by name (substring match)

    Returns:
        List of matching operation contexts
    """
    return self._registry.list_operations_sync(status, parent_id, name_pattern)

get_statistics

get_statistics() -> dict[str, Any]

Get registry statistics.

Returns:

Type Description
dict[str, Any]

Dictionary with operation statistics containing:

dict[str, Any]
  • active_operations: Number of active operations
dict[str, Any]
  • active_by_status: Active operations grouped by status
dict[str, Any]
  • history_size: Number of operations in history
dict[str, Any]
  • history_by_status: Historical operations grouped by status
dict[str, Any]
  • average_duration_seconds: Average duration of completed operations
dict[str, Any]
  • total_completed: Total number of completed operations
Source code in src/hother/cancelable/utils/threading_bridge.py
def get_statistics(self) -> dict[str, Any]:
    """Get registry statistics.

    Returns:
        Dictionary with operation statistics containing:
        - active_operations: Number of active operations
        - active_by_status: Active operations grouped by status
        - history_size: Number of operations in history
        - history_by_status: Historical operations grouped by status
        - average_duration_seconds: Average duration of completed operations
        - total_completed: Total number of completed operations
    """
    return self._registry.get_statistics_sync()

get_history

get_history(
    limit: int | None = None,
    status: OperationStatus | None = None,
    since: datetime | None = None,
) -> list[OperationContext]

Get operation history.

Parameters:

Name Type Description Default
limit int | None

Maximum number of operations to return

None
status OperationStatus | None

Filter by final status

None
since datetime | None

Only return operations completed after this time

None

Returns:

Type Description
list[OperationContext]

List of historical operation contexts

Source code in src/hother/cancelable/utils/threading_bridge.py
def get_history(
    self,
    limit: int | None = None,
    status: OperationStatus | None = None,
    since: datetime | None = None,
) -> list[OperationContext]:
    """Get operation history.

    Args:
        limit: Maximum number of operations to return
        status: Filter by final status
        since: Only return operations completed after this time

    Returns:
        List of historical operation contexts
    """
    return self._registry.get_history_sync(limit, status, since)

cancel_operation

cancel_operation(
    operation_id: str,
    reason: CancelationReason = MANUAL,
    message: str | None = None,
) -> None

Cancel a specific operation.

Schedules cancelation to be executed asynchronously and returns immediately.

Parameters:

Name Type Description Default
operation_id str

ID of operation to cancel

required
reason CancelationReason

Reason for cancelation

MANUAL
message str | None

Optional cancelation message

None
Note

This method returns immediately. The cancelation is scheduled asynchronously via AnyioBridge.

Source code in src/hother/cancelable/utils/threading_bridge.py
def cancel_operation(
    self,
    operation_id: str,
    reason: CancelationReason = CancelationReason.MANUAL,
    message: str | None = None,
) -> None:
    """Cancel a specific operation.

    Schedules cancelation to be executed asynchronously and returns immediately.

    Args:
        operation_id: ID of operation to cancel
        reason: Reason for cancelation
        message: Optional cancelation message

    Note:
        This method returns immediately. The cancelation is scheduled
        asynchronously via AnyioBridge.
    """
    self._registry.cancel_operation_sync(operation_id, reason, message)

cancel_all

cancel_all(
    status: OperationStatus | None = None,
    reason: CancelationReason = MANUAL,
    message: str | None = None,
) -> None

Cancel all operations with optional status filter.

Schedules cancelation to be executed asynchronously and returns immediately.

Parameters:

Name Type Description Default
status OperationStatus | None

Only cancel operations with this status

None
reason CancelationReason

Reason for cancelation

MANUAL
message str | None

Optional cancelation message

None
Note

This method returns immediately. The cancelation is scheduled asynchronously via AnyioBridge.

Source code in src/hother/cancelable/utils/threading_bridge.py
def cancel_all(
    self,
    status: OperationStatus | None = None,
    reason: CancelationReason = CancelationReason.MANUAL,
    message: str | None = None,
) -> None:
    """Cancel all operations with optional status filter.

    Schedules cancelation to be executed asynchronously and returns immediately.

    Args:
        status: Only cancel operations with this status
        reason: Reason for cancelation
        message: Optional cancelation message

    Note:
        This method returns immediately. The cancelation is scheduled
        asynchronously via AnyioBridge.
    """
    self._registry.cancel_all_sync(status, reason, message)

get_instance classmethod

get_instance() -> ThreadSafeRegistry

Get singleton instance of thread-safe registry.

Thread-safe lazy initialization.

Returns:

Type Description
ThreadSafeRegistry

The singleton ThreadSafeRegistry instance

Example
registry = ThreadSafeRegistry.get_instance()
stats = registry.get_statistics()
Source code in src/hother/cancelable/utils/threading_bridge.py
@classmethod
def get_instance(cls) -> ThreadSafeRegistry:
    """Get singleton instance of thread-safe registry.

    Thread-safe lazy initialization.

    Returns:
        The singleton ThreadSafeRegistry instance

    Example:
        ```python
        registry = ThreadSafeRegistry.get_instance()
        stats = registry.get_statistics()
        ```
    """
    if cls._instance is None:
        with cls._lock:
            if cls._instance is None:
                cls._instance = cls()
    return cls._instance

Context Bridge

Context propagation utilities.

hother.cancelable.utils.context_bridge

Context bridge utilities for thread-safe context variable propagation.

This module provides utilities to safely propagate context variables between async tasks and OS threads, solving the context variable thread safety issue.

T module-attribute

T = TypeVar('T')

ContextBridge

Thread-safe context variable bridge for async-to-thread communication.

This class solves the issue where context variables don't propagate to OS threads created by ThreadPoolExecutor, breaking operation tracking in multi-threaded applications.

Source code in src/hother/cancelable/utils/context_bridge.py
class ContextBridge:
    """Thread-safe context variable bridge for async-to-thread communication.

    This class solves the issue where context variables don't propagate
    to OS threads created by ThreadPoolExecutor, breaking operation tracking
    in multi-threaded applications.
    """

    @staticmethod
    def copy_context() -> dict[contextvars.ContextVar[Any], Any]:
        """Copy current context variables to a dict for thread transport.

        Returns:
            Dictionary mapping context variables to their current values
        """
        ctx = contextvars.copy_context()
        return dict(ctx)

    @staticmethod
    def restore_context(context_dict: dict[contextvars.ContextVar[Any], Any]) -> None:
        """Restore context variables from a dictionary.

        Args:
            context_dict: Dictionary mapping context variables to values
        """
        for var, value in context_dict.items():
            var.set(value)

    @staticmethod
    async def run_in_thread_with_context(
        func: Callable[..., T], *args: Any, executor: ThreadPoolExecutor | None = None, **kwargs: Any
    ) -> T:
        """Run function in thread with context variables propagated.

        This method safely copies context variables to the thread, runs the
        function, and returns the result.

        Args:
            func: Function to run in thread
            *args: Positional arguments for func
            executor: Optional thread pool executor (default: None for default executor)
            **kwargs: Keyword arguments for func

        Returns:
            Result of func execution

        Example:
            ```python
            async def async_func():
                result = await ContextBridge.run_in_thread_with_context(
                    expensive_computation, data, param=value
                )
                return result
            ```
        """
        # Copy current context
        ctx = ContextBridge.copy_context()

        def thread_func():
            # Restore context in thread
            ContextBridge.restore_context(ctx)
            return func(*args, **kwargs)

        # Run in thread
        loop = asyncio.get_event_loop()
        return await loop.run_in_executor(executor, thread_func)

copy_context staticmethod

copy_context() -> dict[ContextVar[Any], Any]

Copy current context variables to a dict for thread transport.

Returns:

Type Description
dict[ContextVar[Any], Any]

Dictionary mapping context variables to their current values

Source code in src/hother/cancelable/utils/context_bridge.py
@staticmethod
def copy_context() -> dict[contextvars.ContextVar[Any], Any]:
    """Copy current context variables to a dict for thread transport.

    Returns:
        Dictionary mapping context variables to their current values
    """
    ctx = contextvars.copy_context()
    return dict(ctx)

restore_context staticmethod

restore_context(
    context_dict: dict[ContextVar[Any], Any],
) -> None

Restore context variables from a dictionary.

Parameters:

Name Type Description Default
context_dict dict[ContextVar[Any], Any]

Dictionary mapping context variables to values

required
Source code in src/hother/cancelable/utils/context_bridge.py
@staticmethod
def restore_context(context_dict: dict[contextvars.ContextVar[Any], Any]) -> None:
    """Restore context variables from a dictionary.

    Args:
        context_dict: Dictionary mapping context variables to values
    """
    for var, value in context_dict.items():
        var.set(value)

run_in_thread_with_context async staticmethod

run_in_thread_with_context(
    func: Callable[..., T],
    *args: Any,
    executor: ThreadPoolExecutor | None = None,
    **kwargs: Any,
) -> T

Run function in thread with context variables propagated.

This method safely copies context variables to the thread, runs the function, and returns the result.

Parameters:

Name Type Description Default
func Callable[..., T]

Function to run in thread

required
*args Any

Positional arguments for func

()
executor ThreadPoolExecutor | None

Optional thread pool executor (default: None for default executor)

None
**kwargs Any

Keyword arguments for func

{}

Returns:

Type Description
T

Result of func execution

Example
async def async_func():
    result = await ContextBridge.run_in_thread_with_context(
        expensive_computation, data, param=value
    )
    return result
Source code in src/hother/cancelable/utils/context_bridge.py
@staticmethod
async def run_in_thread_with_context(
    func: Callable[..., T], *args: Any, executor: ThreadPoolExecutor | None = None, **kwargs: Any
) -> T:
    """Run function in thread with context variables propagated.

    This method safely copies context variables to the thread, runs the
    function, and returns the result.

    Args:
        func: Function to run in thread
        *args: Positional arguments for func
        executor: Optional thread pool executor (default: None for default executor)
        **kwargs: Keyword arguments for func

    Returns:
        Result of func execution

    Example:
        ```python
        async def async_func():
            result = await ContextBridge.run_in_thread_with_context(
                expensive_computation, data, param=value
            )
            return result
        ```
    """
    # Copy current context
    ctx = ContextBridge.copy_context()

    def thread_func():
        # Restore context in thread
        ContextBridge.restore_context(ctx)
        return func(*args, **kwargs)

    # Run in thread
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(executor, thread_func)

Stream Processing

Utilities for cancelable async stream processing.

hother.cancelable.utils.streams

Stream utilities for async cancelation.

logger module-attribute

logger = get_logger(__name__)

T module-attribute

T = TypeVar('T')

CancelableAsyncIterator

Bases: AsyncIterator[T]

Wrapper class that makes any async iterator cancelable.

This provides a class-based alternative to the cancelable_stream function.

Source code in src/hother/cancelable/utils/streams.py
class CancelableAsyncIterator(AsyncIterator[T]):
    """Wrapper class that makes any async iterator cancelable.

    This provides a class-based alternative to the cancelable_stream function.
    """

    def __init__(
        self,
        iterator: AsyncIterator[T],
        cancelable: Cancelable,
        report_interval: int | None = None,
        buffer_partial: bool = False,
    ):
        """Initialize cancelable iterator.

        Args:
            iterator: Async iterator to wrap
            cancelable: Cancelable instance to use
            report_interval: Report progress every N items
            buffer_partial: Whether to buffer items
        """
        self._iterator: AsyncIterator[T] = iterator
        self._cancellable: Cancelable = cancelable
        self._report_interval = report_interval
        self._buffer_partial = buffer_partial
        self._count = 0
        self._buffer: list[T] | None = [] if buffer_partial else None
        self._stream_iter = None
        self._completed = False

    def __aiter__(self) -> "CancelableAsyncIterator[T]":
        """Return self as async iterator."""
        return self

    async def __anext__(self) -> T:
        """Get next item with cancelation checking."""
        # Check cancelation
        await self._cancellable.token.check_async()

        try:
            # Get next item
            item = await self._iterator.__anext__()

            # Update count and buffer
            self._count += 1
            if self._buffer is not None:
                self._buffer.append(item)
                if len(self._buffer) > _MAX_BUFFER_SIZE:
                    self._buffer = self._buffer[-_MAX_BUFFER_SIZE:]

            # Report progress if needed
            if self._report_interval and self._count % self._report_interval == 0:
                await self._cancellable.report_progress(
                    f"Processed {self._count} items", {"count": self._count, "latest_item": item}
                )

            return item

        except StopAsyncIteration:
            # Stream ended normally
            self._completed = True
            if self._buffer is not None:
                self._cancellable.context.partial_result = {
                    "count": self._count,
                    "buffer": self._buffer,
                    "completed": True,
                }
            raise

        except anyio.get_cancelled_exc_class():
            # Cancelled
            if self._buffer is not None:
                self._cancellable.context.partial_result = {
                    "count": self._count,
                    "buffer": self._buffer,
                    "completed": False,
                }
            raise

        except Exception:  # Intentionally broad to save partial results on any error
            # Error
            if self._buffer is not None:
                self._cancellable.context.partial_result = {
                    "count": self._count,
                    "buffer": self._buffer,
                    "completed": False,
                }
            raise

    async def aclose(self) -> None:
        """Close the iterator."""
        if hasattr(self._iterator, "aclose"):
            await self._iterator.aclose()  # type: ignore[union-attr]

aclose async

aclose() -> None

Close the iterator.

Source code in src/hother/cancelable/utils/streams.py
async def aclose(self) -> None:
    """Close the iterator."""
    if hasattr(self._iterator, "aclose"):
        await self._iterator.aclose()  # type: ignore[union-attr]

cancelable_stream async

cancelable_stream(
    stream: AsyncIterator[T],
    timeout: float | timedelta | None = None,
    token: Optional[CancelationToken] = None,
    report_interval: int | None = None,
    on_progress: Callable[[int, T], Any] | None = None,
    buffer_partial: bool = False,
    operation_id: str | None = None,
    name: str | None = None,
) -> AsyncIterator[T]

Make any async iterator cancelable with various options.

Parameters:

Name Type Description Default
stream AsyncIterator[T]

Async iterator to wrap

required
timeout float | timedelta | None

Optional timeout for the entire stream

None
token Optional[CancelationToken]

Optional cancelation token

None
report_interval int | None

Report progress every N items

None
on_progress Callable[[int, T], Any] | None

Optional progress callback (item_count, latest_item)

None
buffer_partial bool

Whether to buffer items for partial results

False
operation_id str | None

Optional operation ID

None
name str | None

Optional operation name

None

Yields:

Type Description
AsyncIterator[T]

Items from the wrapped stream

Example

async for item in cancelable_stream( fetch_items(), timeout=30.0, report_interval=100, on_progress=lambda n, item: print(f"Processed {n} items") ): process(item)

Source code in src/hother/cancelable/utils/streams.py
async def cancelable_stream(
    stream: AsyncIterator[T],
    timeout: float | timedelta | None = None,
    token: Optional["CancelationToken"] = None,
    report_interval: int | None = None,
    on_progress: Callable[[int, T], Any] | None = None,
    buffer_partial: bool = False,
    operation_id: str | None = None,
    name: str | None = None,
) -> AsyncIterator[T]:
    """Make any async iterator cancelable with various options.

    Args:
        stream: Async iterator to wrap
        timeout: Optional timeout for the entire stream
        token: Optional cancelation token
        report_interval: Report progress every N items
        on_progress: Optional progress callback (item_count, latest_item)
        buffer_partial: Whether to buffer items for partial results
        operation_id: Optional operation ID
        name: Optional operation name

    Yields:
        Items from the wrapped stream

    Example:
        async for item in cancelable_stream(
            fetch_items(),
            timeout=30.0,
            report_interval=100,
            on_progress=lambda n, item: print(f"Processed {n} items")
        ):
            process(item)
    """
    # Create appropriate cancelable
    if timeout and token:
        cancelable = Cancelable.with_timeout(timeout, operation_id=operation_id, name=name).combine(
            Cancelable.with_token(token)
        )
    elif timeout:
        cancelable = Cancelable.with_timeout(
            timeout,
            operation_id=operation_id,
            name=name or "stream_timeout",
        )
    elif token:
        cancelable = Cancelable.with_token(
            token,
            operation_id=operation_id,
            name=name or "stream_token",
        )
    else:
        cancelable = Cancelable(
            operation_id=operation_id,
            name=name or "stream",
        )

    # Add progress callback if provided
    if on_progress:

        async def report_wrapper(op_id: str, msg: Any, meta: dict[str, Any] | None):
            if meta and "count" in meta and "latest_item" in meta:
                result = on_progress(meta["count"], meta["latest_item"])
                if hasattr(result, "__await__"):
                    await result

        cancelable.on_progress(report_wrapper)

    # Process stream
    async with cancelable:
        async for item in cancelable.stream(
            stream,
            report_interval=report_interval,
            buffer_partial=buffer_partial,
        ):
            yield item

chunked_cancelable_stream async

chunked_cancelable_stream(
    stream: AsyncIterator[T],
    chunk_size: int,
    cancelable: Cancelable,
) -> AsyncIterator[list[T]]

Process stream in chunks with cancelation support.

Parameters:

Name Type Description Default
stream AsyncIterator[T]

Source async iterator

required
chunk_size int

Size of chunks to yield

required
cancelable Cancelable

Cancelable instance

required

Yields:

Type Description
AsyncIterator[list[T]]

Lists of items (chunks)

Example

async for chunk in chunked_cancelable_stream(items, 100, cancel): await process_batch(chunk)

Source code in src/hother/cancelable/utils/streams.py
async def chunked_cancelable_stream(
    stream: AsyncIterator[T],
    chunk_size: int,
    cancelable: Cancelable,
) -> AsyncIterator[list[T]]:
    """Process stream in chunks with cancelation support.

    Args:
        stream: Source async iterator
        chunk_size: Size of chunks to yield
        cancelable: Cancelable instance

    Yields:
        Lists of items (chunks)

    Example:
        async for chunk in chunked_cancelable_stream(items, 100, cancel):
            await process_batch(chunk)
    """
    chunk: list[T] = []

    async for item in cancelable.stream(stream):
        chunk.append(item)

        if len(chunk) >= chunk_size:
            yield chunk
            chunk = []

            # Report progress
            await cancelable.report_progress(f"Processed chunk of {chunk_size} items")

    # Yield remaining items
    if chunk:
        yield chunk
        await cancelable.report_progress(f"Processed final chunk of {len(chunk)} items")

Streaming Simulator

Stream cancellation simulator for testing and demonstration.

hother.cancelable.streaming.simulator.simulator

Core stream simulation functionality.

logger module-attribute

logger = getLogger(__name__)

simulate_stream async

simulate_stream(
    text: str,
    config: StreamConfig | None = None,
    cancelable: Cancelable | None = None,
) -> AsyncGenerator[dict[str, Any]]

Simulate a realistic network stream with variable timing and cancellation support.

This function simulates network streaming behavior including bursts, stalls, jitter, and variable chunk sizes. It's useful for testing cancellable stream processing and demonstrating async cancellation patterns.

Parameters:

Name Type Description Default
text str

The text content to stream

required
config StreamConfig | None

Optional StreamConfig to control simulation behavior. If None, uses default configuration.

None
cancelable Cancelable | None

Optional Cancelable instance for cancellation support. If provided, the stream will check for cancellation and report progress.

None

Yields:

Type Description
AsyncGenerator[dict[str, Any]]

Dictionary chunks with the following types:

AsyncGenerator[dict[str, Any]]
  • {"type": "data", "chunk": str, "chunk_size": int, ...} - Data chunks
AsyncGenerator[dict[str, Any]]
  • {"type": "stall", "duration": float, ...} - Network stalls
AsyncGenerator[dict[str, Any]]
  • {"type": "complete", "total_chunks": int, ...} - Stream completion

Raises:

Type Description
CancelledError

If the associated Cancelable is cancelled during streaming

Example
async with Cancelable.with_timeout(5.0) as cancel:
    config = StreamConfig(base_delay=0.1, stall_probability=0.1)

    async for event in simulate_stream("Hello world", config, cancel):
        if event["type"] == "data":
            print(event["chunk"], end="", flush=True)
Source code in src/hother/cancelable/streaming/simulator/simulator.py
async def simulate_stream(
    text: str, config: StreamConfig | None = None, cancelable: Cancelable | None = None
) -> AsyncGenerator[dict[str, Any]]:
    """Simulate a realistic network stream with variable timing and cancellation support.

    This function simulates network streaming behavior including bursts, stalls,
    jitter, and variable chunk sizes. It's useful for testing cancellable stream
    processing and demonstrating async cancellation patterns.

    Args:
        text: The text content to stream
        config: Optional StreamConfig to control simulation behavior.
            If None, uses default configuration.
        cancelable: Optional Cancelable instance for cancellation support.
            If provided, the stream will check for cancellation and report progress.

    Yields:
        Dictionary chunks with the following types:
        - {"type": "data", "chunk": str, "chunk_size": int, ...} - Data chunks
        - {"type": "stall", "duration": float, ...} - Network stalls
        - {"type": "complete", "total_chunks": int, ...} - Stream completion

    Raises:
        CancelledError: If the associated Cancelable is cancelled during streaming

    Example:
        ```python
        async with Cancelable.with_timeout(5.0) as cancel:
            config = StreamConfig(base_delay=0.1, stall_probability=0.1)

            async for event in simulate_stream("Hello world", config, cancel):
                if event["type"] == "data":
                    print(event["chunk"], end="", flush=True)
        ```
    """
    if config is None:
        config = StreamConfig()

    start_time = time.time()
    chunk_count = 0

    i = 0
    while i < len(text):
        # Check for cancelation
        if cancelable:
            await cancelable.token.check_async()

        if random.random() < config.stall_probability:
            await anyio.sleep(config.stall_duration)

            if cancelable:
                await cancelable.report_progress(
                    f"Network stall: {config.stall_duration:.3f}s", {"type": "stall", "duration": config.stall_duration}
                )

            yield {"type": "stall", "duration": config.stall_duration, "timestamp": time.time() - start_time}

        if random.random() < config.burst_probability:
            for _ in range(config.burst_size):
                if i >= len(text):
                    break

                # Check for cancelation in burst
                if cancelable:
                    await cancelable.token.check_async()

                chunk_size = get_random_chunk_size(config)
                chunk = text[i : i + chunk_size]
                i += len(chunk)
                chunk_count += 1

                yield {
                    "type": "data",
                    "chunk": chunk,
                    "chunk_size": len(chunk),
                    "requested_chunk_size": chunk_size,
                    "position": i,
                    "total_length": len(text),
                    "timestamp": time.time() - start_time,
                    "burst": True,
                    "chunk_number": chunk_count,
                }

                await anyio.sleep(0.001)
        else:
            chunk_size = get_random_chunk_size(config)
            chunk = text[i : i + chunk_size]
            i += len(chunk)
            chunk_count += 1

            delay = config.base_delay
            if random.random() < config.jitter_probability:
                delay += random.uniform(-config.jitter, config.jitter)
            delay = max(0, delay)

            await anyio.sleep(delay)

            yield {
                "type": "data",
                "chunk": chunk,
                "chunk_size": len(chunk),
                "requested_chunk_size": chunk_size,
                "position": i,
                "total_length": len(text),
                "timestamp": time.time() - start_time,
                "burst": False,
                "chunk_number": chunk_count,
            }

            # Report progress periodically
            if cancelable and chunk_count % 10 == 0:
                progress = (i / len(text)) * 100
                await cancelable.report_progress(
                    f"Stream progress: {progress:.1f}%",
                    {
                        "chunks_sent": chunk_count,
                        "bytes_sent": i,
                        "total_bytes": len(text),
                        "progress_percent": progress,
                    },
                )

    yield {"type": "complete", "timestamp": time.time() - start_time, "total_chunks": chunk_count}

Logging

Structured logging utilities for cancellation events.

hother.cancelable.utils.logging

Logging utilities for the cancelable library.

Following Python library best practices, this module provides logger access but does not configure logging. Applications using cancelable should configure their own logging as needed.

get_logger

get_logger(name: str | None = None) -> Logger

Get a standard library logger instance.

Parameters:

Name Type Description Default
name str | None

Logger name. If None, uses the calling module's name

None

Returns:

Type Description
Logger

A configured standard library logger

Note

This function does not configure logging handlers or formatters. Applications should configure logging using logging.basicConfig() or their preferred logging configuration method.

Example

In your application code:

import logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

from hother.cancelable.utils.logging import get_logger
logger = get_logger(__name__)
logger.info("Application started")

Source code in src/hother/cancelable/utils/logging.py
def get_logger(name: str | None = None) -> logging.Logger:
    """Get a standard library logger instance.

    Args:
        name: Logger name. If None, uses the calling module's name

    Returns:
        A configured standard library logger

    Note:
        This function does not configure logging handlers or formatters.
        Applications should configure logging using logging.basicConfig()
        or their preferred logging configuration method.

    Example:
        In your application code:
        ```python
        import logging
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )

        from hother.cancelable.utils.logging import get_logger
        logger = get_logger(__name__)
        logger.info("Application started")
        ```
    """
    if name is None:
        import inspect

        frame = inspect.currentframe()
        name = frame.f_back.f_globals.get("__name__", "cancelable") if frame and frame.f_back else "cancelable"

    return logging.getLogger(name)

Testing

Test utilities and fixtures for cancelable operations.

hother.cancelable.utils.testing

Testing utilities for async cancelation.

logger module-attribute

logger = get_logger(__name__)

T module-attribute

T = TypeVar('T')

MockCancelationToken

Bases: CancelationToken

Mock cancelation token for testing.

Provides additional testing capabilities like scheduled cancelation.

Source code in src/hother/cancelable/utils/testing.py
class MockCancelationToken(CancelationToken):
    """Mock cancelation token for testing.

    Provides additional testing capabilities like scheduled cancelation.
    """

    # Additional fields for testing
    cancel_history: list[dict[str, Any]] = []
    _scheduled_cancelation: Any = PrivateAttr(default=None)

    async def cancel(
        self,
        reason: CancelationReason = CancelationReason.MANUAL,
        message: str | None = None,
    ) -> bool:
        """Cancel and record in history."""
        self.cancel_history.append(
            {
                "time": datetime.now(UTC),
                "reason": reason,
                "message": message,
            }
        )
        return await super().cancel(reason, message)

    async def schedule_cancel(
        self,
        delay: float,
        reason: CancelationReason = CancelationReason.MANUAL,
        message: str | None = None,
    ) -> None:
        """Schedule cancelation after a delay.

        Args:
            delay: Delay in seconds before cancelation
            reason: Cancelation reason
            message: Cancelation message
        """

        async def delayed_cancel():
            await anyio.sleep(delay)
            await self.cancel(reason, message)

        self._scheduled_cancelation = anyio.create_task_group()
        await self._scheduled_cancelation.__aenter__()
        self._scheduled_cancelation.start_soon(delayed_cancel)

    def get_cancel_count(self) -> int:
        """Get number of times cancel was called."""
        return len(self.cancel_history)

cancel_history class-attribute instance-attribute

cancel_history: list[dict[str, Any]] = []

cancel async

cancel(
    reason: CancelationReason = MANUAL,
    message: str | None = None,
) -> bool

Cancel and record in history.

Source code in src/hother/cancelable/utils/testing.py
async def cancel(
    self,
    reason: CancelationReason = CancelationReason.MANUAL,
    message: str | None = None,
) -> bool:
    """Cancel and record in history."""
    self.cancel_history.append(
        {
            "time": datetime.now(UTC),
            "reason": reason,
            "message": message,
        }
    )
    return await super().cancel(reason, message)

schedule_cancel async

schedule_cancel(
    delay: float,
    reason: CancelationReason = MANUAL,
    message: str | None = None,
) -> None

Schedule cancelation after a delay.

Parameters:

Name Type Description Default
delay float

Delay in seconds before cancelation

required
reason CancelationReason

Cancelation reason

MANUAL
message str | None

Cancelation message

None
Source code in src/hother/cancelable/utils/testing.py
async def schedule_cancel(
    self,
    delay: float,
    reason: CancelationReason = CancelationReason.MANUAL,
    message: str | None = None,
) -> None:
    """Schedule cancelation after a delay.

    Args:
        delay: Delay in seconds before cancelation
        reason: Cancelation reason
        message: Cancelation message
    """

    async def delayed_cancel():
        await anyio.sleep(delay)
        await self.cancel(reason, message)

    self._scheduled_cancelation = anyio.create_task_group()
    await self._scheduled_cancelation.__aenter__()
    self._scheduled_cancelation.start_soon(delayed_cancel)

get_cancel_count

get_cancel_count() -> int

Get number of times cancel was called.

Source code in src/hother/cancelable/utils/testing.py
def get_cancel_count(self) -> int:
    """Get number of times cancel was called."""
    return len(self.cancel_history)

OperationRecorder

Records operation events for testing assertions.

Source code in src/hother/cancelable/utils/testing.py
class OperationRecorder:
    """Records operation events for testing assertions."""

    def __init__(self):
        self.events: list[dict[str, Any]] = []
        self.operations: dict[str, OperationContext] = {}
        self._lock = anyio.Lock()

    async def record_event(
        self,
        operation_id: str,
        event_type: str,
        data: dict[str, Any] | None = None,
    ) -> None:
        """Record an operation event."""
        async with self._lock:
            self.events.append(
                {
                    "time": datetime.now(UTC),
                    "operation_id": operation_id,
                    "event_type": event_type,
                    "data": data or {},
                }
            )

    def attach_to_cancellable(self, cancelable: Cancelable) -> Cancelable:
        """Attach recorder to a cancelable to track its events.

        Args:
            cancelable: Cancelable to track

        Returns:
            The cancelable (for chaining)
        """
        op_id = cancelable.context.id
        self.operations[op_id] = cancelable.context

        # Record all events
        async def record_progress(op_id: str, msg: str, meta: dict[str, Any] | None):
            await self.record_event(op_id, "progress", {"message": msg, "meta": meta})

        async def record_status(ctx: OperationContext):
            await self.record_event(ctx.id, f"status_{ctx.status.value}", ctx.log_context())

        async def record_error(ctx: OperationContext, error: Exception):
            await self.record_event(ctx.id, "error", {"error_type": type(error).__name__, "error_message": str(error)})

        return (
            cancelable.on_progress(record_progress)
            .on_start(record_status)
            .on_complete(record_status)
            .on_cancel(record_status)
            .on_error(record_error)
        )

    def get_events_for_operation(self, operation_id: str) -> list[dict[str, Any]]:
        """Get all events for a specific operation."""
        return [e for e in self.events if e["operation_id"] == operation_id]

    def get_events_by_type(self, event_type: str) -> list[dict[str, Any]]:
        """Get all events of a specific type."""
        return [e for e in self.events if e["event_type"] == event_type]

    def assert_event_occurred(
        self,
        operation_id: str,
        event_type: str,
        timeout: float = 1.0,
    ) -> dict[str, Any]:
        """Assert that an event occurred (synchronous check).

        Args:
            operation_id: Operation ID to check
            event_type: Event type to look for
            timeout: Not used in sync version

        Returns:
            The event data

        Raises:
            AssertionError: If event not found
        """
        events = [e for e in self.events if e["operation_id"] == operation_id and e["event_type"] == event_type]

        if not events:
            raise AssertionError(f"Event '{event_type}' not found for operation {operation_id}")

        return events[-1]  # Return most recent

    def assert_final_status(
        self,
        operation_id: str,
        expected_status: OperationStatus,
    ) -> None:
        """Assert the final status of an operation.

        Args:
            operation_id: Operation ID to check
            expected_status: Expected final status

        Raises:
            AssertionError: If status doesn't match
        """
        if operation_id not in self.operations:
            raise AssertionError(f"Operation {operation_id} not found")

        actual_status = self.operations[operation_id].status
        if actual_status != expected_status:
            raise AssertionError(f"Expected status {expected_status.value}, got {actual_status.value}")

events instance-attribute

events: list[dict[str, Any]] = []

operations instance-attribute

operations: dict[str, OperationContext] = {}

record_event async

record_event(
    operation_id: str,
    event_type: str,
    data: dict[str, Any] | None = None,
) -> None

Record an operation event.

Source code in src/hother/cancelable/utils/testing.py
async def record_event(
    self,
    operation_id: str,
    event_type: str,
    data: dict[str, Any] | None = None,
) -> None:
    """Record an operation event."""
    async with self._lock:
        self.events.append(
            {
                "time": datetime.now(UTC),
                "operation_id": operation_id,
                "event_type": event_type,
                "data": data or {},
            }
        )

attach_to_cancellable

attach_to_cancellable(cancelable: Cancelable) -> Cancelable

Attach recorder to a cancelable to track its events.

Parameters:

Name Type Description Default
cancelable Cancelable

Cancelable to track

required

Returns:

Type Description
Cancelable

The cancelable (for chaining)

Source code in src/hother/cancelable/utils/testing.py
def attach_to_cancellable(self, cancelable: Cancelable) -> Cancelable:
    """Attach recorder to a cancelable to track its events.

    Args:
        cancelable: Cancelable to track

    Returns:
        The cancelable (for chaining)
    """
    op_id = cancelable.context.id
    self.operations[op_id] = cancelable.context

    # Record all events
    async def record_progress(op_id: str, msg: str, meta: dict[str, Any] | None):
        await self.record_event(op_id, "progress", {"message": msg, "meta": meta})

    async def record_status(ctx: OperationContext):
        await self.record_event(ctx.id, f"status_{ctx.status.value}", ctx.log_context())

    async def record_error(ctx: OperationContext, error: Exception):
        await self.record_event(ctx.id, "error", {"error_type": type(error).__name__, "error_message": str(error)})

    return (
        cancelable.on_progress(record_progress)
        .on_start(record_status)
        .on_complete(record_status)
        .on_cancel(record_status)
        .on_error(record_error)
    )

get_events_for_operation

get_events_for_operation(
    operation_id: str,
) -> list[dict[str, Any]]

Get all events for a specific operation.

Source code in src/hother/cancelable/utils/testing.py
def get_events_for_operation(self, operation_id: str) -> list[dict[str, Any]]:
    """Get all events for a specific operation."""
    return [e for e in self.events if e["operation_id"] == operation_id]

get_events_by_type

get_events_by_type(event_type: str) -> list[dict[str, Any]]

Get all events of a specific type.

Source code in src/hother/cancelable/utils/testing.py
def get_events_by_type(self, event_type: str) -> list[dict[str, Any]]:
    """Get all events of a specific type."""
    return [e for e in self.events if e["event_type"] == event_type]

assert_event_occurred

assert_event_occurred(
    operation_id: str, event_type: str, timeout: float = 1.0
) -> dict[str, Any]

Assert that an event occurred (synchronous check).

Parameters:

Name Type Description Default
operation_id str

Operation ID to check

required
event_type str

Event type to look for

required
timeout float

Not used in sync version

1.0

Returns:

Type Description
dict[str, Any]

The event data

Raises:

Type Description
AssertionError

If event not found

Source code in src/hother/cancelable/utils/testing.py
def assert_event_occurred(
    self,
    operation_id: str,
    event_type: str,
    timeout: float = 1.0,
) -> dict[str, Any]:
    """Assert that an event occurred (synchronous check).

    Args:
        operation_id: Operation ID to check
        event_type: Event type to look for
        timeout: Not used in sync version

    Returns:
        The event data

    Raises:
        AssertionError: If event not found
    """
    events = [e for e in self.events if e["operation_id"] == operation_id and e["event_type"] == event_type]

    if not events:
        raise AssertionError(f"Event '{event_type}' not found for operation {operation_id}")

    return events[-1]  # Return most recent

assert_final_status

assert_final_status(
    operation_id: str, expected_status: OperationStatus
) -> None

Assert the final status of an operation.

Parameters:

Name Type Description Default
operation_id str

Operation ID to check

required
expected_status OperationStatus

Expected final status

required

Raises:

Type Description
AssertionError

If status doesn't match

Source code in src/hother/cancelable/utils/testing.py
def assert_final_status(
    self,
    operation_id: str,
    expected_status: OperationStatus,
) -> None:
    """Assert the final status of an operation.

    Args:
        operation_id: Operation ID to check
        expected_status: Expected final status

    Raises:
        AssertionError: If status doesn't match
    """
    if operation_id not in self.operations:
        raise AssertionError(f"Operation {operation_id} not found")

    actual_status = self.operations[operation_id].status
    if actual_status != expected_status:
        raise AssertionError(f"Expected status {expected_status.value}, got {actual_status.value}")

CancelationScenario

Test scenario builder for cancelation testing.

Source code in src/hother/cancelable/utils/testing.py
class CancelationScenario:
    """Test scenario builder for cancelation testing."""

    def __init__(self, name: str):
        self.name = name
        self.steps: list[dict[str, Any]] = []
        self.assertions: list[dict[str, Any]] = []

    def add_delay(self, duration: float) -> "CancelationScenario":
        """Add a delay step."""
        self.steps.append({"type": "delay", "duration": duration})
        return self

    def add_cancelation(
        self,
        reason: CancelationReason = CancelationReason.MANUAL,
        message: str | None = None,
    ) -> "CancelationScenario":
        """Add a cancelation step."""
        self.steps.append(
            {
                "type": "cancel",
                "reason": reason,
                "message": message,
            }
        )
        return self

    def add_progress_check(
        self,
        expected_message: str,
        timeout: float = 1.0,
    ) -> "CancelationScenario":
        """Add assertion for progress message."""
        self.assertions.append(
            {
                "type": "progress",
                "message": expected_message,
                "timeout": timeout,
            }
        )
        return self

    def add_status_check(
        self,
        expected_status: OperationStatus,
    ) -> "CancelationScenario":
        """Add assertion for operation status."""
        self.assertions.append(
            {
                "type": "status",
                "status": expected_status,
            }
        )
        return self

    async def run(
        self,
        operation: Callable[..., Any],
        *args: Any,
        **kwargs: Any,
    ) -> OperationRecorder:
        """Run the scenario.

        Args:
            operation: Async callable to test
            *args: Positional arguments for operation
            **kwargs: Keyword arguments for operation

        Returns:
            Operation recorder with results
        """
        recorder = OperationRecorder()
        token = MockCancelationToken()

        # Create cancelable
        cancelable = Cancelable.with_token(token, name=f"scenario_{self.name}")
        recorder.attach_to_cancellable(cancelable)

        # Schedule steps
        async def run_steps():
            for step in self.steps:
                if step["type"] == "delay":
                    await anyio.sleep(step["duration"])
                elif step["type"] == "cancel":
                    await token.cancel(step["reason"], step["message"])

        # Run operation and steps concurrently
        async with anyio.create_task_group() as tg:
            tg.start_soon(run_steps)

            # Run operation with cancelable
            async with cancelable:
                try:
                    await operation(*args, **kwargs)
                except anyio.get_cancelled_exc_class():
                    pass  # Expected

        # Run assertions
        for assertion in self.assertions:
            if assertion["type"] == "progress":
                events = recorder.get_events_by_type("progress")
                messages = [e["data"]["message"] for e in events]
                if assertion["message"] not in messages:
                    raise AssertionError(f"Expected progress message '{assertion['message']}' not found")
            elif assertion["type"] == "status":
                recorder.assert_final_status(cancelable.context.id, assertion["status"])

        return recorder

name instance-attribute

name = name

steps instance-attribute

steps: list[dict[str, Any]] = []

assertions instance-attribute

assertions: list[dict[str, Any]] = []

add_delay

add_delay(duration: float) -> CancelationScenario

Add a delay step.

Source code in src/hother/cancelable/utils/testing.py
def add_delay(self, duration: float) -> "CancelationScenario":
    """Add a delay step."""
    self.steps.append({"type": "delay", "duration": duration})
    return self

add_cancelation

add_cancelation(
    reason: CancelationReason = MANUAL,
    message: str | None = None,
) -> CancelationScenario

Add a cancelation step.

Source code in src/hother/cancelable/utils/testing.py
def add_cancelation(
    self,
    reason: CancelationReason = CancelationReason.MANUAL,
    message: str | None = None,
) -> "CancelationScenario":
    """Add a cancelation step."""
    self.steps.append(
        {
            "type": "cancel",
            "reason": reason,
            "message": message,
        }
    )
    return self

add_progress_check

add_progress_check(
    expected_message: str, timeout: float = 1.0
) -> CancelationScenario

Add assertion for progress message.

Source code in src/hother/cancelable/utils/testing.py
def add_progress_check(
    self,
    expected_message: str,
    timeout: float = 1.0,
) -> "CancelationScenario":
    """Add assertion for progress message."""
    self.assertions.append(
        {
            "type": "progress",
            "message": expected_message,
            "timeout": timeout,
        }
    )
    return self

add_status_check

add_status_check(
    expected_status: OperationStatus,
) -> CancelationScenario

Add assertion for operation status.

Source code in src/hother/cancelable/utils/testing.py
def add_status_check(
    self,
    expected_status: OperationStatus,
) -> "CancelationScenario":
    """Add assertion for operation status."""
    self.assertions.append(
        {
            "type": "status",
            "status": expected_status,
        }
    )
    return self

run async

run(
    operation: Callable[..., Any], *args: Any, **kwargs: Any
) -> OperationRecorder

Run the scenario.

Parameters:

Name Type Description Default
operation Callable[..., Any]

Async callable to test

required
*args Any

Positional arguments for operation

()
**kwargs Any

Keyword arguments for operation

{}

Returns:

Type Description
OperationRecorder

Operation recorder with results

Source code in src/hother/cancelable/utils/testing.py
async def run(
    self,
    operation: Callable[..., Any],
    *args: Any,
    **kwargs: Any,
) -> OperationRecorder:
    """Run the scenario.

    Args:
        operation: Async callable to test
        *args: Positional arguments for operation
        **kwargs: Keyword arguments for operation

    Returns:
        Operation recorder with results
    """
    recorder = OperationRecorder()
    token = MockCancelationToken()

    # Create cancelable
    cancelable = Cancelable.with_token(token, name=f"scenario_{self.name}")
    recorder.attach_to_cancellable(cancelable)

    # Schedule steps
    async def run_steps():
        for step in self.steps:
            if step["type"] == "delay":
                await anyio.sleep(step["duration"])
            elif step["type"] == "cancel":
                await token.cancel(step["reason"], step["message"])

    # Run operation and steps concurrently
    async with anyio.create_task_group() as tg:
        tg.start_soon(run_steps)

        # Run operation with cancelable
        async with cancelable:
            try:
                await operation(*args, **kwargs)
            except anyio.get_cancelled_exc_class():
                pass  # Expected

    # Run assertions
    for assertion in self.assertions:
        if assertion["type"] == "progress":
            events = recorder.get_events_by_type("progress")
            messages = [e["data"]["message"] for e in events]
            if assertion["message"] not in messages:
                raise AssertionError(f"Expected progress message '{assertion['message']}' not found")
        elif assertion["type"] == "status":
            recorder.assert_final_status(cancelable.context.id, assertion["status"])

    return recorder

create_slow_stream async

create_slow_stream(
    items: list[T],
    delay: float = 0.1,
    cancelable: Cancelable | None = None,
) -> AsyncIterator[T]

Create a slow async stream for testing cancelation.

Parameters:

Name Type Description Default
items list[T]

Items to yield

required
delay float

Delay between items (seconds)

0.1
cancelable Cancelable | None

Optional cancelable to check

None

Yields:

Type Description
AsyncIterator[T]

Items with delays

Source code in src/hother/cancelable/utils/testing.py
async def create_slow_stream(
    items: list[T],
    delay: float = 0.1,
    cancelable: Cancelable | None = None,
) -> AsyncIterator[T]:
    """Create a slow async stream for testing cancelation.

    Args:
        items: Items to yield
        delay: Delay between items (seconds)
        cancelable: Optional cancelable to check

    Yields:
        Items with delays
    """
    for i, item in enumerate(items):
        if i > 0:  # No delay before first item
            await anyio.sleep(delay)

        if cancelable:
            await cancelable.token.check_async()

        yield item

run_with_timeout_test async

run_with_timeout_test(
    coro: Any,
    expected_timeout: float,
    tolerance: float = 0.1,
) -> None

Test that a coroutine times out within expected duration.

Parameters:

Name Type Description Default
coro Any

Coroutine to run

required
expected_timeout float

Expected timeout duration

required
tolerance float

Acceptable deviation from expected timeout

0.1

Raises:

Type Description
AssertionError

If timeout doesn't occur or timing is wrong

Source code in src/hother/cancelable/utils/testing.py
async def run_with_timeout_test(
    coro: Any,
    expected_timeout: float,
    tolerance: float = 0.1,
) -> None:
    """Test that a coroutine times out within expected duration.

    Args:
        coro: Coroutine to run
        expected_timeout: Expected timeout duration
        tolerance: Acceptable deviation from expected timeout

    Raises:
        AssertionError: If timeout doesn't occur or timing is wrong
    """
    start_time = anyio.current_time()

    try:
        await coro
        raise AssertionError("Expected timeout but operation completed")
    except (anyio.get_cancelled_exc_class(), TimeoutError):
        # Expected cancelation
        duration = anyio.current_time() - start_time

        if abs(duration - expected_timeout) > tolerance:
            raise AssertionError(f"Timeout occurred after {duration:.2f}s, expected {expected_timeout:.2f}s ± {tolerance:.2f}s")

assert_cancelation_within async

assert_cancelation_within(
    min_time: float, max_time: float
) -> AsyncIterator[MockCancelationToken]

Context manager that asserts cancelation occurs within a time range.

Parameters:

Name Type Description Default
min_time float

Minimum time before cancelation

required
max_time float

Maximum time before cancelation

required

Yields:

Type Description
AsyncIterator[MockCancelationToken]

Mock cancelation token

Raises:

Type Description
AssertionError

If cancelation timing is wrong

Source code in src/hother/cancelable/utils/testing.py
@asynccontextmanager
async def assert_cancelation_within(
    min_time: float,
    max_time: float,
) -> AsyncIterator[MockCancelationToken]:
    """Context manager that asserts cancelation occurs within a time range.

    Args:
        min_time: Minimum time before cancelation
        max_time: Maximum time before cancelation

    Yields:
        Mock cancelation token

    Raises:
        AssertionError: If cancelation timing is wrong
    """
    token = MockCancelationToken()
    start_time = anyio.current_time()

    try:
        yield token
    finally:
        if token.is_cancelled:
            duration = anyio.current_time() - start_time
            if duration < min_time:
                raise AssertionError(f"Cancelation occurred too early: {duration:.2f}s < {min_time:.2f}s")
            if duration > max_time:
                raise AssertionError(f"Cancelation occurred too late: {duration:.2f}s > {max_time:.2f}s")
        else:
            raise AssertionError("Expected cancelation but none occurred")

sample_async_operation async

sample_async_operation(
    duration: float = 1.0,
    cancelable: Cancelable | None = None,
) -> str

Sample async operation for testing.

Source code in src/hother/cancelable/utils/testing.py
async def sample_async_operation(
    duration: float = 1.0,
    cancelable: Cancelable | None = None,
) -> str:
    """Sample async operation for testing."""
    if cancelable:
        await cancelable.report_progress("Operation started")

    await anyio.sleep(duration / 2)

    if cancelable:
        await cancelable.report_progress("Operation 50% complete")

    await anyio.sleep(duration / 2)

    if cancelable:
        await cancelable.report_progress("Operation completed")

    return "success"