Skip to content

Core Components

The core components provide the fundamental building blocks for async cancelation in Python.

Cancelable

The main context manager for cancelable operations.

hother.cancelable.core.cancelable.Cancelable

Main cancelation helper with composable cancelation sources.

Provides a unified interface for handling cancelation from multiple sources including timeouts, tokens, signals, and conditions.

Source code in src/hother/cancelable/core/cancelable.py
  46
  47
  48
  49
  50
  51
  52
  53
  54
  55
  56
  57
  58
  59
  60
  61
  62
  63
  64
  65
  66
  67
  68
  69
  70
  71
  72
  73
  74
  75
  76
  77
  78
  79
  80
  81
  82
  83
  84
  85
  86
  87
  88
  89
  90
  91
  92
  93
  94
  95
  96
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
class Cancelable:
    """Main cancelation helper with composable cancelation sources.

    Provides a unified interface for handling cancelation from multiple sources
    including timeouts, tokens, signals, and conditions.
    """

    def __init__(
        self,
        operation_id: str | None = None,
        name: str | None = None,
        parent: Cancelable | None = None,
        metadata: dict[str, Any] | None = None,
        register_globally: bool = False,
    ):
        """Initialize a new cancelable operation.

        Args:
            operation_id: Unique operation identifier (auto-generated if not provided)
            name: Human-readable operation name
            parent: Parent cancelable for hierarchical cancelation
            metadata: Additional operation metadata
            register_globally: Whether to register with global registry
        """
        # Create context with conditional ID
        context_kwargs = {
            "name": name,
            "metadata": metadata or {},
        }
        if operation_id is not None:
            context_kwargs["id"] = operation_id

        self.context = OperationContext(**context_kwargs)  # type: ignore[arg-type]

        # Set parent relationship after context creation
        if parent:
            self.context.parent_id = parent.context.id

        self._scope: anyio.CancelScope | None = None
        self._token = LinkedCancelationToken()

        # Use weak references to break circular reference cycles
        self._parent_ref = weakref.ref(parent) if parent else None
        self._children: weakref.WeakSet[Cancelable] = weakref.WeakSet()

        # Register with parent if provided (parent holds strong refs to children)
        if parent:
            parent._children.add(self)
        self._sources: list[CancelationSource] = []
        self._shields: list[anyio.CancelScope] = []
        self._cancellables_to_link: list[Cancelable] | None = None
        self._register_globally = register_globally

        # Token linking state management
        self._link_state = LinkState.NOT_LINKED
        self._link_lock = anyio.Lock()

        # Callbacks
        self._progress_callbacks: list[ProgressCallbackType] = []
        self._status_callbacks: dict[str, list[StatusCallbackType | ErrorCallbackType]] = {
            "start": [],
            "complete": [],
            "cancel": [],
            "error": [],
        }

        # Register with parent
        if parent:
            parent._children.add(self)

        logger.info(
            "Cancelable created",
            extra=self.context.log_context(),
        )

    @property
    def token(self) -> LinkedCancelationToken:
        """Get the cancellation token for this operation.

        Returns:
            The LinkedCancelationToken managing this operation's cancellation state.
        """
        return self._token

    def add_source(self, source: CancelationSource) -> Cancelable:
        """Add a cancelation source to this operation.

        This allows adding custom or composite sources (like AllOfSource) to an existing
        Cancelable instance.

        Args:
            source: The cancelation source to add

        Returns:
            Self for method chaining

        Example:
            ```python
            from hother.cancelable.sources.composite import AllOfSource

            cancelable = Cancelable(name="my_op")
            all_of = AllOfSource([timeout_source, condition_source])
            cancelable.add_source(all_of)
            ```
        """
        self._sources.append(source)
        return self

    # Factory methods
    @classmethod
    def with_timeout(
        cls, timeout: float | timedelta, operation_id: str | None = None, name: str | None = None, **kwargs: Any
    ) -> Cancelable:
        """Create cancelable with timeout.

        Args:
            timeout: Timeout duration in seconds or timedelta
            operation_id: Optional operation ID
            name: Optional operation name
            **kwargs: Additional arguments for Cancelable

        Returns:
            Configured Cancelable instance
        """
        from hother.cancelable.sources.timeout import TimeoutSource

        if isinstance(timeout, timedelta):
            timeout = timeout.total_seconds()

        instance = cls(operation_id=operation_id, name=name or f"timeout_{timeout}s", **kwargs)
        instance._sources.append(TimeoutSource(timeout))
        return instance

    @classmethod
    def with_token(
        cls, token: CancelationToken, operation_id: str | None = None, name: str | None = None, **kwargs: Any
    ) -> Cancelable:
        """Create a Cancelable operation using an existing cancellation token.

        This factory method allows you to create a cancellable operation that shares
        a cancellation token with other operations, enabling coordinated cancellation.

        Args:
            token: The CancelationToken to use for this operation
            operation_id: Optional custom operation identifier
            name: Optional operation name (defaults to "token_based")
            **kwargs: Additional arguments passed to Cancelable constructor

        Returns:
            A configured Cancelable instance using the provided token

        Example:
            ```python
            # Share a token between multiple operations
            shared_token = CancelationToken()

            async with Cancelable.with_token(shared_token, name="task1") as cancel1:
                # ... operation 1 ...

            async with Cancelable.with_token(shared_token, name="task2") as cancel2:
                # ... operation 2 ...

            # Cancel both operations via the shared token
            await shared_token.cancel()
            ```
        """
        instance = cls(operation_id=operation_id, name=name or "token_based", **kwargs)
        # Replace default token with provided one
        logger.debug(f"with_token: Replacing default token {instance._token.id} with user token {token.id}")
        instance._token = token
        logger.debug(f"with_token: Created cancelable {instance.context.id} with user token {token.id}")
        return instance

    @classmethod
    def with_signal(cls, *signals: int, operation_id: str | None = None, name: str | None = None, **kwargs: Any) -> Cancelable:
        """Create cancelable with signal handling.

        Args:
            *signals: Signal numbers to handle
            operation_id: Optional operation ID
            name: Optional operation name
            **kwargs: Additional arguments for Cancelable

        Returns:
            Configured Cancelable instance
        """
        from hother.cancelable.sources.signal import SignalSource

        instance = cls(operation_id=operation_id, name=name or "signal_based", **kwargs)
        instance._sources.append(SignalSource(*signals))
        return instance

    @classmethod
    def with_condition(
        cls,
        condition: Callable[[], bool | Awaitable[bool]],
        check_interval: float = 0.1,
        condition_name: str | None = None,
        operation_id: str | None = None,
        name: str | None = None,
        **kwargs: Any,
    ) -> Cancelable:
        """Create cancelable with condition checking.

        Args:
            condition: Callable that returns True when cancelation should occur
            check_interval: How often to check the condition (seconds)
            condition_name: Name for the condition (for logging)
            operation_id: Optional operation ID
            name: Optional operation name
            **kwargs: Additional arguments for Cancelable

        Returns:
            Configured Cancelable instance
        """
        from hother.cancelable.sources.condition import ConditionSource

        instance = cls(operation_id=operation_id, name=name or "condition_based", **kwargs)
        instance._sources.append(ConditionSource(condition, check_interval, condition_name))
        return instance

    # Composition
    def combine(self, *others: Cancelable) -> Cancelable:
        """Combine multiple Cancelable operations into a single coordinated operation.

        Creates a new Cancelable that will be cancelled if ANY of the combined
        operations is cancelled. All cancellation sources from the combined
        operations are merged together.

        Args:
            *others: One or more Cancelable instances to combine with this one

        Returns:
            A new Cancelable instance that coordinates cancellation across all
            combined operations. When entered, all operations' tokens are linked.

        Example:
            ```python
            # Combine timeout and signal handling
            timeout_cancel = Cancelable.with_timeout(30.0)
            signal_cancel = Cancelable.with_signal(signal.SIGTERM)

            async with timeout_cancel.combine(signal_cancel) as cancel:
                # Operation will cancel on either timeout OR signal
                await long_running_operation()
            ```

        Note:
            The combined Cancelable preserves the cancellation reason from
            whichever source triggers first.
        """
        logger.debug("=== COMBINE CALLED ===")
        logger.debug(f"Self: {self.context.id} ({self.context.name}) with token {self._token.id}")
        for i, other in enumerate(others):
            logger.debug(f"Other {i}: {other.context.id} ({other.context.name}) with token {other._token.id}")

        combined = Cancelable(
            name=f"combined_{self.context.name}",
            metadata={
                "sources": [self.context.id] + [o.context.id for o in others],
                "combined": True,
                "preserve_reason": True,  # Add this flag
            },
        )

        logger.debug(f"Created combined cancelable: {combined.context.id} with default token {combined._token.id}")

        # Store the actual cancelables to link their tokens later
        combined._cancellables_to_link = [self] + list(others)
        logger.debug(f"Will link to {len(combined._cancellables_to_link)} cancelables:")
        for i, c in enumerate(combined._cancellables_to_link):
            logger.debug(f"  {i}: {c.context.id} with token {c._token.id}")

        # Combine all sources
        combined._sources.extend(self._sources)
        for other in others:
            combined._sources.extend(other._sources)

        logger.debug(
            "Created combined cancelable",
            extra={
                "operation_id": combined.context.id,
                "source_count": len(combined._sources),
            },
        )

        return combined

    # Callback registration
    def on_progress(
        self,
        callback: ProgressCallbackType,
    ) -> Cancelable:
        """Register a callback to be invoked when progress is reported.

        The callback will be called whenever `report_progress()` is invoked
        on this operation. Both sync and async callbacks are supported.

        Args:
            callback: Function to call on progress updates. Receives:
                - operation_id (str): The ID of the operation
                - message (Any): The progress message
                - metadata (dict[str, Any] | None): Optional metadata

        Returns:
            Self for method chaining

        Example:
            ```python
            async with Cancelable(name="download") as cancel:
                cancel.on_progress(lambda id, msg, meta: print(f"Progress: {msg}"))

                for i in range(100):
                    await cancel.report_progress(f"{i}% complete")
                    await asyncio.sleep(0.1)
            ```
        """
        self._progress_callbacks.append(callback)
        return self

    def on_start(self, callback: StatusCallbackType) -> Cancelable:
        """Register a callback to be invoked when the operation starts.

        The callback is triggered when entering the async context (on `__aenter__`).

        Args:
            callback: Function receiving the OperationContext. Can be sync or async.

        Returns:
            Self for method chaining
        """
        self._status_callbacks["start"].append(callback)
        return self

    def on_complete(self, callback: StatusCallbackType) -> Cancelable:
        """Register a callback to be invoked when the operation completes successfully.

        The callback is triggered when exiting the context without cancellation or error.

        Args:
            callback: Function receiving the OperationContext. Can be sync or async.

        Returns:
            Self for method chaining
        """
        self._status_callbacks["complete"].append(callback)
        return self

    def on_cancel(self, callback: StatusCallbackType) -> Cancelable:
        """Register a callback to be invoked when the operation is cancelled.

        The callback is triggered when the operation is cancelled by any source
        (timeout, signal, token, condition, or parent cancellation).

        Args:
            callback: Function receiving the OperationContext. Can be sync or async.

        Returns:
            Self for method chaining
        """
        self._status_callbacks["cancel"].append(callback)
        return self

    def on_error(
        self,
        callback: ErrorCallbackType,
    ) -> Cancelable:
        """Register a callback to be invoked when the operation encounters an error.

        The callback is triggered when an exception (other than CancelledError)
        is raised within the operation context.

        Args:
            callback: Function receiving the OperationContext and Exception.
                Can be sync or async.

        Returns:
            Self for method chaining
        """
        self._status_callbacks["error"].append(callback)
        return self

    # Progress reporting
    async def report_progress(self, message: Any, metadata: dict[str, Any] | None = None) -> None:
        """Report progress to all registered callbacks.

        Args:
            message: Progress message
            metadata: Optional metadata dict
        """
        for callback in self._progress_callbacks:
            try:
                result = callback(self.context.id, message, metadata)
                if inspect.iscoroutine(result):
                    await result
            except Exception as e:
                logger.error(
                    "Progress callback error for operation %s: %s",
                    self.context.id,
                    str(e),
                    exc_info=True,
                )

    async def check_cancelation(self) -> None:
        """Check if operation is cancelled and raise if so.

        This is a public API for checking cancellation state.
        Use this instead of accessing `_token` directly.

        Raises:
            anyio.CancelledError: If operation is cancelled
        """
        await self._token.check_async()  # pragma: no cover

    # Context manager
    async def __aenter__(self) -> Cancelable:
        """Enter cancelation context."""
        logger.debug(f"=== ENTERING cancelation context for {self.context.id} ({self.context.name}) ===")

        # Set as current operation
        self._context_token = _current_operation.set(self)

        # Safely link all required tokens with race condition protection
        await self._safe_link_tokens()

        # Update status
        self.context.update_status(OperationStatus.RUNNING)

        # Register with global registry if requested
        if self._register_globally:
            from .registry import OperationRegistry

            registry = OperationRegistry.get_instance()
            await registry.register(self)

        # Create cancel scope
        self._scope = anyio.CancelScope()

        # Set up simple token monitoring via callback
        async def on_token_cancel(token: CancelationToken) -> None:
            """Callback when token is cancelled."""
            logger.error(f"🚨 TOKEN CALLBACK TRIGGERED! Token {token.id} cancelled, cancelling scope for {self.context.id}")
            if self._scope and not self._scope.cancel_called:
                logger.error(f"🚨 CANCELLING SCOPE for {self.context.id}")
                self._scope.cancel()
            else:
                scope_info = f"scope={self._scope}, cancel_called={self._scope.cancel_called if self._scope else 'N/A'}"
                logger.error(f"🚨 SCOPE ALREADY CANCELLED OR NONE for {self.context.id} ({scope_info})")

        logger.debug(f"Registering token callback for token {self._token.id}")
        await self._token.register_callback(on_token_cancel)
        logger.debug("Token callback registered successfully")

        # Start monitoring
        await self._setup_monitoring()

        # Trigger start callbacks
        await self._trigger_callbacks("start")

        # Enter scope - sync operation
        self._scope_exit = self._scope.__enter__()

        logger.debug(f"=== COMPLETED ENTER for {self.context.id} ===")
        return self

    @property
    def parent(self) -> Cancelable | None:
        """Get parent cancelable, returning None if garbage collected."""
        return self._parent_ref() if self._parent_ref else None

    async def run_in_thread(self, func: Callable[..., T], *args: Any, **kwargs: Any) -> T:
        """Run function in thread with proper context propagation.

        This method solves the context variable thread safety issue by ensuring
        that context variables (including _current_operation) are properly
        propagated to OS threads.

        Args:
            func: Function to run in thread
            *args: Positional arguments for func
            **kwargs: Keyword arguments for func

        Returns:
            Result of func execution

        Example:
            ```python
            async with Cancelable(name="main") as cancel:
                # Context is propagated to thread
                result = await cancel.run_in_thread(expensive_computation, data)
            ```
        """
        # Store current context for thread propagation
        ctx = ContextBridge.copy_context()

        def thread_func():
            # Restore context in thread
            ContextBridge.restore_context(ctx)
            # Set current operation in thread
            _current_operation.set(self)
            return func(*args, **kwargs)

        # Run in thread with context
        return await ContextBridge.run_in_thread_with_context(thread_func)

    def __del__(self):
        """Cleanup when cancelable is garbage collected."""
        # Remove from parent's children set (if parent still exists)
        if self._parent_ref:
            parent = self._parent_ref()
            if parent and self in parent._children:
                parent._children.remove(self)

        # Clear references to help GC
        self._parent_ref = None
        self._children.clear()

    def _handle_scope_exit(
        self,
        exc_type: type[BaseException] | None,
        exc_val: BaseException | None,
        exc_tb: Any | None,
    ) -> bool:
        """Handle anyio scope exit.

        Returns:
            True if scope handled the exception, False otherwise.
        """
        _scope_handled = False
        if self._scope:
            try:
                # scope.__exit__ returns True if it handled the exception
                _scope_handled = self._scope.__exit__(exc_type, exc_val, exc_tb)
            except Exception as e:
                logger.debug(f"Scope exit raised: {e}")
                # Re-raise the exception from scope exit
                raise
        return _scope_handled

    async def _determine_final_status(
        self,
        exc_type: type[BaseException] | None,
        exc_val: BaseException | None,
    ) -> None:
        """Determine final operation status based on exception."""
        # Determine final status based on the exception
        # We need to update status even if scope handled it, because the exception might still propagate
        if exc_type is not None:
            logger.debug(f"Exception type: {exc_type}")
            if issubclass(exc_type, anyio.get_cancelled_exc_class()):
                logger.debug("Handling CancelledError")
                # Handle cancelation
                # First check if we already have a cancel reason set by a source
                if self.context.cancel_reason:
                    # A source already set the reason (like condition, timeout, etc.)
                    logger.debug(f"Cancel reason already set: {self.context.cancel_reason}")
                elif self._token.is_cancelled:
                    # Token was cancelled
                    self.context.cancel_reason = self._token.reason
                    self.context.cancel_message = self._token.message
                    logger.debug(f"Cancel reason from token: {self._token.reason}")
                elif self._scope and self._scope.cancel_called:
                    # Scope was cancelled - check why
                    # Check if deadline was exceeded (timeout)
                    # Note: anyio CancelScope always has deadline attribute (defaults to inf)
                    if anyio.current_time() >= self._scope.deadline:
                        self.context.cancel_reason = CancelationReason.TIMEOUT
                        self.context.cancel_message = "Operation timed out"
                        logger.debug("Detected timeout from deadline")
                    else:
                        # Check sources
                        for source in self._sources:
                            if hasattr(source, "triggered") and source.triggered:
                                self.context.cancel_reason = source.reason
                                break

                    if not self.context.cancel_reason:
                        self.context.cancel_reason = CancelationReason.MANUAL
                else:
                    self.context.cancel_reason = CancelationReason.MANUAL

                # Always update status to CANCELLED for any CancelledError
                logger.debug(f"Updating status to CANCELLED (was {self.context.status})")
                self.context.update_status(OperationStatus.CANCELLED)
                logger.debug(f"Status after update: {self.context.status}")
                await self._trigger_callbacks("cancel")

            elif issubclass(exc_type, CancelationError) and isinstance(exc_val, CancelationError):
                # Our custom cancelation errors
                self.context.cancel_reason = exc_val.reason
                self.context.cancel_message = exc_val.message
                self.context.update_status(OperationStatus.CANCELLED)
                await self._trigger_callbacks("cancel")
            else:
                # Other errors
                self.context.error = str(exc_val)
                self.context.update_status(OperationStatus.FAILED)

                # Only trigger error callbacks for Exception instances, not BaseException
                # (e.g., skip KeyboardInterrupt, SystemExit, GeneratorExit)
                if isinstance(exc_val, Exception):
                    await self._trigger_error_callbacks(exc_val)
        else:
            # Successful completion
            self.context.update_status(OperationStatus.COMPLETED)
            await self._trigger_callbacks("complete")

    async def _cleanup_context(self) -> None:
        """Cleanup monitoring, shields, registry, and context vars."""
        logger.debug(f"=== __aexit__ finally block for {self.context.id} ===")

        # Stop monitoring
        await self._stop_monitoring()

        # Cleanup shields
        for shield in self._shields:
            shield.cancel()

        # Unregister from global registry
        if self._register_globally:
            from .registry import OperationRegistry

            registry = OperationRegistry.get_instance()
            await registry.unregister(self.context.id)

        # Reset context variable
        if hasattr(self, "_context_token"):
            _current_operation.reset(self._context_token)

        logger.debug(
            f"Exited cancelation context - final status: {self.context.status}",
            extra=self.context.log_context(),
        )

    async def __aexit__(
        self,
        exc_type: type[BaseException] | None,
        exc_val: BaseException | None,
        exc_tb: Any | None,
    ) -> bool:
        """Exit cancelation context."""
        logger.debug(f"=== ENTERING __aexit__ for {self.context.id} ===")
        logger.debug(f"exc_type: {exc_type}, exc_val: {exc_val}")
        logger.debug(f"Current status: {self.context.status}")
        logger.debug(f"Current cancel_reason: {self.context.cancel_reason}")

        try:
            # Handle scope exit
            _scope_handled = self._handle_scope_exit(exc_type, exc_val, exc_tb)
            # Determine final status based on exception
            await self._determine_final_status(exc_type, exc_val)
        except Exception as e:
            logger.error(f"Error in __aexit__ status handling: {e}", exc_info=True)
        finally:
            # Cleanup context resources
            await self._cleanup_context()

        # Always propagate exceptions - cancelation context should not suppress them
        # The anyio.CancelScope handles cancelation propagation appropriately
        return False

    async def _collect_all_tokens(self, cancelables: list[Cancelable], result: list[CancelationToken]) -> None:
        """Recursively collect all tokens from cancelables and their children."""
        for cancelable in cancelables:
            # Add this cancelable's token
            if cancelable._token not in result:
                result.append(cancelable._token)

            # Recursively add tokens from nested cancelables
            if cancelable._cancellables_to_link is not None:
                await self._collect_all_tokens(cancelable._cancellables_to_link, result)

    async def _setup_monitoring(self) -> None:
        """Setup all cancelation sources."""
        # Setup source monitoring
        for source in self._sources:
            source.set_cancel_callback(self._on_source_cancelled)
            await source.start_monitoring(cast(anyio.CancelScope, self._scope))

    async def _stop_monitoring(self) -> None:
        """Stop all monitoring tasks."""
        # Stop source monitoring
        for source in self._sources:
            try:
                await source.stop_monitoring()
            except Exception as e:
                logger.error(
                    "Error stopping source monitoring for %s: %s",
                    str(source),
                    str(e),
                    exc_info=True,
                )

    async def _safe_link_tokens(self) -> None:
        """Safely link all required tokens with race condition protection."""
        async with self._link_lock:
            if self._link_state != LinkState.NOT_LINKED:
                return  # Already processed

            self._link_state = LinkState.LINKING

            try:
                # Check if token supports linking (only LinkedCancelationToken has link method)
                if not hasattr(self._token, "link"):
                    # Log warnings for test expectations
                    parent = self.parent
                    if parent:
                        logger.warning(
                            f"Cannot link to parent: token {type(self._token).__name__} "
                            "does not support linking (not a LinkedCancelationToken)"
                        )
                    if self._cancellables_to_link is not None:
                        logger.warning(
                            f"Cannot link to combined sources: token {type(self._token).__name__} "
                            "does not support linking (not a LinkedCancelationToken)"
                        )
                    self._link_state = LinkState.CANCELLED
                    return

                # Link to parent token if we have a parent
                parent = self.parent
                if parent:
                    logger.debug(f"Linking to parent token: {parent._token.id}")
                    await self._token.link(parent._token)

                # Recursively link to ALL underlying tokens from combined cancelables
                if self._cancellables_to_link is not None:
                    logger.debug(f"Linking to {len(self._cancellables_to_link)} combined cancelables")
                    all_tokens: list[CancelationToken] = []
                    await self._collect_all_tokens(self._cancellables_to_link, all_tokens)

                    # Check if we should preserve cancelation reasons
                    preserve_reason = self.context.metadata.get("preserve_reason", False)

                    logger.debug(f"Found {len(all_tokens)} total tokens to link:")
                    for i, token in enumerate(all_tokens):
                        logger.debug(f"  Token {i}: {token.id}")
                        await self._token.link(token, preserve_reason=preserve_reason)

                self._link_state = LinkState.LINKED

            except Exception as e:
                self._link_state = LinkState.CANCELLED
                logger.error(f"Token linking failed: {e}")
                raise

    async def _on_source_cancelled(self, reason: CancelationReason, message: str) -> None:
        """Handle cancelation from a source."""
        self.context.cancel_reason = reason
        self.context.cancel_message = message
        # Also update the status immediately when a source cancels
        self.context.update_status(OperationStatus.CANCELLED)

    # Stream wrapper
    async def stream(
        self,
        async_iter: AsyncIterator[T],
        report_interval: int | None = None,
        buffer_partial: bool = True,
    ) -> AsyncIterator[T]:
        """Wrap async iterator with cancelation support.

        Args:
            async_iter: Async iterator to wrap
            report_interval: Report progress every N items
            buffer_partial: Whether to buffer items for partial results

        Yields:
            Items from the wrapped iterator
        """
        count = 0
        buffer: list[T] = []

        try:
            async for item in async_iter:
                # Check cancelation
                await self._token.check_async()

                yield item
                count += 1

                if buffer_partial:
                    buffer.append(item)
                    # Limit buffer size
                    if len(buffer) > _MAX_BUFFER_SIZE:
                        buffer = buffer[-_MAX_BUFFER_SIZE:]

                if report_interval and count % report_interval == 0:
                    await self.report_progress(f"Processed {count} items", {"count": count, "latest_item": item})

        except anyio.get_cancelled_exc_class():
            # Save partial results
            self.context.partial_result = {
                "count": count,
                "buffer": buffer if buffer_partial else None,
            }
            raise
        except Exception:  # Intentionally broad to save partial results on any error
            # Also save partial results on other exceptions
            self.context.partial_result = {
                "count": count,
                "buffer": buffer if buffer_partial else None,
                "completed": False,
            }
            raise
        else:
            # Save final results if completed normally
            if buffer_partial or count > 0:
                self.context.partial_result = {
                    "count": count,
                    "buffer": buffer if buffer_partial else None,
                    "completed": True,
                }
        finally:
            logger.debug(
                "Stream processing completed for operation %s with %d items",
                self.context.id,
                count,
            )

    # Function wrapper
    def wrap(self, operation: Callable[..., Awaitable[R]]) -> Callable[..., Awaitable[R]]:
        """Wrap an async operation to automatically check for cancelation before execution.

        This is useful for retry loops and other patterns where you want automatic
        cancelation checking without manually accessing the token.

        Note: This assumes the cancelable context is already active (you're inside
        an `async with` block). It does NOT create a new context.

        Args:
            operation: Async callable to wrap

        Returns:
            Wrapped callable that checks cancelation before executing

        Example:
            ```python
            async with Cancelable.with_timeout(30) as cancel:
                wrapped_fetch = cancel.wrap(fetch_data)

                # In a retry loop
                for attempt in range(3):
                    try:
                        result = await wrapped_fetch(url)
                        break
                    except Exception:
                        await anyio.sleep(1)
            ```
        """

        @wraps(operation)
        async def wrapped(*args: Any, **kwargs: Any) -> R:
            # Check cancelation before executing
            await self._token.check_async()
            return await operation(*args, **kwargs)

        return wrapped

    @asynccontextmanager
    async def wrapping(self) -> AsyncIterator[Callable[..., Awaitable[R]]]:
        """Async context manager that yields a wrap function for scoped operation wrapping.

        The yielded wrap function checks cancelation before executing any operation.
        This is useful for retry loops where you want all operations in a scope to
        be automatically wrapped with cancelation checking.

        Yields:
            A wrap function that checks cancelation before executing operations

        Example:
            ```python
            from tenacity import AsyncRetrying, stop_after_attempt

            async with Cancelable.with_timeout(30) as cancel:
                async for attempt in AsyncRetrying(stop=stop_after_attempt(3)):
                    with attempt:
                        async with cancel.wrapping() as wrap:
                            result = await wrap(fetch_data, url)
            ```
        """

        async def wrap_fn(fn: Callable[..., Awaitable[R]], *args: Any, **kwargs: Any) -> R:
            await self._token.check_async()
            return await fn(*args, **kwargs)

        yield wrap_fn

    # Shielding
    @asynccontextmanager
    async def shield(self) -> AsyncIterator[Cancelable]:
        """Shield a section from cancelation.

        Creates a child operation that is protected from cancelation but still
        participates in the operation hierarchy for monitoring and tracking.

        Yields:
            A new Cancelable for the shielded section
        """
        # Create properly integrated child cancelable
        shielded = Cancelable(name=f"{self.context.name}_shielded", metadata={"shielded": True})
        # Manually set parent relationship for hierarchy tracking but don't add to _children
        # to prevent automatic cancelation propagation
        shielded.context.parent_id = self.context.id

        # Override token linking to prevent cancelation propagation
        # The shielded operation should not be cancelled by parent token
        shielded._token = LinkedCancelationToken()  # Fresh token, no parent linking

        # Use anyio's CancelScope with shield=True
        with anyio.CancelScope(shield=True) as shield_scope:
            self._shields.append(shield_scope)
            try:
                shielded.context.update_status(OperationStatus.SHIELDED)
                yield shielded
            finally:
                # Shield is always in list at this point (added at line 783)
                self._shields.remove(shield_scope)

        # Force a checkpoint after shield to allow cancelation to propagate
        # We need to be in an async context for this to work properly
        try:
            await anyio.lowlevel.checkpoint()  # type: ignore[attr-defined]
        except:
            # Re-raise any exception including CancelledError
            raise

    # Cancelation
    async def cancel(
        self,
        reason: CancelationReason = CancelationReason.MANUAL,
        message: str | None = None,
        propagate_to_children: bool = True,
    ) -> None:
        """Cancel the operation.

        Args:
            reason: Reason for cancelation
            message: Optional cancelation message
            propagate_to_children: Whether to cancel child operations
        """
        # Cancel our token
        await self._token.cancel(reason, message)

        # Cancel children if requested
        if propagate_to_children:
            children_to_cancel = list(self._children)  # Snapshot to avoid modification during iteration
            for child in children_to_cancel:
                if child and not child.is_cancelled:
                    await child.cancel(
                        CancelationReason.PARENT,
                        f"Parent operation {self.context.id[:8]} cancelled",
                        propagate_to_children=True,
                    )

        # Clear references to help GC after cancelation
        self._children.clear()
        self._parent_ref = None

        # Log without duplicating cancel_reason
        log_ctx = self.context.log_context()
        # Remove cancel_reason from log_context if it exists to avoid duplication
        log_ctx.pop("cancel_reason", None)

        logger.info(
            "Operation cancelled",
            extra={
                **log_ctx,
                "cancel_reason": reason.value,
                "cancel_message": message,
            },
        )

    # Status helpers
    @property
    def is_cancelled(self) -> bool:
        """Check if operation is cancelled."""
        return self.context.is_cancelled

    @property
    def is_running(self) -> bool:
        """Check if operation is running."""
        return self.context.status == OperationStatus.RUNNING

    @property
    def is_completed(self) -> bool:
        """Check if operation completed successfully."""
        return self.context.is_success

    @property
    def operation_id(self) -> str:
        """Get operation ID."""
        return self.context.id

    # Callback helpers
    async def _trigger_callbacks(self, callback_type: str) -> None:
        """Trigger callbacks of a specific type."""
        callbacks = self._status_callbacks.get(callback_type, [])
        for callback in callbacks:
            try:
                result = callback(self.context)  # type: ignore[misc]
                if inspect.iscoroutine(result):  # type: ignore[arg-type]
                    await result
            except Exception as e:
                logger.error(
                    "%s callback error for operation %s: %s",
                    callback_type.capitalize(),
                    self.context.id,
                    str(e),
                    exc_info=True,
                )

    async def _trigger_error_callbacks(self, error: Exception) -> None:
        """Trigger error callbacks."""
        callbacks = self._status_callbacks.get("error", [])
        for callback in callbacks:
            try:
                result = callback(self.context, error)  # type: ignore[misc]
                if inspect.iscoroutine(result):  # type: ignore[arg-type]
                    await result
            except Exception as e:
                logger.error(
                    "Error callback error for operation %s: %s",
                    self.context.id,
                    str(e),
                    exc_info=True,
                )

