Common Patterns
This guide covers common usage patterns with Streamblocks.
Processing LLM Responses
The most common use case is extracting structured blocks from LLM responses:
async def process_llm_response(response_stream):
processor = StreamBlockProcessor(
syntaxes=[MarkdownFrontmatterSyntax()]
)
blocks = []
async for event in processor.process_stream(response_stream):
if event.type == EventType.BLOCK_CLOSED:
blocks.append(event.block)
return blocks
Real-time Block Updates
Display blocks as they're being streamed:
async def stream_with_updates(stream):
processor = StreamBlockProcessor(
syntaxes=[MarkdownFrontmatterSyntax()],
emit_block_events=True,
)
current_block = None
async for event in processor.process_stream(stream):
match event.type:
case EventType.BLOCK_OPENED:
current_block = event.block
print(f"Started: {current_block.block_type}")
case EventType.BLOCK_UPDATED:
print(f"Content: {event.block.content[-50:]}")
case EventType.BLOCK_CLOSED:
print(f"Finished: {event.block.block_type}")
current_block = None
Multiple Syntax Support
Combine multiple syntaxes for flexible parsing:
processor = StreamBlockProcessor(
syntaxes=[
MarkdownFrontmatterSyntax(), # YAML frontmatter
FencedCodeSyntax(), # Code blocks
DelimiterFrontmatterSyntax(), # Custom delimiters
]
)
Filtering Events
Process only specific event types:
async def get_text_only(stream):
processor = StreamBlockProcessor(
emit_text_delta=True,
emit_block_events=False, # Skip block events
)
text = ""
async for event in processor.process_stream(stream):
if event.type == EventType.TEXT_DELTA:
text += event.data
return text
Block Type Routing
Route blocks to different handlers:
handlers = {
"message": handle_message,
"tool_call": handle_tool_call,
"code": handle_code,
}
async for event in processor.process_stream(stream):
if event.type == EventType.BLOCK_CLOSED:
block = event.block
handler = handlers.get(block.block_type)
if handler:
await handler(block)
Error Recovery
Continue processing after errors:
async def process_with_recovery(stream):
processor = StreamBlockProcessor(syntaxes=[MarkdownFrontmatterSyntax()])
async for event in processor.process_stream(stream):
try:
if event.type == EventType.BLOCK_CLOSED:
process_block(event.block)
except Exception as e:
logger.error(f"Error processing block: {e}")
continue # Continue with next event
Chaining Processors
Use multiple processors for different stages:
# First pass: extract raw blocks
raw_processor = StreamBlockProcessor(syntaxes=[MarkdownFrontmatterSyntax()])
# Second pass: validate and transform
async def process_pipeline(stream):
async for event in raw_processor.process_stream(stream):
if event.type == EventType.BLOCK_CLOSED:
validated = validate_block(event.block)
if validated:
yield transform_block(validated)