Error Handling
Streamblocks provides comprehensive error handling for stream processing and block validation. This guide covers error types, handling patterns, and recovery strategies.
Error Categories
flowchart TB
Errors[Errors] --> Stream[Stream Errors]
Errors --> Block[Block Errors]
Errors --> Validation[Validation Errors]
Errors --> Adapter[Adapter Errors]
Stream --> StreamClosed[Stream Closed]
Stream --> StreamTimeout[Timeout]
Block --> InvalidHeader[Invalid Header]
Block --> InvalidMeta[Invalid Metadata]
Block --> InvalidContent[Invalid Content]
Block --> TooLarge[Size Exceeded]
Block --> Unclosed[Unclosed Block]
Validation --> MetaValidation[Metadata Validation]
Validation --> ContentValidation[Content Validation]
Adapter --> AdapterMismatch[Adapter Mismatch]
Adapter --> ProviderError[Provider Error]
BlockErrorCode
Error codes for block rejection:
from streamblocks import BlockErrorCode
class BlockErrorCode(Enum):
"""Error codes for block rejection."""
# Header errors
INVALID_HEADER = "invalid_header"
# Metadata errors
INVALID_METADATA = "invalid_metadata"
# Content errors
INVALID_CONTENT = "invalid_content"
# Validation errors
VALIDATION_FAILED = "validation_failed"
# Size errors
MAX_SIZE_EXCEEDED = "max_size_exceeded"
# State errors
UNCLOSED_BLOCK = "unclosed_block"
# Registry errors
UNKNOWN_BLOCK_TYPE = "unknown_block_type"
BlockRejection
Information about rejected blocks:
from streamblocks import BlockRejection
@dataclass
class BlockRejection:
"""Details about a rejected block."""
reason: BlockErrorCode # Error code
message: str # Human-readable message
partial_content: str | None # Content accumulated before rejection
line_number: int | None # Line where error occurred
Handling Block Rejections
Basic Handling
from streamblocks import EventType, BlockErrorCode
async for event in processor.process_stream(stream):
if event.type == EventType.BLOCK_REJECTED:
rejection = event.rejection
print(f"Block rejected: {rejection.reason.value}")
print(f"Message: {rejection.message}")
Detailed Handling
async for event in processor.process_stream(stream):
if event.type == EventType.BLOCK_REJECTED:
rejection = event.rejection
match rejection.reason:
case BlockErrorCode.INVALID_HEADER:
logger.warning(f"Invalid header at line {rejection.line_number}")
case BlockErrorCode.INVALID_METADATA:
logger.warning(f"Metadata error: {rejection.message}")
case BlockErrorCode.INVALID_CONTENT:
logger.warning(f"Content error: {rejection.message}")
case BlockErrorCode.VALIDATION_FAILED:
logger.warning(f"Validation failed: {rejection.message}")
case BlockErrorCode.MAX_SIZE_EXCEEDED:
logger.warning("Block exceeded size limit")
case BlockErrorCode.UNCLOSED_BLOCK:
logger.warning("Block not closed before stream end")
case BlockErrorCode.UNKNOWN_BLOCK_TYPE:
logger.warning(f"Unknown type: {rejection.message}")
Recovery with Partial Content
async for event in processor.process_stream(stream):
if event.type == EventType.BLOCK_REJECTED:
rejection = event.rejection
if rejection.partial_content:
# Try to salvage something from the failed block
logger.info(f"Partial content: {rejection.partial_content[:100]}...")
# Maybe parse as plain text
await handle_as_text(rejection.partial_content)
Stream Errors
Handling Stream Exceptions
async def process_safely(stream):
try:
async for event in processor.process_stream(stream):
await handle_event(event)
except ConnectionError as e:
logger.error(f"Connection lost: {e}")
await reconnect_and_retry()
except TimeoutError as e:
logger.error(f"Stream timeout: {e}")
await handle_timeout()
except Exception as e:
logger.error(f"Unexpected error: {e}")
raise
Retry Pattern
import asyncio
async def process_with_retry(stream_factory, max_retries=3):
for attempt in range(max_retries):
try:
stream = stream_factory()
async for event in processor.process_stream(stream):
await handle_event(event)
return # Success
except ConnectionError as e:
if attempt < max_retries - 1:
wait = 2 ** attempt # Exponential backoff
logger.warning(f"Retry {attempt + 1} in {wait}s: {e}")
await asyncio.sleep(wait)
else:
logger.error("Max retries exceeded")
raise
Validation Errors
Pydantic Validation
from pydantic import ValidationError
class TaskMetadata(BaseMetadata):
block_type: Literal["task"] = "task"
priority: str
@field_validator("priority")
@classmethod
def validate_priority(cls, v: str) -> str:
if v not in {"low", "normal", "high"}:
raise ValueError(f"Invalid priority: {v}")
return v
# When validation fails, BLOCK_REJECTED is emitted
# with reason=VALIDATION_FAILED
Custom Validation Errors
class StrictContent(BaseContent):
@model_validator(mode="after")
def validate_content(self) -> "StrictContent":
if len(self.raw_content) < 10:
raise ValueError("Content too short (minimum 10 chars)")
if not self.raw_content.strip():
raise ValueError("Content cannot be empty")
return self
Adapter Errors
Handling Provider Errors
async def process_gemini_stream(prompt):
try:
response = model.generate_content(prompt, stream=True)
async for event in processor.process_stream(response):
await handle_event(event)
except google.api_core.exceptions.ResourceExhausted:
logger.error("Gemini rate limit exceeded")
await asyncio.sleep(60)
# Retry...
except google.api_core.exceptions.InvalidArgument as e:
logger.error(f"Invalid request: {e}")
raise
Adapter Mismatch
from streamblocks.ext.gemini import GeminiInputAdapter
# Using wrong adapter for stream type
processor = StreamBlockProcessor(
registry=registry,
syntax=syntax,
input_adapter=GeminiInputAdapter(), # Wrong for OpenAI stream
)
# Will fail to extract text from events
# Use input_adapter="auto" to avoid this
Error Logging
Structured Logging
import structlog
logger = structlog.get_logger()
async for event in processor.process_stream(stream):
if event.type == EventType.BLOCK_REJECTED:
logger.warning(
"block_rejected",
reason=event.rejection.reason.value,
message=event.rejection.message,
line_number=event.rejection.line_number,
has_partial=event.rejection.partial_content is not None,
)
Error Statistics
from collections import Counter
error_counts = Counter()
async for event in processor.process_stream(stream):
if event.type == EventType.BLOCK_REJECTED:
error_counts[event.rejection.reason] += 1
# Summary
for reason, count in error_counts.most_common():
print(f"{reason.value}: {count}")
Graceful Degradation
Continue on Error
async def resilient_process(stream):
blocks = []
errors = []
async for event in processor.process_stream(stream):
if event.type == EventType.BLOCK_EXTRACTED:
try:
processed = await process_block(event.block)
blocks.append(processed)
except ProcessingError as e:
errors.append(e)
# Continue with next block
elif event.type == EventType.BLOCK_REJECTED:
errors.append(event.rejection)
# Continue processing
return blocks, errors
Fallback Processing
async for event in processor.process_stream(stream):
if event.type == EventType.BLOCK_EXTRACTED:
block = event.block
try:
await primary_handler(block)
except PrimaryError:
try:
await fallback_handler(block)
except FallbackError:
await last_resort_handler(block)
Error Recovery Strategies
1. Skip and Continue
# Skip failed blocks, process the rest
errors = []
async for event in processor.process_stream(stream):
if event.type == EventType.BLOCK_REJECTED:
errors.append(event.rejection)
continue # Skip to next
if event.type == EventType.BLOCK_EXTRACTED:
await process(event.block)
2. Retry Failed Blocks
# Collect failed blocks for retry
failed_blocks = []
async for event in processor.process_stream(stream):
if event.type == EventType.BLOCK_REJECTED:
if event.rejection.partial_content:
failed_blocks.append(event.rejection)
# Later: retry with more lenient settings
for rejection in failed_blocks:
await retry_parse(rejection.partial_content)
3. Alert and Stop
# Stop on critical errors
async for event in processor.process_stream(stream):
if event.type == EventType.BLOCK_REJECTED:
if is_critical(event.rejection):
await alert_operators(event.rejection)
raise CriticalError(event.rejection.message)
if event.type == EventType.BLOCK_EXTRACTED:
await process(event.block)
Best Practices
Log All Rejections
Always log block rejections for debugging.
Use Structured Logging
Include context (reason, line number, partial content) in logs.
Graceful Degradation
Design for partial success—one failed block shouldn't stop processing.
Monitor Error Rates
Track rejection rates to detect problems early.
Test Error Paths
Test with invalid input to ensure error handling works.
Next Steps
- Events - Event handling
- Validation - Validation system
- Troubleshooting - Common issues