Adapters
Stream adapters for converting provider-specific formats.
Overview
Adapters convert provider-specific stream formats into text chunks that Streamblocks can process. Each adapter implements a callable interface that transforms the input stream.
Provider Adapters
GeminiAdapter
Adapter for Google Gemini streams.
from hother.streamblocks.adapters import GeminiAdapter
adapter = GeminiAdapter()
async for event in processor.process_stream(adapter(gemini_response)):
...
OpenAIAdapter
Adapter for OpenAI streams.
from hother.streamblocks.adapters import OpenAIAdapter
adapter = OpenAIAdapter()
async for event in processor.process_stream(adapter(openai_response)):
...
AnthropicAdapter
Adapter for Anthropic streams.
from hother.streamblocks.adapters import AnthropicAdapter
adapter = AnthropicAdapter()
async for event in processor.process_stream(adapter(anthropic_stream)):
...
Auto-Detection
Streamblocks can automatically detect the appropriate adapter:
from hother.streamblocks.adapters import auto_detect_adapter
adapter = auto_detect_adapter(response)
if adapter:
async for event in processor.process_stream(adapter(response)):
...
Creating Custom Adapters
Create custom adapters by implementing a callable that yields text chunks:
class MyProviderAdapter:
def __call__(self, stream):
for chunk in stream:
# Extract text from provider-specific format
yield chunk.text
API Reference
hother.streamblocks.adapters
Stream adapters for bidirectional protocol transformation.
This module provides the adapter system for transforming between different input and output protocols.
Key Components: - EventCategory: Categorize events for routing (TEXT_CONTENT, PASSTHROUGH, SKIP) - InputProtocolAdapter: Protocol for input transformation - OutputProtocolAdapter: Protocol for output transformation - InputAdapterRegistry: Auto-detection of input adapters
Example
from hother.streamblocks.adapters import EventCategory, InputAdapterRegistry from hother.streamblocks.adapters.input import IdentityInputAdapter from hother.streamblocks.adapters.output import StreamBlocksOutputAdapter
EventCategory
Bases: StrEnum
Semantic categorization of protocol events.
These three categories are EXHAUSTIVE - every protocol event falls into one:
- TEXT_CONTENT: Event contains text that should be processed by StreamBlocks
- PASSTHROUGH: Event should pass through unchanged to output
- SKIP: Event should not be emitted in output at all
TEXT_CONTENT
class-attribute
instance-attribute
Event contains text content that should be processed by StreamBlocks.
PASSTHROUGH
class-attribute
instance-attribute
Event has no text content and should pass through unchanged to output.
InputAdapterRegistry
Registry for input adapter auto-detection.
Uses module prefix matching and attribute-based fallback detection. Extensions register themselves when imported.
Example
Register via decorator
@InputAdapterRegistry.register(module_prefix="openai.") ... class OpenAIInputAdapter: ... def categorize(self, event) -> EventCategory: ... return EventCategory.TEXT_CONTENT ... def extract_text(self, event) -> str | None: ... return event.choices[0].delta.content
Register via method
InputAdapterRegistry.register_module("mycompany.api", MyCustomAdapter)
Detect adapter from sample
adapter = InputAdapterRegistry.detect(sample_chunk)
register
classmethod
register(
*,
module_prefix: str | None = None,
attributes: list[str] | None = None,
) -> Callable[
[type[InputProtocolAdapter[Any]]],
type[InputProtocolAdapter[Any]],
]
Decorator to register an adapter for auto-detection.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
module_prefix
|
str | None
|
Module path prefix to match (e.g., "openai.types") |
None
|
attributes
|
list[str] | None
|
Required attributes for attribute-based detection |
None
|
Returns:
| Type | Description |
|---|---|
Callable[[type[InputProtocolAdapter[Any]]], type[InputProtocolAdapter[Any]]]
|
Decorator function |
Example
@InputAdapterRegistry.register(module_prefix="openai.") ... class OpenAIInputAdapter: ... ...
@InputAdapterRegistry.register(attributes=["text", "candidates"]) ... class GeminiInputAdapter: ... ...
register_module
classmethod
register_module(
prefix: str,
adapter_class: type[InputProtocolAdapter[Any]],
) -> None
Register adapter by module prefix (non-decorator form).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
prefix
|
str
|
Module path prefix to match |
required |
adapter_class
|
type[InputProtocolAdapter[Any]]
|
Adapter class to instantiate when matched |
required |
register_pattern
classmethod
register_pattern(
attrs: list[str],
adapter_class: type[InputProtocolAdapter[Any]],
) -> None
Register adapter by attribute pattern (non-decorator form).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
attrs
|
list[str]
|
Required attributes for detection |
required |
adapter_class
|
type[InputProtocolAdapter[Any]]
|
Adapter class to instantiate when matched |
required |
detect
classmethod
detect(chunk: Any) -> InputProtocolAdapter[Any] | None
Detect and instantiate appropriate adapter from chunk.
Detection order: 1. String → IdentityInputAdapter 2. Module prefix match 3. Attribute pattern match 4. Fallback to text/content attribute 5. None if no match
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
chunk
|
Any
|
Sample chunk from stream |
required |
Returns:
| Type | Description |
|---|---|
InputProtocolAdapter[Any] | None
|
Adapter instance if detected, None otherwise |
get_registered_modules
classmethod
get_registered_modules() -> dict[
str, type[InputProtocolAdapter[Any]]
]
Get all registered module prefixes.
Returns:
| Type | Description |
|---|---|
dict[str, type[InputProtocolAdapter[Any]]]
|
Copy of module prefix registry |
get_registered_patterns
classmethod
BidirectionalAdapter
Bases: Protocol[TInput, TOutput]
Combined bidirectional adapter for full protocol transformation.
This is a convenience pattern - users can also use separate adapters.
Example
class MyBidirectionalAdapter: ... def init(self): ... self._input = MyInputAdapter() ... self._output = MyOutputAdapter() ... ... @property ... def input_adapter(self) -> MyInputAdapter: ... return self._input ... ... @property ... def output_adapter(self) -> MyOutputAdapter: ... return self._output
InputProtocolAdapter
Bases: Protocol[TInput]
Protocol for transforming input events for StreamBlocks processing.
Handles any input format: chunks (OpenAI), events (AG-UI), or custom.
Example
class MyInputAdapter: ... def categorize(self, event: MyEvent) -> EventCategory: ... if event.has_text: ... return EventCategory.TEXT_CONTENT ... return EventCategory.PASSTHROUGH ... ... def extract_text(self, event: MyEvent) -> str | None: ... return event.text
categorize
categorize(event: TInput) -> EventCategory
Categorize event for routing.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event
|
TInput
|
Input event to categorize |
required |
Returns:
| Type | Description |
|---|---|
EventCategory
|
|
EventCategory
|
|
EventCategory
|
|
extract_text
extract_text(event: TInput) -> str | None
Extract text content from TEXT_CONTENT events.
Only called for events categorized as TEXT_CONTENT.
This method can perform any complex extraction/computation: - Simple field access: return event.text - Nested extraction: return event.data.content.text - Multiple fields: return f"{event.prefix}{event.body}" - Decoding: return base64.decode(event.encoded_text) - JSON extraction: return json.loads(event.payload)["message"]
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event
|
TInput
|
Input event to extract text from |
required |
Returns:
| Type | Description |
|---|---|
str | None
|
Extracted text, or None if no text available |
get_metadata
OutputProtocolAdapter
Bases: Protocol[TOutput]
Protocol for transforming StreamBlocks events to output format.
Can emit to any protocol: AG-UI, custom events, plain text, etc.
Example
class MyOutputAdapter: ... def to_protocol_event(self, event: BaseEvent) -> MyEvent | None: ... if isinstance(event, BlockEndEvent): ... return MyEvent(type="block", data=event.get_block()) ... return None ... ... def passthrough(self, original_event: Any) -> MyEvent | None: ... return MyEvent(type="passthrough", data=original_event)
to_protocol_event
Convert a StreamBlocks event to output protocol event(s).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event
|
BaseEvent
|
StreamBlocks event to convert |
required |
Returns:
| Type | Description |
|---|---|
TOutput | list[TOutput] | None
|
|
TOutput | list[TOutput] | None
|
|
TOutput | list[TOutput] | None
|
|
passthrough
passthrough(original_event: Any) -> TOutput | None
Handle passthrough events.
Called for events categorized as PASSTHROUGH by the input adapter.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
original_event
|
Any
|
Original input event to pass through |
required |
Returns:
| Type | Description |
|---|---|
TOutput | None
|
|
TOutput | None
|
|
detect_input_adapter
detect_input_adapter(
sample: Any,
) -> InputProtocolAdapter[Any]
Detect input adapter from sample event.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
sample
|
Any
|
A sample event from the stream |
required |
Returns:
| Type | Description |
|---|---|
InputProtocolAdapter[Any]
|
Detected adapter instance |
Raises:
| Type | Description |
|---|---|
ValueError
|
If no adapter matches the sample |