Skip to content

Cancelation Sources

Cancelation sources provide different ways to trigger cancelation of async operations.

Base Source

CancelationSource

Abstract base class for all cancelation sources.

hother.cancelable.sources.base.CancelationSource

Bases: ABC

Abstract base class for cancelation sources.

A cancelation source monitors for a specific condition and triggers cancelation when that condition is met.

Source code in src/hother/cancelable/sources/base.py
class CancelationSource(ABC):
    """Abstract base class for cancelation sources.

    A cancelation source monitors for a specific condition and triggers
    cancelation when that condition is met.
    """

    def __init__(self, reason: CancelationReason, name: str | None = None):
        """Initialize cancelation source.

        Args:
            reason: The cancelation reason this source will use
            name: Optional name for the source
        """
        self.reason = reason
        self.name = name or self.__class__.__name__
        self.scope: anyio.CancelScope | None = None
        self._cancel_callback: Callable[[CancelationReason, str], None | Awaitable[None]] | None = None
        self._monitoring_task: anyio.CancelScope | None = None
        self.triggered: bool = False

    @abstractmethod
    async def start_monitoring(self, scope: anyio.CancelScope) -> None:
        """Start monitoring for cancelation condition.

        Args:
            scope: The cancel scope to trigger when condition is met
        """
        self.scope = scope

    @abstractmethod
    async def stop_monitoring(self) -> None:
        """Stop monitoring and clean up resources."""
        if self._monitoring_task:
            self._monitoring_task.cancel()
            self._monitoring_task = None

    def set_cancel_callback(self, callback: Callable[[CancelationReason, str], None | Awaitable[None]]) -> None:
        """Set callback to be called when cancelation is triggered.

        Args:
            callback: Callback function that accepts reason and message (can be sync or async)
        """
        self._cancel_callback = callback

    async def trigger_cancelation(self, message: str | None = None) -> None:
        """Trigger cancelation with the configured reason.

        Args:
            message: Optional cancelation message
        """
        if self.scope and not self.scope.cancel_called:
            logger.info(
                "Cancelation triggered",
                extra={
                    "source": self.name,
                    "reason": self.reason.value,
                    "cancel_message": message,
                },
            )

            # Call callback if set
            if self._cancel_callback:
                try:
                    result = self._cancel_callback(self.reason, message or "")
                    # If result is an Awaitable, await it
                    if result is not None:
                        await result
                except Exception as e:
                    logger.error(
                        "Error in cancelation callback",
                        extra={
                            "source": self.name,
                            "error": str(e),
                        },
                        exc_info=True,
                    )

            # Cancel the scope
            self.scope.cancel()

    def __str__(self) -> str:
        """String representation."""
        return f"{self.name}(reason={self.reason.value})"

reason instance-attribute

reason = reason

name instance-attribute

name = name or __name__

scope instance-attribute

scope: CancelScope | None = None

triggered instance-attribute

triggered: bool = False

start_monitoring abstractmethod async

start_monitoring(scope: CancelScope) -> None

Start monitoring for cancelation condition.

Parameters:

Name Type Description Default
scope CancelScope

The cancel scope to trigger when condition is met

required
Source code in src/hother/cancelable/sources/base.py
@abstractmethod
async def start_monitoring(self, scope: anyio.CancelScope) -> None:
    """Start monitoring for cancelation condition.

    Args:
        scope: The cancel scope to trigger when condition is met
    """
    self.scope = scope

stop_monitoring abstractmethod async

stop_monitoring() -> None

Stop monitoring and clean up resources.

Source code in src/hother/cancelable/sources/base.py
@abstractmethod
async def stop_monitoring(self) -> None:
    """Stop monitoring and clean up resources."""
    if self._monitoring_task:
        self._monitoring_task.cancel()
        self._monitoring_task = None

set_cancel_callback

set_cancel_callback(
    callback: Callable[
        [CancelationReason, str], None | Awaitable[None]
    ],
) -> None

Set callback to be called when cancelation is triggered.

Parameters:

Name Type Description Default
callback Callable[[CancelationReason, str], None | Awaitable[None]]

Callback function that accepts reason and message (can be sync or async)

required
Source code in src/hother/cancelable/sources/base.py
def set_cancel_callback(self, callback: Callable[[CancelationReason, str], None | Awaitable[None]]) -> None:
    """Set callback to be called when cancelation is triggered.

    Args:
        callback: Callback function that accepts reason and message (can be sync or async)
    """
    self._cancel_callback = callback

trigger_cancelation async