context instance-attribute

context = OperationContext(**context_kwargs)

token property

Get the cancellation token for this operation.

Returns:

Type Description
LinkedCancelationToken

The LinkedCancelationToken managing this operation's cancellation state.

parent property

parent: Cancelable | None

Get parent cancelable, returning None if garbage collected.

is_cancelled property

is_cancelled: bool

Check if operation is cancelled.

is_running property

is_running: bool

Check if operation is running.

is_completed property

is_completed: bool

Check if operation completed successfully.

operation_id property

operation_id: str

Get operation ID.

add_source

add_source(source: CancelationSource) -> Cancelable

Add a cancelation source to this operation.

This allows adding custom or composite sources (like AllOfSource) to an existing Cancelable instance.

Parameters:

Name Type Description Default
source CancelationSource

The cancelation source to add

required

Returns:

Type Description
Cancelable

Self for method chaining

Example
from hother.cancelable.sources.composite import AllOfSource

cancelable = Cancelable(name="my_op")
all_of = AllOfSource([timeout_source, condition_source])
cancelable.add_source(all_of)
Source code in src/hother/cancelable/core/cancelable.py
def add_source(self, source: CancelationSource) -> Cancelable:
    """Add a cancelation source to this operation.

    This allows adding custom or composite sources (like AllOfSource) to an existing
    Cancelable instance.

    Args:
        source: The cancelation source to add

    Returns:
        Self for method chaining

    Example:
        ```python
        from hother.cancelable.sources.composite import AllOfSource

        cancelable = Cancelable(name="my_op")
        all_of = AllOfSource([timeout_source, condition_source])
        cancelable.add_source(all_of)
        ```
    """
    self._sources.append(source)
    return self

