Utilities
Utility modules providing helper functions, decorators, bridges, and testing tools.
Decorators
The @cancelable decorator for easily making functions cancelable.
hother.cancelable.utils.decorators
Decorators and convenience functions for async cancelation.
cancelable
cancelable(
timeout: float | timedelta | None = None,
operation_id: str | None = None,
name: str | None = None,
register_globally: bool = False,
inject_param: str | None = "cancelable",
) -> Callable[
[Callable[P, Awaitable[R]]],
Callable[P, Awaitable[R]],
]
Decorator to make async function cancelable.
The decorator automatically creates a Cancelable context and injects it via the specified parameter name (default: 'cancelable'). The decorated function will ALWAYS receive a non-None Cancelable instance.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
timeout
|
float | timedelta | None
|
Optional timeout for the operation |
None
|
operation_id
|
str | None
|
Optional operation ID (auto-generated if not provided) |
None
|
name
|
str | None
|
Optional operation name (defaults to function name) |
None
|
register_globally
|
bool
|
Whether to register with global registry |
False
|
inject_param
|
str | None
|
Parameter name to inject cancelable (None to disable) |
'cancelable'
|
Returns:
| Type | Description |
|---|---|
Callable[[Callable[P, Awaitable[R]]], Callable[P, Awaitable[R]]]
|
Decorator function |
Example
@cancelable(timeout=30.0, register_globally=True) async def my_operation(data: str, cancelable: Cancelable = None): await cancelable.report_progress("Starting") # ... do work ... return result
Note
Type the injected parameter as Cancelable = None for type checker
compatibility. The decorator ALWAYS provides a non-None instance,
but the = None default signals to type checkers that callers don't
need to provide this argument.
For strict type checking within the function, optionally add:
assert cancelable is not None
Alternatively, disable injection with inject_param=None and use
current_operation() instead.
Source code in src/hother/cancelable/utils/decorators.py
with_timeout
async
with_timeout(
timeout: float | timedelta,
coro: Awaitable[T],
operation_id: str | None = None,
name: str | None = None,
) -> T
Run coroutine with timeout.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
timeout
|
float | timedelta
|
Timeout duration |
required |
coro
|
Awaitable[T]
|
Coroutine to run |
required |
operation_id
|
str | None
|
Optional operation ID |
None
|
name
|
str | None
|
Optional operation name |
None
|
Returns:
| Type | Description |
|---|---|
T
|
Result from coroutine |
Raises:
| Type | Description |
|---|---|
CancelledError
|
If operation times out |
Example
result = await with_timeout(5.0, fetch_data())
Source code in src/hother/cancelable/utils/decorators.py
with_current_operation
Decorator that injects current operation into function.
The function must have a parameter named 'operation'. The decorator will inject the current operation context if available (may be None if called outside a Cancelable context).
Example
@with_current_operation() async def process_item(item: str, operation: Cancelable | None): if operation: await operation.report_progress(f"Processing {item}") return item.upper()
Note
Unlike @cancelable, this decorator injects the CURRENT operation (if one exists) rather than creating a new one. The operation parameter may be None if no cancelable context is active.
Source code in src/hother/cancelable/utils/decorators.py
cancelable_method
cancelable_method(
timeout: float | timedelta | None = None,
name: str | None = None,
register_globally: bool = False,
) -> Callable[
[Callable[..., Awaitable[R]]],
Callable[..., Awaitable[R]],
]
Decorator for async methods that should be cancelable.
Similar to @cancelable but designed for class methods. The decorator automatically creates a Cancelable context and injects it as a 'cancelable' parameter. The decorated method will ALWAYS receive a non-None Cancelable instance.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
timeout
|
float | timedelta | None
|
Optional timeout for the operation |
None
|
name
|
str | None
|
Optional operation name (defaults to ClassName.method_name) |
None
|
register_globally
|
bool
|
Whether to register with global registry |
False
|
Returns:
| Type | Description |
|---|---|
Callable[[Callable[..., Awaitable[R]]], Callable[..., Awaitable[R]]]
|
Decorator function |
Example
class DataProcessor: @cancelable_method(timeout=60.0) async def process(self, data: list, cancelable: Cancelable = None): for item in data: await self._process_item(item) await cancelable.report_progress(f"Processed {item}")
Note
Type the injected parameter as Cancelable = None for type checker
compatibility. The decorator ALWAYS provides a non-None instance,
but the = None default signals to type checkers that callers don't
need to provide this argument.
Source code in src/hother/cancelable/utils/decorators.py
cancelable_with_token
cancelable_with_token(
token: CancelationToken,
operation_id: str | None = None,
name: str | None = None,
register_globally: bool = False,
inject_param: str | None = "cancelable",
) -> Callable[
[Callable[P, Awaitable[R]]],
Callable[P, Awaitable[R]],
]
Decorator for token-based cancelation.
Creates a cancelable operation that can be cancelled via the provided token. Useful for operations that need to be cancelled from other tasks or threads.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
token
|
CancelationToken
|
CancelationToken to use for cancelation |
required |
operation_id
|
str | None
|
Optional operation ID (auto-generated if not provided) |
None
|
name
|
str | None
|
Optional operation name (defaults to function name) |
None
|
register_globally
|
bool
|
Whether to register with global registry |
False
|
inject_param
|
str | None
|
Parameter name to inject Cancelable (None to disable) |
'cancelable'
|
Returns:
| Type | Description |
|---|---|
Callable[[Callable[P, Awaitable[R]]], Callable[P, Awaitable[R]]]
|
Decorator function |
Example
token = CancelationToken()
@cancelable_with_token(token, name="fetch_data")
async def fetch_data(url: str, cancelable: Cancelable = None):
await cancelable.report_progress("Fetching...")
return await httpx.get(url)
# Cancel from another task
await token.cancel(CancelationReason.MANUAL, "User cancelled")
Note
Type the injected parameter as Cancelable = None for type checker compatibility.
Source code in src/hother/cancelable/utils/decorators.py
cancelable_with_signal
cancelable_with_signal(
*signals: int,
operation_id: str | None = None,
name: str | None = None,
register_globally: bool = False,
inject_param: str | None = "cancelable",
) -> Callable[
[Callable[P, Awaitable[R]]],
Callable[P, Awaitable[R]],
]
Decorator for signal-based cancelation.
Creates a cancelable operation that responds to OS signals (Unix only). Useful for graceful shutdown of long-running services.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
*signals
|
int
|
Signal numbers to handle (e.g., signal.SIGTERM, signal.SIGINT) |
()
|
operation_id
|
str | None
|
Optional operation ID (auto-generated if not provided) |
None
|
name
|
str | None
|
Optional operation name (defaults to function name) |
None
|
register_globally
|
bool
|
Whether to register with global registry |
False
|
inject_param
|
str | None
|
Parameter name to inject Cancelable (None to disable) |
'cancelable'
|
Returns:
| Type | Description |
|---|---|
Callable[[Callable[P, Awaitable[R]]], Callable[P, Awaitable[R]]]
|
Decorator function |
Example
Note
Type the injected parameter as Cancelable = None for type checker compatibility.
Source code in src/hother/cancelable/utils/decorators.py
cancelable_with_condition
cancelable_with_condition(
condition: Callable[[], bool | Awaitable[bool]],
check_interval: float = 0.1,
condition_name: str | None = None,
operation_id: str | None = None,
name: str | None = None,
register_globally: bool = False,
inject_param: str | None = "cancelable",
) -> Callable[
[Callable[P, Awaitable[R]]],
Callable[P, Awaitable[R]],
]
Decorator for condition-based cancelation.
Creates a cancelable operation that cancels when a condition becomes True. Useful for resource-based cancelation (disk full, memory limit, etc.).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
condition
|
Callable[[], bool | Awaitable[bool]]
|
Callable that returns True when cancelation should occur |
required |
check_interval
|
float
|
How often to check the condition (seconds) |
0.1
|
condition_name
|
str | None
|
Name for the condition (for logging) |
None
|
operation_id
|
str | None
|
Optional operation ID (auto-generated if not provided) |
None
|
name
|
str | None
|
Optional operation name (defaults to function name) |
None
|
register_globally
|
bool
|
Whether to register with global registry |
False
|
inject_param
|
str | None
|
Parameter name to inject Cancelable (None to disable) |
'cancelable'
|
Returns:
| Type | Description |
|---|---|
Callable[[Callable[P, Awaitable[R]]], Callable[P, Awaitable[R]]]
|
Decorator function |
Example
Note
Type the injected parameter as Cancelable = None for type checker compatibility.
Source code in src/hother/cancelable/utils/decorators.py
386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 | |
cancelable_combine
cancelable_combine(
*cancelables: Cancelable,
operation_id: str | None = None,
name: str | None = None,
register_globally: bool = False,
inject_param: str | None = "cancelable",
) -> Callable[
[Callable[P, Awaitable[R]]],
Callable[P, Awaitable[R]],
]
Decorator for combining multiple cancelation sources.
Creates a cancelable operation that cancels when ANY of the provided cancelables trigger. Useful for operations with multiple cancelation conditions.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
*cancelables
|
Cancelable
|
Cancelables to combine |
()
|
operation_id
|
str | None
|
Optional operation ID (auto-generated if not provided) |
None
|
name
|
str | None
|
Optional operation name (defaults to function name) |
None
|
register_globally
|
bool
|
Whether to register with global registry |
False
|
inject_param
|
str | None
|
Parameter name to inject Cancelable (None to disable) |
'cancelable'
|
Returns:
| Type | Description |
|---|---|
Callable[[Callable[P, Awaitable[R]]], Callable[P, Awaitable[R]]]
|
Decorator function |
Example
Note
Type the injected parameter as Cancelable = None for type checker compatibility.
Source code in src/hother/cancelable/utils/decorators.py
464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 | |
with_cancelable
with_cancelable(
cancel: Cancelable,
inject: bool = False,
inject_param: str = "cancelable",
) -> Callable[
[Callable[P, Awaitable[R]]],
Callable[P, Awaitable[R]],
]
Decorator that wraps a function with an existing Cancelable instance.
This decorator allows you to use a pre-configured Cancelable context with your async function. Unlike @cancelable which creates a new context, this decorator uses an existing one, enabling sharing of cancelation state across multiple functions.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
cancel
|
Cancelable
|
Existing Cancelable instance to use |
required |
inject
|
bool
|
Whether to inject the Cancelable into the function signature (default: False) |
False
|
inject_param
|
str
|
Parameter name to inject Cancelable as (default: "cancelable") |
'cancelable'
|
Returns:
| Type | Description |
|---|---|
Callable[[Callable[P, Awaitable[R]]], Callable[P, Awaitable[R]]]
|
Decorator function |
Example
from hother.cancelable import Cancelable, with_cancelable, current_operation
# Create a shared cancelable context
cancel = Cancelable.with_timeout(30.0, name="data_pipeline")
@with_cancelable(cancel)
async def fetch_data():
# No injection, access via current_operation()
ctx = current_operation()
await ctx.report_progress("Fetching data...")
return await fetch()
@with_cancelable(cancel, inject=True)
async def process_data(cancelable: Cancelable = None):
# With injection
await cancelable.report_progress("Processing...")
return await process()
# Both functions share the same cancelation context
async with cancel:
data = await fetch_data()
result = await process_data()
Note
When inject=False (default), use current_operation() to access the context from within the function if needed.
Source code in src/hother/cancelable/utils/decorators.py
Bridges
AnyIO Bridge
Bridge for integrating with anyio-based async code.
hother.cancelable.utils.anyio_bridge
Global bridge for thread-safe anyio operations.
Allows regular Python threads to schedule callbacks in anyio context, providing an equivalent to asyncio's loop.call_soon_threadsafe().
AnyioBridge
Singleton bridge for thread-to-anyio communication.
Provides call_soon_threadsafe equivalent for anyio by using memory object streams and a background worker task.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
buffer_size
|
int
|
Maximum number of queued callbacks before blocking (default: 1000) |
1000
|
Example
# Custom buffer size for high-throughput applications
bridge = AnyioBridge(buffer_size=5000)
await bridge.start()
# Or use default
bridge = AnyioBridge.get_instance()
async with anyio.create_task_group() as tg:
tg.start_soon(bridge.start)
# Now thread-safe calls work
def from_thread():
bridge.call_soon_threadsafe(some_callback)
Source code in src/hother/cancelable/utils/anyio_bridge.py
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 | |
get_instance
classmethod
get_instance() -> Self
Get singleton instance of the bridge.
Thread-safe lazy initialization.
Returns:
| Type | Description |
|---|---|
Self
|
The singleton AnyioBridge instance |
Source code in src/hother/cancelable/utils/anyio_bridge.py
start
async
Start the bridge worker task.
Should be called once at application startup from async context. Must be run in a task group as it blocks forever.
Example
Source code in src/hother/cancelable/utils/anyio_bridge.py
call_soon_threadsafe
Schedule callback to run in anyio context from any thread.
This is the anyio equivalent of asyncio's loop.call_soon_threadsafe(). The callback will be executed in the anyio event loop context.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
callback
|
Callable[[], Any]
|
Function to call (can be sync or async) |
required |
Note
If the bridge hasn't started yet, callbacks are queued and will be processed once the bridge starts.
Source code in src/hother/cancelable/utils/anyio_bridge.py
stop
async
Stop the bridge and clean up resources.
Properly closes the send and receive streams to avoid resource leak warnings during garbage collection.
Should be called before cancelling the task group running the bridge.
Example
Source code in src/hother/cancelable/utils/anyio_bridge.py
call_soon_threadsafe
Convenience function for thread-safe anyio scheduling.
Equivalent to bridge.get_instance().call_soon_threadsafe(callback).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
callback
|
Callable[[], Any]
|
Function to call in anyio context |
required |
Example
Source code in src/hother/cancelable/utils/anyio_bridge.py
Threading Bridge
Bridge for canceling operations from threads.
hother.cancelable.utils.threading_bridge
Thread-safe wrapper for OperationRegistry.
Provides synchronous API for accessing the registry from threads.
ThreadSafeRegistry
Thread-safe wrapper for OperationRegistry.
Provides synchronous API for accessing the registry from threads. All methods are thread-safe and can be called from any thread.
This class wraps the OperationRegistry singleton and provides
convenience methods without the _sync suffix.
Example
# From a thread (e.g., Flask/Django handler)
from hother.cancelable import ThreadSafeRegistry
registry = ThreadSafeRegistry()
# List running operations
operations = registry.list_operations(status=OperationStatus.RUNNING)
# Cancel a specific operation
registry.cancel_operation(op_id, reason=CancelationReason.MANUAL)
# Get statistics
stats = registry.get_statistics()
print(f"Active operations: {stats['active_operations']}")
Note
- Read operations (get, list, statistics, history) return immediately with data
- Write operations (cancel) schedule async work and return immediately
- For async code, use OperationRegistry directly instead
Source code in src/hother/cancelable/utils/threading_bridge.py
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 | |
get_operation
get_operation(operation_id: str) -> Cancelable | None
Get operation by ID.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
operation_id
|
str
|
Operation ID to look up |
required |
Returns:
| Type | Description |
|---|---|
Cancelable | None
|
Cancelable operation or None if not found |
Source code in src/hother/cancelable/utils/threading_bridge.py
list_operations
list_operations(
status: OperationStatus | None = None,
parent_id: str | None = None,
name_pattern: str | None = None,
) -> list[OperationContext]
List operations with optional filtering.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
status
|
OperationStatus | None
|
Filter by operation status |
None
|
parent_id
|
str | None
|
Filter by parent operation ID |
None
|
name_pattern
|
str | None
|
Filter by name (substring match) |
None
|
Returns:
| Type | Description |
|---|---|
list[OperationContext]
|
List of matching operation contexts |
Source code in src/hother/cancelable/utils/threading_bridge.py
get_statistics
Get registry statistics.
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dictionary with operation statistics containing: |
dict[str, Any]
|
|
dict[str, Any]
|
|
dict[str, Any]
|
|
dict[str, Any]
|
|
dict[str, Any]
|
|
dict[str, Any]
|
|
Source code in src/hother/cancelable/utils/threading_bridge.py
get_history
get_history(
limit: int | None = None,
status: OperationStatus | None = None,
since: datetime | None = None,
) -> list[OperationContext]
Get operation history.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
limit
|
int | None
|
Maximum number of operations to return |
None
|
status
|
OperationStatus | None
|
Filter by final status |
None
|
since
|
datetime | None
|
Only return operations completed after this time |
None
|
Returns:
| Type | Description |
|---|---|
list[OperationContext]
|
List of historical operation contexts |
Source code in src/hother/cancelable/utils/threading_bridge.py
cancel_operation
cancel_operation(
operation_id: str,
reason: CancelationReason = MANUAL,
message: str | None = None,
) -> None
Cancel a specific operation.
Schedules cancelation to be executed asynchronously and returns immediately.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
operation_id
|
str
|
ID of operation to cancel |
required |
reason
|
CancelationReason
|
Reason for cancelation |
MANUAL
|
message
|
str | None
|
Optional cancelation message |
None
|
Note
This method returns immediately. The cancelation is scheduled asynchronously via AnyioBridge.
Source code in src/hother/cancelable/utils/threading_bridge.py
cancel_all
cancel_all(
status: OperationStatus | None = None,
reason: CancelationReason = MANUAL,
message: str | None = None,
) -> None
Cancel all operations with optional status filter.
Schedules cancelation to be executed asynchronously and returns immediately.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
status
|
OperationStatus | None
|
Only cancel operations with this status |
None
|
reason
|
CancelationReason
|
Reason for cancelation |
MANUAL
|
message
|
str | None
|
Optional cancelation message |
None
|
Note
This method returns immediately. The cancelation is scheduled asynchronously via AnyioBridge.
Source code in src/hother/cancelable/utils/threading_bridge.py
get_instance
classmethod
get_instance() -> ThreadSafeRegistry
Get singleton instance of thread-safe registry.
Thread-safe lazy initialization.
Returns:
| Type | Description |
|---|---|
ThreadSafeRegistry
|
The singleton ThreadSafeRegistry instance |
Source code in src/hother/cancelable/utils/threading_bridge.py
Context Bridge
Context propagation utilities.
hother.cancelable.utils.context_bridge
Context bridge utilities for thread-safe context variable propagation.
This module provides utilities to safely propagate context variables between async tasks and OS threads, solving the context variable thread safety issue.
ContextBridge
Thread-safe context variable bridge for async-to-thread communication.
This class solves the issue where context variables don't propagate to OS threads created by ThreadPoolExecutor, breaking operation tracking in multi-threaded applications.
Source code in src/hother/cancelable/utils/context_bridge.py
copy_context
staticmethod
copy_context() -> dict[ContextVar[Any], Any]
Copy current context variables to a dict for thread transport.
Returns:
| Type | Description |
|---|---|
dict[ContextVar[Any], Any]
|
Dictionary mapping context variables to their current values |
Source code in src/hother/cancelable/utils/context_bridge.py
restore_context
staticmethod
restore_context(
context_dict: dict[ContextVar[Any], Any],
) -> None
Restore context variables from a dictionary.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context_dict
|
dict[ContextVar[Any], Any]
|
Dictionary mapping context variables to values |
required |
Source code in src/hother/cancelable/utils/context_bridge.py
run_in_thread_with_context
async
staticmethod
run_in_thread_with_context(
func: Callable[..., T],
*args: Any,
executor: ThreadPoolExecutor | None = None,
**kwargs: Any,
) -> T
Run function in thread with context variables propagated.
This method safely copies context variables to the thread, runs the function, and returns the result.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
func
|
Callable[..., T]
|
Function to run in thread |
required |
*args
|
Any
|
Positional arguments for func |
()
|
executor
|
ThreadPoolExecutor | None
|
Optional thread pool executor (default: None for default executor) |
None
|
**kwargs
|
Any
|
Keyword arguments for func |
{}
|
Returns:
| Type | Description |
|---|---|
T
|
Result of func execution |
Example
Source code in src/hother/cancelable/utils/context_bridge.py
Stream Processing
Utilities for cancelable async stream processing.
hother.cancelable.utils.streams
Stream utilities for async cancelation.
CancelableAsyncIterator
Bases: AsyncIterator[T]
Wrapper class that makes any async iterator cancelable.
This provides a class-based alternative to the cancelable_stream function.
Source code in src/hother/cancelable/utils/streams.py
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 | |
cancelable_stream
async
cancelable_stream(
stream: AsyncIterator[T],
timeout: float | timedelta | None = None,
token: Optional[CancelationToken] = None,
report_interval: int | None = None,
on_progress: Callable[[int, T], Any] | None = None,
buffer_partial: bool = False,
operation_id: str | None = None,
name: str | None = None,
) -> AsyncIterator[T]
Make any async iterator cancelable with various options.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
stream
|
AsyncIterator[T]
|
Async iterator to wrap |
required |
timeout
|
float | timedelta | None
|
Optional timeout for the entire stream |
None
|
token
|
Optional[CancelationToken]
|
Optional cancelation token |
None
|
report_interval
|
int | None
|
Report progress every N items |
None
|
on_progress
|
Callable[[int, T], Any] | None
|
Optional progress callback (item_count, latest_item) |
None
|
buffer_partial
|
bool
|
Whether to buffer items for partial results |
False
|
operation_id
|
str | None
|
Optional operation ID |
None
|
name
|
str | None
|
Optional operation name |
None
|
Yields:
| Type | Description |
|---|---|
AsyncIterator[T]
|
Items from the wrapped stream |
Example
async for item in cancelable_stream( fetch_items(), timeout=30.0, report_interval=100, on_progress=lambda n, item: print(f"Processed {n} items") ): process(item)
Source code in src/hother/cancelable/utils/streams.py
chunked_cancelable_stream
async
chunked_cancelable_stream(
stream: AsyncIterator[T],
chunk_size: int,
cancelable: Cancelable,
) -> AsyncIterator[list[T]]
Process stream in chunks with cancelation support.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
stream
|
AsyncIterator[T]
|
Source async iterator |
required |
chunk_size
|
int
|
Size of chunks to yield |
required |
cancelable
|
Cancelable
|
Cancelable instance |
required |
Yields:
| Type | Description |
|---|---|
AsyncIterator[list[T]]
|
Lists of items (chunks) |
Example
async for chunk in chunked_cancelable_stream(items, 100, cancel): await process_batch(chunk)
Source code in src/hother/cancelable/utils/streams.py
Streaming Simulator
Stream cancellation simulator for testing and demonstration.
hother.cancelable.streaming.simulator.simulator
Core stream simulation functionality.
simulate_stream
async
simulate_stream(
text: str,
config: StreamConfig | None = None,
cancelable: Cancelable | None = None,
) -> AsyncGenerator[dict[str, Any]]
Simulate a realistic network stream with variable timing and cancellation support.
This function simulates network streaming behavior including bursts, stalls, jitter, and variable chunk sizes. It's useful for testing cancellable stream processing and demonstrating async cancellation patterns.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
text
|
str
|
The text content to stream |
required |
config
|
StreamConfig | None
|
Optional StreamConfig to control simulation behavior. If None, uses default configuration. |
None
|
cancelable
|
Cancelable | None
|
Optional Cancelable instance for cancellation support. If provided, the stream will check for cancellation and report progress. |
None
|
Yields:
| Type | Description |
|---|---|
AsyncGenerator[dict[str, Any]]
|
Dictionary chunks with the following types: |
AsyncGenerator[dict[str, Any]]
|
|
AsyncGenerator[dict[str, Any]]
|
|
AsyncGenerator[dict[str, Any]]
|
|
Raises:
| Type | Description |
|---|---|
CancelledError
|
If the associated Cancelable is cancelled during streaming |
Example
Source code in src/hother/cancelable/streaming/simulator/simulator.py
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 | |
Logging
Structured logging utilities for cancellation events.
hother.cancelable.utils.logging
Logging utilities for the cancelable library.
Following Python library best practices, this module provides logger access but does not configure logging. Applications using cancelable should configure their own logging as needed.
get_logger
Get a standard library logger instance.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str | None
|
Logger name. If None, uses the calling module's name |
None
|
Returns:
| Type | Description |
|---|---|
Logger
|
A configured standard library logger |
Note
This function does not configure logging handlers or formatters. Applications should configure logging using logging.basicConfig() or their preferred logging configuration method.
Example
In your application code:
Source code in src/hother/cancelable/utils/logging.py
Testing
Test utilities and fixtures for cancelable operations.
hother.cancelable.utils.testing
Testing utilities for async cancelation.
MockCancelationToken
Bases: CancelationToken
Mock cancelation token for testing.
Provides additional testing capabilities like scheduled cancelation.
Source code in src/hother/cancelable/utils/testing.py
cancel
async
cancel(
reason: CancelationReason = MANUAL,
message: str | None = None,
) -> bool
Cancel and record in history.
Source code in src/hother/cancelable/utils/testing.py
schedule_cancel
async
schedule_cancel(
delay: float,
reason: CancelationReason = MANUAL,
message: str | None = None,
) -> None
Schedule cancelation after a delay.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
delay
|
float
|
Delay in seconds before cancelation |
required |
reason
|
CancelationReason
|
Cancelation reason |
MANUAL
|
message
|
str | None
|
Cancelation message |
None
|
Source code in src/hother/cancelable/utils/testing.py
OperationRecorder
Records operation events for testing assertions.
Source code in src/hother/cancelable/utils/testing.py
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 | |
record_event
async
Record an operation event.
Source code in src/hother/cancelable/utils/testing.py
attach_to_cancellable
attach_to_cancellable(cancelable: Cancelable) -> Cancelable
Attach recorder to a cancelable to track its events.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
cancelable
|
Cancelable
|
Cancelable to track |
required |
Returns:
| Type | Description |
|---|---|
Cancelable
|
The cancelable (for chaining) |
Source code in src/hother/cancelable/utils/testing.py
get_events_for_operation
Get all events for a specific operation.
get_events_by_type
assert_event_occurred
Assert that an event occurred (synchronous check).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
operation_id
|
str
|
Operation ID to check |
required |
event_type
|
str
|
Event type to look for |
required |
timeout
|
float
|
Not used in sync version |
1.0
|
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
The event data |
Raises:
| Type | Description |
|---|---|
AssertionError
|
If event not found |
Source code in src/hother/cancelable/utils/testing.py
assert_final_status
assert_final_status(
operation_id: str, expected_status: OperationStatus
) -> None
Assert the final status of an operation.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
operation_id
|
str
|
Operation ID to check |
required |
expected_status
|
OperationStatus
|
Expected final status |
required |
Raises:
| Type | Description |
|---|---|
AssertionError
|
If status doesn't match |
Source code in src/hother/cancelable/utils/testing.py
CancelationScenario
Test scenario builder for cancelation testing.
Source code in src/hother/cancelable/utils/testing.py
270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 | |
add_delay
add_delay(duration: float) -> CancelationScenario
add_cancelation
add_cancelation(
reason: CancelationReason = MANUAL,
message: str | None = None,
) -> CancelationScenario
Add a cancelation step.
Source code in src/hother/cancelable/utils/testing.py
add_progress_check
add_progress_check(
expected_message: str, timeout: float = 1.0
) -> CancelationScenario
Add assertion for progress message.
Source code in src/hother/cancelable/utils/testing.py
add_status_check
add_status_check(
expected_status: OperationStatus,
) -> CancelationScenario
Add assertion for operation status.
Source code in src/hother/cancelable/utils/testing.py
run
async
run(
operation: Callable[..., Any], *args: Any, **kwargs: Any
) -> OperationRecorder
Run the scenario.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
operation
|
Callable[..., Any]
|
Async callable to test |
required |
*args
|
Any
|
Positional arguments for operation |
()
|
**kwargs
|
Any
|
Keyword arguments for operation |
{}
|
Returns:
| Type | Description |
|---|---|
OperationRecorder
|
Operation recorder with results |
Source code in src/hother/cancelable/utils/testing.py
create_slow_stream
async
create_slow_stream(
items: list[T],
delay: float = 0.1,
cancelable: Cancelable | None = None,
) -> AsyncIterator[T]
Create a slow async stream for testing cancelation.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
items
|
list[T]
|
Items to yield |
required |
delay
|
float
|
Delay between items (seconds) |
0.1
|
cancelable
|
Cancelable | None
|
Optional cancelable to check |
None
|
Yields:
| Type | Description |
|---|---|
AsyncIterator[T]
|
Items with delays |
Source code in src/hother/cancelable/utils/testing.py
run_with_timeout_test
async
Test that a coroutine times out within expected duration.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
coro
|
Any
|
Coroutine to run |
required |
expected_timeout
|
float
|
Expected timeout duration |
required |
tolerance
|
float
|
Acceptable deviation from expected timeout |
0.1
|
Raises:
| Type | Description |
|---|---|
AssertionError
|
If timeout doesn't occur or timing is wrong |
Source code in src/hother/cancelable/utils/testing.py
assert_cancelation_within
async
assert_cancelation_within(
min_time: float, max_time: float
) -> AsyncIterator[MockCancelationToken]
Context manager that asserts cancelation occurs within a time range.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
min_time
|
float
|
Minimum time before cancelation |
required |
max_time
|
float
|
Maximum time before cancelation |
required |
Yields:
| Type | Description |
|---|---|
AsyncIterator[MockCancelationToken]
|
Mock cancelation token |
Raises:
| Type | Description |
|---|---|
AssertionError
|
If cancelation timing is wrong |
Source code in src/hother/cancelable/utils/testing.py
sample_async_operation
async
sample_async_operation(
duration: float = 1.0,
cancelable: Cancelable | None = None,
) -> str
Sample async operation for testing.