trigger_cancelation(message: str | None = None) -> None

Trigger cancelation with the configured reason.

Parameters:

Name Type Description Default
message str | None

Optional cancelation message

None
Source code in src/hother/cancelable/sources/base.py
async def trigger_cancelation(self, message: str | None = None) -> None:
    """Trigger cancelation with the configured reason.

    Args:
        message: Optional cancelation message
    """
    if self.scope and not self.scope.cancel_called:
        logger.info(
            "Cancelation triggered",
            extra={
                "source": self.name,
                "reason": self.reason.value,
                "cancel_message": message,
            },
        )

        # Call callback if set
        if self._cancel_callback:
            try:
                result = self._cancel_callback(self.reason, message or "")
                # If result is an Awaitable, await it
                if result is not None:
                    await result
            except Exception as e:
                logger.error(
                    "Error in cancelation callback",
                    extra={
                        "source": self.name,
                        "error": str(e),
                    },
                    exc_info=True,
                )

        # Cancel the scope
        self.scope.cancel()

Built-in Sources

TimeoutSource

Cancels operations after a specified time period.

hother.cancelable.sources.timeout.TimeoutSource

Bases: CancelationSource

Cancelation source that triggers after a specified timeout.

Source code in src/hother/cancelable/sources/timeout.py
class TimeoutSource(CancelationSource):
    """Cancelation source that triggers after a specified timeout."""

    def __init__(self, timeout: float | timedelta, name: str | None = None):
        """Initialize timeout source.

        Args:
            timeout: Timeout duration in seconds or as timedelta
            name: Optional name for the source
        """
        super().__init__(CancelationReason.TIMEOUT, name)

        if isinstance(timeout, timedelta):
            timeout = timeout.total_seconds()

        if timeout <= 0:
            raise ValueError(f"Timeout must be positive, got {timeout}")

        self.timeout = timeout
        self.triggered = False
        self._deadline_time: float | None = None

    async def start_monitoring(self, scope: anyio.CancelScope) -> None:
        """Set scope deadline for timeout.

        Args:
            scope: Cancel scope to configure
        """
        self.scope = scope
        self._deadline_time = anyio.current_time() + self.timeout
        scope.deadline = self._deadline_time

        logger.debug(
            "Timeout source activated",
            extra={
                "source": self.name,
                "timeout_seconds": self.timeout,
                "deadline": scope.deadline,
            },
        )

    async def stop_monitoring(self) -> None:
        """Stop timeout monitoring."""
        # Check if timeout occurred by comparing current time with deadline
        if self._deadline_time and anyio.current_time() >= self._deadline_time:
            self.triggered = True

        logger.debug(
            "Timeout source stopped",
            extra={
                "source": self.name,
                "triggered": self.triggered,
            },
        )

timeout instance-attribute

timeout = timeout

triggered instance-attribute

triggered = False

start_monitoring async

start_monitoring(scope: CancelScope) -> None

Set scope deadline for timeout.

Parameters:

Name Type Description Default
scope CancelScope

Cancel scope to configure

required
Source code in src/hother/cancelable/sources/timeout.py
async def start_monitoring(self, scope: anyio.CancelScope) -> None:
    """Set scope deadline for timeout.

    Args:
        scope: Cancel scope to configure
    """
    self.scope = scope
    self._deadline_time = anyio.current_time() + self.timeout
    scope.deadline = self._deadline_time

    logger.debug(
        "Timeout source activated",
        extra={
            "source": self.name,
            "timeout_seconds": self.timeout,
            "deadline": scope.deadline,
        },
    )

stop_monitoring async

stop_monitoring() -> None

Stop timeout monitoring.

Source code in src/hother/cancelable/sources/timeout.py
async def stop_monitoring(self) -> None:
    """Stop timeout monitoring."""
    # Check if timeout occurred by comparing current time with deadline
    if self._deadline_time and anyio.current_time() >= self._deadline_time:
        self.triggered = True

    logger.debug(
        "Timeout source stopped",
        extra={
            "source": self.name,
            "triggered": self.triggered,
        },
    )

SignalSource

Cancels operations when Unix signals are received (e.g., SIGTERM, SIGINT).

hother.cancelable.sources.signal.SignalSource

Bases: CancelationSource

Cancelation source that monitors OS signals.

Uses anyio's native signal handling for clean integration. Supports graceful shutdown via SIGINT, SIGTERM, etc.

Note: Signal handlers can only be installed in the main thread.