with_timeout classmethod

with_timeout(
    timeout: float | timedelta,
    operation_id: str | None = None,
    name: str | None = None,
    **kwargs: Any,
) -> Cancelable

Create cancelable with timeout.

Parameters:

Name Type Description Default
timeout float | timedelta

Timeout duration in seconds or timedelta

required
operation_id str | None

Optional operation ID

None
name str | None

Optional operation name

None
**kwargs Any

Additional arguments for Cancelable

{}

Returns:

Type Description
Cancelable

Configured Cancelable instance

Source code in src/hother/cancelable/core/cancelable.py
@classmethod
def with_timeout(
    cls, timeout: float | timedelta, operation_id: str | None = None, name: str | None = None, **kwargs: Any
) -> Cancelable:
    """Create cancelable with timeout.

    Args:
        timeout: Timeout duration in seconds or timedelta
        operation_id: Optional operation ID
        name: Optional operation name
        **kwargs: Additional arguments for Cancelable

    Returns:
        Configured Cancelable instance
    """
    from hother.cancelable.sources.timeout import TimeoutSource

    if isinstance(timeout, timedelta):
        timeout = timeout.total_seconds()

    instance = cls(operation_id=operation_id, name=name or f"timeout_{timeout}s", **kwargs)
    instance._sources.append(TimeoutSource(timeout))
    return instance

with_token classmethod

with_token(
    token: CancelationToken,
    operation_id: str | None = None,
    name: str | None = None,
    **kwargs: Any,
) -> Cancelable

Create a Cancelable operation using an existing cancellation token.

This factory method allows you to create a cancellable operation that shares a cancellation token with other operations, enabling coordinated cancellation.

Parameters:

Name Type Description Default
token CancelationToken

The CancelationToken to use for this operation

required
operation_id str | None

Optional custom operation identifier

None
name str | None

Optional operation name (defaults to "token_based")

None
**kwargs Any

Additional arguments passed to Cancelable constructor

{}

Returns:

Type Description
Cancelable

A configured Cancelable instance using the provided token

Example
# Share a token between multiple operations
shared_token = CancelationToken()

async with Cancelable.with_token(shared_token, name="task1") as cancel1:
    # ... operation 1 ...

async with Cancelable.with_token(shared_token, name="task2") as cancel2:
    # ... operation 2 ...

# Cancel both operations via the shared token
await shared_token.cancel()
Source code in src/hother/cancelable/core/cancelable.py
@classmethod
def with_token(
    cls, token: CancelationToken, operation_id: str | None = None, name: str | None = None, **kwargs: Any
) -> Cancelable:
    """Create a Cancelable operation using an existing cancellation token.

    This factory method allows you to create a cancellable operation that shares
    a cancellation token with other operations, enabling coordinated cancellation.

    Args:
        token: The CancelationToken to use for this operation
        operation_id: Optional custom operation identifier
        name: Optional operation name (defaults to "token_based")
        **kwargs: Additional arguments passed to Cancelable constructor

    Returns:
        A configured Cancelable instance using the provided token

    Example:
        ```python
        # Share a token between multiple operations
        shared_token = CancelationToken()

        async with Cancelable.with_token(shared_token, name="task1") as cancel1:
            # ... operation 1 ...

        async with Cancelable.with_token(shared_token, name="task2") as cancel2:
            # ... operation 2 ...

        # Cancel both operations via the shared token
        await shared_token.cancel()
        ```
    """
    instance = cls(operation_id=operation_id, name=name or "token_based", **kwargs)
    # Replace default token with provided one
    logger.debug(f"with_token: Replacing default token {instance._token.id} with user token {token.id}")
    instance._token = token
    logger.debug(f"with_token: Created cancelable {instance.context.id} with user token {token.id}")
    return instance

with_signal classmethod

with_signal(
    *signals: int,
    operation_id: str | None = None,
    name: str | None = None,
    **kwargs: Any,
) -> Cancelable

Create cancelable with signal handling.

Parameters:

Name Type Description Default
*signals int

Signal numbers to handle

()
operation_id str | None

Optional operation ID

None
name str | None

Optional operation name

None
**kwargs Any

Additional arguments for Cancelable

{}

Returns:

Type Description
Cancelable

Configured Cancelable instance

Source code in src/hother/cancelable/core/cancelable.py
@classmethod
def with_signal(cls, *signals: int, operation_id: str | None = None, name: str | None = None, **kwargs: Any) -> Cancelable:
    """Create cancelable with signal handling.

    Args:
        *signals: Signal numbers to handle
        operation_id: Optional operation ID
        name: Optional operation name
        **kwargs: Additional arguments for Cancelable

    Returns:
        Configured Cancelable instance
    """
    from hother.cancelable.sources.signal import SignalSource

    instance = cls(operation_id=operation_id, name=name or "signal_based", **kwargs)
    instance._sources.append(SignalSource(*signals))
    return instance

with_condition classmethod

with_condition(
    condition: Callable[[], bool | Awaitable[bool]],
    check_interval: float = 0.1,
    condition_name: str | None = None,
    operation_id: str | None = None,
    name: str | None = None,
    **kwargs: Any,
) -> Cancelable

Create cancelable with condition checking.

Parameters:

Name Type Description Default
condition Callable[[], bool | Awaitable[bool]]

Callable that returns True when cancelation should occur

required
check_interval float

How often to check the condition (seconds)

0.1
condition_name str | None

Name for the condition (for logging)

None
operation_id str | None

Optional operation ID

None
name str | None

Optional operation name

None
**kwargs Any

Additional arguments for Cancelable

{}

Returns:

Type Description
Cancelable

Configured Cancelable instance

Source code in src/hother/cancelable/core/cancelable.py
@classmethod
def with_condition(
    cls,
    condition: Callable[[], bool | Awaitable[bool]],
    check_interval: float = 0.1,
    condition_name: str | None = None,
    operation_id: str | None = None,
    name: str | None = None,
    **kwargs: Any,
) -> Cancelable:
    """Create cancelable with condition checking.

    Args:
        condition: Callable that returns True when cancelation should occur
        check_interval: How often to check the condition (seconds)
        condition_name: Name for the condition (for logging)
        operation_id: Optional operation ID
        name: Optional operation name
        **kwargs: Additional arguments for Cancelable

    Returns:
        Configured Cancelable instance
    """
    from hother.cancelable.sources.condition import ConditionSource

    instance = cls(operation_id=operation_id, name=name or "condition_based", **kwargs)
    instance._sources.append(ConditionSource(condition, check_interval, condition_name))
    return instance

combine

combine(*others: Cancelable) -> Cancelable

Combine multiple Cancelable operations into a single coordinated operation.

Creates a new Cancelable that will be cancelled if ANY of the combined operations is cancelled. All cancellation sources from the combined operations are merged together.

Parameters:

Name Type Description Default
*others Cancelable

One or more Cancelable instances to combine with this one

()

Returns:

Type Description
Cancelable

A new Cancelable instance that coordinates cancellation across all

Cancelable

combined operations. When entered, all operations' tokens are linked.

Example
# Combine timeout and signal handling
timeout_cancel = Cancelable.with_timeout(30.0)
signal_cancel = Cancelable.with_signal(signal.SIGTERM)

async with timeout_cancel.combine(signal_cancel) as cancel:
    # Operation will cancel on either timeout OR signal
    await long_running_operation()
Note

The combined Cancelable preserves the cancellation reason from whichever source triggers first.

Source code in src/hother/cancelable/core/cancelable.py
def combine(self, *others: Cancelable) -> Cancelable:
    """Combine multiple Cancelable operations into a single coordinated operation.

    Creates a new Cancelable that will be cancelled if ANY of the combined
    operations is cancelled. All cancellation sources from the combined
    operations are merged together.

    Args:
        *others: One or more Cancelable instances to combine with this one

    Returns:
        A new Cancelable instance that coordinates cancellation across all
        combined operations. When entered, all operations' tokens are linked.

    Example:
        ```python
        # Combine timeout and signal handling
        timeout_cancel = Cancelable.with_timeout(30.0)
        signal_cancel = Cancelable.with_signal(signal.SIGTERM)

        async with timeout_cancel.combine(signal_cancel) as cancel:
            # Operation will cancel on either timeout OR signal
            await long_running_operation()
        ```

    Note:
        The combined Cancelable preserves the cancellation reason from
        whichever source triggers first.
    """
    logger.debug("=== COMBINE CALLED ===")
    logger.debug(f"Self: {self.context.id} ({self.context.name}) with token {self._token.id}")
    for i, other in enumerate(others):
        logger.debug(f"Other {i}: {other.context.id} ({other.context.name}) with token {other._token.id}")

    combined = Cancelable(
        name=f"combined_{self.context.name}",
        metadata={
            "sources": [self.context.id] + [o.context.id for o in others],
            "combined": True,
            "preserve_reason": True,  # Add this flag
        },
    )

    logger.debug(f"Created combined cancelable: {combined.context.id} with default token {combined._token.id}")

    # Store the actual cancelables to link their tokens later
    combined._cancellables_to_link = [self] + list(others)
    logger.debug(f"Will link to {len(combined._cancellables_to_link)} cancelables:")
    for i, c in enumerate(combined._cancellables_to_link):
        logger.debug(f"  {i}: {c.context.id} with token {c._token.id}")

    # Combine all sources
    combined._sources.extend(self._sources)
    for other in others:
        combined._sources.extend(other._sources)

    logger.debug(
        "Created combined cancelable",
        extra={
            "operation_id": combined.context.id,
            "source_count": len(combined._sources),
        },
    )

    return combined

on_progress

on_progress(callback: ProgressCallbackType) -> Cancelable

Register a callback to be invoked when progress is reported.

The callback will be called whenever report_progress() is invoked on this operation. Both sync and async callbacks are supported.

Parameters:

Name Type Description Default
callback ProgressCallbackType

Function to call on progress updates. Receives: - operation_id (str): The ID of the operation - message (Any): The progress message - metadata (dict[str, Any] | None): Optional metadata

required

Returns:

Type Description
Cancelable

Self for method chaining

Example
async with Cancelable(name="download") as cancel:
    cancel.on_progress(lambda id, msg, meta: print(f"Progress: {msg}"))

    for i in range(100):
        await cancel.report_progress(f"{i}% complete")
        await asyncio.sleep(0.1)
Source code in src/hother/cancelable/core/cancelable.py
def on_progress(
    self,
    callback: ProgressCallbackType,
) -> Cancelable:
    """Register a callback to be invoked when progress is reported.

    The callback will be called whenever `report_progress()` is invoked
    on this operation. Both sync and async callbacks are supported.

    Args:
        callback: Function to call on progress updates. Receives:
            - operation_id (str): The ID of the operation
            - message (Any): The progress message
            - metadata (dict[str, Any] | None): Optional metadata

    Returns:
        Self for method chaining

    Example:
        ```python
        async with Cancelable(name="download") as cancel:
            cancel.on_progress(lambda id, msg, meta: print(f"Progress: {msg}"))

            for i in range(100):
                await cancel.report_progress(f"{i}% complete")
                await asyncio.sleep(0.1)
        ```
    """
    self._progress_callbacks.append(callback)
    return self

on_start

on_start(callback: StatusCallbackType) -> Cancelable

Register a callback to be invoked when the operation starts.

The callback is triggered when entering the async context (on __aenter__).

Parameters:

Name Type Description Default
callback StatusCallbackType

Function receiving the OperationContext. Can be sync or async.

required

Returns:

Type Description
Cancelable

Self for method chaining

Source code in src/hother/cancelable/core/cancelable.py
def on_start(self, callback: StatusCallbackType) -> Cancelable:
    """Register a callback to be invoked when the operation starts.

    The callback is triggered when entering the async context (on `__aenter__`).

    Args:
        callback: Function receiving the OperationContext. Can be sync or async.

    Returns:
        Self for method chaining
    """
    self._status_callbacks["start"].append(callback)
    return self

on_complete

on_complete(callback: StatusCallbackType) -> Cancelable

Register a callback to be invoked when the operation completes successfully.

The callback is triggered when exiting the context without cancellation or error.

Parameters:

Name Type Description Default
callback StatusCallbackType

Function receiving the OperationContext. Can be sync or async.

required

Returns:

Type Description
Cancelable

Self for method chaining

Source code in src/hother/cancelable/core/cancelable.py
def on_complete(self, callback: StatusCallbackType) -> Cancelable:
    """Register a callback to be invoked when the operation completes successfully.

    The callback is triggered when exiting the context without cancellation or error.

    Args:
        callback: Function receiving the OperationContext. Can be sync or async.

    Returns:
        Self for method chaining
    """
    self._status_callbacks["complete"].append(callback)
    return self

