FastAPI Integration
The cancelable library provides seamless integration with FastAPI for building cancelable endpoints.
Installation
The FastAPI integration is included when you install cancelable:
Basic Usage
Dependency Injection
from fastapi import FastAPI, Depends
from hother.cancelable.integrations.fastapi import cancelable_dependency
from hother.cancelable import Cancelable
app = FastAPI()
@app.get("/long-operation")
async def long_operation(cancel: Cancelable = Depends(cancelable_dependency)):
async with cancel:
# Your long-running operation here
result = await process_data()
return {"result": result}
Timeout Configuration
from hother.cancelable.integrations.fastapi import cancelable_dependency
# Create a custom dependency with timeout
def get_cancelable_30s():
return cancelable_dependency(timeout=30.0)
@app.get("/timeout-operation")
async def timeout_operation(cancel: Cancelable = Depends(get_cancelable_30s)):
async with cancel:
# This operation will timeout after 30 seconds
return await long_computation()
Advanced Usage
Background Tasks with Cancelation
from fastapi import BackgroundTasks
from hother.cancelable import CancelationToken
# Store tokens for background tasks
background_tokens = {}
@app.post("/start-background-task")
async def start_background_task(background_tasks: BackgroundTasks):
token = CancelationToken()
task_id = str(uuid.uuid4())
background_tokens[task_id] = token
background_tasks.add_task(
background_worker,
task_id,
token
)
return {"task_id": task_id}
async def background_worker(task_id: str, token: CancelationToken):
async with Cancelable.with_token(token) as cancel:
try:
# Long-running background work
await process_large_dataset()
finally:
# Cleanup
background_tokens.pop(task_id, None)
@app.post("/cancel-task/{task_id}")
async def cancel_task(task_id: str):
token = background_tokens.get(task_id)
if token:
await token.cancel()
return {"status": "cancelled"}
return {"status": "not_found"}
Request Cancelation Handling
from starlette.requests import Request
@app.get("/streaming-response")
async def streaming_response(
request: Request,
cancel: Cancelable = Depends(cancelable_dependency)
):
async def generate():
async with cancel:
for i in range(100):
# Check if client disconnected
if await request.is_disconnected():
await cancel.cancel()
break
yield f"data: Item {i}\n\n"
await asyncio.sleep(0.1)
return StreamingResponse(
generate(),
media_type="text/event-stream"
)
Best Practices
- Always use async with: Ensure proper cleanup by using the context manager
- Set appropriate timeouts: Configure timeouts based on expected operation duration
- Handle cancelation gracefully: Catch CancelledError and perform cleanup
- Monitor operations: Use the global registry for debugging and monitoring
Example: File Upload with Progress
from hother.cancelable import Cancelable
from fastapi import UploadFile, File
@app.post("/upload")
async def upload_file(
file: UploadFile = File(...),
cancel: Cancelable = Depends(cancelable_dependency)
):
async with cancel:
cancel.on_progress(
lambda op_id, msg, meta: logger.info(f"Upload progress: {msg}")
)
total_size = 0
chunk_size = 1024 * 1024 # 1MB chunks
with open(f"uploads/{file.filename}", "wb") as f:
while chunk := await file.read(chunk_size):
f.write(chunk)
total_size += len(chunk)
await cancel.report_progress(
f"Uploaded {total_size} bytes",
{"bytes": total_size, "filename": file.filename}
)
return {
"filename": file.filename,
"size": total_size,
"status": "completed"
}
Error Handling
from hother.cancelable import TimeoutCancelation
@app.get("/with-error-handling")
async def with_error_handling(cancel: Cancelable = Depends(cancelable_dependency)):
try:
async with cancel:
result = await some_operation()
return {"result": result}
except TimeoutCancelation:
return JSONResponse(
status_code=408,
content={"error": "Operation timed out"}
)
except Exception as e:
return JSONResponse(
status_code=500,
content={"error": str(e)}
)
Middleware
RequestCancelationMiddleware
ASGI middleware for automatic request cancelation monitoring:
from fastapi import FastAPI
from hother.cancelable.integrations.fastapi import RequestCancelationMiddleware
app = FastAPI()
# Add middleware
app.add_middleware(RequestCancelationMiddleware)
@app.post("/process")
async def process_data(data: dict):
# Automatically cancels if client disconnects
# No need for manual cancelable_dependency
async with Cancelable(name="process") as cancel:
result = await long_operation(data)
return {"result": result}
Benefits: - Automatic client disconnect detection - Application-wide cancelation monitoring - No per-endpoint setup needed
WebSocket Support
CancelableWebSocket
WebSocket wrapper with built-in cancelation support:
from fastapi import WebSocket
from hother.cancelable.integrations.fastapi import CancelableWebSocket
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: str):
await websocket.accept()
async with Cancelable.with_timeout(300.0) as cancel:
ws = CancelableWebSocket(websocket, cancel)
# Send with cancelation checking
await ws.send_json({"status": "connected"})
# Receive with cancelation checking
while True:
data = await ws.receive_json()
# Report progress
await cancel.report_progress(f"Received from {client_id}")
response = await process(data)
await ws.send_json(response)
Methods:
- send_text(data), send_json(data) - Send with cancelation check
- receive_text(), receive_json() - Receive with cancelation check
- Auto-integrates with progress reporting
SSE Streaming
cancelable_streaming_response()
Create Server-Sent Events (SSE) streaming responses with cancelation:
from hother.cancelable.integrations.fastapi import cancelable_streaming_response
@app.get("/stream/progress/{job_id}")
async def stream_progress(job_id: str):
async def generate():
async with Cancelable(name=f"stream_{job_id}") as cancel:
for i in range(100):
# Checks cancelation before each event
yield f"data: Processing {i}/100\n\n"
await anyio.sleep(0.5)
return cancelable_streaming_response(generate())
Features: - SSE format support - Automatic cancelation checking - Graceful stream termination - Client disconnect handling
Exception Handling
with_cancelation Decorator
Automatic exception handling for FastAPI endpoints:
from hother.cancelable.integrations.fastapi import with_cancelation
@app.post("/process")
@with_cancelation # Converts cancelation to proper HTTP status
async def process_data(data: dict):
async with Cancelable.with_timeout(30.0) as cancel:
result = await long_operation(data)
return {"result": result}
# If cancelled: automatically returns HTTP 499 (Client Closed Request)
# If timeout: automatically returns HTTP 504 (Gateway Timeout)
HTTP status codes:
- CancelationReason.TIMEOUT → 504 Gateway Timeout
- CancelationReason.MANUAL → 499 Client Closed Request
- CancelationReason.SIGNAL → 503 Service Unavailable
- Other → 500 Internal Server Error