Cancelation Sources
Cancelation sources provide different ways to trigger cancelation of async operations.
Base Source
CancelationSource
Abstract base class for all cancelation sources.
hother.cancelable.sources.base.CancelationSource
Bases: ABC
Abstract base class for cancelation sources.
A cancelation source monitors for a specific condition and triggers
cancelation when that condition is met.
Source code in src/hother/cancelable/sources/base.py
| class CancelationSource(ABC):
"""Abstract base class for cancelation sources.
A cancelation source monitors for a specific condition and triggers
cancelation when that condition is met.
"""
def __init__(self, reason: CancelationReason, name: str | None = None):
"""Initialize cancelation source.
Args:
reason: The cancelation reason this source will use
name: Optional name for the source
"""
self.reason = reason
self.name = name or self.__class__.__name__
self.scope: anyio.CancelScope | None = None
self._cancel_callback: Callable[[CancelationReason, str], None | Awaitable[None]] | None = None
self._monitoring_task: anyio.CancelScope | None = None
self.triggered: bool = False
@abstractmethod
async def start_monitoring(self, scope: anyio.CancelScope) -> None:
"""Start monitoring for cancelation condition.
Args:
scope: The cancel scope to trigger when condition is met
"""
self.scope = scope
@abstractmethod
async def stop_monitoring(self) -> None:
"""Stop monitoring and clean up resources."""
if self._monitoring_task:
self._monitoring_task.cancel()
self._monitoring_task = None
def set_cancel_callback(self, callback: Callable[[CancelationReason, str], None | Awaitable[None]]) -> None:
"""Set callback to be called when cancelation is triggered.
Args:
callback: Callback function that accepts reason and message (can be sync or async)
"""
self._cancel_callback = callback
async def trigger_cancelation(self, message: str | None = None) -> None:
"""Trigger cancelation with the configured reason.
Args:
message: Optional cancelation message
"""
if self.scope and not self.scope.cancel_called:
logger.info(
"Cancelation triggered",
extra={
"source": self.name,
"reason": self.reason.value,
"cancel_message": message,
},
)
# Call callback if set
if self._cancel_callback:
try:
result = self._cancel_callback(self.reason, message or "")
# If result is an Awaitable, await it
if result is not None:
await result
except Exception as e:
logger.error(
"Error in cancelation callback",
extra={
"source": self.name,
"error": str(e),
},
exc_info=True,
)
# Cancel the scope
self.scope.cancel()
def __str__(self) -> str:
"""String representation."""
return f"{self.name}(reason={self.reason.value})"
|
reason
instance-attribute
triggered
instance-attribute
start_monitoring
abstractmethod
async
Start monitoring for cancelation condition.
Parameters:
| Name |
Type |
Description |
Default |
scope
|
CancelScope
|
The cancel scope to trigger when condition is met
|
required
|
Source code in src/hother/cancelable/sources/base.py
| @abstractmethod
async def start_monitoring(self, scope: anyio.CancelScope) -> None:
"""Start monitoring for cancelation condition.
Args:
scope: The cancel scope to trigger when condition is met
"""
self.scope = scope
|
stop_monitoring
abstractmethod
async
stop_monitoring() -> None
Stop monitoring and clean up resources.
Source code in src/hother/cancelable/sources/base.py
| @abstractmethod
async def stop_monitoring(self) -> None:
"""Stop monitoring and clean up resources."""
if self._monitoring_task:
self._monitoring_task.cancel()
self._monitoring_task = None
|
set_cancel_callback
Set callback to be called when cancelation is triggered.
Parameters:
Source code in src/hother/cancelable/sources/base.py
| def set_cancel_callback(self, callback: Callable[[CancelationReason, str], None | Awaitable[None]]) -> None:
"""Set callback to be called when cancelation is triggered.
Args:
callback: Callback function that accepts reason and message (can be sync or async)
"""
self._cancel_callback = callback
|
trigger_cancelation
async
trigger_cancelation(message: str | None = None) -> None
Trigger cancelation with the configured reason.
Parameters:
| Name |
Type |
Description |
Default |
message
|
str | None
|
Optional cancelation message
|
None
|
Source code in src/hother/cancelable/sources/base.py
| async def trigger_cancelation(self, message: str | None = None) -> None:
"""Trigger cancelation with the configured reason.
Args:
message: Optional cancelation message
"""
if self.scope and not self.scope.cancel_called:
logger.info(
"Cancelation triggered",
extra={
"source": self.name,
"reason": self.reason.value,
"cancel_message": message,
},
)
# Call callback if set
if self._cancel_callback:
try:
result = self._cancel_callback(self.reason, message or "")
# If result is an Awaitable, await it
if result is not None:
await result
except Exception as e:
logger.error(
"Error in cancelation callback",
extra={
"source": self.name,
"error": str(e),
},
exc_info=True,
)
# Cancel the scope
self.scope.cancel()
|
Built-in Sources
TimeoutSource
Cancels operations after a specified time period.
hother.cancelable.sources.timeout.TimeoutSource
Bases: CancelationSource
Cancelation source that triggers after a specified timeout.
Source code in src/hother/cancelable/sources/timeout.py
| class TimeoutSource(CancelationSource):
"""Cancelation source that triggers after a specified timeout."""
def __init__(self, timeout: float | timedelta, name: str | None = None):
"""Initialize timeout source.
Args:
timeout: Timeout duration in seconds or as timedelta
name: Optional name for the source
"""
super().__init__(CancelationReason.TIMEOUT, name)
if isinstance(timeout, timedelta):
timeout = timeout.total_seconds()
if timeout <= 0:
raise ValueError(f"Timeout must be positive, got {timeout}")
self.timeout = timeout
self.triggered = False
self._deadline_time: float | None = None
async def start_monitoring(self, scope: anyio.CancelScope) -> None:
"""Set scope deadline for timeout.
Args:
scope: Cancel scope to configure
"""
self.scope = scope
self._deadline_time = anyio.current_time() + self.timeout
scope.deadline = self._deadline_time
logger.debug(
"Timeout source activated",
extra={
"source": self.name,
"timeout_seconds": self.timeout,
"deadline": scope.deadline,
},
)
async def stop_monitoring(self) -> None:
"""Stop timeout monitoring."""
# Check if timeout occurred by comparing current time with deadline
if self._deadline_time and anyio.current_time() >= self._deadline_time:
self.triggered = True
logger.debug(
"Timeout source stopped",
extra={
"source": self.name,
"triggered": self.triggered,
},
)
|
timeout
instance-attribute
triggered
instance-attribute
start_monitoring
async
Set scope deadline for timeout.
Parameters:
| Name |
Type |
Description |
Default |
scope
|
CancelScope
|
Cancel scope to configure
|
required
|
Source code in src/hother/cancelable/sources/timeout.py
| async def start_monitoring(self, scope: anyio.CancelScope) -> None:
"""Set scope deadline for timeout.
Args:
scope: Cancel scope to configure
"""
self.scope = scope
self._deadline_time = anyio.current_time() + self.timeout
scope.deadline = self._deadline_time
logger.debug(
"Timeout source activated",
extra={
"source": self.name,
"timeout_seconds": self.timeout,
"deadline": scope.deadline,
},
)
|
stop_monitoring
async
stop_monitoring() -> None
Stop timeout monitoring.
Source code in src/hother/cancelable/sources/timeout.py
| async def stop_monitoring(self) -> None:
"""Stop timeout monitoring."""
# Check if timeout occurred by comparing current time with deadline
if self._deadline_time and anyio.current_time() >= self._deadline_time:
self.triggered = True
logger.debug(
"Timeout source stopped",
extra={
"source": self.name,
"triggered": self.triggered,
},
)
|
SignalSource
Cancels operations when Unix signals are received (e.g., SIGTERM, SIGINT).
hother.cancelable.sources.signal.SignalSource
Bases: CancelationSource
Cancelation source that monitors OS signals.
Uses anyio's native signal handling for clean integration.
Supports graceful shutdown via SIGINT, SIGTERM, etc.
Note: Signal handlers can only be installed in the main thread.
Source code in src/hother/cancelable/sources/signal.py
| class SignalSource(CancelationSource):
"""Cancelation source that monitors OS signals.
Uses anyio's native signal handling for clean integration.
Supports graceful shutdown via SIGINT, SIGTERM, etc.
Note: Signal handlers can only be installed in the main thread.
"""
def __init__(self, *signals: int, name: str | None = None) -> None:
"""Initialize signal source.
Args:
*signals: Signal numbers to monitor (e.g., signal.SIGINT)
name: Optional name for the source
"""
super().__init__(CancelationReason.SIGNAL, name)
# Validate signals
for sig in signals:
if not isinstance(sig, int):
raise TypeError(f"Signal must be an integer, got {type(sig)}")
if not signals:
# Default to SIGINT and SIGTERM
self.signals = (signal.SIGINT, signal.SIGTERM)
else:
self.signals = tuple(signals)
self.triggered = False
self._signal_received: int | None = None
self._task_group: anyio.abc.TaskGroup | None = None
async def start_monitoring(self, scope: anyio.CancelScope) -> None:
"""Start monitoring for signals.
Args:
scope: Cancel scope to trigger when signal is received
"""
self.scope = scope
# Create task group for background monitoring
self._task_group = anyio.create_task_group()
await self._task_group.__aenter__()
# Start signal monitoring task
self._task_group.start_soon(self._monitor_signals)
logger.debug(
"Signal source activated",
extra={
"source": self.name,
"signals": [signal.Signals(s).name for s in self.signals if s in signal.Signals._value2member_map_],
},
)
async def stop_monitoring(self) -> None:
"""Stop monitoring signals and clean up resources."""
if self._task_group:
# Cancel the task group to stop the monitoring task
try:
self._task_group.cancel_scope.cancel()
await self._task_group.__aexit__(None, None, None)
except BaseException as e:
# Suppress CancelledError and other exceptions during cleanup
logger.debug(f"Task group exit: {type(e).__name__}: {e}")
finally:
self._task_group = None
logger.debug(
"Signal source stopped",
extra={
"source": self.name,
"triggered": self.triggered,
"signal_received": self._signal_received,
},
)
async def _monitor_signals(self) -> None:
"""Monitor for signals using anyio's native signal handling.
This runs in a background task and waits for any of the configured signals.
When a signal is received, it triggers cancelation and exits.
"""
try:
# Open signal receiver (sync context manager)
with anyio.open_signal_receiver(*self.signals) as signals: # type: ignore[arg-type]
logger.debug(
"Signal source monitoring started",
extra={
"source": self.name,
"signals": [signal.Signals(s).name for s in self.signals if s in signal.Signals._value2member_map_],
},
)
# Wait for signals
# Signal reception happens via anyio's native receiver
# Tested through integration examples (examples/02_advanced/08_signal_handling.py)
async for signum in signals: # pragma: no cover
if not self.triggered:
self.triggered = True
self._signal_received = signum
# Get signal name
signal_name = "UNKNOWN"
if signum in signal.Signals._value2member_map_:
signal_name = signal.Signals(signum).name
message = f"Received signal {signal_name} ({signum})"
logger.info(
"Signal received, triggering cancelation",
extra={
"source": self.name,
"signal": signal_name,
"signum": signum,
},
)
# Trigger cancelation
await self.trigger_cancelation(message)
break
# Exception handling for unexpected errors during signal monitoring
# Defensive code - difficult to trigger without breaking anyio internals
except Exception as e: # pragma: no cover
logger.error(
"Signal monitoring error",
extra={
"source": self.name,
"error": str(e),
},
exc_info=True,
)
raise
finally:
logger.debug(
"Signal source monitoring stopped",
extra={
"source": self.name,
"triggered": self.triggered,
"signal_received": self._signal_received,
},
)
|
signals
instance-attribute
triggered
instance-attribute
start_monitoring
async
Start monitoring for signals.
Parameters:
| Name |
Type |
Description |
Default |
scope
|
CancelScope
|
Cancel scope to trigger when signal is received
|
required
|
Source code in src/hother/cancelable/sources/signal.py
| async def start_monitoring(self, scope: anyio.CancelScope) -> None:
"""Start monitoring for signals.
Args:
scope: Cancel scope to trigger when signal is received
"""
self.scope = scope
# Create task group for background monitoring
self._task_group = anyio.create_task_group()
await self._task_group.__aenter__()
# Start signal monitoring task
self._task_group.start_soon(self._monitor_signals)
logger.debug(
"Signal source activated",
extra={
"source": self.name,
"signals": [signal.Signals(s).name for s in self.signals if s in signal.Signals._value2member_map_],
},
)
|
stop_monitoring
async
stop_monitoring() -> None
Stop monitoring signals and clean up resources.
Source code in src/hother/cancelable/sources/signal.py
| async def stop_monitoring(self) -> None:
"""Stop monitoring signals and clean up resources."""
if self._task_group:
# Cancel the task group to stop the monitoring task
try:
self._task_group.cancel_scope.cancel()
await self._task_group.__aexit__(None, None, None)
except BaseException as e:
# Suppress CancelledError and other exceptions during cleanup
logger.debug(f"Task group exit: {type(e).__name__}: {e}")
finally:
self._task_group = None
logger.debug(
"Signal source stopped",
extra={
"source": self.name,
"triggered": self.triggered,
"signal_received": self._signal_received,
},
)
|
ConditionSource
Cancels operations when a predicate function returns True.
hother.cancelable.sources.condition.ConditionSource
Bases: CancelationSource
Cancelation source that monitors a condition function.
Cancels when the condition function returns True.
Source code in src/hother/cancelable/sources/condition.py
| class ConditionSource(CancelationSource):
"""Cancelation source that monitors a condition function.
Cancels when the condition function returns True.
"""
def __init__(
self,
condition: Callable[[], bool | Awaitable[bool]],
check_interval: float = 0.1,
condition_name: str | None = None,
name: str | None = None,
):
"""Initialize condition source.
Args:
condition: Function that returns True when cancelation should occur
check_interval: How often to check condition (seconds)
condition_name: Name for the condition (for logging)
name: Optional name for the source
"""
super().__init__(CancelationReason.CONDITION, name)
self.condition = condition
self.check_interval = check_interval
self.condition_name = condition_name or getattr(condition, "__name__", "condition")
self.triggered = False
self._task_group: anyio.abc.TaskGroup | None = None
self._stop_event: anyio.Event | None = None
# Validate check interval
if check_interval <= 0:
raise ValueError(f"Check interval must be positive, got {check_interval}")
# Determine if condition is async
self._is_async = inspect.iscoroutinefunction(condition)
async def start_monitoring(self, scope: anyio.CancelScope) -> None:
"""Start monitoring the condition.
Args:
scope: Cancel scope to trigger when condition is met
"""
self.scope = scope
# Create stop event for graceful shutdown
self._stop_event = anyio.Event()
# Create task group for background monitoring
self._task_group = anyio.create_task_group()
await self._task_group.__aenter__()
# Start monitoring task
self._task_group.start_soon(self._monitor_condition)
logger.debug(
"Condition source activated",
extra={
"source": self.name,
"condition_name": self.condition_name,
"check_interval": self.check_interval,
},
)
async def stop_monitoring(self) -> None:
"""Stop monitoring the condition."""
if self._task_group and self._stop_event:
# Signal the monitoring task to stop gracefully
self._stop_event.set()
# Exit the task group normally (monitoring task will complete)
try:
await self._task_group.__aexit__(None, None, None)
except Exception as e:
logger.debug(f"Task group exit error: {type(e).__name__}: {e}")
finally:
self._task_group = None
self._stop_event = None
logger.debug(
"Condition source stopped",
extra={
"source": self.name,
"condition_name": self.condition_name,
"triggered": self.triggered,
},
)
async def _monitor_condition(self) -> None:
"""Monitor the condition in a loop."""
# Ensure stop event is set (should be guaranteed by start_monitoring)
assert self._stop_event is not None, "stop_event must be set before monitoring"
check_count = 0
try:
while not self.triggered and not self._stop_event.is_set():
check_count += 1
logger.debug(f"Condition check #{check_count} for {self.condition_name}")
# Check condition
try:
result: bool
if self._is_async:
result = await self.condition() # type: ignore[misc]
else:
# Run sync condition in thread pool
result = await anyio.to_thread.run_sync(self.condition) # type: ignore[arg-type]
logger.debug(f"Condition check #{check_count} returned: {result}")
if result:
self.triggered = True
logger.debug(f"Condition '{self.condition_name}' met after {check_count} checks")
# Trigger cancelation through the base class method
await self.trigger_cancelation(f"Condition '{self.condition_name}' met after {check_count} checks")
break
except Exception as e:
logger.error(
"Error checking condition",
extra={
"source": self.name,
"condition_name": self.condition_name,
"error": str(e),
},
exc_info=True,
)
# Continue monitoring despite errors
# Wait before next check, but break early if stop event is set
with anyio.move_on_after(self.check_interval):
await self._stop_event.wait()
except anyio.get_cancelled_exc_class():
# Task was cancelled
logger.debug("Condition monitoring task cancelled")
raise
except Exception as e:
logger.error(
"Unexpected error in condition monitor",
extra={
"source": self.name,
"error": str(e),
},
exc_info=True,
)
|
condition
instance-attribute
check_interval
instance-attribute
check_interval = check_interval
condition_name
instance-attribute
condition_name = condition_name or getattr(
condition, "__name__", "condition"
)
triggered
instance-attribute
start_monitoring
async
Start monitoring the condition.
Parameters:
| Name |
Type |
Description |
Default |
scope
|
CancelScope
|
Cancel scope to trigger when condition is met
|
required
|
Source code in src/hother/cancelable/sources/condition.py
| async def start_monitoring(self, scope: anyio.CancelScope) -> None:
"""Start monitoring the condition.
Args:
scope: Cancel scope to trigger when condition is met
"""
self.scope = scope
# Create stop event for graceful shutdown
self._stop_event = anyio.Event()
# Create task group for background monitoring
self._task_group = anyio.create_task_group()
await self._task_group.__aenter__()
# Start monitoring task
self._task_group.start_soon(self._monitor_condition)
logger.debug(
"Condition source activated",
extra={
"source": self.name,
"condition_name": self.condition_name,
"check_interval": self.check_interval,
},
)
|
stop_monitoring
async
stop_monitoring() -> None
Stop monitoring the condition.
Source code in src/hother/cancelable/sources/condition.py
| async def stop_monitoring(self) -> None:
"""Stop monitoring the condition."""
if self._task_group and self._stop_event:
# Signal the monitoring task to stop gracefully
self._stop_event.set()
# Exit the task group normally (monitoring task will complete)
try:
await self._task_group.__aexit__(None, None, None)
except Exception as e:
logger.debug(f"Task group exit error: {type(e).__name__}: {e}")
finally:
self._task_group = None
self._stop_event = None
logger.debug(
"Condition source stopped",
extra={
"source": self.name,
"condition_name": self.condition_name,
"triggered": self.triggered,
},
)
|
ResourceConditionSource
Specialized condition source for monitoring system resources (CPU, memory).
hother.cancelable.sources.condition.ResourceConditionSource
Bases: ConditionSource
Specialized condition source for monitoring system resources.
Useful for cancelling operations when resources are constrained.
Source code in src/hother/cancelable/sources/condition.py
| class ResourceConditionSource(ConditionSource):
"""Specialized condition source for monitoring system resources.
Useful for cancelling operations when resources are constrained.
"""
def __init__(
self,
memory_threshold: float | None = None,
cpu_threshold: float | None = None,
disk_threshold: float | None = None,
check_interval: float = 5.0,
name: str | None = None,
):
"""Initialize resource condition source.
Args:
memory_threshold: Cancel if memory usage exceeds this percentage
cpu_threshold: Cancel if CPU usage exceeds this percentage
disk_threshold: Cancel if disk usage exceeds this percentage
check_interval: How often to check resources (seconds)
name: Optional name for the source
"""
self.memory_threshold = memory_threshold
self.cpu_threshold = cpu_threshold
self.disk_threshold = disk_threshold
# Build condition name
conditions: list[str] = []
if memory_threshold:
conditions.append(f"memory>{memory_threshold}%")
if cpu_threshold:
conditions.append(f"cpu>{cpu_threshold}%")
if disk_threshold:
conditions.append(f"disk>{disk_threshold}%")
condition_name = f"resource_check({', '.join(conditions)})"
super().__init__(
condition=self._check_resources,
check_interval=check_interval,
condition_name=condition_name,
name=name or "resource_monitor",
)
async def _check_resources(self) -> bool:
"""Check if any resource threshold is exceeded."""
try:
import psutil
except ImportError:
logger.warning("psutil not available, resource monitoring disabled")
return False
# Check memory
if self.memory_threshold:
memory_percent = psutil.virtual_memory().percent
if memory_percent > self.memory_threshold:
logger.info(
"Memory threshold exceeded",
extra={
"current": memory_percent,
"threshold": self.memory_threshold,
},
)
return True
# Check CPU
if self.cpu_threshold:
cpu_percent = psutil.cpu_percent(interval=0.1)
if cpu_percent > self.cpu_threshold:
logger.info(
"CPU threshold exceeded",
extra={
"current": cpu_percent,
"threshold": self.cpu_threshold,
},
)
return True
# Check disk
if self.disk_threshold:
disk_usage = psutil.disk_usage("/").percent
if disk_usage > self.disk_threshold:
logger.info(
"Disk threshold exceeded",
extra={
"current": disk_usage,
"threshold": self.disk_threshold,
},
)
return True
return False
|
memory_threshold
instance-attribute
memory_threshold = memory_threshold
cpu_threshold
instance-attribute
cpu_threshold = cpu_threshold
disk_threshold
instance-attribute
disk_threshold = disk_threshold
Composite Sources
Composite sources allow combining multiple cancelation sources with different logic:
OR vs AND Logic Comparison
| Feature |
CompositeSource / AnyOfSource |
AllOfSource |
| Trigger Logic |
Cancels when ANY source triggers |
Cancels when ALL sources trigger |
| Use Case |
Safety nets, failsafes, OR conditions |
Requirements, gates, AND conditions |
| Example |
Timeout OR manual cancel OR signal |
Minimum time AND target count |
| Thread Safety |
✅ Yes |
✅ Yes (with anyio.Lock()) |
| Typical Usage |
Cancelable.combine() |
Manual construction with AllOfSource([...]) |
Usage Examples
OR Logic (Any-Of) - Default:
from hother.cancelable.sources.composite import CompositeSource
from hother.cancelable.sources.timeout import TimeoutSource
from hother.cancelable.sources.signal import SignalSource
# Cancels when timeout OR signal (whichever comes first)
or_source = CompositeSource([
TimeoutSource(timeout=60.0),
SignalSource(signal.SIGTERM)
])
AND Logic (All-Of) - Require All:
from hother.cancelable.sources.composite import AllOfSource
from hother.cancelable.sources.timeout import TimeoutSource
from hother.cancelable.sources.condition import ConditionSource
# Cancels only when BOTH timeout AND condition are met
and_source = AllOfSource([
TimeoutSource(timeout=60.0),
ConditionSource(lambda: items_processed >= 100, 1.0)
])
CompositeSource
Combines multiple cancelation sources into a single source (any-of logic).
hother.cancelable.sources.composite.CompositeSource
Bases: CancelationSource
Cancelation source that combines multiple other sources.
Triggers when any of the component sources trigger.
Source code in src/hother/cancelable/sources/composite.py
| class CompositeSource(CancelationSource):
"""Cancelation source that combines multiple other sources.
Triggers when any of the component sources trigger.
"""
def __init__(
self,
sources: list[CancelationSource],
name: str | None = None,
):
"""Initialize composite source.
Args:
sources: List of cancelation sources to combine
name: Optional name for the source
"""
# Use MANUAL as default reason (will be overridden by actual source)
super().__init__(CancelationReason.MANUAL, name or "composite")
if not sources:
raise ValueError("At least one source is required")
self.sources = sources
self.triggered_source: CancelationSource | None = None
async def start_monitoring(self, scope: anyio.CancelScope) -> None:
"""Start monitoring all component sources.
Args:
scope: Cancel scope to trigger when any source triggers
"""
self.scope = scope
# Create task group for background monitoring
self._task_group = anyio.create_task_group()
await self._task_group.__aenter__()
# Start each source with a wrapper
for source in self.sources:
self._task_group.start_soon(self._monitor_source, source)
logger.debug(
"Composite source activated: %s with %d sources (%s)",
self.name,
len(self.sources),
[type(s).__name__ for s in self.sources],
)
async def stop_monitoring(self) -> None:
"""Stop monitoring all component sources."""
# Cancel monitoring task group
if hasattr(self, "_task_group") and self._task_group:
self._task_group.cancel_scope.cancel()
# Try to properly exit the task group, but shield from cancelation
# and handle errors if we're in a different context
try:
with anyio.CancelScope(shield=True):
await self._task_group.__aexit__(None, None, None)
except (anyio.get_cancelled_exc_class(), RuntimeError, Exception) as e:
# Task group exit failed, likely due to context mismatch
# This is acceptable as the cancel scope was already cancelled
logger.debug(f"Task group cleanup skipped: {type(e).__name__}")
finally:
self._task_group = None
# Stop each source
for source in self.sources:
try:
await source.stop_monitoring()
except Exception as e:
logger.error(
"Error stopping source %s: %s",
str(source),
str(e),
exc_info=True,
)
logger.debug(
"Composite source stopped: %s (triggered by %s)",
self.name,
str(self.triggered_source) if self.triggered_source else None,
)
async def _monitor_source(self, source: CancelationSource) -> None:
"""Monitor a single source and propagate its cancelation.
Args:
source: Source to monitor
"""
# Override the source's trigger method to capture which source triggered
original_trigger = source.trigger_cancelation
async def wrapped_trigger(message: str | None = None):
self.triggered_source = source
self.reason = source.reason # Use the source's reason
await original_trigger(message)
# Trigger our own cancelation
if self.scope and not self.scope.cancel_called:
await self.trigger_cancelation(f"Composite source triggered by {source.name}: {message}")
source.trigger_cancelation = wrapped_trigger
try:
# Start the source
await source.start_monitoring(anyio.CancelScope())
except Exception as e:
logger.error(
"Error in component source %s of composite %s: %s",
str(source),
self.name,
str(e),
exc_info=True,
)
|
sources
instance-attribute
triggered_source
instance-attribute
start_monitoring
async
Start monitoring all component sources.
Parameters:
| Name |
Type |
Description |
Default |
scope
|
CancelScope
|
Cancel scope to trigger when any source triggers
|
required
|
Source code in src/hother/cancelable/sources/composite.py
| async def start_monitoring(self, scope: anyio.CancelScope) -> None:
"""Start monitoring all component sources.
Args:
scope: Cancel scope to trigger when any source triggers
"""
self.scope = scope
# Create task group for background monitoring
self._task_group = anyio.create_task_group()
await self._task_group.__aenter__()
# Start each source with a wrapper
for source in self.sources:
self._task_group.start_soon(self._monitor_source, source)
logger.debug(
"Composite source activated: %s with %d sources (%s)",
self.name,
len(self.sources),
[type(s).__name__ for s in self.sources],
)
|
stop_monitoring
async
stop_monitoring() -> None
Stop monitoring all component sources.
Source code in src/hother/cancelable/sources/composite.py
| async def stop_monitoring(self) -> None:
"""Stop monitoring all component sources."""
# Cancel monitoring task group
if hasattr(self, "_task_group") and self._task_group:
self._task_group.cancel_scope.cancel()
# Try to properly exit the task group, but shield from cancelation
# and handle errors if we're in a different context
try:
with anyio.CancelScope(shield=True):
await self._task_group.__aexit__(None, None, None)
except (anyio.get_cancelled_exc_class(), RuntimeError, Exception) as e:
# Task group exit failed, likely due to context mismatch
# This is acceptable as the cancel scope was already cancelled
logger.debug(f"Task group cleanup skipped: {type(e).__name__}")
finally:
self._task_group = None
# Stop each source
for source in self.sources:
try:
await source.stop_monitoring()
except Exception as e:
logger.error(
"Error stopping source %s: %s",
str(source),
str(e),
exc_info=True,
)
logger.debug(
"Composite source stopped: %s (triggered by %s)",
self.name,
str(self.triggered_source) if self.triggered_source else None,
)
|
AnyOfSource
Alias for CompositeSource - cancels when any source triggers.
hother.cancelable.sources.composite.AnyOfSource
Bases: CompositeSource
Alias for CompositeSource - triggers when ANY source triggers.
Source code in src/hother/cancelable/sources/composite.py
| class AnyOfSource(CompositeSource):
"""Alias for CompositeSource - triggers when ANY source triggers."""
|
AllOfSource
Cancels when all sources have triggered (all-of logic).
hother.cancelable.sources.composite.AllOfSource
Bases: CancelationSource
Cancelation source that requires ALL component sources to trigger.
Only cancels when all component sources have triggered.
Source code in src/hother/cancelable/sources/composite.py
| class AllOfSource(CancelationSource):
"""Cancelation source that requires ALL component sources to trigger.
Only cancels when all component sources have triggered.
"""
def __init__(
self,
sources: list[CancelationSource],
name: str | None = None,
):
"""Initialize all-of source.
Args:
sources: List of cancelation sources that must all trigger
name: Optional name for the source
"""
super().__init__(CancelationReason.MANUAL, name or "all_of")
if not sources:
raise ValueError("At least one source is required")
self.sources = sources
self.triggered_sources: set[CancelationSource] = set()
self._lock = anyio.Lock()
async def start_monitoring(self, scope: anyio.CancelScope) -> None:
"""Start monitoring all component sources."""
self.scope = scope
# Create a task group for all sources
self._task_group = anyio.create_task_group()
await self._task_group.__aenter__()
# Start each source with a wrapper
for source in self.sources:
self._task_group.start_soon(self._monitor_source, source)
logger.debug(
"All-of source activated: %s with %d sources",
self.name,
len(self.sources),
)
async def stop_monitoring(self) -> None:
"""Stop monitoring all component sources."""
# Cancel monitoring task group
if hasattr(self, "_task_group") and self._task_group:
self._task_group.cancel_scope.cancel()
await self._task_group.__aexit__(None, None, None)
# Stop each source
for source in self.sources:
try:
await source.stop_monitoring()
except Exception as e:
logger.error(
"Error stopping source %s: %s",
str(source),
str(e),
exc_info=True,
)
async def _monitor_source(self, source: CancelationSource) -> None:
"""Monitor a single source and check if all have triggered."""
# Override the source's trigger method
original_trigger = source.trigger_cancelation
async def wrapped_trigger(message: str | None = None):
async with self._lock:
self.triggered_sources.add(source)
# Check if all sources have triggered
if len(self.triggered_sources) == len(self.sources):
# All sources triggered, cancel
await self.trigger_cancelation(f"All {len(self.sources)} sources have triggered")
# Still call original trigger for logging
await original_trigger(message)
source.trigger_cancelation = wrapped_trigger
try:
# Start the source with a dummy scope
await source.start_monitoring(anyio.CancelScope())
except Exception as e:
logger.error(
"Error in component source %s of all-of %s: %s",
str(source),
self.name,
str(e),
exc_info=True,
)
|
sources
instance-attribute
triggered_sources
instance-attribute
start_monitoring
async
Start monitoring all component sources.
Source code in src/hother/cancelable/sources/composite.py
| async def start_monitoring(self, scope: anyio.CancelScope) -> None:
"""Start monitoring all component sources."""
self.scope = scope
# Create a task group for all sources
self._task_group = anyio.create_task_group()
await self._task_group.__aenter__()
# Start each source with a wrapper
for source in self.sources:
self._task_group.start_soon(self._monitor_source, source)
logger.debug(
"All-of source activated: %s with %d sources",
self.name,
len(self.sources),
)
|
stop_monitoring
async
stop_monitoring() -> None
Stop monitoring all component sources.
Source code in src/hother/cancelable/sources/composite.py
| async def stop_monitoring(self) -> None:
"""Stop monitoring all component sources."""
# Cancel monitoring task group
if hasattr(self, "_task_group") and self._task_group:
self._task_group.cancel_scope.cancel()
await self._task_group.__aexit__(None, None, None)
# Stop each source
for source in self.sources:
try:
await source.stop_monitoring()
except Exception as e:
logger.error(
"Error stopping source %s: %s",
str(source),
str(e),
exc_info=True,
)
|