Skip to content

Adapters

Stream adapters for converting provider-specific formats.

Overview

Adapters convert provider-specific stream formats into text chunks that Streamblocks can process. Each adapter implements a callable interface that transforms the input stream.

Provider Adapters

GeminiAdapter

Adapter for Google Gemini streams.

from hother.streamblocks.adapters import GeminiAdapter

adapter = GeminiAdapter()
async for event in processor.process_stream(adapter(gemini_response)):
    ...

OpenAIAdapter

Adapter for OpenAI streams.

from hother.streamblocks.adapters import OpenAIAdapter

adapter = OpenAIAdapter()
async for event in processor.process_stream(adapter(openai_response)):
    ...

AnthropicAdapter

Adapter for Anthropic streams.

from hother.streamblocks.adapters import AnthropicAdapter

adapter = AnthropicAdapter()
async for event in processor.process_stream(adapter(anthropic_stream)):
    ...

Auto-Detection

Streamblocks can automatically detect the appropriate adapter:

from hother.streamblocks.adapters import auto_detect_adapter

adapter = auto_detect_adapter(response)
if adapter:
    async for event in processor.process_stream(adapter(response)):
        ...

Creating Custom Adapters

Create custom adapters by implementing a callable that yields text chunks:

class MyProviderAdapter:
    def __call__(self, stream):
        for chunk in stream:
            # Extract text from provider-specific format
            yield chunk.text

API Reference

hother.streamblocks.adapters

Stream adapters for bidirectional protocol transformation.

This module provides the adapter system for transforming between different input and output protocols.

Key Components: - EventCategory: Categorize events for routing (TEXT_CONTENT, PASSTHROUGH, SKIP) - InputProtocolAdapter: Protocol for input transformation - OutputProtocolAdapter: Protocol for output transformation - InputAdapterRegistry: Auto-detection of input adapters

Example

from hother.streamblocks.adapters import EventCategory, InputAdapterRegistry from hother.streamblocks.adapters.input import IdentityInputAdapter from hother.streamblocks.adapters.output import StreamBlocksOutputAdapter

EventCategory

Bases: StrEnum

Semantic categorization of protocol events.

These three categories are EXHAUSTIVE - every protocol event falls into one:

  • TEXT_CONTENT: Event contains text that should be processed by StreamBlocks
  • PASSTHROUGH: Event should pass through unchanged to output
  • SKIP: Event should not be emitted in output at all

TEXT_CONTENT class-attribute instance-attribute

TEXT_CONTENT = 'text_content'

Event contains text content that should be processed by StreamBlocks.

PASSTHROUGH class-attribute instance-attribute

PASSTHROUGH = 'passthrough'

Event has no text content and should pass through unchanged to output.

SKIP class-attribute instance-attribute

SKIP = 'skip'

Event should not be included in output at all.

InputAdapterRegistry

Registry for input adapter auto-detection.

Uses module prefix matching and attribute-based fallback detection. Extensions register themselves when imported.

Example

Register via decorator

@InputAdapterRegistry.register(module_prefix="openai.") ... class OpenAIInputAdapter: ... def categorize(self, event) -> EventCategory: ... return EventCategory.TEXT_CONTENT ... def extract_text(self, event) -> str | None: ... return event.choices[0].delta.content

Register via method

InputAdapterRegistry.register_module("mycompany.api", MyCustomAdapter)

Detect adapter from sample

adapter = InputAdapterRegistry.detect(sample_chunk)

register classmethod

register(
    *,
    module_prefix: str | None = None,
    attributes: list[str] | None = None,
) -> Callable[
    [type[InputProtocolAdapter[Any]]],
    type[InputProtocolAdapter[Any]],
]

Decorator to register an adapter for auto-detection.

Parameters:

Name Type Description Default
module_prefix str | None

Module path prefix to match (e.g., "openai.types")

None
attributes list[str] | None

Required attributes for attribute-based detection

None

Returns:

Type Description
Callable[[type[InputProtocolAdapter[Any]]], type[InputProtocolAdapter[Any]]]

Decorator function

Example

@InputAdapterRegistry.register(module_prefix="openai.") ... class OpenAIInputAdapter: ... ...

@InputAdapterRegistry.register(attributes=["text", "candidates"]) ... class GeminiInputAdapter: ... ...

register_module classmethod

register_module(
    prefix: str,
    adapter_class: type[InputProtocolAdapter[Any]],
) -> None

Register adapter by module prefix (non-decorator form).

Parameters:

Name Type Description Default
prefix str

Module path prefix to match

required
adapter_class type[InputProtocolAdapter[Any]]

Adapter class to instantiate when matched

required

register_pattern classmethod

register_pattern(
    attrs: list[str],
    adapter_class: type[InputProtocolAdapter[Any]],
) -> None

Register adapter by attribute pattern (non-decorator form).

Parameters:

Name Type Description Default
attrs list[str]

Required attributes for detection

required
adapter_class type[InputProtocolAdapter[Any]]

Adapter class to instantiate when matched

required

detect classmethod

detect(chunk: Any) -> InputProtocolAdapter[Any] | None

Detect and instantiate appropriate adapter from chunk.

Detection order: 1. String → IdentityInputAdapter 2. Module prefix match 3. Attribute pattern match 4. Fallback to text/content attribute 5. None if no match