Source code in src/hother/cancelable/sources/signal.py
class SignalSource(CancelationSource):
    """Cancelation source that monitors OS signals.

    Uses anyio's native signal handling for clean integration.
    Supports graceful shutdown via SIGINT, SIGTERM, etc.

    Note: Signal handlers can only be installed in the main thread.
    """

    def __init__(self, *signals: int, name: str | None = None) -> None:
        """Initialize signal source.

        Args:
            *signals: Signal numbers to monitor (e.g., signal.SIGINT)
            name: Optional name for the source
        """
        super().__init__(CancelationReason.SIGNAL, name)

        # Validate signals
        for sig in signals:
            if not isinstance(sig, int):
                raise TypeError(f"Signal must be an integer, got {type(sig)}")

        if not signals:
            # Default to SIGINT and SIGTERM
            self.signals = (signal.SIGINT, signal.SIGTERM)
        else:
            self.signals = tuple(signals)

        self.triggered = False
        self._signal_received: int | None = None
        self._task_group: anyio.abc.TaskGroup | None = None

    async def start_monitoring(self, scope: anyio.CancelScope) -> None:
        """Start monitoring for signals.

        Args:
            scope: Cancel scope to trigger when signal is received
        """
        self.scope = scope

        # Create task group for background monitoring
        self._task_group = anyio.create_task_group()
        await self._task_group.__aenter__()

        # Start signal monitoring task
        self._task_group.start_soon(self._monitor_signals)

        logger.debug(
            "Signal source activated",
            extra={
                "source": self.name,
                "signals": [signal.Signals(s).name for s in self.signals if s in signal.Signals._value2member_map_],
            },
        )

    async def stop_monitoring(self) -> None:
        """Stop monitoring signals and clean up resources."""
        if self._task_group:
            # Cancel the task group to stop the monitoring task
            try:
                self._task_group.cancel_scope.cancel()
                await self._task_group.__aexit__(None, None, None)
            except BaseException as e:
                # Suppress CancelledError and other exceptions during cleanup
                logger.debug(f"Task group exit: {type(e).__name__}: {e}")
            finally:
                self._task_group = None

        logger.debug(
            "Signal source stopped",
            extra={
                "source": self.name,
                "triggered": self.triggered,
                "signal_received": self._signal_received,
            },
        )

    async def _monitor_signals(self) -> None:
        """Monitor for signals using anyio's native signal handling.

        This runs in a background task and waits for any of the configured signals.
        When a signal is received, it triggers cancelation and exits.
        """
        try:
            # Open signal receiver (sync context manager)
            with anyio.open_signal_receiver(*self.signals) as signals:  # type: ignore[arg-type]
                logger.debug(
                    "Signal source monitoring started",
                    extra={
                        "source": self.name,
                        "signals": [signal.Signals(s).name for s in self.signals if s in signal.Signals._value2member_map_],
                    },
                )

                # Wait for signals
                # Signal reception happens via anyio's native receiver
                # Tested through integration examples (examples/02_advanced/08_signal_handling.py)
                async for signum in signals:  # pragma: no cover
                    if not self.triggered:
                        self.triggered = True
                        self._signal_received = signum

                        # Get signal name
                        signal_name = "UNKNOWN"
                        if signum in signal.Signals._value2member_map_:
                            signal_name = signal.Signals(signum).name

                        message = f"Received signal {signal_name} ({signum})"

                        logger.info(
                            "Signal received, triggering cancelation",
                            extra={
                                "source": self.name,
                                "signal": signal_name,
                                "signum": signum,
                            },
                        )

                        # Trigger cancelation
                        await self.trigger_cancelation(message)
                        break

        # Exception handling for unexpected errors during signal monitoring
        # Defensive code - difficult to trigger without breaking anyio internals
        except Exception as e:  # pragma: no cover
            logger.error(
                "Signal monitoring error",
                extra={
                    "source": self.name,
                    "error": str(e),
                },
                exc_info=True,
            )
            raise
        finally:
            logger.debug(
                "Signal source monitoring stopped",
                extra={
                    "source": self.name,
                    "triggered": self.triggered,
                    "signal_received": self._signal_received,
                },
            )

signals instance-attribute

signals = (SIGINT, SIGTERM)

triggered instance-attribute

triggered = False

start_monitoring async

start_monitoring(scope: CancelScope) -> None

Start monitoring for signals.

Parameters:

Name Type Description Default
scope CancelScope

Cancel scope to trigger when signal is received

