Basics
This guide covers the fundamental concepts of Streamblocks. Understanding these concepts is essential for effectively using the library.
Core Components
Streamblocks is built around four core components that work together:
flowchart TB
subgraph Core["Core Components"]
Processor[StreamBlockProcessor]
Registry[BlockRegistry]
Syntax[Syntax]
Events[Events]
end
Stream[Text Stream] --> Processor
Processor --> Registry
Registry --> Syntax
Processor --> Events
Events --> Application[Your Application]
StreamBlockProcessor
The central engine that processes text streams and extracts blocks:
from streamblocks import StreamBlockProcessor, BlockRegistry, Syntax
# Create a processor
registry = BlockRegistry()
processor = StreamBlockProcessor(
registry=registry,
syntax=Syntax.DELIMITER_PREAMBLE,
)
# Process a stream
async for event in processor.process_stream(stream):
handle_event(event)
Key responsibilities:
- Receives text chunks from adapted streams
- Accumulates lines for block detection
- Manages block state transitions
- Emits events for each processing stage
- Coordinates validation and output
BlockRegistry
Manages registered block types and their handlers:
from streamblocks import BlockRegistry
from streamblocks.blocks import TaskBlock, CodeBlock, MessageBlock
# Create registry with built-in blocks
registry = BlockRegistry()
# Register custom blocks
registry.register(TaskBlock)
registry.register(CodeBlock, priority=10) # Higher priority
registry.register(MessageBlock)
# Look up block by type
block_class = registry.get_block("task")
Key responsibilities:
- Stores metadata and content class mappings
- Provides priority-based type resolution
- Supports custom block type registration
- Handles default block fallback
Syntax
Defines how blocks are detected and parsed from text:
from streamblocks import Syntax
# Built-in syntaxes
Syntax.DELIMITER_PREAMBLE # !!id:type\ncontent\n!!end
Syntax.DELIMITER_FRONTMATTER # !!id\n---\nyaml\n---\ncontent\n!!end
Syntax.MARKDOWN_FRONTMATTER # ```id\n---\nyaml\n---\ncontent\n```
Events
Real-time notifications about processing state:
from streamblocks import EventType
async for event in processor.process_stream(stream):
match event.type:
case EventType.STREAM_START:
print("Stream started")
case EventType.TEXT_DELTA:
print(f"Text: {event.text}")
case EventType.BLOCK_OPENED:
print(f"Block started: {event.block_id}")
case EventType.BLOCK_EXTRACTED:
print(f"Block complete: {event.block}")
case EventType.STREAM_END:
print("Stream ended")
Block Model
A block represents a structured unit of content extracted from a text stream.
Block Structure
from streamblocks import Block, BaseMetadata, BaseContent
# A block consists of:
# - metadata: Parsed from header/frontmatter
# - content: The block body
block = Block(
metadata=TaskMetadata(
id="task01",
block_type="task",
priority="high",
),
content=TaskContent(
raw_content="Implement the feature",
),
)
# Access block data
print(block.metadata.id) # "task01"
print(block.metadata.block_type) # "task"
print(block.content.raw_content) # "Implement the feature"
Metadata
Block metadata is parsed from the header or frontmatter:
from streamblocks import BaseMetadata
from typing import Literal
class TaskMetadata(BaseMetadata):
"""Metadata for task blocks."""
block_type: Literal["task"] = "task"
priority: str = "normal"
assignee: str | None = None
due_date: str | None = None
Required fields:
id: Unique identifier for the blockblock_type: Type discriminator for routing
Optional fields:
- Any additional fields you define
- Validated by Pydantic
Content
Block content holds the parsed body:
from streamblocks import BaseContent
class TaskContent(BaseContent):
"""Content for task blocks."""
@classmethod
def parse(cls, raw_text: str) -> "TaskContent":
"""Parse raw text into content."""
# Custom parsing logic
return cls(raw_content=raw_text.strip())
Processing Pipeline
Understanding the processing pipeline helps you build effective applications:
flowchart LR
subgraph Input["1. Input"]
Stream[Provider Stream]
Adapter[Input Adapter]
end
subgraph Accumulation["2. Accumulation"]
Chunks[Text Chunks]
Lines[Complete Lines]
end
subgraph Detection["3. Detection"]
Searching[Searching]
Detected[Block Detected]
end
subgraph Extraction["4. Extraction"]
Parsing[Parsing]
Validation[Validation]
end
subgraph Output["5. Output"]
Events[Events]
Blocks[Blocks]
end
Stream --> Adapter --> Chunks --> Lines
Lines --> Searching --> Detected
Detected --> Parsing --> Validation
Validation --> Events
Validation --> Blocks
Step 1: Input Adaptation
Provider-specific streams are normalized to text chunks:
# Raw Gemini stream
async for chunk in gemini_response:
# chunk is GenerateContentResponse
# Adapted to text
async for text in adapted_stream:
# text is a string
Step 2: Line Accumulation
Text chunks are accumulated into complete lines:
# Chunks may split across lines
chunk1 = "Hello, wo"
chunk2 = "rld!\nHow are"
chunk3 = " you?\n"
# Accumulated into lines
line1 = "Hello, world!"
line2 = "How are you?"
Step 3: Block Detection
Lines are scanned for block start/end markers:
# Delimiter Preamble syntax
"!!task01:task" # Start detected
"Do something" # Content
"!!end" # End detected
Step 4: Block Extraction
Complete blocks are parsed and validated:
# Parsing
metadata = parse_metadata(header_line)
content = parse_content(content_lines)
# Validation
validate_metadata(metadata)
validate_content(content)
# Block creation
block = Block(metadata=metadata, content=content)
Step 5: Event Emission
Events are emitted for each stage:
# Events throughout processing
STREAM_START # Stream begins
TEXT_DELTA # Text chunk received
BLOCK_OPENED # Block start detected
BLOCK_CONTENT # Content accumulated
BLOCK_EXTRACTED # Block complete and valid
BLOCK_REJECTED # Block failed validation
STREAM_END # Stream complete
Configuration Options
Processor Configuration
processor = StreamBlockProcessor(
registry=registry,
syntax=Syntax.DELIMITER_PREAMBLE,
# Event emission
emit_text_deltas=True, # Emit TEXT_DELTA events
emit_block_content=True, # Emit BLOCK_CONTENT events
emit_original_events=False, # Emit original provider events
# Limits
max_block_size=100_000, # Max content size in chars
# Adapters
input_adapter="auto", # Auto-detect adapter
output_adapter=None, # Optional output transformation
# Logging
logger=my_logger, # Custom logger
)
Registry Configuration
registry = BlockRegistry()
# Register with priority (higher = checked first)
registry.register(HighPriorityBlock, priority=100)
registry.register(NormalBlock, priority=50)
registry.register(FallbackBlock, priority=1)
# Set default block for unknown types
registry.set_default(GenericBlock)
Basic Usage Patterns
Pattern 1: Simple Extraction
Extract all blocks from a stream:
from streamblocks import StreamBlockProcessor, BlockRegistry, Syntax, EventType
async def extract_blocks(stream):
registry = BlockRegistry()
processor = StreamBlockProcessor(
registry=registry,
syntax=Syntax.DELIMITER_PREAMBLE,
)
blocks = []
async for event in processor.process_stream(stream):
if event.type == EventType.BLOCK_EXTRACTED:
blocks.append(event.block)
return blocks
Pattern 2: Real-time Processing
Process blocks as they complete:
async def process_realtime(stream):
registry = BlockRegistry()
processor = StreamBlockProcessor(
registry=registry,
syntax=Syntax.DELIMITER_PREAMBLE,
emit_text_deltas=True,
)
async for event in processor.process_stream(stream):
match event.type:
case EventType.TEXT_DELTA:
# Show text as it arrives
display_text(event.text)
case EventType.BLOCK_OPENED:
# Block started - prepare UI
show_block_placeholder(event.block_id)
case EventType.BLOCK_EXTRACTED:
# Block complete - render it
render_block(event.block)
case EventType.BLOCK_REJECTED:
# Block failed - show error
show_error(event.rejection.message)
Pattern 3: Type-Specific Handling
Route blocks by type:
async def handle_by_type(stream):
registry = BlockRegistry()
processor = StreamBlockProcessor(
registry=registry,
syntax=Syntax.DELIMITER_PREAMBLE,
)
async for event in processor.process_stream(stream):
if event.type == EventType.BLOCK_EXTRACTED:
block = event.block
match block.metadata.block_type:
case "task":
await handle_task(block)
case "code":
await handle_code(block)
case "message":
await handle_message(block)
case _:
await handle_unknown(block)
Pattern 4: Error Handling
Handle errors gracefully:
async def process_with_errors(stream):
registry = BlockRegistry()
processor = StreamBlockProcessor(
registry=registry,
syntax=Syntax.DELIMITER_PREAMBLE,
)
extracted = 0
rejected = 0
async for event in processor.process_stream(stream):
match event.type:
case EventType.BLOCK_EXTRACTED:
extracted += 1
await process_block(event.block)
case EventType.BLOCK_REJECTED:
rejected += 1
await log_rejection(event.rejection)
case EventType.STREAM_END:
print(f"Complete: {extracted} extracted, {rejected} rejected")
Text vs Blocks
Streamblocks distinguishes between regular text and structured blocks:
This is regular text that passes through.
!!task01:task
This is block content that gets extracted.
!!end
More regular text here.
With emit_text_deltas=True:
# Events emitted:
TEXT_DELTA: "This is regular text that passes through.\n"
TEXT_DELTA: "\n"
BLOCK_OPENED: block_id="task01"
BLOCK_CONTENT: "This is block content that gets extracted."
BLOCK_EXTRACTED: Block(...)
TEXT_DELTA: "\n"
TEXT_DELTA: "More regular text here.\n"
Memory Considerations
Streamblocks is designed for efficient streaming:
- No full buffering: Text is processed line-by-line
- Events emitted immediately: As soon as detection is complete
- Block size limits: Configurable maximum to prevent memory issues
# Limit block size
processor = StreamBlockProcessor(
registry=registry,
syntax=syntax,
max_block_size=50_000, # Reject blocks larger than 50KB
)
async for event in processor.process_stream(stream):
if event.type == EventType.BLOCK_REJECTED:
if event.rejection.reason == BlockErrorCode.MAX_SIZE_EXCEEDED:
print("Block too large, skipping")
Next Steps
- Stream Processing - Deep dive into processing
- Syntaxes - Syntax formats in detail
- Adapters - Working with LLM providers
- Block Types - Creating custom blocks
- Events - Event system details