on_cancel

on_cancel(callback: StatusCallbackType) -> Cancelable

Register a callback to be invoked when the operation is cancelled.

The callback is triggered when the operation is cancelled by any source (timeout, signal, token, condition, or parent cancellation).

Parameters:

Name Type Description Default
callback StatusCallbackType

Function receiving the OperationContext. Can be sync or async.

required

Returns:

Type Description
Cancelable

Self for method chaining

Source code in src/hother/cancelable/core/cancelable.py
def on_cancel(self, callback: StatusCallbackType) -> Cancelable:
    """Register a callback to be invoked when the operation is cancelled.

    The callback is triggered when the operation is cancelled by any source
    (timeout, signal, token, condition, or parent cancellation).

    Args:
        callback: Function receiving the OperationContext. Can be sync or async.

    Returns:
        Self for method chaining
    """
    self._status_callbacks["cancel"].append(callback)
    return self

on_error

on_error(callback: ErrorCallbackType) -> Cancelable

Register a callback to be invoked when the operation encounters an error.

The callback is triggered when an exception (other than CancelledError) is raised within the operation context.

Parameters:

Name Type Description Default
callback ErrorCallbackType

Function receiving the OperationContext and Exception. Can be sync or async.

required

Returns:

Type Description
Cancelable

Self for method chaining

Source code in src/hother/cancelable/core/cancelable.py
def on_error(
    self,
    callback: ErrorCallbackType,
) -> Cancelable:
    """Register a callback to be invoked when the operation encounters an error.

    The callback is triggered when an exception (other than CancelledError)
    is raised within the operation context.

    Args:
        callback: Function receiving the OperationContext and Exception.
            Can be sync or async.

    Returns:
        Self for method chaining
    """
    self._status_callbacks["error"].append(callback)
    return self

report_progress async

report_progress(
    message: Any, metadata: dict[str, Any] | None = None
) -> None

Report progress to all registered callbacks.

Parameters:

Name Type Description Default
message Any

Progress message

required
metadata dict[str, Any] | None

Optional metadata dict

None
Source code in src/hother/cancelable/core/cancelable.py
async def report_progress(self, message: Any, metadata: dict[str, Any] | None = None) -> None:
    """Report progress to all registered callbacks.

    Args:
        message: Progress message
        metadata: Optional metadata dict
    """
    for callback in self._progress_callbacks:
        try:
            result = callback(self.context.id, message, metadata)
            if inspect.iscoroutine(result):
                await result
        except Exception as e:
            logger.error(
                "Progress callback error for operation %s: %s",
                self.context.id,
                str(e),
                exc_info=True,
            )

check_cancelation async

check_cancelation() -> None

Check if operation is cancelled and raise if so.

This is a public API for checking cancellation state. Use this instead of accessing _token directly.

Raises:

Type Description
CancelledError

If operation is cancelled

Source code in src/hother/cancelable/core/cancelable.py
async def check_cancelation(self) -> None:
    """Check if operation is cancelled and raise if so.

    This is a public API for checking cancellation state.
    Use this instead of accessing `_token` directly.

    Raises:
        anyio.CancelledError: If operation is cancelled
    """
    await self._token.check_async()  # pragma: no cover

run_in_thread async

run_in_thread(
    func: Callable[..., T], *args: Any, **kwargs: Any
) -> T

Run function in thread with proper context propagation.

This method solves the context variable thread safety issue by ensuring that context variables (including _current_operation) are properly propagated to OS threads.

Parameters:

Name Type Description Default
func Callable[..., T]

Function to run in thread

required
*args Any

Positional arguments for func

()
**kwargs Any

Keyword arguments for func

{}

Returns:

Type Description
T

Result of func execution

Example
async with Cancelable(name="main") as cancel:
    # Context is propagated to thread
    result = await cancel.run_in_thread(expensive_computation, data)
Source code in src/hother/cancelable/core/cancelable.py
async def run_in_thread(self, func: Callable[..., T], *args: Any, **kwargs: Any) -> T:
    """Run function in thread with proper context propagation.

    This method solves the context variable thread safety issue by ensuring
    that context variables (including _current_operation) are properly
    propagated to OS threads.

    Args:
        func: Function to run in thread
        *args: Positional arguments for func
        **kwargs: Keyword arguments for func

    Returns:
        Result of func execution

    Example:
        ```python
        async with Cancelable(name="main") as cancel:
            # Context is propagated to thread
            result = await cancel.run_in_thread(expensive_computation, data)
        ```
    """
    # Store current context for thread propagation
    ctx = ContextBridge.copy_context()

    def thread_func():
        # Restore context in thread
        ContextBridge.restore_context(ctx)
        # Set current operation in thread
        _current_operation.set(self)
        return func(*args, **kwargs)

    # Run in thread with context
    return await ContextBridge.run_in_thread_with_context(thread_func)

stream async

stream(
    async_iter: AsyncIterator[T],
    report_interval: int | None = None,
    buffer_partial: bool = True,
) -> AsyncIterator[T]

Wrap async iterator with cancelation support.

Parameters:

Name Type Description Default
async_iter AsyncIterator[T]

Async iterator to wrap

required
report_interval int | None

Report progress every N items

None
buffer_partial bool

Whether to buffer items for partial results

True

Yields:

Type Description
AsyncIterator[T]

Items from the wrapped iterator

Source code in src/hother/cancelable/core/cancelable.py
async def stream(
    self,
    async_iter: AsyncIterator[T],
    report_interval: int | None = None,
    buffer_partial: bool = True,
) -> AsyncIterator[T]:
    """Wrap async iterator with cancelation support.

    Args:
        async_iter: Async iterator to wrap
        report_interval: Report progress every N items
        buffer_partial: Whether to buffer items for partial results

    Yields:
        Items from the wrapped iterator
    """
    count = 0
    buffer: list[T] = []

    try:
        async for item in async_iter:
            # Check cancelation
            await self._token.check_async()

            yield item
            count += 1

            if buffer_partial:
                buffer.append(item)
                # Limit buffer size
                if len(buffer) > _MAX_BUFFER_SIZE:
                    buffer = buffer[-_MAX_BUFFER_SIZE:]

            if report_interval and count % report_interval == 0:
                await self.report_progress(f"Processed {count} items", {"count": count, "latest_item": item})

    except anyio.get_cancelled_exc_class():
        # Save partial results
        self.context.partial_result = {
            "count": count,
            "buffer": buffer if buffer_partial else None,
        }
        raise
    except Exception:  # Intentionally broad to save partial results on any error
        # Also save partial results on other exceptions
        self.context.partial_result = {
            "count": count,
            "buffer": buffer if buffer_partial else None,
            "completed": False,
        }
        raise
    else:
        # Save final results if completed normally
        if buffer_partial or count > 0:
            self.context.partial_result = {
                "count": count,
                "buffer": buffer if buffer_partial else None,
                "completed": True,
            }
    finally:
        logger.debug(
            "Stream processing completed for operation %s with %d items",
            self.context.id,
            count,
        )

wrap

wrap(
    operation: Callable[..., Awaitable[R]],
) -> Callable[..., Awaitable[R]]

Wrap an async operation to automatically check for cancelation before execution.

This is useful for retry loops and other patterns where you want automatic cancelation checking without manually accessing the token.

