Skip to content

Core Components

Core Streamblocks components and the main processor.

StreamBlockProcessor

The main processing engine for extracting blocks from streams.

from hother.streamblocks import StreamBlockProcessor, Registry, DelimiterPreambleSyntax

processor = StreamBlockProcessor(
    registry=Registry(),
    syntaxes=[DelimiterPreambleSyntax()],
    emit_text_delta=True,
    emit_block_content_delta=True,
)

async for event in processor.process_stream(stream):
    print(event)

Constructor Parameters

Parameter Type Default Description
registry Registry Required Block type registry
syntaxes list[BaseSyntax] Required Syntax definitions to use
input_adapter InputProtocolAdapter \| None None Input stream adapter
output_adapter OutputProtocolAdapter \| None None Output event adapter
emit_text_delta bool False Emit TEXT_DELTA events
emit_text_content bool True Emit TEXT_CONTENT events
emit_block_start bool True Emit BLOCK_START events
emit_block_content_delta bool False Emit delta events
emit_block_metadata_end bool False Emit metadata end events
emit_block_content_end bool False Emit content end events
max_block_size int 1048576 Maximum block size in bytes

Methods

process_stream

async def process_stream(
    self,
    stream: AsyncIterable[Any],
) -> AsyncIterator[Event]:
    """Process a stream and yield events."""

process_chunk

async def process_chunk(self, chunk: str) -> list[Event]:
    """Process a single text chunk and return events."""

finalize

async def finalize(self) -> list[Event]:
    """Finalize processing and return any remaining events."""

Registry

Block type registry with validation support.

from hother.streamblocks import Registry, Block, BaseMetadata, BaseContent
from typing import Literal

class TaskMetadata(BaseMetadata):
    block_type: Literal["task"] = "task"
    priority: str = "normal"

class TaskContent(BaseContent):
    pass

TaskBlock = Block[TaskMetadata, TaskContent]

# Create registry and register block types
registry = Registry()
registry.register("task", TaskBlock)

Constructor Parameters

Parameter Type Default Description
name str \| None None Registry name
failure_mode MetadataValidationFailureMode REJECT Validation failure behavior

Methods

register

def register(
    self,
    block_type: str,
    block_class: type[Block],
) -> None:
    """Register a block type with its class."""

get

def get(self, block_type: str) -> type[Block] | None:
    """Get the block class for a type."""

validate

def validate(
    self,
    block_type: str,
    metadata: dict,
    content: str,
) -> ValidationResult:
    """Validate block data against registered type."""

StreamState

Processing state enumeration.

from hother.streamblocks import StreamState

StreamState.IDLE       # Not processing
StreamState.STREAMING  # Processing stream
StreamState.FINALIZING # Finalizing stream
StreamState.COMPLETED  # Stream completed
StreamState.ERROR      # Error occurred

ValidationResult

Validation outcome with errors.

from hother.streamblocks import ValidationResult

result = registry.validate("task", metadata, content)

if result.success:
    print("Validation passed")
else:
    print(f"Errors: {result.errors}")

Fields

Field Type Description
success bool Whether validation passed
errors list[str] List of error messages
metadata BaseMetadata \| None Validated metadata
content BaseContent \| None Validated content

MetadataValidationFailureMode

Validation failure behavior enumeration.

from hother.streamblocks import MetadataValidationFailureMode

MetadataValidationFailureMode.REJECT    # Reject block on failure
MetadataValidationFailureMode.FALLBACK  # Use fallback metadata
MetadataValidationFailureMode.SKIP      # Skip validation

Base Types

BaseMetadata

Base metadata model with standard fields.

from hother.streamblocks import BaseMetadata

class TaskMetadata(BaseMetadata):
    block_type: Literal["task"] = "task"
    priority: str = "normal"
    tags: list[str] = []

Required fields:

Field Type Description
id str Block identifier
block_type str Type of the block

BaseContent

Base content model with raw content field.

from hother.streamblocks import BaseContent

class TaskContent(BaseContent):
    @classmethod
    def parse(cls, raw_text: str) -> "TaskContent":
        """Custom parsing logic."""
        return cls(raw_content=raw_text.strip())

Required fields:

Field Type Description
raw_content str Raw unparsed content

Detection and Parse Results

DetectionResult

Result from syntax detection attempt.

from hother.streamblocks import DetectionResult