required
Source code in src/hother/cancelable/sources/signal.py
async def start_monitoring(self, scope: anyio.CancelScope) -> None:
    """Start monitoring for signals.

    Args:
        scope: Cancel scope to trigger when signal is received
    """
    self.scope = scope

    # Create task group for background monitoring
    self._task_group = anyio.create_task_group()
    await self._task_group.__aenter__()

    # Start signal monitoring task
    self._task_group.start_soon(self._monitor_signals)

    logger.debug(
        "Signal source activated",
        extra={
            "source": self.name,
            "signals": [signal.Signals(s).name for s in self.signals if s in signal.Signals._value2member_map_],
        },
    )

stop_monitoring async

stop_monitoring() -> None

Stop monitoring signals and clean up resources.

Source code in src/hother/cancelable/sources/signal.py
async def stop_monitoring(self) -> None:
    """Stop monitoring signals and clean up resources."""
    if self._task_group:
        # Cancel the task group to stop the monitoring task
        try:
            self._task_group.cancel_scope.cancel()
            await self._task_group.__aexit__(None, None, None)
        except BaseException as e:
            # Suppress CancelledError and other exceptions during cleanup
            logger.debug(f"Task group exit: {type(e).__name__}: {e}")
        finally:
            self._task_group = None

    logger.debug(
        "Signal source stopped",
        extra={
            "source": self.name,
            "triggered": self.triggered,
            "signal_received": self._signal_received,
        },
    )

ConditionSource

Cancels operations when a predicate function returns True.

hother.cancelable.sources.condition.ConditionSource

Bases: CancelationSource

Cancelation source that monitors a condition function.

Cancels when the condition function returns True.

Source code in src/hother/cancelable/sources/condition.py
class ConditionSource(CancelationSource):
    """Cancelation source that monitors a condition function.

    Cancels when the condition function returns True.
    """

    def __init__(
        self,
        condition: Callable[[], bool | Awaitable[bool]],
        check_interval: float = 0.1,
        condition_name: str | None = None,
        name: str | None = None,
    ):
        """Initialize condition source.

        Args:
            condition: Function that returns True when cancelation should occur
            check_interval: How often to check condition (seconds)
            condition_name: Name for the condition (for logging)
            name: Optional name for the source
        """
        super().__init__(CancelationReason.CONDITION, name)

        self.condition = condition
        self.check_interval = check_interval
        self.condition_name = condition_name or getattr(condition, "__name__", "condition")
        self.triggered = False
        self._task_group: anyio.abc.TaskGroup | None = None
        self._stop_event: anyio.Event | None = None

        # Validate check interval
        if check_interval <= 0:
            raise ValueError(f"Check interval must be positive, got {check_interval}")

        # Determine if condition is async
        self._is_async = inspect.iscoroutinefunction(condition)

    async def start_monitoring(self, scope: anyio.CancelScope) -> None:
        """Start monitoring the condition.

        Args:
            scope: Cancel scope to trigger when condition is met
        """
        self.scope = scope

        # Create stop event for graceful shutdown
        self._stop_event = anyio.Event()

        # Create task group for background monitoring
        self._task_group = anyio.create_task_group()
        await self._task_group.__aenter__()

        # Start monitoring task
        self._task_group.start_soon(self._monitor_condition)

        logger.debug(
            "Condition source activated",
            extra={
                "source": self.name,
                "condition_name": self.condition_name,
                "check_interval": self.check_interval,
            },
        )

    async def stop_monitoring(self) -> None:
        """Stop monitoring the condition."""
        if self._task_group and self._stop_event:
            # Signal the monitoring task to stop gracefully
            self._stop_event.set()

            # Exit the task group normally (monitoring task will complete)
            try:
                await self._task_group.__aexit__(None, None, None)
            except Exception as e:
                logger.debug(f"Task group exit error: {type(e).__name__}: {e}")
            finally:
                self._task_group = None
                self._stop_event = None

        logger.debug(
            "Condition source stopped",
            extra={
                "source": self.name,
                "condition_name": self.condition_name,
                "triggered": self.triggered,
            },
        )

    async def _monitor_condition(self) -> None:
        """Monitor the condition in a loop."""
        # Ensure stop event is set (should be guaranteed by start_monitoring)
        assert self._stop_event is not None, "stop_event must be set before monitoring"

        check_count = 0

        try:
            while not self.triggered and not self._stop_event.is_set():
                check_count += 1
                logger.debug(f"Condition check #{check_count} for {self.condition_name}")

                # Check condition
                try:
                    result: bool
                    if self._is_async:
                        result = await self.condition()  # type: ignore[misc]
                    else:
                        # Run sync condition in thread pool
                        result = await anyio.to_thread.run_sync(self.condition)  # type: ignore[arg-type]

                    logger.debug(f"Condition check #{check_count} returned: {result}")

                    if result:
                        self.triggered = True
                        logger.debug(f"Condition '{self.condition_name}' met after {check_count} checks")

                        # Trigger cancelation through the base class method
                        await self.trigger_cancelation(f"Condition '{self.condition_name}' met after {check_count} checks")
                        break

                except Exception as e:
                    logger.error(
                        "Error checking condition",
                        extra={
                            "source": self.name,
                            "condition_name": self.condition_name,
                            "error": str(e),
                        },
                        exc_info=True,
                    )
                    # Continue monitoring despite errors

                # Wait before next check, but break early if stop event is set
                with anyio.move_on_after(self.check_interval):
                    await self._stop_event.wait()

        except anyio.get_cancelled_exc_class():
            # Task was cancelled
            logger.debug("Condition monitoring task cancelled")
            raise
        except Exception as e:
            logger.error(
                "Unexpected error in condition monitor",
                extra={
                    "source": self.name,
                    "error": str(e),
                },
                exc_info=True,
            )