Note: This assumes the cancelable context is already active (you're inside an async with block). It does NOT create a new context.

Parameters:

Name Type Description Default
operation Callable[..., Awaitable[R]]

Async callable to wrap

required

Returns:

Type Description
Callable[..., Awaitable[R]]

Wrapped callable that checks cancelation before executing

Example
async with Cancelable.with_timeout(30) as cancel:
    wrapped_fetch = cancel.wrap(fetch_data)

    # In a retry loop
    for attempt in range(3):
        try:
            result = await wrapped_fetch(url)
            break
        except Exception:
            await anyio.sleep(1)
Source code in src/hother/cancelable/core/cancelable.py
def wrap(self, operation: Callable[..., Awaitable[R]]) -> Callable[..., Awaitable[R]]:
    """Wrap an async operation to automatically check for cancelation before execution.

    This is useful for retry loops and other patterns where you want automatic
    cancelation checking without manually accessing the token.

    Note: This assumes the cancelable context is already active (you're inside
    an `async with` block). It does NOT create a new context.

    Args:
        operation: Async callable to wrap

    Returns:
        Wrapped callable that checks cancelation before executing

    Example:
        ```python
        async with Cancelable.with_timeout(30) as cancel:
            wrapped_fetch = cancel.wrap(fetch_data)

            # In a retry loop
            for attempt in range(3):
                try:
                    result = await wrapped_fetch(url)
                    break
                except Exception:
                    await anyio.sleep(1)
        ```
    """

    @wraps(operation)
    async def wrapped(*args: Any, **kwargs: Any) -> R:
        # Check cancelation before executing
        await self._token.check_async()
        return await operation(*args, **kwargs)

    return wrapped

wrapping async

wrapping() -> AsyncIterator[Callable[..., Awaitable[R]]]

Async context manager that yields a wrap function for scoped operation wrapping.

The yielded wrap function checks cancelation before executing any operation. This is useful for retry loops where you want all operations in a scope to be automatically wrapped with cancelation checking.

Yields:

Type Description
AsyncIterator[Callable[..., Awaitable[R]]]

A wrap function that checks cancelation before executing operations

Example
from tenacity import AsyncRetrying, stop_after_attempt

async with Cancelable.with_timeout(30) as cancel:
    async for attempt in AsyncRetrying(stop=stop_after_attempt(3)):
        with attempt:
            async with cancel.wrapping() as wrap:
                result = await wrap(fetch_data, url)
Source code in src/hother/cancelable/core/cancelable.py
@asynccontextmanager
async def wrapping(self) -> AsyncIterator[Callable[..., Awaitable[R]]]:
    """Async context manager that yields a wrap function for scoped operation wrapping.

    The yielded wrap function checks cancelation before executing any operation.
    This is useful for retry loops where you want all operations in a scope to
    be automatically wrapped with cancelation checking.

    Yields:
        A wrap function that checks cancelation before executing operations

    Example:
        ```python
        from tenacity import AsyncRetrying, stop_after_attempt

        async with Cancelable.with_timeout(30) as cancel:
            async for attempt in AsyncRetrying(stop=stop_after_attempt(3)):
                with attempt:
                    async with cancel.wrapping() as wrap:
                        result = await wrap(fetch_data, url)
        ```
    """

    async def wrap_fn(fn: Callable[..., Awaitable[R]], *args: Any, **kwargs: Any) -> R:
        await self._token.check_async()
        return await fn(*args, **kwargs)

    yield wrap_fn

shield async

Shield a section from cancelation.

Creates a child operation that is protected from cancelation but still participates in the operation hierarchy for monitoring and tracking.

Yields:

Type Description
AsyncIterator[Cancelable]

A new Cancelable for the shielded section

Source code in src/hother/cancelable/core/cancelable.py
@asynccontextmanager
async def shield(self) -> AsyncIterator[Cancelable]:
    """Shield a section from cancelation.

    Creates a child operation that is protected from cancelation but still
    participates in the operation hierarchy for monitoring and tracking.

    Yields:
        A new Cancelable for the shielded section
    """
    # Create properly integrated child cancelable
    shielded = Cancelable(name=f"{self.context.name}_shielded", metadata={"shielded": True})
    # Manually set parent relationship for hierarchy tracking but don't add to _children
    # to prevent automatic cancelation propagation
    shielded.context.parent_id = self.context.id

    # Override token linking to prevent cancelation propagation
    # The shielded operation should not be cancelled by parent token
    shielded._token = LinkedCancelationToken()  # Fresh token, no parent linking

    # Use anyio's CancelScope with shield=True
    with anyio.CancelScope(shield=True) as shield_scope:
        self._shields.append(shield_scope)
        try:
            shielded.context.update_status(OperationStatus.SHIELDED)
            yield shielded
        finally:
            # Shield is always in list at this point (added at line 783)
            self._shields.remove(shield_scope)

    # Force a checkpoint after shield to allow cancelation to propagate
    # We need to be in an async context for this to work properly
    try:
        await anyio.lowlevel.checkpoint()  # type: ignore[attr-defined]
    except:
        # Re-raise any exception including CancelledError
        raise

cancel async

cancel(
    reason: CancelationReason = MANUAL,
    message: str | None = None,
    propagate_to_children: bool = True,
) -> None

Cancel the operation.

Parameters:

Name Type Description Default
reason CancelationReason

Reason for cancelation

MANUAL
message str | None

Optional cancelation message

None
propagate_to_children bool

Whether to cancel child operations

True
Source code in src/hother/cancelable/core/cancelable.py
async def cancel(
    self,
    reason: CancelationReason = CancelationReason.MANUAL,
    message: str | None = None,
    propagate_to_children: bool = True,
) -> None:
    """Cancel the operation.

    Args:
        reason: Reason for cancelation
        message: Optional cancelation message
        propagate_to_children: Whether to cancel child operations
    """
    # Cancel our token
    await self._token.cancel(reason, message)

    # Cancel children if requested
    if propagate_to_children:
        children_to_cancel = list(self._children)  # Snapshot to avoid modification during iteration
        for child in children_to_cancel:
            if child and not child.is_cancelled:
                await child.cancel(
                    CancelationReason.PARENT,
                    f"Parent operation {self.context.id[:8]} cancelled",
                    propagate_to_children=True,
                )

    # Clear references to help GC after cancelation
    self._children.clear()
    self._parent_ref = None

    # Log without duplicating cancel_reason
    log_ctx = self.context.log_context()
    # Remove cancel_reason from log_context if it exists to avoid duplication
    log_ctx.pop("cancel_reason", None)

    logger.info(
        "Operation cancelled",
        extra={
            **log_ctx,
            "cancel_reason": reason.value,
            "cancel_message": message,
        },
    )

Cancelation Tokens

CancelationToken

Thread-safe token for manual cancelation.

hother.cancelable.core.token.CancelationToken

Bases: BaseModel

Thread-safe cancelation token that can be shared across tasks.

Attributes:

Name Type Description
id str

Unique token identifier

is_cancelled bool

Whether the token has been cancelled

reason CancelationReason | None

Reason for cancelation

message str | None

Optional cancelation message

cancelled_at datetime | None

When the token was cancelled

Source code in src/hother/cancelable/core/token.py
class CancelationToken(BaseModel):
    """Thread-safe cancelation token that can be shared across tasks.

    Attributes:
        id: Unique token identifier
        is_cancelled: Whether the token has been cancelled
        reason: Reason for cancelation
        message: Optional cancelation message
        cancelled_at: When the token was cancelled
    """

    model_config = ConfigDict(arbitrary_types_allowed=True)

    id: str = Field(default_factory=lambda: str(uuid.uuid4()))
    is_cancelled: bool = False
    reason: CancelationReason | None = None
    message: str | None = None
    cancelled_at: datetime | None = None

    # Private fields using PrivateAttr
    _event: Any = PrivateAttr(default=None)
    _lock: Any = PrivateAttr(default=None)
    _state_lock: Any = PrivateAttr(default=None)  # Thread-safe lock for state updates
    _callbacks: Any = PrivateAttr(default=None)

    def __init__(self, **data: Any) -> None:
        super().__init__(**data)
        self._event = anyio.Event()
        self._lock = anyio.Lock()
        self._state_lock = threading.Lock()  # Thread-safe lock for state updates
        self._callbacks = []

        logger.debug(f"Created cancelation token {self.id}")

    def __hash__(self) -> int:
        """Make token hashable based on ID."""
        return hash(self.id)

    def __eq__(self, other: object) -> bool:
        """Check equality based on ID."""
        if not isinstance(other, CancelationToken):
            return False
        return self.id == other.id

    async def cancel(
        self,
        reason: CancelationReason = CancelationReason.MANUAL,
        message: str | None = None,
    ) -> bool:
        """Cancel the token.

        Args:
            reason: Reason for cancelation
            message: Optional cancelation message

        Returns:
            True if token was cancelled, False if already cancelled
        """
        logger.info(f"=== CANCEL CALLED on token {self.id} ===")
        async with self._lock:
            if self.is_cancelled:
                logger.debug(
                    "Token already cancelled",
                    extra={
                        "token_id": self.id,
                        "original_reason": self.reason.value if self.reason else None,
                    },
                )
                return False

            self.is_cancelled = True
            self.reason = reason
            self.message = message
            self.cancelled_at = datetime.now(UTC)
            self._event.set()

            logger.info(
                f"Token {self.id} cancelled - calling {len(self._callbacks)} callbacks",
                extra={
                    "token_id": self.id,
                    "reason": reason.value,
                    "cancel_message": message,
                    "callback_count": len(self._callbacks),
                },
            )

            # Notify callbacks
            for i, callback in enumerate(list(self._callbacks)):
                try:
                    logger.debug(f"Calling callback {i} for token {self.id}")
                    await callback(self)
                    logger.debug(f"Callback {i} completed successfully")
                except Exception as e:
                    logger.error(
                        "Error in cancelation callback",
                        extra={
                            "token_id": self.id,
                            "callback_index": i,
                            "error": str(e),
                        },
                        exc_info=True,
                    )

            logger.info(f"=== CANCEL COMPLETED for token {self.id} ===")
            return True

    def cancel_sync(
        self,
        reason: CancelationReason = CancelationReason.MANUAL,
        message: str | None = None,
    ) -> bool:
        """Thread-safe synchronous cancelation from any thread.

        This method can be called from regular Python threads (pynput, signal handlers, etc.)
        and will safely cancel the token and notify async waiters via the anyio bridge.

        Args:
            reason: Reason for cancelation
            message: Optional cancelation message

        Returns:
            True if token was cancelled, False if already cancelled

        Example:
            ```python
            def on_signal(signum):
                # Called from signal handler thread
                token.cancel_sync(CancelationReason.SIGNAL)
            ```
        """
        logger.info(f"=== CANCEL_SYNC CALLED on token {self.id} from thread ===")

        # Update state with thread-safe lock
        with self._state_lock:
            if self.is_cancelled:
                logger.debug(
                    "Token already cancelled",
                    extra={
                        "token_id": self.id,
                        "original_reason": self.reason.value if self.reason else None,
                    },
                )
                return False

            self.is_cancelled = True
            self.reason = reason
            self.message = message
            self.cancelled_at = datetime.now(UTC)

        logger.debug(
            f"Token {self.id} cancelled (sync) - notifying async waiters",
            extra={
                "token_id": self.id,
                "reason": reason.value,
                "cancel_message": message,
            },
        )

        # Notify async waiters (thread-safe)
        self._notify_async_waiters()

        # Schedule callbacks (thread-safe)
        self._schedule_callbacks()

        logger.debug(f"=== CANCEL_SYNC COMPLETED for token {self.id} ===")
        return True

    def _notify_async_waiters(self) -> None:
        """Set the anyio event from a thread.

        Uses the anyio bridge to safely set the event in the anyio context.
        """

        def set_event() -> None:
            self._event.set()

        call_soon_threadsafe(set_event)

    def _schedule_callbacks(self) -> None:
        """Schedule callbacks to run in the anyio context.

        Uses the anyio bridge to safely execute callbacks from a thread.
        """
        # Take snapshot of callbacks with thread-safe lock
        with self._state_lock:
            callbacks_to_call = list(self._callbacks)

        logger.debug(
            f"Scheduling {len(callbacks_to_call)} callbacks for token {self.id}",
            extra={
                "token_id": self.id,
                "callback_count": len(callbacks_to_call),
            },
        )

        # Schedule each callback via bridge
        for i, callback in enumerate(callbacks_to_call):

            async def run_callback(idx: int = i, cb: Any = callback) -> None:  # Capture loop variables
                try:
                    logger.debug(f"Calling callback {idx} for token {self.id}")
                    await cb(self)
                    logger.debug(f"Callback {idx} completed successfully")
                except Exception as e:
                    logger.error(
                        "Error in cancelation callback",
                        exc_info=True,
                        extra={"token_id": self.id, "callback_index": idx, "error": str(e)},
                    )

            call_soon_threadsafe(run_callback)

    async def wait_for_cancel(self) -> None:
        """Wait until token is cancelled."""
        await self._event.wait()

    def check(self) -> None:
        """Check if cancelled and raise exception if so.

        Raises:
            ManualCancelation: If token is cancelled
        """
        if self.is_cancelled:
            logger.debug("Token check triggered cancelation", extra={"token_id": self.id})
            raise ManualCancelation(
                message=self.message or "Operation cancelled via token",
            )

    async def check_async(self) -> None:
        """Async version of check that allows for proper async cancelation.

        Raises:
            anyio.CancelledError: If token is cancelled
        """
        if self.is_cancelled:
            logger.debug("Token async check triggered cancelation", extra={"token_id": self.id})
            raise anyio.get_cancelled_exc_class()(self.message or "Operation cancelled via token")

    def is_cancelation_requested(self) -> bool:
        """Non-throwing check for cancelation.

        Returns:
            True if cancelation has been requested
        """
        return self.is_cancelled

    async def register_callback(self, callback: Callable[[CancelationToken], Awaitable[None]]) -> None:
        """Register a callback to be called on cancelation.

        The callback should accept the token as its only argument.

        Args:
            callback: Async callable that accepts the token
        """
        logger.info(f"Registering callback for token {self.id} (currently {len(self._callbacks)} callbacks)")
        async with self._lock:
            self._callbacks.append(callback)
            logger.info(f"Callback registered. Now {len(self._callbacks)} callbacks for token {self.id}")

            # If already cancelled, call immediately
            if self.is_cancelled:
                logger.info(f"Token {self.id} already cancelled, calling callback immediately")
                try:
                    await callback(self)
                except Exception as e:
                    logger.error(
                        "Error in immediate cancelation callback",
                        extra={
                            "token_id": self.id,
                            "error": str(e),
                        },
                        exc_info=True,
                    )

    def __str__(self) -> str:
        """String representation of token."""
        if self.is_cancelled:
            return f"CancelationToken(id={self.id[:8]}, cancelled={self.reason.value if self.reason else 'unknown'})"
        return f"CancelationToken(id={self.id[:8]}, active)"

    def __repr__(self) -> str:
        """Detailed representation of token."""
        return (
            f"CancelationToken(id='{self.id}', is_cancelled={self.is_cancelled}, "
            f"reason={self.reason}, message='{self.message}')"
        )

model_config class-attribute instance-attribute

model_config = ConfigDict(arbitrary_types_allowed=True)

id class-attribute instance-attribute

id: str = Field(default_factory=lambda: str(uuid4()))

is_cancelled class-attribute instance-attribute

is_cancelled: bool = False

reason class-attribute instance-attribute

reason: CancelationReason | None = None

message class-attribute instance-attribute

message: str | None = None

cancelled_at class-attribute instance-attribute

cancelled_at: datetime | None = None

cancel async

cancel(
    reason: CancelationReason = MANUAL,
    message: str | None = None,
) -> bool

Cancel the token.

Parameters:

Name Type Description Default
reason CancelationReason

Reason for cancelation

MANUAL
message str | None

Optional cancelation message

None

Returns:

Type Description
bool

True if token was cancelled, False if already cancelled

Source code in src/hother/cancelable/core/token.py
async def cancel(
    self,
    reason: CancelationReason = CancelationReason.MANUAL,
    message: str | None = None,
) -> bool:
    """Cancel the token.

    Args:
        reason: Reason for cancelation
        message: Optional cancelation message

    Returns:
        True if token was cancelled, False if already cancelled
    """
    logger.info(f"=== CANCEL CALLED on token {self.id} ===")
    async with self._lock:
        if self.is_cancelled:
            logger.debug(
                "Token already cancelled",
                extra={
                    "token_id": self.id,
                    "original_reason": self.reason.value if self.reason else None,
                },
            )
            return False

        self.is_cancelled = True
        self.reason = reason
        self.message = message
        self.cancelled_at = datetime.now(UTC)
        self._event.set()

        logger.info(
            f"Token {self.id} cancelled - calling {len(self._callbacks)} callbacks",
            extra={
                "token_id": self.id,
                "reason": reason.value,
                "cancel_message": message,
                "callback_count": len(self._callbacks),
            },
        )

        # Notify callbacks
        for i, callback in enumerate(list(self._callbacks)):
            try:
                logger.debug(f"Calling callback {i} for token {self.id}")
                await callback(self)
                logger.debug(f"Callback {i} completed successfully")
            except Exception as e:
                logger.error(
                    "Error in cancelation callback",
                    extra={
                        "token_id": self.id,
                        "callback_index": i,
                        "error": str(e),
                    },
                    exc_info=True,
                )

        logger.info(f"=== CANCEL COMPLETED for token {self.id} ===")
        return True

cancel_sync

cancel_sync(
    reason: CancelationReason = MANUAL,
    message: str | None = None,
) -> bool

Thread-safe synchronous cancelation from any thread.

This method can be called from regular Python threads (pynput, signal handlers, etc.) and will safely cancel the token and notify async waiters via the anyio bridge.

Parameters:

Name Type Description Default
reason CancelationReason

Reason for cancelation

MANUAL
message str | None

Optional cancelation message

None

Returns:

Type Description
bool

True if token was cancelled, False if already cancelled

Example
def on_signal(signum):
    # Called from signal handler thread
    token.cancel_sync(CancelationReason.SIGNAL)
Source code in src/hother/cancelable/core/token.py
def cancel_sync(
    self,
    reason: CancelationReason = CancelationReason.MANUAL,
    message: str | None = None,
) -> bool:
    """Thread-safe synchronous cancelation from any thread.

    This method can be called from regular Python threads (pynput, signal handlers, etc.)
    and will safely cancel the token and notify async waiters via the anyio bridge.

    Args:
        reason: Reason for cancelation
        message: Optional cancelation message

    Returns:
        True if token was cancelled, False if already cancelled

    Example:
        ```python
        def on_signal(signum):
            # Called from signal handler thread
            token.cancel_sync(CancelationReason.SIGNAL)
        ```
    """
    logger.info(f"=== CANCEL_SYNC CALLED on token {self.id} from thread ===")

    # Update state with thread-safe lock
    with self._state_lock:
        if self.is_cancelled:
            logger.debug(
                "Token already cancelled",
                extra={
                    "token_id": self.id,
                    "original_reason": self.reason.value if self.reason else None,
                },
            )
            return False

        self.is_cancelled = True
        self.reason = reason
        self.message = message
        self.cancelled_at = datetime.now(UTC)

    logger.debug(
        f"Token {self.id} cancelled (sync) - notifying async waiters",
        extra={
            "token_id": self.id,
            "reason": reason.value,
            "cancel_message": message,
        },
    )

    # Notify async waiters (thread-safe)
    self._notify_async_waiters()

    # Schedule callbacks (thread-safe)
    self._schedule_callbacks()

    logger.debug(f"=== CANCEL_SYNC COMPLETED for token {self.id} ===")
    return True

wait_for_cancel async

wait_for_cancel() -> None

Wait until token is cancelled.

Source code in src/hother/cancelable/core/token.py
async def wait_for_cancel(self) -> None:
    """Wait until token is cancelled."""
    await self._event.wait()

check

check() -> None

Check if cancelled and raise exception if so.

Raises:

Type Description
ManualCancelation

If token is cancelled

Source code in src/hother/cancelable/core/token.py
def check(self) -> None:
    """Check if cancelled and raise exception if so.

    Raises:
        ManualCancelation: If token is cancelled
    """
    if self.is_cancelled:
        logger.debug("Token check triggered cancelation", extra={"token_id": self.id})
        raise ManualCancelation(
            message=self.message or "Operation cancelled via token",
        )

check_async async

check_async() -> None

Async version of check that allows for proper async cancelation.

Raises:

Type Description
CancelledError

If token is cancelled

Source code in src/hother/cancelable/core/token.py
async def check_async(self) -> None:
    """Async version of check that allows for proper async cancelation.

    Raises:
        anyio.CancelledError: If token is cancelled
    """
    if self.is_cancelled:
        logger.debug("Token async check triggered cancelation", extra={"token_id": self.id})
        raise anyio.get_cancelled_exc_class()(self.message or "Operation cancelled via token")

is_cancelation_requested

is_cancelation_requested() -> bool

Non-throwing check for cancelation.

Returns:

Type Description
bool

True if cancelation has been requested

Source code in src/hother/cancelable/core/token.py
def is_cancelation_requested(self) -> bool:
    """Non-throwing check for cancelation.

    Returns:
        True if cancelation has been requested
    """
    return self.is_cancelled

register_callback async

register_callback(
    callback: Callable[[CancelationToken], Awaitable[None]],
) -> None

Register a callback to be called on cancelation.

The callback should accept the token as its only argument.

Parameters:

Name Type Description Default
callback Callable[[CancelationToken], Awaitable[None]]

Async callable that accepts the token

required
Source code in src/hother/cancelable/core/token.py
async def register_callback(self, callback: Callable[[CancelationToken], Awaitable[None]]) -> None:
    """Register a callback to be called on cancelation.

    The callback should accept the token as its only argument.

    Args:
        callback: Async callable that accepts the token
    """
    logger.info(f"Registering callback for token {self.id} (currently {len(self._callbacks)} callbacks)")
    async with self._lock:
        self._callbacks.append(callback)
        logger.info(f"Callback registered. Now {len(self._callbacks)} callbacks for token {self.id}")

        # If already cancelled, call immediately
        if self.is_cancelled:
            logger.info(f"Token {self.id} already cancelled, calling callback immediately")
            try:
                await callback(self)
            except Exception as e:
                logger.error(
                    "Error in immediate cancelation callback",
                    extra={
                        "token_id": self.id,
                        "error": str(e),
                    },
                    exc_info=True,
                )

LinkedCancelationToken

Token that combines multiple cancelation tokens.

hother.cancelable.core.token.LinkedCancelationToken

Bases: CancelationToken

Cancelation token that can be linked to other tokens.

When any linked token is cancelled, this token is also cancelled.

Source code in src/hother/cancelable/core/token.py
class LinkedCancelationToken(CancelationToken):
    """Cancelation token that can be linked to other tokens.

    When any linked token is cancelled, this token is also cancelled.
    """

    def __init__(self, **data: Any) -> None:
        super().__init__(**data)
        self._linked_tokens: list[CancelationToken] = []  # Use regular list instead of WeakSet for now

    async def link(self, token: CancelationToken, preserve_reason: bool = False) -> None:
        """Link this token to another token.

        When the linked token is cancelled, this token will also be cancelled.

        Args:
            token: Token to link to
            preserve_reason: Whether to preserve the original cancelation reason
        """

        async def on_linked_cancel(linked_token: CancelationToken):
            if preserve_reason and linked_token.reason:
                # Preserve the original reason for combined cancelables
                await self.cancel(
                    reason=linked_token.reason,
                    message=linked_token.message or f"Linked token {linked_token.id[:8]} was cancelled",
                )
            else:
                # Use PARENT for true parent-child relationships
                await self.cancel(
                    reason=CancelationReason.PARENT,
                    message=f"Linked token {linked_token.id[:8]} was cancelled",
                )

        await token.register_callback(on_linked_cancel)
        self._linked_tokens.append(token)

        logger.debug(
            "Linked cancelation tokens",
            extra={"token_id": self.id, "linked_token_id": token.id, "preserve_reason": preserve_reason},
        )
link(
    token: CancelationToken, preserve_reason: bool = False
) -> None

Link this token to another token.

When the linked token is cancelled, this token will also be cancelled.

Parameters:

Name Type Description Default
token CancelationToken

Token to link to

required
preserve_reason bool

Whether to preserve the original cancelation reason

False
Source code in src/hother/cancelable/core/token.py
async def link(self, token: CancelationToken, preserve_reason: bool = False) -> None:
    """Link this token to another token.

    When the linked token is cancelled, this token will also be cancelled.

    Args:
        token: Token to link to
        preserve_reason: Whether to preserve the original cancelation reason
    """

    async def on_linked_cancel(linked_token: CancelationToken):
        if preserve_reason and linked_token.reason:
            # Preserve the original reason for combined cancelables
            await self.cancel(
                reason=linked_token.reason,
                message=linked_token.message or f"Linked token {linked_token.id[:8]} was cancelled",
            )
        else:
            # Use PARENT for true parent-child relationships
            await self.cancel(
                reason=CancelationReason.PARENT,
                message=f"Linked token {linked_token.id[:8]} was cancelled",
            )

    await token.register_callback(on_linked_cancel)
    self._linked_tokens.append(token)

    logger.debug(
        "Linked cancelation tokens",
        extra={"token_id": self.id, "linked_token_id": token.id, "preserve_reason": preserve_reason},
    )

Models and Status

OperationContext

Tracks the state and metadata of a cancelable operation.

hother.cancelable.core.models.OperationContext

Bases: BaseModel

Complete operation context with metadata and status tracking.

Attributes:

Name Type Description
id str

Unique operation identifier

name str | None

Human-readable operation name

status OperationStatus

Current operation status

start_time datetime

When the operation started

end_time datetime | None

When the operation ended (if applicable)

cancel_reason CancelationReason | None

Reason for cancelation (if cancelled)

cancel_message str | None

Additional cancelation message

error str | None

Error message (if failed)

partial_result Any | None

Any partial results before cancelation

metadata dict[str, Any]

Additional operation metadata

parent_id str | None

Parent operation ID (for nested operations)

child_ids set[str]

Set of child operation IDs

Source code in src/hother/cancelable/core/models.py
class OperationContext(BaseModel):
    """Complete operation context with metadata and status tracking.

    Attributes:
        id: Unique operation identifier
        name: Human-readable operation name
        status: Current operation status
        start_time: When the operation started
        end_time: When the operation ended (if applicable)
        cancel_reason: Reason for cancelation (if cancelled)
        cancel_message: Additional cancelation message
        error: Error message (if failed)
        partial_result: Any partial results before cancelation
        metadata: Additional operation metadata
        parent_id: Parent operation ID (for nested operations)
        child_ids: Set of child operation IDs
    """

    model_config = ConfigDict(arbitrary_types_allowed=True)

    id: str = Field(default_factory=lambda: str(uuid.uuid4()))
    name: str | None = None
    status: OperationStatus = OperationStatus.PENDING
    start_time: datetime = Field(default_factory=lambda: datetime.now(UTC))
    end_time: datetime | None = None
    cancel_reason: CancelationReason | None = None
    cancel_message: str | None = None
    error: str | None = None
    partial_result: Any | None = None
    metadata: dict[str, Any] = Field(default_factory=dict)
    parent_id: str | None = None
    child_ids: set[str] = Field(default_factory=set)

    @property
    def duration(self) -> timedelta | None:
        """Calculate operation duration."""
        if self.end_time:
            return self.end_time - self.start_time
        if self.status == OperationStatus.RUNNING:
            return datetime.now(UTC) - self.start_time
        return None

    @property
    def duration_seconds(self) -> float | None:
        """Get duration in seconds."""
        duration = self.duration
        return duration.total_seconds() if duration else None

    @property
    def is_terminal(self) -> bool:
        """Check if operation is in terminal state."""
        return self.status in {
            OperationStatus.COMPLETED,
            OperationStatus.CANCELLED,
            OperationStatus.FAILED,
            OperationStatus.TIMEOUT,
        }

    @property
    def is_success(self) -> bool:
        """Check if operation completed successfully."""
        return self.status == OperationStatus.COMPLETED

    @property
    def is_cancelled(self) -> bool:
        """Check if operation was cancelled."""
        return self.status in {
            OperationStatus.CANCELLED,
            OperationStatus.TIMEOUT,
        }

    def log_context(self) -> dict[str, Any]:
        """Get context dict for structured logging."""
        return {
            "operation_id": self.id,
            "operation_name": self.name,
            "status": self.status.value,
            "duration_seconds": self.duration_seconds,
            "parent_id": self.parent_id,
            "child_count": len(self.child_ids),
            "has_error": bool(self.error),
            "cancel_reason": self.cancel_reason.value if self.cancel_reason else None,
        }

    def update_status(self, status: OperationStatus) -> None:
        """Update operation status with appropriate logging.

        Args:
            status: New operation status
        """
        old_status = self.status
        self.status = status

        if status in {OperationStatus.COMPLETED, OperationStatus.CANCELLED, OperationStatus.FAILED, OperationStatus.TIMEOUT}:
            self.end_time = datetime.now(UTC)

        logger.info(
            "Operation status changed",
            extra={
                "old_status": old_status.value,
                "new_status": status.value,
                **self.log_context(),
            },
        )

model_config class-attribute instance-attribute

model_config = ConfigDict(arbitrary_types_allowed=True)

id class-attribute instance-attribute

id: str = Field(default_factory=lambda: str(uuid4()))

name class-attribute instance-attribute

name: str | None = None

status class-attribute instance-attribute

start_time class-attribute instance-attribute

start_time: datetime = Field(
    default_factory=lambda: now(UTC)
)

end_time class-attribute instance-attribute

end_time: datetime | None = None

cancel_reason class-attribute instance-attribute

cancel_reason: CancelationReason | None = None

cancel_message class-attribute instance-attribute

cancel_message: str | None = None

error class-attribute instance-attribute

error: str | None = None

partial_result class-attribute instance-attribute

partial_result: Any | None = None

metadata class-attribute instance-attribute

metadata: dict[str, Any] = Field(default_factory=dict)

parent_id class-attribute instance-attribute

parent_id: str | None = None

child_ids class-attribute instance-attribute

child_ids: set[str] = Field(default_factory=set)

duration property

duration: timedelta | None

Calculate operation duration.

duration_seconds property

duration_seconds: float | None

Get duration in seconds.

is_terminal property

is_terminal: bool

Check if operation is in terminal state.

is_success property

is_success: bool

Check if operation completed successfully.

is_cancelled property

is_cancelled: bool

Check if operation was cancelled.

log_context

log_context() -> dict[str, Any]

Get context dict for structured logging.

Source code in src/hother/cancelable/core/models.py
def log_context(self) -> dict[str, Any]:
    """Get context dict for structured logging."""
    return {
        "operation_id": self.id,
        "operation_name": self.name,
        "status": self.status.value,
        "duration_seconds": self.duration_seconds,
        "parent_id": self.parent_id,
        "child_count": len(self.child_ids),
        "has_error": bool(self.error),
        "cancel_reason": self.cancel_reason.value if self.cancel_reason else None,
    }

update_status

update_status(status: OperationStatus) -> None

Update operation status with appropriate logging.

Parameters:

Name Type Description Default
status OperationStatus

New operation status

required
Source code in src/hother/cancelable/core/models.py
def update_status(self, status: OperationStatus) -> None:
    """Update operation status with appropriate logging.

    Args:
        status: New operation status
    """
    old_status = self.status
    self.status = status

    if status in {OperationStatus.COMPLETED, OperationStatus.CANCELLED, OperationStatus.FAILED, OperationStatus.TIMEOUT}:
        self.end_time = datetime.now(UTC)

    logger.info(
        "Operation status changed",
        extra={
            "old_status": old_status.value,
            "new_status": status.value,
            **self.log_context(),
        },
    )

OperationStatus

Enumeration of possible operation states.

hother.cancelable.core.models.OperationStatus

Bases: str, Enum

Operation lifecycle status.

Source code in src/hother/cancelable/core/models.py
class OperationStatus(str, Enum):
    """Operation lifecycle status."""

    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    CANCELLED = "cancelled"
    FAILED = "failed"
    TIMEOUT = "timeout"
    SHIELDED = "shielded"

PENDING class-attribute instance-attribute

PENDING = 'pending'

RUNNING class-attribute instance-attribute

RUNNING = 'running'

COMPLETED class-attribute instance-attribute

COMPLETED = 'completed'

CANCELLED class-attribute instance-attribute

CANCELLED = 'cancelled'

FAILED class-attribute instance-attribute

FAILED = 'failed'

TIMEOUT class-attribute instance-attribute

TIMEOUT = 'timeout'

SHIELDED class-attribute instance-attribute

SHIELDED = 'shielded'

CancelationReason

Enumeration of cancelation reason categories.

hother.cancelable.core.models.CancelationReason

Bases: str, Enum

Reason for cancelation.

Source code in src/hother/cancelable/core/models.py
class CancelationReason(str, Enum):
    """Reason for cancelation."""

    TIMEOUT = "timeout"
    MANUAL = "manual"
    SIGNAL = "signal"
    CONDITION = "condition"
    PARENT = "parent"
    ERROR = "error"

TIMEOUT class-attribute instance-attribute

TIMEOUT = 'timeout'

MANUAL class-attribute instance-attribute

MANUAL = 'manual'

SIGNAL class-attribute instance-attribute

SIGNAL = 'signal'

CONDITION class-attribute instance-attribute

CONDITION = 'condition'

PARENT class-attribute instance-attribute

PARENT = 'parent'

ERROR class-attribute instance-attribute

ERROR = 'error'

Registry

OperationRegistry

Global registry for tracking active cancelable operations.

hother.cancelable.core.registry.OperationRegistry

Singleton registry for tracking all cancelable operations.

Provides centralized management and monitoring of operations across the application.

Source code in src/hother/cancelable/core/registry.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
class OperationRegistry:
    """Singleton registry for tracking all cancelable operations.

    Provides centralized management and monitoring of operations across
    the application.
    """

    _instance: Optional["OperationRegistry"] = None

    def __new__(cls) -> "OperationRegistry":
        """Ensure singleton instance."""
        if cls._instance is None:
            cls._instance = super().__new__(cls)
            cls._instance._initialized = False
        return cls._instance

    def __init__(self):
        """Initialize registry (only once)."""
        if self._initialized:
            return

        self._operations: dict[str, Cancelable] = {}
        self._history: list[OperationContext] = []
        self._history_limit = 1000
        self._lock: anyio.Lock = anyio.Lock()
        self._data_lock = threading.Lock()  # Thread-safe lock for data access
        self._initialized = True

        logger.info("Operation registry initialized")

    @classmethod
    def get_instance(cls) -> "OperationRegistry":
        """Get singleton instance of the registry.

        Returns:
            The global OperationRegistry instance
        """
        if cls._instance is None:
            cls._instance = cls()
        return cls._instance

    async def register(self, operation: "Cancelable") -> None:
        """Register an operation with the registry.

        Args:
            operation: Cancelable operation to register
        """
        async with self._lock:
            with self._data_lock:
                self._operations[operation.context.id] = operation
                total = len(self._operations)

            logger.info(
                "Operation registered",
                extra={
                    "operation_id": operation.context.id,
                    "operation_name": operation.context.name,
                    "total_operations": total,
                },
            )

    async def unregister(self, operation_id: str) -> None:
        """Unregister an operation and add to history.

        Args:
            operation_id: ID of operation to unregister
        """
        async with self._lock:
            with self._data_lock:
                operation = self._operations.pop(operation_id, None)
                if operation:
                    # Add to history
                    self._history.append(operation.context.model_copy(deep=True))

                    # Maintain history limit
                    if len(self._history) > self._history_limit:
                        self._history = self._history[-self._history_limit :]

            if operation:
                logger.debug(
                    "Operation unregistered",
                    extra={
                        "operation_id": operation_id,
                        "final_status": operation.context.status.value,
                        "duration": operation.context.duration_seconds,
                    },
                )

    async def get_operation(self, operation_id: str) -> Optional["Cancelable"]:
        """Get operation by ID.

        Args:
            operation_id: Operation ID to look up

        Returns:
            Cancelable operation or None if not found
        """
        async with self._lock:
            with self._data_lock:
                return self._operations.get(operation_id)

    async def list_operations(
        self,
        status: OperationStatus | None = None,
        parent_id: str | None = None,
        name_pattern: str | None = None,
    ) -> list[OperationContext]:
        """List operations with optional filtering.

        Args:
            status: Filter by operation status
            parent_id: Filter by parent operation ID
            name_pattern: Filter by name (substring match)

        Returns:
            List of matching operation contexts
        """
        async with self._lock:
            with self._data_lock:
                operations = [op.context for op in self._operations.values()]

            # Apply filters (outside lock - operating on copied list)
            if status:
                operations = [op for op in operations if op.status == status]

            if parent_id:
                operations = [op for op in operations if op.parent_id == parent_id]

            if name_pattern:
                operations = [op for op in operations if op.name and name_pattern.lower() in op.name.lower()]

            return operations

    async def cancel_operation(
        self,
        operation_id: str,
        reason: CancelationReason = CancelationReason.MANUAL,
        message: str | None = None,
    ) -> bool:
        """Cancel a specific operation.

        Args:
            operation_id: ID of operation to cancel
            reason: Reason for cancelation
            message: Optional cancelation message

        Returns:
            True if operation was cancelled, False if not found
        """
        if operation := await self.get_operation(operation_id):
            await operation.cancel(reason, message)
            return True

        logger.warning(
            "Attempted to cancel non-existent operation",
            extra={"operation_id": operation_id},
        )
        return False

    async def cancel_all(
        self,
        status: OperationStatus | None = None,
        reason: CancelationReason = CancelationReason.MANUAL,
        message: str | None = None,
    ) -> int:
        """Cancel all operations with optional status filter.

        Args:
            status: Only cancel operations with this status
            reason: Reason for cancelation
            message: Optional cancelation message

        Returns:
            Number of operations cancelled
        """
        async with self._lock:
            with self._data_lock:
                to_cancel = list(self._operations.values())

            if status:
                to_cancel = [op for op in to_cancel if op.context.status == status]

        # Cancel outside lock to avoid deadlock
        count = 0
        for operation in to_cancel:
            try:
                await operation.cancel(reason, message or "Bulk cancelation")
                count += 1
            except Exception as e:
                logger.error(
                    "Error cancelling operation",
                    extra={
                        "operation_id": operation.context.id,
                        "error": str(e),
                    },
                    exc_info=True,
                )

        logger.info(
            "Bulk cancelation completed",
            extra={
                "cancelled_count": count,
                "filter_status": status.value if status else None,
            },
        )

        return count

    async def get_history(
        self,
        limit: int | None = None,
        status: OperationStatus | None = None,
        since: datetime | None = None,
    ) -> list[OperationContext]:
        """Get operation history.

        Args:
            limit: Maximum number of operations to return
            status: Filter by final status
            since: Only return operations completed after this time

        Returns:
            List of historical operation contexts
        """
        async with self._lock:
            with self._data_lock:
                history = self._history.copy()

            # Apply filters (outside lock - operating on copied list)
            if status:
                history = [op for op in history if op.status == status]

            if since:
                history = [op for op in history if op.end_time and op.end_time >= since]

            # Apply limit
            if limit:
                history = history[-limit:]

            return history

    async def cleanup_completed(
        self,
        older_than: timedelta | None = None,
        keep_failed: bool = True,
    ) -> int:
        """Clean up completed operations from active tracking.

        Args:
            older_than: Only cleanup operations older than this
            keep_failed: Whether to keep failed operations

        Returns:
            Number of operations cleaned up
        """
        async with self._lock:
            with self._data_lock:
                now = datetime.now(UTC)
                to_remove: list[str] = []

                for op_id, operation in self._operations.items():
                    context = operation.context

                    # Skip non-terminal operations
                    if not context.is_terminal:
                        continue

                    # Skip failed operations if requested
                    if keep_failed and context.status == OperationStatus.FAILED:
                        continue

                    # Check age if specified
                    if older_than and context.end_time:
                        age = now - context.end_time
                        if age < older_than:
                            continue

                    to_remove.append(op_id)

                # Remove operations
                for op_id in to_remove:
                    if operation := self._operations.pop(op_id, None):
                        self._history.append(operation.context.model_copy(deep=True))

                # Maintain history limit
                if len(self._history) > self._history_limit:
                    self._history = self._history[-self._history_limit :]

        logger.info(
            "Cleaned up completed operations",
            extra={
                "cleaned_count": len(to_remove),
                "older_than_seconds": older_than.total_seconds() if older_than else None,
            },
        )

        return len(to_remove)

    async def get_statistics(self) -> dict[str, Any]:
        """Get registry statistics.

        Returns:
            Dictionary with operation statistics
        """
        async with self._lock:
            with self._data_lock:
                active_by_status = {}
                for operation in self._operations.values():
                    status = operation.context.status.value
                    active_by_status[status] = active_by_status.get(status, 0) + 1  # type: ignore[attr-defined]

                history_by_status = {}
                total_duration = 0.0
                completed_count = 0

                for context in self._history:
                    status = context.status.value
                    history_by_status[status] = history_by_status.get(status, 0) + 1  # type: ignore[attr-defined]

                    if context.duration_seconds and context.is_success:
                        total_duration += context.duration_seconds
                        completed_count += 1

                avg_duration = total_duration / completed_count if completed_count > 0 else 0

                return {
                    "active_operations": len(self._operations),
                    "active_by_status": active_by_status,
                    "history_size": len(self._history),
                    "history_by_status": history_by_status,
                    "average_duration_seconds": avg_duration,
                    "total_completed": completed_count,
                }

    async def clear_all(self) -> None:
        """Clear all operations and history (for testing)."""
        async with self._lock:
            with self._data_lock:
                self._operations.clear()
                self._history.clear()
            logger.warning("Registry cleared - all operations removed")

    # Thread-safe synchronous methods

    def get_operation_sync(self, operation_id: str) -> Optional["Cancelable"]:
        """Get operation by ID (thread-safe, synchronous).

        This method can be called from any thread.

        Args:
            operation_id: Operation ID to look up

        Returns:
            Cancelable operation or None if not found
        """
        with self._data_lock:
            return self._operations.get(operation_id)

    def list_operations_sync(
        self,
        status: OperationStatus | None = None,
        parent_id: str | None = None,
        name_pattern: str | None = None,
    ) -> list[OperationContext]:
        """List operations with optional filtering (thread-safe, synchronous).

        This method can be called from any thread.

        Args:
            status: Filter by operation status
            parent_id: Filter by parent operation ID
            name_pattern: Filter by name (substring match)

        Returns:
            List of matching operation contexts
        """
        with self._data_lock:
            # Create copies to avoid holding lock during filtering
            operations = [op.context.model_copy() for op in self._operations.values()]

        # Apply filters outside lock
        if status:
            operations = [op for op in operations if op.status == status]

        if parent_id:
            operations = [op for op in operations if op.parent_id == parent_id]

        if name_pattern:
            operations = [op for op in operations if op.name and name_pattern.lower() in op.name.lower()]

        return operations

    def get_statistics_sync(self) -> dict[str, Any]:
        """Get registry statistics (thread-safe, synchronous).

        This method can be called from any thread.

        Returns:
            Dictionary with operation statistics
        """
        with self._data_lock:
            active_by_status = {}
            for operation in self._operations.values():
                status = operation.context.status.value
                active_by_status[status] = active_by_status.get(status, 0) + 1  # type: ignore[attr-defined]

            history_by_status = {}
            total_duration = 0.0
            completed_count = 0

            for context in self._history:
                status = context.status.value
                history_by_status[status] = history_by_status.get(status, 0) + 1  # type: ignore[attr-defined]

                if context.duration_seconds and context.is_success:
                    total_duration += context.duration_seconds
                    completed_count += 1

            avg_duration = total_duration / completed_count if completed_count > 0 else 0

            return {
                "active_operations": len(self._operations),
                "active_by_status": active_by_status,
                "history_size": len(self._history),
                "history_by_status": history_by_status,
                "average_duration_seconds": avg_duration,
                "total_completed": completed_count,
            }

    def get_history_sync(
        self,
        limit: int | None = None,
        status: OperationStatus | None = None,
        since: datetime | None = None,
    ) -> list[OperationContext]:
        """Get operation history (thread-safe, synchronous).

        This method can be called from any thread.

        Args:
            limit: Maximum number of operations to return
            status: Filter by final status
            since: Only return operations completed after this time

        Returns:
            List of historical operation contexts
        """
        with self._data_lock:
            history = self._history.copy()

        # Apply filters outside lock
        if status:
            history = [op for op in history if op.status == status]

        if since:
            history = [op for op in history if op.end_time and op.end_time >= since]

        # Apply limit
        if limit:
            history = history[-limit:]

        return history

    def cancel_operation_sync(
        self,
        operation_id: str,
        reason: CancelationReason = CancelationReason.MANUAL,
        message: str | None = None,
    ) -> None:
        """Cancel a specific operation (thread-safe, asynchronous execution).

        This method can be called from any thread. It schedules the cancelation
        to be executed asynchronously and returns immediately.

        Args:
            operation_id: ID of operation to cancel
            reason: Reason for cancelation
            message: Optional cancelation message

        Note:
            The cancelation is scheduled via AnyioBridge and executes asynchronously.
            This method returns immediately without waiting for completion.
        """
        from hother.cancelable.utils.anyio_bridge import call_soon_threadsafe

        async def do_cancel():
            await self.cancel_operation(operation_id, reason, message)

        call_soon_threadsafe(do_cancel)

    def cancel_all_sync(
        self,
        status: OperationStatus | None = None,
        reason: CancelationReason = CancelationReason.MANUAL,
        message: str | None = None,
    ) -> None:
        """Cancel all operations (thread-safe, asynchronous execution).

        This method can be called from any thread. It schedules the cancelation
        to be executed asynchronously and returns immediately.

        Args:
            status: Only cancel operations with this status
            reason: Reason for cancelation
            message: Optional cancelation message

        Note:
            The cancelation is scheduled via AnyioBridge and executes asynchronously.
            This method returns immediately without waiting for completion.
        """
        from hother.cancelable.utils.anyio_bridge import call_soon_threadsafe

        async def do_cancel():
            await self.cancel_all(status, reason, message)

        call_soon_threadsafe(do_cancel)

get_instance classmethod

get_instance() -> OperationRegistry

Get singleton instance of the registry.

Returns:

Type Description
OperationRegistry

The global OperationRegistry instance

Source code in src/hother/cancelable/core/registry.py
@classmethod
def get_instance(cls) -> "OperationRegistry":
    """Get singleton instance of the registry.

    Returns:
        The global OperationRegistry instance
    """
    if cls._instance is None:
        cls._instance = cls()
    return cls._instance

register async

register(operation: Cancelable) -> None

Register an operation with the registry.

Parameters:

Name Type Description Default
operation Cancelable

Cancelable operation to register

required
Source code in src/hother/cancelable/core/registry.py
async def register(self, operation: "Cancelable") -> None:
    """Register an operation with the registry.

    Args:
        operation: Cancelable operation to register
    """
    async with self._lock:
        with self._data_lock:
            self._operations[operation.context.id] = operation
            total = len(self._operations)

        logger.info(
            "Operation registered",
            extra={
                "operation_id": operation.context.id,
                "operation_name": operation.context.name,
                "total_operations": total,
            },
        )

unregister async

unregister(operation_id: str) -> None

Unregister an operation and add to history.

Parameters:

Name Type Description Default
operation_id str

ID of operation to unregister

required
Source code in src/hother/cancelable/core/registry.py
async def unregister(self, operation_id: str) -> None:
    """Unregister an operation and add to history.

    Args:
        operation_id: ID of operation to unregister
    """
    async with self._lock:
        with self._data_lock:
            operation = self._operations.pop(operation_id, None)
            if operation:
                # Add to history
                self._history.append(operation.context.model_copy(deep=True))

                # Maintain history limit
                if len(self._history) > self._history_limit:
                    self._history = self._history[-self._history_limit :]

        if operation:
            logger.debug(
                "Operation unregistered",
                extra={
                    "operation_id": operation_id,
                    "final_status": operation.context.status.value,
                    "duration": operation.context.duration_seconds,
                },
            )

get_operation async

get_operation(operation_id: str) -> Optional[Cancelable]

Get operation by ID.

Parameters:

Name Type Description Default
operation_id str

Operation ID to look up

required

Returns:

Type Description
Optional[Cancelable]

Cancelable operation or None if not found

Source code in src/hother/cancelable/core/registry.py
async def get_operation(self, operation_id: str) -> Optional["Cancelable"]:
    """Get operation by ID.

    Args:
        operation_id: Operation ID to look up

    Returns:
        Cancelable operation or None if not found
    """
    async with self._lock:
        with self._data_lock:
            return self._operations.get(operation_id)

list_operations async

list_operations(
    status: OperationStatus | None = None,
    parent_id: str | None = None,
    name_pattern: str | None = None,
) -> list[OperationContext]

List operations with optional filtering.

Parameters:

Name Type Description Default
status OperationStatus | None

Filter by operation status

None
parent_id str | None

Filter by parent operation ID

None
name_pattern str | None

Filter by name (substring match)

None

Returns:

Type Description
list[OperationContext]

List of matching operation contexts

Source code in src/hother/cancelable/core/registry.py
async def list_operations(
    self,
    status: OperationStatus | None = None,
    parent_id: str | None = None,
    name_pattern: str | None = None,
) -> list[OperationContext]:
    """List operations with optional filtering.

    Args:
        status: Filter by operation status
        parent_id: Filter by parent operation ID
        name_pattern: Filter by name (substring match)

    Returns:
        List of matching operation contexts
    """
    async with self._lock:
        with self._data_lock:
            operations = [op.context for op in self._operations.values()]

        # Apply filters (outside lock - operating on copied list)
        if status:
            operations = [op for op in operations if op.status == status]

        if parent_id:
            operations = [op for op in operations if op.parent_id == parent_id]

        if name_pattern:
            operations = [op for op in operations if op.name and name_pattern.lower() in op.name.lower()]

        return operations

cancel_operation async

cancel_operation(
    operation_id: str,
    reason: CancelationReason = MANUAL,
    message: str | None = None,
) -> bool

Cancel a specific operation.

Parameters:

Name Type Description Default
operation_id str

ID of operation to cancel

required
reason CancelationReason

Reason for cancelation

MANUAL
message str | None

Optional cancelation message

None

Returns:

Type Description
bool

True if operation was cancelled, False if not found

Source code in src/hother/cancelable/core/registry.py
async def cancel_operation(
    self,
    operation_id: str,
    reason: CancelationReason = CancelationReason.MANUAL,
    message: str | None = None,
) -> bool:
    """Cancel a specific operation.

    Args:
        operation_id: ID of operation to cancel
        reason: Reason for cancelation
        message: Optional cancelation message

    Returns:
        True if operation was cancelled, False if not found
    """
    if operation := await self.get_operation(operation_id):
        await operation.cancel(reason, message)
        return True

    logger.warning(
        "Attempted to cancel non-existent operation",
        extra={"operation_id": operation_id},
    )
    return False

cancel_all async

cancel_all(
    status: OperationStatus | None = None,
    reason: CancelationReason = MANUAL,
    message: str | None = None,
) -> int

Cancel all operations with optional status filter.

Parameters:

Name Type Description Default
status OperationStatus | None

Only cancel operations with this status

None
reason CancelationReason

Reason for cancelation

MANUAL
message str | None

Optional cancelation message

None

Returns:

Type Description
int

Number of operations cancelled

Source code in src/hother/cancelable/core/registry.py
async def cancel_all(
    self,
    status: OperationStatus | None = None,
    reason: CancelationReason = CancelationReason.MANUAL,
    message: str | None = None,
) -> int:
    """Cancel all operations with optional status filter.

    Args:
        status: Only cancel operations with this status
        reason: Reason for cancelation
        message: Optional cancelation message

    Returns:
        Number of operations cancelled
    """
    async with self._lock:
        with self._data_lock:
            to_cancel = list(self._operations.values())

        if status:
            to_cancel = [op for op in to_cancel if op.context.status == status]

    # Cancel outside lock to avoid deadlock
    count = 0
    for operation in to_cancel:
        try:
            await operation.cancel(reason, message or "Bulk cancelation")
            count += 1
        except Exception as e:
            logger.error(
                "Error cancelling operation",
                extra={
                    "operation_id": operation.context.id,
                    "error": str(e),
                },
                exc_info=True,
            )

    logger.info(
        "Bulk cancelation completed",
        extra={
            "cancelled_count": count,
            "filter_status": status.value if status else None,
        },
    )

    return count

get_history async

get_history(
    limit: int | None = None,
    status: OperationStatus | None = None,
    since: datetime | None = None,
) -> list[OperationContext]

Get operation history.

Parameters:

Name Type Description Default
limit int | None

Maximum number of operations to return

None
status OperationStatus | None

Filter by final status

None
since datetime | None

Only return operations completed after this time

None

Returns:

Type Description
list[OperationContext]

List of historical operation contexts

Source code in src/hother/cancelable/core/registry.py
async def get_history(
    self,
    limit: int | None = None,
    status: OperationStatus | None = None,
    since: datetime | None = None,
) -> list[OperationContext]:
    """Get operation history.

    Args:
        limit: Maximum number of operations to return
        status: Filter by final status
        since: Only return operations completed after this time

    Returns:
        List of historical operation contexts
    """
    async with self._lock:
        with self._data_lock:
            history = self._history.copy()

        # Apply filters (outside lock - operating on copied list)
        if status:
            history = [op for op in history if op.status == status]

        if since:
            history = [op for op in history if op.end_time and op.end_time >= since]

        # Apply limit
        if limit:
            history = history[-limit:]

        return history

cleanup_completed async

cleanup_completed(
    older_than: timedelta | None = None,
    keep_failed: bool = True,
) -> int

Clean up completed operations from active tracking.

Parameters:

Name Type Description Default
older_than timedelta | None

Only cleanup operations older than this

None
keep_failed bool

Whether to keep failed operations

True

Returns:

Type Description
int

Number of operations cleaned up

Source code in src/hother/cancelable/core/registry.py
async def cleanup_completed(
    self,
    older_than: timedelta | None = None,
    keep_failed: bool = True,
) -> int:
    """Clean up completed operations from active tracking.

    Args:
        older_than: Only cleanup operations older than this
        keep_failed: Whether to keep failed operations

    Returns:
        Number of operations cleaned up
    """
    async with self._lock:
        with self._data_lock:
            now = datetime.now(UTC)
            to_remove: list[str] = []

            for op_id, operation in self._operations.items():
                context = operation.context

                # Skip non-terminal operations
                if not context.is_terminal:
                    continue

                # Skip failed operations if requested
                if keep_failed and context.status == OperationStatus.FAILED:
                    continue

                # Check age if specified
                if older_than and context.end_time:
                    age = now - context.end_time
                    if age < older_than:
                        continue

                to_remove.append(op_id)

            # Remove operations
            for op_id in to_remove:
                if operation := self._operations.pop(op_id, None):
                    self._history.append(operation.context.model_copy(deep=True))

            # Maintain history limit
            if len(self._history) > self._history_limit:
                self._history = self._history[-self._history_limit :]

    logger.info(
        "Cleaned up completed operations",
        extra={
            "cleaned_count": len(to_remove),
            "older_than_seconds": older_than.total_seconds() if older_than else None,
        },
    )

    return len(to_remove)

get_statistics async

get_statistics() -> dict[str, Any]

Get registry statistics.

Returns:

Type Description
dict[str, Any]

Dictionary with operation statistics

Source code in src/hother/cancelable/core/registry.py
async def get_statistics(self) -> dict[str, Any]:
    """Get registry statistics.

    Returns:
        Dictionary with operation statistics
    """
    async with self._lock:
        with self._data_lock:
            active_by_status = {}
            for operation in self._operations.values():
                status = operation.context.status.value
                active_by_status[status] = active_by_status.get(status, 0) + 1  # type: ignore[attr-defined]

            history_by_status = {}
            total_duration = 0.0
            completed_count = 0

            for context in self._history:
                status = context.status.value
                history_by_status[status] = history_by_status.get(status, 0) + 1  # type: ignore[attr-defined]

                if context.duration_seconds and context.is_success:
                    total_duration += context.duration_seconds
                    completed_count += 1

            avg_duration = total_duration / completed_count if completed_count > 0 else 0

            return {
                "active_operations": len(self._operations),
                "active_by_status": active_by_status,
                "history_size": len(self._history),
                "history_by_status": history_by_status,
                "average_duration_seconds": avg_duration,
                "total_completed": completed_count,
            }

clear_all async

clear_all() -> None

Clear all operations and history (for testing).

Source code in src/hother/cancelable/core/registry.py
async def clear_all(self) -> None:
    """Clear all operations and history (for testing)."""
    async with self._lock:
        with self._data_lock:
            self._operations.clear()
            self._history.clear()
        logger.warning("Registry cleared - all operations removed")

get_operation_sync

get_operation_sync(
    operation_id: str,
) -> Optional[Cancelable]

Get operation by ID (thread-safe, synchronous).

This method can be called from any thread.

Parameters:

Name Type Description Default
operation_id str

Operation ID to look up

required

Returns:

Type Description
Optional[Cancelable]

Cancelable operation or None if not found

Source code in src/hother/cancelable/core/registry.py
def get_operation_sync(self, operation_id: str) -> Optional["Cancelable"]:
    """Get operation by ID (thread-safe, synchronous).

    This method can be called from any thread.

    Args:
        operation_id: Operation ID to look up

    Returns:
        Cancelable operation or None if not found
    """
    with self._data_lock:
        return self._operations.get(operation_id)

list_operations_sync

list_operations_sync(
    status: OperationStatus | None = None,
    parent_id: str | None = None,
    name_pattern: str | None = None,
) -> list[OperationContext]

List operations with optional filtering (thread-safe, synchronous).

This method can be called from any thread.

Parameters:

Name Type Description Default
status OperationStatus | None

Filter by operation status

None
parent_id str | None

Filter by parent operation ID

None
name_pattern str | None

Filter by name (substring match)

None

Returns:

Type Description
list[OperationContext]

List of matching operation contexts

Source code in src/hother/cancelable/core/registry.py
def list_operations_sync(
    self,
    status: OperationStatus | None = None,
    parent_id: str | None = None,
    name_pattern: str | None = None,
) -> list[OperationContext]:
    """List operations with optional filtering (thread-safe, synchronous).

    This method can be called from any thread.

    Args:
        status: Filter by operation status
        parent_id: Filter by parent operation ID
        name_pattern: Filter by name (substring match)

    Returns:
        List of matching operation contexts
    """
    with self._data_lock:
        # Create copies to avoid holding lock during filtering
        operations = [op.context.model_copy() for op in self._operations.values()]

    # Apply filters outside lock
    if status:
        operations = [op for op in operations if op.status == status]

    if parent_id:
        operations = [op for op in operations if op.parent_id == parent_id]

    if name_pattern:
        operations = [op for op in operations if op.name and name_pattern.lower() in op.name.lower()]

    return operations

get_statistics_sync

get_statistics_sync() -> dict[str, Any]

Get registry statistics (thread-safe, synchronous).

This method can be called from any thread.

Returns:

Type Description
dict[str, Any]

Dictionary with operation statistics

Source code in src/hother/cancelable/core/registry.py
def get_statistics_sync(self) -> dict[str, Any]:
    """Get registry statistics (thread-safe, synchronous).

    This method can be called from any thread.

    Returns:
        Dictionary with operation statistics
    """
    with self._data_lock:
        active_by_status = {}
        for operation in self._operations.values():
            status = operation.context.status.value
            active_by_status[status] = active_by_status.get(status, 0) + 1  # type: ignore[attr-defined]

        history_by_status = {}
        total_duration = 0.0
        completed_count = 0

        for context in self._history:
            status = context.status.value
            history_by_status[status] = history_by_status.get(status, 0) + 1  # type: ignore[attr-defined]

            if context.duration_seconds and context.is_success:
                total_duration += context.duration_seconds
                completed_count += 1

        avg_duration = total_duration / completed_count if completed_count > 0 else 0

        return {
            "active_operations": len(self._operations),
            "active_by_status": active_by_status,
            "history_size": len(self._history),
            "history_by_status": history_by_status,
            "average_duration_seconds": avg_duration,
            "total_completed": completed_count,
        }

get_history_sync

get_history_sync(
    limit: int | None = None,
    status: OperationStatus | None = None,
    since: datetime | None = None,
) -> list[OperationContext]

Get operation history (thread-safe, synchronous).

This method can be called from any thread.

Parameters:

Name Type Description Default
limit int | None

Maximum number of operations to return

None
status OperationStatus | None

Filter by final status

None
since datetime | None

Only return operations completed after this time

None

Returns:

Type Description
list[OperationContext]

List of historical operation contexts

Source code in src/hother/cancelable/core/registry.py
def get_history_sync(
    self,
    limit: int | None = None,
    status: OperationStatus | None = None,
    since: datetime | None = None,
) -> list[OperationContext]:
    """Get operation history (thread-safe, synchronous).

    This method can be called from any thread.

    Args:
        limit: Maximum number of operations to return
        status: Filter by final status
        since: Only return operations completed after this time

    Returns:
        List of historical operation contexts
    """
    with self._data_lock:
        history = self._history.copy()

    # Apply filters outside lock
    if status:
        history = [op for op in history if op.status == status]

    if since:
        history = [op for op in history if op.end_time and op.end_time >= since]

    # Apply limit
    if limit:
        history = history[-limit:]

    return history

cancel_operation_sync

cancel_operation_sync(
    operation_id: str,
    reason: CancelationReason = MANUAL,
    message: str | None = None,
) -> None

Cancel a specific operation (thread-safe, asynchronous execution).

This method can be called from any thread. It schedules the cancelation to be executed asynchronously and returns immediately.

Parameters:

Name Type Description Default
operation_id str

ID of operation to cancel

required
reason CancelationReason

Reason for cancelation

MANUAL
message str | None

Optional cancelation message

None
Note

The cancelation is scheduled via AnyioBridge and executes asynchronously. This method returns immediately without waiting for completion.

Source code in src/hother/cancelable/core/registry.py
def cancel_operation_sync(
    self,
    operation_id: str,
    reason: CancelationReason = CancelationReason.MANUAL,
    message: str | None = None,
) -> None:
    """Cancel a specific operation (thread-safe, asynchronous execution).

    This method can be called from any thread. It schedules the cancelation
    to be executed asynchronously and returns immediately.

    Args:
        operation_id: ID of operation to cancel
        reason: Reason for cancelation
        message: Optional cancelation message

    Note:
        The cancelation is scheduled via AnyioBridge and executes asynchronously.
        This method returns immediately without waiting for completion.
    """
    from hother.cancelable.utils.anyio_bridge import call_soon_threadsafe

    async def do_cancel():
        await self.cancel_operation(operation_id, reason, message)

    call_soon_threadsafe(do_cancel)

cancel_all_sync

cancel_all_sync(
    status: OperationStatus | None = None,
    reason: CancelationReason = MANUAL,
    message: str | None = None,
) -> None

Cancel all operations (thread-safe, asynchronous execution).

This method can be called from any thread. It schedules the cancelation to be executed asynchronously and returns immediately.

Parameters:

Name Type Description Default
status OperationStatus | None

Only cancel operations with this status

None
reason CancelationReason

Reason for cancelation

MANUAL
message str | None

Optional cancelation message

None
Note

The cancelation is scheduled via AnyioBridge and executes asynchronously. This method returns immediately without waiting for completion.

Source code in src/hother/cancelable/core/registry.py
def cancel_all_sync(
    self,
    status: OperationStatus | None = None,
    reason: CancelationReason = CancelationReason.MANUAL,
    message: str | None = None,
) -> None:
    """Cancel all operations (thread-safe, asynchronous execution).

    This method can be called from any thread. It schedules the cancelation
    to be executed asynchronously and returns immediately.

    Args:
        status: Only cancel operations with this status
        reason: Reason for cancelation
        message: Optional cancelation message

    Note:
        The cancelation is scheduled via AnyioBridge and executes asynchronously.
        This method returns immediately without waiting for completion.
    """
    from hother.cancelable.utils.anyio_bridge import call_soon_threadsafe

    async def do_cancel():
        await self.cancel_all(status, reason, message)

    call_soon_threadsafe(do_cancel)

Exceptions

All exception classes used by the cancelation system.

hother.cancelable.core.exceptions

Custom exceptions for the async cancelation system.

CancelationError

Bases: Exception

Base exception for cancelation-related errors.

Attributes:

Name Type Description
reason

The reason for cancelation

message

Optional cancelation message

context

Optional operation context

Source code in src/hother/cancelable/core/exceptions.py
class CancelationError(Exception):
    """Base exception for cancelation-related errors.

    Attributes:
        reason: The reason for cancelation
        message: Optional cancelation message
        context: Optional operation context
    """

    def __init__(
        self,
        reason: CancelationReason,
        message: str | None = None,
        context: OperationContext | None = None,
    ):
        self.reason = reason
        self.message = message or f"Operation cancelled: {reason.value}"
        self.context = context
        super().__init__(self.message)

reason instance-attribute

reason = reason

message instance-attribute

message = message or f'Operation cancelled: {value}'

context instance-attribute

context = context

TimeoutCancelation

Bases: CancelationError

Operation cancelled due to timeout.

Source code in src/hother/cancelable/core/exceptions.py
class TimeoutCancelation(CancelationError):
    """Operation cancelled due to timeout."""

    def __init__(
        self,
        timeout_seconds: float,
        message: str | None = None,
        context: OperationContext | None = None,
    ):
        self.timeout_seconds = timeout_seconds
        default_message = f"Operation timed out after {timeout_seconds}s"
        super().__init__(
            CancelationReason.TIMEOUT,
            message or default_message,
            context,
        )

timeout_seconds instance-attribute

timeout_seconds = timeout_seconds

ManualCancelation

Bases: CancelationError

Operation cancelled manually via token or API.

Source code in src/hother/cancelable/core/exceptions.py
class ManualCancelation(CancelationError):
    """Operation cancelled manually via token or API."""

    def __init__(
        self,
        message: str | None = None,
        context: OperationContext | None = None,
    ):
        super().__init__(
            CancelationReason.MANUAL,
            message or "Operation cancelled manually",
            context,
        )

SignalCancelation

Bases: CancelationError

Operation cancelled by system signal.

Source code in src/hother/cancelable/core/exceptions.py
class SignalCancelation(CancelationError):
    """Operation cancelled by system signal."""

    def __init__(
        self,
        signal_number: int,
        message: str | None = None,
        context: OperationContext | None = None,
    ):
        self.signal_number = signal_number
        default_message = f"Operation cancelled by signal {signal_number}"
        super().__init__(
            CancelationReason.SIGNAL,
            message or default_message,
            context,
        )

signal_number instance-attribute

signal_number = signal_number

ConditionCancelation

Bases: CancelationError

Operation cancelled by condition check.

Source code in src/hother/cancelable/core/exceptions.py
class ConditionCancelation(CancelationError):
    """Operation cancelled by condition check."""

    def __init__(
        self,
        condition_name: str | None = None,
        message: str | None = None,
        context: OperationContext | None = None,
    ):
        self.condition_name = condition_name
        default_message = "Operation cancelled: condition met"
        if condition_name:
            default_message = f"Operation cancelled: {condition_name} condition met"
        super().__init__(
            CancelationReason.CONDITION,
            message or default_message,
            context,
        )

condition_name instance-attribute

condition_name = condition_name

ParentCancelation

Bases: CancelationError

Operation cancelled because parent was cancelled.

Source code in src/hother/cancelable/core/exceptions.py
class ParentCancelation(CancelationError):
    """Operation cancelled because parent was cancelled."""

    def __init__(
        self,
        parent_id: str,
        parent_reason: CancelationReason | None = None,
        message: str | None = None,
        context: OperationContext | None = None,
    ):
        self.parent_id = parent_id
        self.parent_reason = parent_reason
        default_message = f"Operation cancelled: parent {parent_id} was cancelled"
        if parent_reason:
            default_message = f"Operation cancelled: parent {parent_id} was cancelled ({parent_reason.value})"
        super().__init__(
            CancelationReason.PARENT,
            message or default_message,
            context,
        )

parent_id instance-attribute

parent_id = parent_id

parent_reason instance-attribute

parent_reason = parent_reason