result = DetectionResult(
    is_opening=True,
    is_closing=False,
    is_metadata_boundary=False,
    metadata={"id": "task01", "block_type": "task"},
)

ParseResult

Result from parsing attempt.

from hother.streamblocks import ParseResult

result = ParseResult(
    success=True,
    metadata=metadata,
    content=content,
    error=None,
)

API Reference

hother.streamblocks.core.processor.StreamBlockProcessor

Main stream processing engine for a single syntax type.

This processor works with exactly one syntax and coordinates: - Adapter detection and text extraction - Line accumulation via LineAccumulator - Block detection and extraction via BlockStateMachine - Event emission (TextDeltaEvent, block events, etc.)

registry instance-attribute

registry = registry

syntax instance-attribute

syntax = syntax

logger instance-attribute

logger = logger or StdlibLoggerAdapter(getLogger(__name__))

config instance-attribute

config = config or ProcessorConfig()

process_chunk

process_chunk(
    chunk: TChunk,
    adapter: InputProtocolAdapter[TChunk] | None = None,
) -> list[TChunk | Event]

Process a single chunk and return resulting events.

This method is stateful - it maintains internal state between calls. Call finalize() after processing all chunks to flush incomplete blocks.

Parameters:

Name Type Description Default
chunk TChunk

Single chunk to process

required
adapter InputProtocolAdapter[TChunk] | None

Optional adapter for extracting text. If not provided and auto_detect_adapter=True, will auto-detect on first chunk.

None

Returns:

Type Description
list[TChunk | Event]

List of events generated from this chunk. May be empty if chunk only

list[TChunk | Event]

accumulates text without completing any lines.

Raises:

Type Description
RuntimeError

If adapter is not set after first chunk processing (internal state error, should not occur in normal usage).

Example

processor = StreamBlockProcessor(registry) response = await client.generate_content_stream(...) async for chunk in response: ... events = processor.process_chunk(chunk) ... for event in events: ... if isinstance(event, BlockEndEvent): ... print(f"Block: {event.block_id}") ...

Finalize at stream end

final_events = processor.finalize() for event in final_events: ... if isinstance(event, BlockErrorEvent): ... print(f"Incomplete block: {event.reason}")

finalize

finalize() -> list[Event]

Finalize processing and flush any incomplete blocks.

Call this method after processing all chunks to get rejection events for any blocks that were opened but never closed.

This method processes any accumulated text as a final line before flushing candidates, ensuring the last line is processed even if it doesn't end with a newline.

Returns:

Type Description
list[Event]

List of events including processed final line and rejection events

list[Event]

for incomplete blocks

Example

processor = StreamBlockProcessor(registry) async for chunk in stream: ... events = processor.process_chunk(chunk) ... # ... handle events ...

Stream ended, process remaining text and flush incomplete blocks

final_events = processor.finalize() for event in final_events: ... if isinstance(event, BlockErrorEvent): ... print(f"Incomplete block: {event.reason}")

is_native_event

is_native_event(event: Any) -> bool

Check if event is a native provider event (not a StreamBlocks event).

This method provides provider-agnostic detection of native events. It checks if the event originates from the AI provider (Gemini, OpenAI, Anthropic, etc.) versus being a StreamBlocks event.

Parameters:

Name Type Description Default
event Any

Event to check

required

Returns:

Type Description
bool

True if event is from the native provider, False if it's a StreamBlocks

bool

event or if detection is not possible

Example

processor = StreamBlockProcessor(registry) async for event in processor.process_stream(gemini_stream): ... if processor.is_native_event(event): ... # Handle Gemini event (provider-agnostic!) ... usage = getattr(event, 'usage_metadata', None) ... elif isinstance(event, BlockEndEvent): ... # Handle StreamBlocks event ... print(f"Block: {event.block_id}")

process_stream async

process_stream(
    stream: AsyncIterator[TChunk],
    adapter: InputProtocolAdapter[TChunk] | None = None,
) -> AsyncGenerator[TChunk | Event]

Process stream and yield mixed events.

This method processes chunks from any stream format, extracting text via an adapter and emitting both original chunks (if enabled) and StreamBlocks events.

Parameters:

Name Type Description Default
stream AsyncIterator[TChunk]

Async iterator yielding chunks (text or objects)

required
adapter InputProtocolAdapter[TChunk] | None