condition instance-attribute

condition = condition

check_interval instance-attribute

check_interval = check_interval

condition_name instance-attribute

condition_name = condition_name or getattr(
    condition, "__name__", "condition"
)

triggered instance-attribute

triggered = False

start_monitoring async

start_monitoring(scope: CancelScope) -> None

Start monitoring the condition.

Parameters:

Name Type Description Default
scope CancelScope

Cancel scope to trigger when condition is met

required
Source code in src/hother/cancelable/sources/condition.py
async def start_monitoring(self, scope: anyio.CancelScope) -> None:
    """Start monitoring the condition.

    Args:
        scope: Cancel scope to trigger when condition is met
    """
    self.scope = scope

    # Create stop event for graceful shutdown
    self._stop_event = anyio.Event()

    # Create task group for background monitoring
    self._task_group = anyio.create_task_group()
    await self._task_group.__aenter__()

    # Start monitoring task
    self._task_group.start_soon(self._monitor_condition)

    logger.debug(
        "Condition source activated",
        extra={
            "source": self.name,
            "condition_name": self.condition_name,
            "check_interval": self.check_interval,
        },
    )

stop_monitoring async

stop_monitoring() -> None

Stop monitoring the condition.

Source code in src/hother/cancelable/sources/condition.py
async def stop_monitoring(self) -> None:
    """Stop monitoring the condition."""
    if self._task_group and self._stop_event:
        # Signal the monitoring task to stop gracefully
        self._stop_event.set()

        # Exit the task group normally (monitoring task will complete)
        try:
            await self._task_group.__aexit__(None, None, None)
        except Exception as e:
            logger.debug(f"Task group exit error: {type(e).__name__}: {e}")
        finally:
            self._task_group = None
            self._stop_event = None

    logger.debug(
        "Condition source stopped",
        extra={
            "source": self.name,
            "condition_name": self.condition_name,
            "triggered": self.triggered,
        },
    )

ResourceConditionSource

Specialized condition source for monitoring system resources (CPU, memory).

hother.cancelable.sources.condition.ResourceConditionSource

Bases: ConditionSource

Specialized condition source for monitoring system resources.

Useful for cancelling operations when resources are constrained.

