Core Components
Core Streamblocks components and the main processor.
StreamBlockProcessor
The main processing engine for extracting blocks from streams.
from hother.streamblocks import StreamBlockProcessor, Registry, DelimiterPreambleSyntax
processor = StreamBlockProcessor(
registry=Registry(),
syntaxes=[DelimiterPreambleSyntax()],
emit_text_delta=True,
emit_block_content_delta=True,
)
async for event in processor.process_stream(stream):
print(event)
Constructor Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
registry |
Registry |
Required | Block type registry |
syntaxes |
list[BaseSyntax] |
Required | Syntax definitions to use |
input_adapter |
InputProtocolAdapter \| None |
None |
Input stream adapter |
output_adapter |
OutputProtocolAdapter \| None |
None |
Output event adapter |
emit_text_delta |
bool |
False |
Emit TEXT_DELTA events |
emit_text_content |
bool |
True |
Emit TEXT_CONTENT events |
emit_block_start |
bool |
True |
Emit BLOCK_START events |
emit_block_content_delta |
bool |
False |
Emit delta events |
emit_block_metadata_end |
bool |
False |
Emit metadata end events |
emit_block_content_end |
bool |
False |
Emit content end events |
max_block_size |
int |
1048576 |
Maximum block size in bytes |
Methods
process_stream
async def process_stream(
self,
stream: AsyncIterable[Any],
) -> AsyncIterator[Event]:
"""Process a stream and yield events."""
process_chunk
async def process_chunk(self, chunk: str) -> list[Event]:
"""Process a single text chunk and return events."""
finalize
Registry
Block type registry with validation support.
from hother.streamblocks import Registry, Block, BaseMetadata, BaseContent
from typing import Literal
class TaskMetadata(BaseMetadata):
block_type: Literal["task"] = "task"
priority: str = "normal"
class TaskContent(BaseContent):
pass
TaskBlock = Block[TaskMetadata, TaskContent]
# Create registry and register block types
registry = Registry()
registry.register("task", TaskBlock)
Constructor Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
name |
str \| None |
None |
Registry name |
failure_mode |
MetadataValidationFailureMode |
REJECT |
Validation failure behavior |
Methods
register
def register(
self,
block_type: str,
block_class: type[Block],
) -> None:
"""Register a block type with its class."""
get
validate
def validate(
self,
block_type: str,
metadata: dict,
content: str,
) -> ValidationResult:
"""Validate block data against registered type."""
StreamState
Processing state enumeration.
from hother.streamblocks import StreamState
StreamState.IDLE # Not processing
StreamState.STREAMING # Processing stream
StreamState.FINALIZING # Finalizing stream
StreamState.COMPLETED # Stream completed
StreamState.ERROR # Error occurred
ValidationResult
Validation outcome with errors.
from hother.streamblocks import ValidationResult
result = registry.validate("task", metadata, content)
if result.success:
print("Validation passed")
else:
print(f"Errors: {result.errors}")
Fields
| Field | Type | Description |
|---|---|---|
success |
bool |
Whether validation passed |
errors |
list[str] |
List of error messages |
metadata |
BaseMetadata \| None |
Validated metadata |
content |
BaseContent \| None |
Validated content |
MetadataValidationFailureMode
Validation failure behavior enumeration.
from hother.streamblocks import MetadataValidationFailureMode
MetadataValidationFailureMode.REJECT # Reject block on failure
MetadataValidationFailureMode.FALLBACK # Use fallback metadata
MetadataValidationFailureMode.SKIP # Skip validation
Base Types
BaseMetadata
Base metadata model with standard fields.
from hother.streamblocks import BaseMetadata
class TaskMetadata(BaseMetadata):
block_type: Literal["task"] = "task"
priority: str = "normal"
tags: list[str] = []
Required fields:
| Field | Type | Description |
|---|---|---|
id |
str |
Block identifier |
block_type |
str |
Type of the block |
BaseContent
Base content model with raw content field.
from hother.streamblocks import BaseContent
class TaskContent(BaseContent):
@classmethod
def parse(cls, raw_text: str) -> "TaskContent":
"""Custom parsing logic."""
return cls(raw_content=raw_text.strip())
Required fields:
| Field | Type | Description |
|---|---|---|
raw_content |
str |
Raw unparsed content |
Detection and Parse Results
DetectionResult
Result from syntax detection attempt.
from hother.streamblocks import DetectionResult
result = DetectionResult(
is_opening=True,
is_closing=False,
is_metadata_boundary=False,
metadata={"id": "task01", "block_type": "task"},
)
ParseResult
Result from parsing attempt.
from hother.streamblocks import ParseResult
result = ParseResult(
success=True,
metadata=metadata,
content=content,
error=None,
)
API Reference
hother.streamblocks.core.processor.StreamBlockProcessor
Main stream processing engine for a single syntax type.
This processor works with exactly one syntax and coordinates: - Adapter detection and text extraction - Line accumulation via LineAccumulator - Block detection and extraction via BlockStateMachine - Event emission (TextDeltaEvent, block events, etc.)
process_chunk
process_chunk(
chunk: TChunk,
adapter: InputProtocolAdapter[TChunk] | None = None,
) -> list[TChunk | Event]
Process a single chunk and return resulting events.
This method is stateful - it maintains internal state between calls. Call finalize() after processing all chunks to flush incomplete blocks.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
chunk
|
TChunk
|
Single chunk to process |
required |
adapter
|
InputProtocolAdapter[TChunk] | None
|
Optional adapter for extracting text. If not provided and auto_detect_adapter=True, will auto-detect on first chunk. |
None
|
Returns:
| Type | Description |
|---|---|
list[TChunk | Event]
|
List of events generated from this chunk. May be empty if chunk only |
list[TChunk | Event]
|
accumulates text without completing any lines. |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If adapter is not set after first chunk processing (internal state error, should not occur in normal usage). |
Example
processor = StreamBlockProcessor(registry) response = await client.generate_content_stream(...) async for chunk in response: ... events = processor.process_chunk(chunk) ... for event in events: ... if isinstance(event, BlockEndEvent): ... print(f"Block: {event.block_id}") ...
Finalize at stream end
final_events = processor.finalize() for event in final_events: ... if isinstance(event, BlockErrorEvent): ... print(f"Incomplete block: {event.reason}")
finalize
finalize() -> list[Event]
Finalize processing and flush any incomplete blocks.
Call this method after processing all chunks to get rejection events for any blocks that were opened but never closed.
This method processes any accumulated text as a final line before flushing candidates, ensuring the last line is processed even if it doesn't end with a newline.
Returns:
| Type | Description |
|---|---|
list[Event]
|
List of events including processed final line and rejection events |
list[Event]
|
for incomplete blocks |
Example
processor = StreamBlockProcessor(registry) async for chunk in stream: ... events = processor.process_chunk(chunk) ... # ... handle events ...
Stream ended, process remaining text and flush incomplete blocks
final_events = processor.finalize() for event in final_events: ... if isinstance(event, BlockErrorEvent): ... print(f"Incomplete block: {event.reason}")
is_native_event
Check if event is a native provider event (not a StreamBlocks event).
This method provides provider-agnostic detection of native events. It checks if the event originates from the AI provider (Gemini, OpenAI, Anthropic, etc.) versus being a StreamBlocks event.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event
|
Any
|
Event to check |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if event is from the native provider, False if it's a StreamBlocks |
bool
|
event or if detection is not possible |
Example
processor = StreamBlockProcessor(registry) async for event in processor.process_stream(gemini_stream): ... if processor.is_native_event(event): ... # Handle Gemini event (provider-agnostic!) ... usage = getattr(event, 'usage_metadata', None) ... elif isinstance(event, BlockEndEvent): ... # Handle StreamBlocks event ... print(f"Block: {event.block_id}")
process_stream
async
process_stream(
stream: AsyncIterator[TChunk],
adapter: InputProtocolAdapter[TChunk] | None = None,
) -> AsyncGenerator[TChunk | Event]
Process stream and yield mixed events.
This method processes chunks from any stream format, extracting text via an adapter and emitting both original chunks (if enabled) and StreamBlocks events.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
stream
|
AsyncIterator[TChunk]
|
Async iterator yielding chunks (text or objects) |
required |
adapter
|
InputProtocolAdapter[TChunk] | None
|
Optional adapter for extracting text from chunks. If None and auto_detect_adapter=True, will auto-detect from first chunk. |
None
|
Yields:
| Type | Description |
|---|---|
AsyncGenerator[TChunk | Event]
|
Mixed stream of: |
AsyncGenerator[TChunk | Event]
|
|
AsyncGenerator[TChunk | Event]
|
|
AsyncGenerator[TChunk | Event]
|
|
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If adapter is not set after first chunk processing (internal state error, should not occur in normal usage). |
Example
Plain text
async for event in processor.process_stream(text_stream): ... if isinstance(event, BlockEndEvent): ... print(f"Extracted: {event.block_id}")
With Gemini adapter (auto-detected)
async for event in processor.process_stream(gemini_stream): ... if hasattr(event, 'usage_metadata'): ... print(f"Tokens: {event.usage_metadata}") ... elif isinstance(event, BlockEndEvent): ... print(f"Extracted: {event.block_id}")
hother.streamblocks.core.registry.Registry
Type-specific registry for a single syntax type.
This registry holds exactly one syntax instance and maps block types to block classes.
Example
syntax = DelimiterPreambleSyntax(name="my_syntax") registry = Registry(syntax) registry.register("files_operations", FileOperations, validators=[my_validator]) registry.register("patch", Patch)
Or with bulk registration:
registry = Registry( ... syntax=syntax, ... blocks={ ... "files_operations": FileOperations, ... "patch": Patch, ... } ... ) registry.add_validator("files_operations", my_validator)
metadata_failure_mode
property
metadata_failure_mode: MetadataValidationFailureMode
Get the metadata validation failure mode.
register
register(
name: str,
block_class: type[Block[Any, Any]],
validators: list[ValidatorFunc] | None = None,
) -> None
Register a block class for a block type.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Block type name (e.g., "files_operations", "patch") |
required |
block_class
|
type[Block[Any, Any]]
|
Block class inheriting from Block[M, C] |
required |
validators
|
list[ValidatorFunc] | None
|
Optional list of validator functions for this block type |
None
|
get_block_class
add_validator
Add a validator for a block type.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
block_type
|
BlockType
|
Type of block to validate |
required |
validator
|
ValidatorFunc
|
Function that validates a block |
required |
validate_block
validate_block(block: ExtractedBlock[Any, Any]) -> bool
Run all validators for a block.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
block
|
ExtractedBlock[Any, Any]
|
Extracted block to validate |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if all validators pass |
add_metadata_validator
Add an early metadata validator for a block type.
Metadata validators are called when the metadata section completes, before content accumulation begins. They receive the raw metadata string and the parsed metadata dict.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
block_type
|
BlockType
|
Type of block to validate |
required |
validator
|
MetadataValidatorFunc
|
Function that validates metadata and returns ValidationResult |
required |
add_content_validator
Add an early content validator for a block type.
Content validators are called when the content section completes, before the final BlockEndEvent. They receive the raw content string and the parsed content dict.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
block_type
|
BlockType
|
Type of block to validate |
required |
validator
|
ContentValidatorFunc
|
Function that validates content and returns ValidationResult |
required |
validate_metadata
validate_metadata(
block_type: str,
raw_metadata: str,
parsed_metadata: dict[str, Any] | None,
) -> ValidationResult
Run all metadata validators for a block type.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
block_type
|
str
|
Type of block being validated |
required |
raw_metadata
|
str
|
Raw metadata string |
required |
parsed_metadata
|
dict[str, Any] | None
|
Parsed metadata dict (if available) |
required |
Returns:
| Type | Description |
|---|---|
ValidationResult
|
ValidationResult with combined results from all validators |
validate_content
validate_content(
block_type: str,
raw_content: str,
parsed_content: dict[str, Any] | None,
) -> ValidationResult
Run all content validators for a block type.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
block_type
|
str
|
Type of block being validated |
required |
raw_content
|
str
|
Raw content string |
required |
parsed_content
|
dict[str, Any] | None
|
Parsed content dict (if available) |
required |
Returns:
| Type | Description |
|---|---|
ValidationResult
|
ValidationResult with combined results from all validators |
hother.streamblocks.core.registry.ValidationResult
dataclass
hother.streamblocks.core.registry.MetadataValidationFailureMode
Bases: StrEnum
Behavior when metadata validation fails.
hother.streamblocks.core.types.BaseMetadata
Bases: BaseModel
Base metadata model with standard fields.
All custom metadata models should inherit from this class and add their domain-specific fields.
Example
Define custom metadata for a patch block
class PatchMetadata(BaseMetadata): ... file_path: str ... operation: Literal["create", "update", "delete"] ...
Create instance with required base fields
metadata = PatchMetadata( ... id="patch_001", ... block_type="patch", ... file_path="src/main.py", ... operation="update" ... ) metadata.id 'patch_001' metadata.file_path 'src/main.py'
hother.streamblocks.core.types.BaseContent
Bases: BaseModel
Base content model with raw content field.
All custom content models should inherit from this class and optionally override the parse() method to add custom parsing logic. The raw_content field always preserves the original unparsed text.
Example
Simple content model that just stores raw text
class SimpleContent(BaseContent): ... pass ... content = SimpleContent.parse("Hello, world!") content.raw_content 'Hello, world!'
Content model with custom parsing
class ItemsContent(BaseContent): ... items: list[str] = [] ... ... @classmethod ... def parse(cls, raw_text: str) -> Self: ... items = [line.strip() for line in raw_text.split("\n") if line.strip()] ... return cls(raw_content=raw_text, items=items) ... content = ItemsContent.parse("apple\nbanana\norange") content.items ['apple', 'banana', 'orange'] content.raw_content # Original text preserved 'apple\nbanana\norange'
hother.streamblocks.core.types.DetectionResult
hother.streamblocks.core.types.ParseResult
Bases: BaseModel
Result from parsing attempt.
model_config
class-attribute
instance-attribute
model_config = ConfigDict(arbitrary_types_allowed=True)