Optional adapter for extracting text from chunks. If None and auto_detect_adapter=True, will auto-detect from first chunk.

None

Yields:

Type Description
AsyncGenerator[TChunk | Event]

Mixed stream of:

AsyncGenerator[TChunk | Event]
  • Original chunks (if emit_original_events=True)
AsyncGenerator[TChunk | Event]
  • TextDeltaEvent (if emit_text_deltas=True)
AsyncGenerator[TChunk | Event]
  • TextContentEvent, BlockStartEvent, BlockEndEvent, BlockErrorEvent, and section delta events

Raises:

Type Description
RuntimeError

If adapter is not set after first chunk processing (internal state error, should not occur in normal usage).

Example

Plain text

async for event in processor.process_stream(text_stream): ... if isinstance(event, BlockEndEvent): ... print(f"Extracted: {event.block_id}")

With Gemini adapter (auto-detected)

async for event in processor.process_stream(gemini_stream): ... if hasattr(event, 'usage_metadata'): ... print(f"Tokens: {event.usage_metadata}") ... elif isinstance(event, BlockEndEvent): ... print(f"Extracted: {event.block_id}")

hother.streamblocks.core.registry.Registry

Type-specific registry for a single syntax type.

This registry holds exactly one syntax instance and maps block types to block classes.

Example

syntax = DelimiterPreambleSyntax(name="my_syntax") registry = Registry(syntax) registry.register("files_operations", FileOperations, validators=[my_validator]) registry.register("patch", Patch)

Or with bulk registration:

registry = Registry( ... syntax=syntax, ... blocks={ ... "files_operations": FileOperations, ... "patch": Patch, ... } ... ) registry.add_validator("files_operations", my_validator)

logger instance-attribute

logger = logger or StdlibLoggerAdapter(getLogger(__name__))

syntax property

syntax: BaseSyntax

Get the syntax instance.

metadata_failure_mode property

metadata_failure_mode: MetadataValidationFailureMode

Get the metadata validation failure mode.

register

register(
    name: str,
    block_class: type[Block[Any, Any]],
    validators: list[ValidatorFunc] | None = None,
) -> None

Register a block class for a block type.

Parameters:

Name Type Description Default
name str

Block type name (e.g., "files_operations", "patch")

required
block_class type[Block[Any, Any]]

Block class inheriting from Block[M, C]

required
validators list[ValidatorFunc] | None

Optional list of validator functions for this block type

None

get_block_class

get_block_class(
    block_type: str,
) -> type[Block[Any, Any]] | None

Get the block class for a given block type.

Parameters:

Name Type Description Default
block_type str

The block type to look up

required

Returns:

Type Description
type[Block[Any, Any]] | None

The registered block class, or None if not found

add_validator

add_validator(
    block_type: BlockType, validator: ValidatorFunc
) -> None

Add a validator for a block type.

Parameters:

Name Type Description Default
block_type BlockType

Type of block to validate

required
validator ValidatorFunc

Function that validates a block

required

validate_block

validate_block(block: ExtractedBlock[Any, Any]) -> bool

Run all validators for a block.

Parameters:

Name Type Description Default
block ExtractedBlock[Any, Any]

Extracted block to validate

required

Returns:

Type Description
bool

True if all validators pass

add_metadata_validator

add_metadata_validator(
    block_type: BlockType, validator: MetadataValidatorFunc
) -> None

Add an early metadata validator for a block type.

Metadata validators are called when the metadata section completes, before content accumulation begins. They receive the raw metadata string and the parsed metadata dict.

Parameters:

Name Type Description Default
block_type BlockType

Type of block to validate

required
validator MetadataValidatorFunc

Function that validates metadata and returns ValidationResult

required

add_content_validator

add_content_validator(
    block_type: BlockType, validator: ContentValidatorFunc
) -> None

Add an early content validator for a block type.

Content validators are called when the content section completes, before the final BlockEndEvent. They receive the raw content string and the parsed content dict.

Parameters:

Name Type Description Default
block_type BlockType

Type of block to validate

required
validator ContentValidatorFunc

Function that validates content and returns ValidationResult

required

validate_metadata

validate_metadata(
    block_type: str,
    raw_metadata: str,
    parsed_metadata: dict[str, Any] | None,
) -> ValidationResult

Run all metadata validators for a block type.

Parameters:

Name Type Description Default
block_type str

Type of block being validated

required
raw_metadata str

Raw metadata string

required
parsed_metadata dict[str, Any] | None

Parsed metadata dict (if available)

required

Returns:

Type Description
ValidationResult

ValidationResult with combined results from all validators

validate_content

validate_content(
    block_type: str,
    raw_content: str,
    parsed_content: dict[str, Any] | None,
) -> ValidationResult

Run all content validators for a block type.

Parameters:

Name Type Description Default
block_type str

Type of block being validated

required
raw_content str

Raw content string

required
parsed_content dict[str, Any] | None

Parsed content dict (if available)

required

Returns:

Type Description
ValidationResult

ValidationResult with combined results from all validators

hother.streamblocks.core.registry.ValidationResult dataclass

Result from section validation.

Attributes:

Name Type Description
passed bool

Whether validation succeeded

error str | None

Error message if validation failed

error class-attribute instance-attribute

error: str | None = None

passed class-attribute instance-attribute

passed: bool = True

failure classmethod

failure(error: str) -> ValidationResult

Create a failed validation result.

success classmethod

success() -> ValidationResult

Create a successful validation result.

hother.streamblocks.core.registry.MetadataValidationFailureMode

Bases: StrEnum

Behavior when metadata validation fails.

ABORT_BLOCK class-attribute instance-attribute

ABORT_BLOCK = 'abort_block'

CONTINUE class-attribute instance-attribute

CONTINUE = 'continue'

SKIP_CONTENT class-attribute instance-attribute

SKIP_CONTENT = 'skip_content'

hother.streamblocks.core.types.BaseMetadata

Bases: BaseModel

Base metadata model with standard fields.

All custom metadata models should inherit from this class and add their domain-specific fields.

Example

Define custom metadata for a patch block

class PatchMetadata(BaseMetadata): ... file_path: str ... operation: Literal["create", "update", "delete"] ...

Create instance with required base fields

metadata = PatchMetadata( ... id="patch_001", ... block_type="patch", ... file_path="src/main.py", ... operation="update" ... ) metadata.id 'patch_001' metadata.file_path 'src/main.py'

block_type class-attribute instance-attribute

block_type: str = Field(
    ..., description="Type of the block"
)

id class-attribute instance-attribute

id: str = Field(..., description='Block identifier')

hother.streamblocks.core.types.BaseContent

Bases: BaseModel

Base content model with raw content field.

All custom content models should inherit from this class and optionally override the parse() method to add custom parsing logic. The raw_content field always preserves the original unparsed text.

Example

Simple content model that just stores raw text

class SimpleContent(BaseContent): ... pass ... content = SimpleContent.parse("Hello, world!") content.raw_content 'Hello, world!'

Content model with custom parsing

class ItemsContent(BaseContent): ... items: list[str] = [] ... ... @classmethod ... def parse(cls, raw_text: str) -> Self: ... items = [line.strip() for line in raw_text.split("\n") if line.strip()] ... return cls(raw_content=raw_text, items=items) ... content = ItemsContent.parse("apple\nbanana\norange") content.items ['apple', 'banana', 'orange'] content.raw_content # Original text preserved 'apple\nbanana\norange'

raw_content class-attribute instance-attribute

raw_content: str = Field(
    ..., description="Raw unparsed content from the block"
)

parse classmethod

parse(raw_text: str) -> Self

Default parse method that just stores raw content.

Override this in subclasses to add custom parsing logic.

hother.streamblocks.core.types.DetectionResult

Bases: BaseModel

Result from syntax detection attempt.

is_closing class-attribute instance-attribute

is_closing: bool = False

is_metadata_boundary class-attribute instance-attribute

is_metadata_boundary: bool = False

is_opening class-attribute instance-attribute

is_opening: bool = False

metadata class-attribute instance-attribute

metadata: dict[str, Any] | None = None

hother.streamblocks.core.types.ParseResult

Bases: BaseModel

Result from parsing attempt.

content class-attribute instance-attribute

content: TContent | None = None

error class-attribute instance-attribute

error: str | None = None

exception class-attribute instance-attribute

exception: Exception | None = None

metadata class-attribute instance-attribute

metadata: TMetadata | None = None

model_config class-attribute instance-attribute

model_config = ConfigDict(arbitrary_types_allowed=True)

success instance-attribute

success: bool