Source code in src/hother/cancelable/sources/condition.py
class ResourceConditionSource(ConditionSource):
    """Specialized condition source for monitoring system resources.

    Useful for cancelling operations when resources are constrained.
    """

    def __init__(
        self,
        memory_threshold: float | None = None,
        cpu_threshold: float | None = None,
        disk_threshold: float | None = None,
        check_interval: float = 5.0,
        name: str | None = None,
    ):
        """Initialize resource condition source.

        Args:
            memory_threshold: Cancel if memory usage exceeds this percentage
            cpu_threshold: Cancel if CPU usage exceeds this percentage
            disk_threshold: Cancel if disk usage exceeds this percentage
            check_interval: How often to check resources (seconds)
            name: Optional name for the source
        """
        self.memory_threshold = memory_threshold
        self.cpu_threshold = cpu_threshold
        self.disk_threshold = disk_threshold

        # Build condition name
        conditions: list[str] = []
        if memory_threshold:
            conditions.append(f"memory>{memory_threshold}%")
        if cpu_threshold:
            conditions.append(f"cpu>{cpu_threshold}%")
        if disk_threshold:
            conditions.append(f"disk>{disk_threshold}%")

        condition_name = f"resource_check({', '.join(conditions)})"

        super().__init__(
            condition=self._check_resources,
            check_interval=check_interval,
            condition_name=condition_name,
            name=name or "resource_monitor",
        )

    async def _check_resources(self) -> bool:
        """Check if any resource threshold is exceeded."""
        try:
            import psutil
        except ImportError:
            logger.warning("psutil not available, resource monitoring disabled")
            return False

        # Check memory
        if self.memory_threshold:
            memory_percent = psutil.virtual_memory().percent
            if memory_percent > self.memory_threshold:
                logger.info(
                    "Memory threshold exceeded",
                    extra={
                        "current": memory_percent,
                        "threshold": self.memory_threshold,
                    },
                )
                return True

        # Check CPU
        if self.cpu_threshold:
            cpu_percent = psutil.cpu_percent(interval=0.1)
            if cpu_percent > self.cpu_threshold:
                logger.info(
                    "CPU threshold exceeded",
                    extra={
                        "current": cpu_percent,
                        "threshold": self.cpu_threshold,
                    },
                )
                return True

        # Check disk
        if self.disk_threshold:
            disk_usage = psutil.disk_usage("/").percent
            if disk_usage > self.disk_threshold:
                logger.info(
                    "Disk threshold exceeded",
                    extra={
                        "current": disk_usage,
                        "threshold": self.disk_threshold,
                    },
                )
                return True

        return False

memory_threshold instance-attribute

memory_threshold = memory_threshold

cpu_threshold instance-attribute

cpu_threshold = cpu_threshold

disk_threshold instance-attribute

disk_threshold = disk_threshold

Composite Sources

Composite sources allow combining multiple cancelation sources with different logic:

OR vs AND Logic Comparison

Feature CompositeSource / AnyOfSource AllOfSource
Trigger Logic Cancels when ANY source triggers Cancels when ALL sources trigger
Use Case Safety nets, failsafes, OR conditions Requirements, gates, AND conditions
Example Timeout OR manual cancel OR signal Minimum time AND target count
Thread Safety ✅ Yes ✅ Yes (with anyio.Lock())
Typical Usage Cancelable.combine() Manual construction with AllOfSource([...])

Usage Examples

OR Logic (Any-Of) - Default:

from hother.cancelable.sources.composite import CompositeSource
from hother.cancelable.sources.timeout import TimeoutSource
from hother.cancelable.sources.signal import SignalSource

# Cancels when timeout OR signal (whichever comes first)
or_source = CompositeSource([
    TimeoutSource(timeout=60.0),
    SignalSource(signal.SIGTERM)
])

AND Logic (All-Of) - Require All:

from hother.cancelable.sources.composite import AllOfSource
from hother.cancelable.sources.timeout import TimeoutSource
from hother.cancelable.sources.condition import ConditionSource

# Cancels only when BOTH timeout AND condition are met
and_source = AllOfSource([
    TimeoutSource(timeout=60.0),
    ConditionSource(lambda: items_processed >= 100, 1.0)
])

CompositeSource

Combines multiple cancelation sources into a single source (any-of logic).

hother.cancelable.sources.composite.CompositeSource

Bases: CancelationSource

Cancelation source that combines multiple other sources.

Triggers when any of the component sources trigger.