Parameters:

Name Type Description Default
chunk Any

Sample chunk from stream

required

Returns:

Type Description
InputProtocolAdapter[Any] | None

Adapter instance if detected, None otherwise

get_registered_modules classmethod

get_registered_modules() -> dict[
    str, type[InputProtocolAdapter[Any]]
]

Get all registered module prefixes.

Returns:

Type Description
dict[str, type[InputProtocolAdapter[Any]]]

Copy of module prefix registry

get_registered_patterns classmethod

get_registered_patterns() -> list[
    tuple[list[str], type[InputProtocolAdapter[Any]]]
]

Get all registered attribute patterns.

Returns:

Type Description
list[tuple[list[str], type[InputProtocolAdapter[Any]]]]

Copy of attribute pattern registry

clear classmethod

clear() -> None

Clear all registered adapters (useful for testing).

BidirectionalAdapter

Bases: Protocol[TInput, TOutput]

Combined bidirectional adapter for full protocol transformation.

This is a convenience pattern - users can also use separate adapters.

Example

class MyBidirectionalAdapter: ... def init(self): ... self._input = MyInputAdapter() ... self._output = MyOutputAdapter() ... ... @property ... def input_adapter(self) -> MyInputAdapter: ... return self._input ... ... @property ... def output_adapter(self) -> MyOutputAdapter: ... return self._output

input_adapter property

input_adapter: InputProtocolAdapter[TInput]

Input protocol adapter.

output_adapter property

output_adapter: OutputProtocolAdapter[TOutput]

Output protocol adapter.

InputProtocolAdapter

Bases: Protocol[TInput]

Protocol for transforming input events for StreamBlocks processing.

Handles any input format: chunks (OpenAI), events (AG-UI), or custom.

Example

class MyInputAdapter: ... def categorize(self, event: MyEvent) -> EventCategory: ... if event.has_text: ... return EventCategory.TEXT_CONTENT ... return EventCategory.PASSTHROUGH ... ... def extract_text(self, event: MyEvent) -> str | None: ... return event.text

categorize

categorize(event: TInput) -> EventCategory

Categorize event for routing.

Parameters:

Name Type Description Default
event TInput

Input event to categorize

required

Returns:

Type Description
EventCategory
  • TEXT_CONTENT: Event contains text, process with StreamBlocks
EventCategory
  • PASSTHROUGH: Pass through to output unchanged
EventCategory
  • SKIP: Don't include in output

extract_text

extract_text(event: TInput) -> str | None

Extract text content from TEXT_CONTENT events.

Only called for events categorized as TEXT_CONTENT.

This method can perform any complex extraction/computation: - Simple field access: return event.text - Nested extraction: return event.data.content.text - Multiple fields: return f"{event.prefix}{event.body}" - Decoding: return base64.decode(event.encoded_text) - JSON extraction: return json.loads(event.payload)["message"]

Parameters:

Name Type Description Default
event TInput

Input event to extract text from

required

Returns:

Type Description
str | None

Extracted text, or None if no text available

get_metadata

get_metadata(event: TInput) -> dict[str, Any] | None

Extract protocol-specific metadata from event (optional).

Parameters:

Name Type Description Default
event TInput

Input event to extract metadata from

required

Returns:

Type Description
dict[str, Any] | None

Dictionary of metadata, or None

is_complete

is_complete(event: TInput) -> bool

Check if this event signals stream completion (optional).

Parameters:

Name Type Description Default
event TInput

Input event to check

required

Returns:

Type Description
bool

True if this is the final event in the stream

OutputProtocolAdapter

Bases: Protocol[TOutput]

Protocol for transforming StreamBlocks events to output format.

Can emit to any protocol: AG-UI, custom events, plain text, etc.

Example

class MyOutputAdapter: ... def to_protocol_event(self, event: BaseEvent) -> MyEvent | None: ... if isinstance(event, BlockEndEvent): ... return MyEvent(type="block", data=event.get_block()) ... return None ... ... def passthrough(self, original_event: Any) -> MyEvent | None: ... return MyEvent(type="passthrough", data=original_event)

to_protocol_event

to_protocol_event(
    event: BaseEvent,
) -> TOutput | list[TOutput] | None

Convert a StreamBlocks event to output protocol event(s).

Parameters:

Name Type Description Default
event BaseEvent

StreamBlocks event to convert

required

Returns:

Type Description
TOutput | list[TOutput] | None
  • Single event
TOutput | list[TOutput] | None
  • List of events (for protocols requiring start/content/end pattern)
TOutput | list[TOutput] | None
  • None to skip emission

passthrough

passthrough(original_event: Any) -> TOutput | None

Handle passthrough events.

Called for events categorized as PASSTHROUGH by the input adapter.

Parameters:

Name Type Description Default
original_event Any

Original input event to pass through

required

Returns:

Type Description
TOutput | None
  • The event in output protocol format
TOutput | None
  • None if this adapter can't handle the event type

detect_input_adapter

detect_input_adapter(
    sample: Any,
) -> InputProtocolAdapter[Any]

Detect input adapter from sample event.

Parameters:

Name Type Description Default
sample Any

A sample event from the stream

required

Returns:

Type Description
InputProtocolAdapter[Any]

Detected adapter instance

Raises:

Type Description
ValueError

If no adapter matches the sample