Source code in src/hother/cancelable/sources/composite.py
class CompositeSource(CancelationSource):
    """Cancelation source that combines multiple other sources.

    Triggers when any of the component sources trigger.
    """

    def __init__(
        self,
        sources: list[CancelationSource],
        name: str | None = None,
    ):
        """Initialize composite source.

        Args:
            sources: List of cancelation sources to combine
            name: Optional name for the source
        """
        # Use MANUAL as default reason (will be overridden by actual source)
        super().__init__(CancelationReason.MANUAL, name or "composite")

        if not sources:
            raise ValueError("At least one source is required")

        self.sources = sources
        self.triggered_source: CancelationSource | None = None

    async def start_monitoring(self, scope: anyio.CancelScope) -> None:
        """Start monitoring all component sources.

        Args:
            scope: Cancel scope to trigger when any source triggers
        """
        self.scope = scope

        # Create task group for background monitoring
        self._task_group = anyio.create_task_group()
        await self._task_group.__aenter__()

        # Start each source with a wrapper
        for source in self.sources:
            self._task_group.start_soon(self._monitor_source, source)

        logger.debug(
            "Composite source activated: %s with %d sources (%s)",
            self.name,
            len(self.sources),
            [type(s).__name__ for s in self.sources],
        )

    async def stop_monitoring(self) -> None:
        """Stop monitoring all component sources."""
        # Cancel monitoring task group
        if hasattr(self, "_task_group") and self._task_group:
            self._task_group.cancel_scope.cancel()

            # Try to properly exit the task group, but shield from cancelation
            # and handle errors if we're in a different context
            try:
                with anyio.CancelScope(shield=True):
                    await self._task_group.__aexit__(None, None, None)
            except (anyio.get_cancelled_exc_class(), RuntimeError, Exception) as e:
                # Task group exit failed, likely due to context mismatch
                # This is acceptable as the cancel scope was already cancelled
                logger.debug(f"Task group cleanup skipped: {type(e).__name__}")
            finally:
                self._task_group = None

        # Stop each source
        for source in self.sources:
            try:
                await source.stop_monitoring()
            except Exception as e:
                logger.error(
                    "Error stopping source %s: %s",
                    str(source),
                    str(e),
                    exc_info=True,
                )

        logger.debug(
            "Composite source stopped: %s (triggered by %s)",
            self.name,
            str(self.triggered_source) if self.triggered_source else None,
        )

    async def _monitor_source(self, source: CancelationSource) -> None:
        """Monitor a single source and propagate its cancelation.

        Args:
            source: Source to monitor
        """
        # Override the source's trigger method to capture which source triggered
        original_trigger = source.trigger_cancelation

        async def wrapped_trigger(message: str | None = None):
            self.triggered_source = source
            self.reason = source.reason  # Use the source's reason
            await original_trigger(message)

            # Trigger our own cancelation
            if self.scope and not self.scope.cancel_called:
                await self.trigger_cancelation(f"Composite source triggered by {source.name}: {message}")

        source.trigger_cancelation = wrapped_trigger

        try:
            # Start the source
            await source.start_monitoring(anyio.CancelScope())
        except Exception as e:
            logger.error(
                "Error in component source %s of composite %s: %s",
                str(source),
                self.name,
                str(e),
                exc_info=True,
            )

sources instance-attribute

sources = sources

triggered_source instance-attribute

triggered_source: CancelationSource | None = None

start_monitoring async

start_monitoring(scope: CancelScope) -> None

Start monitoring all component sources.

Parameters:

Name Type Description Default
scope CancelScope

Cancel scope to trigger when any source triggers

required
Source code in src/hother/cancelable/sources/composite.py
async def start_monitoring(self, scope: anyio.CancelScope) -> None:
    """Start monitoring all component sources.

    Args:
        scope: Cancel scope to trigger when any source triggers
    """
    self.scope = scope

    # Create task group for background monitoring
    self._task_group = anyio.create_task_group()
    await self._task_group.__aenter__()

    # Start each source with a wrapper
    for source in self.sources:
        self._task_group.start_soon(self._monitor_source, source)

    logger.debug(
        "Composite source activated: %s with %d sources (%s)",
        self.name,
        len(self.sources),
        [type(s).__name__ for s in self.sources],
    )

stop_monitoring async

stop_monitoring() -> None

Stop monitoring all component sources.

Source code in src/hother/cancelable/sources/composite.py
async def stop_monitoring(self) -> None:
    """Stop monitoring all component sources."""
    # Cancel monitoring task group
    if hasattr(self, "_task_group") and self._task_group:
        self._task_group.cancel_scope.cancel()

        # Try to properly exit the task group, but shield from cancelation
        # and handle errors if we're in a different context
        try:
            with anyio.CancelScope(shield=True):
                await self._task_group.__aexit__(None, None, None)
        except (anyio.get_cancelled_exc_class(), RuntimeError, Exception) as e:
            # Task group exit failed, likely due to context mismatch
            # This is acceptable as the cancel scope was already cancelled
            logger.debug(f"Task group cleanup skipped: {type(e).__name__}")
        finally:
            self._task_group = None

    # Stop each source
    for source in self.sources:
        try:
            await source.stop_monitoring()
        except Exception as e:
            logger.error(
                "Error stopping source %s: %s",
                str(source),
                str(e),
                exc_info=True,
            )

    logger.debug(
        "Composite source stopped: %s (triggered by %s)",
        self.name,
        str(self.triggered_source) if self.triggered_source else None,
    )

AnyOfSource

Alias for CompositeSource - cancels when any source triggers.

hother.cancelable.sources.composite.AnyOfSource

Bases: CompositeSource

Alias for CompositeSource - triggers when ANY source triggers.

Source code in src/hother/cancelable/sources/composite.py
class AnyOfSource(CompositeSource):
    """Alias for CompositeSource - triggers when ANY source triggers."""

AllOfSource

Cancels when all sources have triggered (all-of logic).

hother.cancelable.sources.composite.AllOfSource

Bases: CancelationSource

Cancelation source that requires ALL component sources to trigger.

Only cancels when all component sources have triggered.

Source code in src/hother/cancelable/sources/composite.py
class AllOfSource(CancelationSource):
    """Cancelation source that requires ALL component sources to trigger.

    Only cancels when all component sources have triggered.
    """

    def __init__(
        self,
        sources: list[CancelationSource],
        name: str | None = None,
    ):
        """Initialize all-of source.

        Args:
            sources: List of cancelation sources that must all trigger
            name: Optional name for the source
        """
        super().__init__(CancelationReason.MANUAL, name or "all_of")

        if not sources:
            raise ValueError("At least one source is required")

        self.sources = sources
        self.triggered_sources: set[CancelationSource] = set()
        self._lock = anyio.Lock()

    async def start_monitoring(self, scope: anyio.CancelScope) -> None:
        """Start monitoring all component sources."""
        self.scope = scope

        # Create a task group for all sources
        self._task_group = anyio.create_task_group()
        await self._task_group.__aenter__()

        # Start each source with a wrapper
        for source in self.sources:
            self._task_group.start_soon(self._monitor_source, source)

        logger.debug(
            "All-of source activated: %s with %d sources",
            self.name,
            len(self.sources),
        )

    async def stop_monitoring(self) -> None:
        """Stop monitoring all component sources."""
        # Cancel monitoring task group
        if hasattr(self, "_task_group") and self._task_group:
            self._task_group.cancel_scope.cancel()
            await self._task_group.__aexit__(None, None, None)

        # Stop each source
        for source in self.sources:
            try:
                await source.stop_monitoring()
            except Exception as e:
                logger.error(
                    "Error stopping source %s: %s",
                    str(source),
                    str(e),
                    exc_info=True,
                )

    async def _monitor_source(self, source: CancelationSource) -> None:
        """Monitor a single source and check if all have triggered."""
        # Override the source's trigger method
        original_trigger = source.trigger_cancelation

        async def wrapped_trigger(message: str | None = None):
            async with self._lock:
                self.triggered_sources.add(source)

                # Check if all sources have triggered
                if len(self.triggered_sources) == len(self.sources):
                    # All sources triggered, cancel
                    await self.trigger_cancelation(f"All {len(self.sources)} sources have triggered")

            # Still call original trigger for logging
            await original_trigger(message)

        source.trigger_cancelation = wrapped_trigger

        try:
            # Start the source with a dummy scope
            await source.start_monitoring(anyio.CancelScope())
        except Exception as e:
            logger.error(
                "Error in component source %s of all-of %s: %s",
                str(source),
                self.name,
                str(e),
                exc_info=True,
            )

sources instance-attribute

sources = sources

triggered_sources instance-attribute

triggered_sources: set[CancelationSource] = set()

start_monitoring async

start_monitoring(scope: CancelScope) -> None

Start monitoring all component sources.

Source code in src/hother/cancelable/sources/composite.py
async def start_monitoring(self, scope: anyio.CancelScope) -> None:
    """Start monitoring all component sources."""
    self.scope = scope

    # Create a task group for all sources
    self._task_group = anyio.create_task_group()
    await self._task_group.__aenter__()

    # Start each source with a wrapper
    for source in self.sources:
        self._task_group.start_soon(self._monitor_source, source)

    logger.debug(
        "All-of source activated: %s with %d sources",
        self.name,
        len(self.sources),
    )

stop_monitoring async

stop_monitoring() -> None

Stop monitoring all component sources.

Source code in src/hother/cancelable/sources/composite.py
async def stop_monitoring(self) -> None:
    """Stop monitoring all component sources."""
    # Cancel monitoring task group
    if hasattr(self, "_task_group") and self._task_group:
        self._task_group.cancel_scope.cancel()
        await self._task_group.__aexit__(None, None, None)

    # Stop each source
    for source in self.sources:
        try:
            await source.stop_monitoring()
        except Exception as e:
            logger.error(
                "Error stopping source %s: %s",
                str(source),
                str(e),
                exc_info=True,
            )