Extensions Reference
Provider-specific extensions for Streamblocks.
Overview
Extensions provide adapters for different LLM providers. Each extension implements the adapter protocol to convert provider-specific stream formats.
flowchart TB
subgraph Extensions["Provider Extensions"]
Gemini[gemini]
OpenAI[openai]
Anthropic[anthropic]
AGUI[agui]
end
subgraph Protocol["Adapter Protocol"]
Input[InputProtocolAdapter]
Output[OutputProtocolAdapter]
end
Gemini --> Input
OpenAI --> Input
Anthropic --> Input
AGUI --> Input
AGUI --> Output
Installation
Extensions are installed as optional dependencies:
Gemini Extension
Google Gemini adapter for processing Gemini API responses.
Installation
Usage
from hother.streamblocks.extensions.gemini import GeminiInputAdapter
adapter = GeminiInputAdapter()
# Use with processor
processor = StreamBlockProcessor(
registry=Registry(),
syntaxes=[DelimiterPreambleSyntax()],
input_adapter=adapter,
)
Auto-Detection
The Gemini adapter is automatically detected when processing Gemini responses:
import google.generativeai as genai
from hother.streamblocks import StreamBlockProcessor, detect_input_adapter
genai.configure(api_key="...") # pragma: allowlist secret
model = genai.GenerativeModel("gemini-pro")
response = model.generate_content("...", stream=True)
# Auto-detect adapter
adapter = detect_input_adapter(response)
OpenAI Extension
OpenAI adapter for processing OpenAI API responses.
Installation
Usage
from hother.streamblocks.extensions.openai import OpenAIInputAdapter
adapter = OpenAIInputAdapter()
# Use with processor
processor = StreamBlockProcessor(
registry=Registry(),
syntaxes=[DelimiterPreambleSyntax()],
input_adapter=adapter,
)
With OpenAI Client
from openai import OpenAI
from hother.streamblocks import StreamBlockProcessor, Registry, DelimiterPreambleSyntax
from hother.streamblocks.extensions.openai import OpenAIInputAdapter
client = OpenAI()
stream = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": "..."}],
stream=True,
)
processor = StreamBlockProcessor(
registry=Registry(),
syntaxes=[DelimiterPreambleSyntax()],
input_adapter=OpenAIInputAdapter(),
)
async for event in processor.process_stream(stream):
handle_event(event)
Anthropic Extension
Anthropic adapter for processing Claude API responses.
Installation
Usage
from hother.streamblocks.extensions.anthropic import AnthropicInputAdapter
adapter = AnthropicInputAdapter()
# Use with processor
processor = StreamBlockProcessor(
registry=Registry(),
syntaxes=[DelimiterPreambleSyntax()],
input_adapter=adapter,
)
With Anthropic Client
import anthropic
from hother.streamblocks import StreamBlockProcessor, Registry, DelimiterPreambleSyntax
from hother.streamblocks.extensions.anthropic import AnthropicInputAdapter
client = anthropic.Anthropic()
processor = StreamBlockProcessor(
registry=Registry(),
syntaxes=[DelimiterPreambleSyntax()],
input_adapter=AnthropicInputAdapter(),
)
with client.messages.stream(
model="claude-3-opus",
max_tokens=1024,
messages=[{"role": "user", "content": "..."}],
) as stream:
async for event in processor.process_stream(stream):
handle_event(event)
AG-UI Extension
AG-UI protocol adapters for bidirectional streaming.
Installation
The AG-UI extension is included in the base installation.
Input Adapter
Process incoming AG-UI events:
from hother.streamblocks.extensions.agui import AGUIInputAdapter
adapter = AGUIInputAdapter()
processor = StreamBlockProcessor(
registry=Registry(),
syntaxes=[DelimiterPreambleSyntax()],
input_adapter=adapter,
)
async for event in processor.process_stream(agui_stream):
handle_event(event)
Output Adapter
Convert Streamblocks events to AG-UI protocol:
from hother.streamblocks.extensions.agui import AGUIOutputAdapter
adapter = AGUIOutputAdapter()
processor = StreamBlockProcessor(
registry=Registry(),
syntaxes=[DelimiterPreambleSyntax()],
output_adapter=adapter,
)
# Events are now AG-UI formatted
async for event in processor.process_stream(llm_stream):
yield event # AG-UI protocol event
Bidirectional Usage
from hother.streamblocks.extensions.agui import AGUIInputAdapter, AGUIOutputAdapter
processor = StreamBlockProcessor(
registry=Registry(),
syntaxes=[DelimiterPreambleSyntax()],
input_adapter=AGUIInputAdapter(),
output_adapter=AGUIOutputAdapter(),
)
Event Filtering
Configure which events are converted:
from hother.streamblocks.extensions.agui import AGUIOutputAdapter
adapter = AGUIOutputAdapter(
include_text_deltas=True,
include_block_content=False,
include_metadata=True,
)
Custom Event Mapping
Map block types to AG-UI events:
adapter = AGUIOutputAdapter(
block_type_mapping={
"task": "CustomTaskEvent",
"code": "CodeBlockEvent",
"message": "TextMessageContent",
}
)
Creating Custom Extensions
Implement the adapter protocol for custom providers:
from hother.streamblocks import InputProtocolAdapter, EventCategory
class MyProviderAdapter(InputProtocolAdapter):
"""Adapter for MyProvider streams."""
def categorize(self, event) -> EventCategory:
"""Categorize the event type."""
if hasattr(event, "text"):
return EventCategory.TEXT_CONTENT
if hasattr(event, "tool_call"):
return EventCategory.PASSTHROUGH
return EventCategory.SKIP
def extract_text(self, event) -> str:
"""Extract text content from event."""
return event.text
def get_original_event(self, event):
"""Get the original event for passthrough."""
return event
Extension API Reference
Gemini
hother.streamblocks.extensions.gemini
Gemini extension for StreamBlocks.
This extension provides input adapters for Google GenAI streams.
Importing this module registers the GeminiInputAdapter for auto-detection.
Example
Import to enable auto-detection
import hother.streamblocks.extensions.gemini
Auto-detect from Gemini stream
processor = ProtocolStreamProcessor(registry) async for event in processor.process_stream(gemini_stream): ... print(event)
Or use convenience factory
from hother.streamblocks.extensions.gemini import create_gemini_processor processor = create_gemini_processor(registry)
GeminiInputAdapter
Input adapter for Google GenAI streams.
Handles chunks from google.genai.models.generate_content_stream() and google.ai.generativelanguage clients.
Extracts: - Text from chunk.text attribute - Usage metadata (token counts) - Model version information
Example
from google import genai adapter = GeminiInputAdapter()
async for chunk in client.aio.models.generate_content_stream(...): ... text = adapter.extract_text(chunk) ... metadata = adapter.get_metadata(chunk) ... if metadata: ... print(f"Tokens: {metadata['usage']}")
categorize
categorize(event: GenerateContentResponse) -> EventCategory
Categorize event - all Gemini chunks are text content.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event
|
GenerateContentResponse
|
Gemini GenerateContentResponse chunk |
required |
Returns:
| Type | Description |
|---|---|
EventCategory
|
TEXT_CONTENT for all chunks |
extract_text
extract_text(event: GenerateContentResponse) -> str | None
Extract text from chunk.text attribute.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event
|
GenerateContentResponse
|
Gemini GenerateContentResponse chunk |
required |
Returns:
| Type | Description |
|---|---|
str | None
|
Text content, or None if not present |
is_complete
is_complete(event: GenerateContentResponse) -> bool
Gemini doesn't have explicit finish markers in each chunk.
Completion is typically detected by the stream ending.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event
|
GenerateContentResponse
|
Gemini GenerateContentResponse chunk |
required |
Returns:
| Type | Description |
|---|---|
bool
|
Always False - Gemini streams end naturally |
get_metadata
create_gemini_processor
Create processor pre-configured for Gemini streams.
This is a convenience factory that creates a ProtocolStreamProcessor with GeminiInputAdapter and StreamBlocksOutputAdapter.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
registry
|
Registry
|
Registry with syntax and block definitions |
required |
Returns:
| Type | Description |
|---|---|
ProtocolStreamProcessor[Any, BaseEvent]
|
Pre-configured processor for Gemini streams |
Example
from hother.streamblocks.extensions.gemini import create_gemini_processor processor = create_gemini_processor(registry) async for event in processor.process_stream(gemini_stream): ... if isinstance(event, BlockExtractedEvent): ... print(f"Block: {event.block.metadata.id}")
OpenAI
hother.streamblocks.extensions.openai
OpenAI extension for StreamBlocks.
This extension provides input adapters for OpenAI ChatCompletionChunk streams.
Importing this module registers the OpenAIInputAdapter for auto-detection.
Example
Import to enable auto-detection
import hother.streamblocks.extensions.openai
Auto-detect from OpenAI stream
processor = ProtocolStreamProcessor(registry) async for event in processor.process_stream(openai_stream): ... print(event)
Or use convenience factory
from hother.streamblocks.extensions.openai import create_openai_processor processor = create_openai_processor(registry)
OpenAIInputAdapter
Input adapter for OpenAI ChatCompletionChunk streams.
Handles streams from openai.AsyncStream[ChatCompletionChunk].
Extracts: - Delta content from choices[0].delta.content - Finish reasons - Model information
Example
from openai import AsyncOpenAI adapter = OpenAIInputAdapter()
client = AsyncOpenAI() stream = await client.chat.completions.create( ... model="gpt-4", ... messages=[...], ... stream=True ... )
async for chunk in stream: ... text = adapter.extract_text(chunk) ... if adapter.is_complete(chunk): ... print("Stream complete!")
categorize
categorize(event: ChatCompletionChunk) -> EventCategory
Categorize event - all OpenAI chunks are text content.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event
|
ChatCompletionChunk
|
OpenAI ChatCompletionChunk |
required |
Returns:
| Type | Description |
|---|---|
EventCategory
|
TEXT_CONTENT for all chunks |
extract_text
extract_text(event: ChatCompletionChunk) -> str | None
Extract text from choices[0].delta.content.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event
|
ChatCompletionChunk
|
OpenAI ChatCompletionChunk |
required |
Returns:
| Type | Description |
|---|---|
str | None
|
Delta content text, or None if not present |
is_complete
is_complete(event: ChatCompletionChunk) -> bool
Check if finish_reason is set.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event
|
ChatCompletionChunk
|
OpenAI ChatCompletionChunk |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if this is the final chunk |
get_metadata
create_openai_processor
Create processor pre-configured for OpenAI streams.
This is a convenience factory that creates a ProtocolStreamProcessor with OpenAIInputAdapter and StreamBlocksOutputAdapter.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
registry
|
Registry
|
Registry with syntax and block definitions |
required |
Returns:
| Type | Description |
|---|---|
ProtocolStreamProcessor[Any, BaseEvent]
|
Pre-configured processor for OpenAI streams |
Example
from hother.streamblocks.extensions.openai import create_openai_processor processor = create_openai_processor(registry) async for event in processor.process_stream(openai_stream): ... if isinstance(event, BlockExtractedEvent): ... print(f"Block: {event.block.metadata.id}")
Anthropic
hother.streamblocks.extensions.anthropic
Anthropic extension for StreamBlocks.
This extension provides input adapters for Anthropic message streams.
Importing this module registers the AnthropicInputAdapter for auto-detection.
Example
Import to enable auto-detection
import hother.streamblocks.extensions.anthropic
Auto-detect from Anthropic stream
processor = ProtocolStreamProcessor(registry) async for event in processor.process_stream(anthropic_stream): ... print(event)
Or use convenience factory
from hother.streamblocks.extensions.anthropic import create_anthropic_processor processor = create_anthropic_processor(registry)
AnthropicEvent
module-attribute
AnthropicInputAdapter
Input adapter for Anthropic message streams.
Handles event-based streaming from anthropic.MessageStream.
Anthropic uses different event types: - content_block_delta: Contains text deltas (TEXT_CONTENT) - message_delta: Contains usage information (PASSTHROUGH) - message_stop: Signals stream completion (PASSTHROUGH) - Other events: PASSTHROUGH
Example
from anthropic import AsyncAnthropic adapter = AnthropicInputAdapter()
client = AsyncAnthropic() async with client.messages.stream(...) as stream: ... async for event in stream: ... text = adapter.extract_text(event) ... if text: ... print(text, end='', flush=True) ... if adapter.is_complete(event): ... print("\nDone!")
categorize
categorize(event: AnthropicEvent) -> EventCategory
Categorize event based on type.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event
|
AnthropicEvent
|
Anthropic event |
required |
Returns:
| Type | Description |
|---|---|
EventCategory
|
TEXT_CONTENT for content_block_delta events, PASSTHROUGH for others |
extract_text
extract_text(event: AnthropicEvent) -> str | None
Extract text from content_block_delta events.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event
|
AnthropicEvent
|
Anthropic event |
required |
Returns:
| Type | Description |
|---|---|
str | None
|
Delta text if present, None otherwise |
is_complete
is_complete(event: AnthropicEvent) -> bool
Check for message_stop event.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event
|
AnthropicEvent
|
Anthropic event |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if this is the message_stop event |
get_metadata
get_metadata(
event: AnthropicEvent,
) -> dict[str, Any] | None
Extract stop reason and usage information.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event
|
AnthropicEvent
|
Anthropic event |
required |
Returns:
| Type | Description |
|---|---|
dict[str, Any] | None
|
Dictionary with stop_reason or usage if present |
create_anthropic_processor
Create processor pre-configured for Anthropic streams.
This is a convenience factory that creates a ProtocolStreamProcessor with AnthropicInputAdapter and StreamBlocksOutputAdapter.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
registry
|
Registry
|
Registry with syntax and block definitions |
required |
Returns:
| Type | Description |
|---|---|
ProtocolStreamProcessor[Any, BaseEvent]
|
Pre-configured processor for Anthropic streams |
Example
from hother.streamblocks.extensions.anthropic import create_anthropic_processor processor = create_anthropic_processor(registry) async for event in processor.process_stream(anthropic_stream): ... if isinstance(event, BlockExtractedEvent): ... print(f"Block: {event.block.metadata.id}")
AG-UI
hother.streamblocks.extensions.agui
AG-UI extension for StreamBlocks.
This extension provides bidirectional adapters for the AG-UI protocol.
AG-UI is an event-based protocol for agent-to-frontend communication with event types for lifecycle, text messages, tool calls, and state management.
Importing this module registers the AGUIInputAdapter for auto-detection.
Example
Import to enable auto-detection
import hother.streamblocks.extensions.agui
Auto-detect from AG-UI stream
processor = ProtocolStreamProcessor(registry) async for event in processor.process_stream(agui_stream): ... print(event)
Bidirectional: AG-UI in, AG-UI out
from hother.streamblocks.extensions.agui import ( ... create_agui_bidirectional_processor, ... AGUIEventFilter, ... ) processor = create_agui_bidirectional_processor( ... registry, ... event_filter=AGUIEventFilter.BLOCKS_WITH_PROGRESS, ... ) async for event in processor.process_stream(agui_stream): ... # event is an AG-UI event (dict format) ... if event["type"] == "CUSTOM": ... handle_streamblocks_event(event) ... else: ... forward_to_frontend(event)
AGUIEventFilter
Bases: Flag
Configurable event filtering for AG-UI output adapter.
Use these flags to control which StreamBlocks events are emitted when using AGUIOutputAdapter.
Example
Only emit block-related events
filter = AGUIEventFilter.BLOCKS_ONLY
Emit blocks with progress updates
filter = AGUIEventFilter.BLOCKS_WITH_PROGRESS
Custom combination
filter = AGUIEventFilter.TEXT_DELTA | AGUIEventFilter.BLOCK_EXTRACTED
RAW_TEXT
class-attribute
instance-attribute
RAW_TEXT = auto()
Emit RawTextEvent as TextMessageContentEvent.
TEXT_DELTA
class-attribute
instance-attribute
TEXT_DELTA = auto()
Emit TextDeltaEvent as TextMessageContentEvent.
BLOCK_OPENED
class-attribute
instance-attribute
BLOCK_OPENED = auto()
Emit BlockOpenedEvent as CustomEvent.
BLOCK_DELTA
class-attribute
instance-attribute
BLOCK_DELTA = auto()
Emit section delta events (BlockHeaderDeltaEvent, BlockMetadataDeltaEvent, BlockContentDeltaEvent) as CustomEvent.
BLOCK_EXTRACTED
class-attribute
instance-attribute
BLOCK_EXTRACTED = auto()
Emit BlockExtractedEvent as CustomEvent.
BLOCK_REJECTED
class-attribute
instance-attribute
BLOCK_REJECTED = auto()
Emit BlockRejectedEvent as CustomEvent.
ALL
class-attribute
instance-attribute
ALL = (
RAW_TEXT
| TEXT_DELTA
| BLOCK_OPENED
| BLOCK_DELTA
| BLOCK_EXTRACTED
| BLOCK_REJECTED
)
Emit all StreamBlocks events.
BLOCKS_ONLY
class-attribute
instance-attribute
BLOCKS_ONLY = (
BLOCK_OPENED | BLOCK_EXTRACTED | BLOCK_REJECTED
)
Emit only block lifecycle events (opened, extracted, rejected).
BLOCKS_WITH_PROGRESS
class-attribute
instance-attribute
BLOCKS_WITH_PROGRESS = (
BLOCK_OPENED
| BLOCK_DELTA
| BLOCK_EXTRACTED
| BLOCK_REJECTED
)
Emit block lifecycle events plus progress updates.
TEXT_AND_FINAL
class-attribute
instance-attribute
TEXT_AND_FINAL = (
TEXT_DELTA | BLOCK_EXTRACTED | BLOCK_REJECTED
)
Emit text streaming plus final block results.
AGUIInputAdapter
Input adapter for AG-UI protocol events.
Handles event-based streaming from AG-UI protocol.
AG-UI Event Categories: - TEXT_MESSAGE_CONTENT, TEXT_MESSAGE_CHUNK: TEXT_CONTENT (has text) - All other events: PASSTHROUGH (lifecycle, tool calls, state)
Example
adapter = AGUIInputAdapter()
async for event in agui_stream: ... category = adapter.categorize(event) ... if category == EventCategory.TEXT_CONTENT: ... text = adapter.extract_text(event) ... print(text, end='', flush=True)
categorize
categorize(event: BaseEvent) -> EventCategory
Categorize event based on type.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event
|
BaseEvent
|
AG-UI BaseEvent |
required |
Returns:
| Type | Description |
|---|---|
EventCategory
|
TEXT_CONTENT for text message events, PASSTHROUGH for others |
extract_text
extract_text(event: BaseEvent) -> str | None
Extract text from TEXT_CONTENT events.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event
|
BaseEvent
|
AG-UI BaseEvent |
required |
Returns:
| Type | Description |
|---|---|
str | None
|
Delta text if TEXT_MESSAGE_CONTENT or TEXT_MESSAGE_CHUNK |
AGUIOutputAdapter
Output adapter for AG-UI protocol events.
Transforms StreamBlocks events into AG-UI CustomEvent format.
StreamBlocks events are mapped to AG-UI as follows: - TextDeltaEvent, TextContentEvent → TextMessageContentEvent - BlockStartEvent → CustomEvent(name="streamblocks.block_start") - BlockHeaderDeltaEvent → CustomEvent(name="streamblocks.block_delta") - BlockMetadataDeltaEvent → CustomEvent(name="streamblocks.block_delta") - BlockContentDeltaEvent → CustomEvent(name="streamblocks.block_delta") - BlockEndEvent → CustomEvent(name="streamblocks.block_end") - BlockErrorEvent → CustomEvent(name="streamblocks.block_error")
Passthrough events (AG-UI events from input) are passed through unchanged.
Example
adapter = AGUIOutputAdapter(event_filter=AGUIEventFilter.BLOCKS_WITH_PROGRESS)
Convert StreamBlocks event to AG-UI event
agui_event = adapter.to_protocol_event(block_extracted_event)
Pass through AG-UI events
original = adapter.passthrough(run_started_event)
to_protocol_event
Convert StreamBlocks event to AG-UI event format.
Returns a dictionary representation of the AG-UI event that can be serialized or converted to the actual AG-UI event type.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event
|
BaseEvent
|
StreamBlocks event |
required |
Returns:
| Type | Description |
|---|---|
dict[str, Any] | None
|
Dictionary representing AG-UI event, or None if filtered out |
Note
Returns dict rather than actual AG-UI types to avoid requiring ag-ui-protocol as a runtime dependency.
passthrough
create_agui_processor
Create processor for AG-UI → StreamBlocks (unidirectional).
This processor takes AG-UI events as input and emits native StreamBlocks events.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
registry
|
Registry
|
Registry with syntax and block definitions |
required |
Returns:
| Type | Description |
|---|---|
ProtocolStreamProcessor[Any, BaseEvent]
|
Pre-configured processor for AG-UI input, StreamBlocks output |
Example
from hother.streamblocks.extensions.agui import create_agui_processor processor = create_agui_processor(registry) async for event in processor.process_stream(agui_stream): ... if isinstance(event, BlockExtractedEvent): ... print(f"Block: {event.block.metadata.id}")
create_agui_bidirectional_processor
create_agui_bidirectional_processor(
registry: Registry, event_filter: AGUIEventFilter = ALL
) -> ProtocolStreamProcessor[Any, dict[str, Any]]
Create processor for AG-UI → AG-UI (bidirectional).
This processor takes AG-UI events as input and emits AG-UI events as output. StreamBlocks events are converted to AG-UI CustomEvent format.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
registry
|
Registry
|
Registry with syntax and block definitions |
required |
event_filter
|
AGUIEventFilter
|
Filter to control which StreamBlocks events are emitted |
ALL
|
Returns:
| Type | Description |
|---|---|
ProtocolStreamProcessor[Any, dict[str, Any]]
|
Pre-configured processor for AG-UI input and output |
Example
from hother.streamblocks.extensions.agui import ( ... create_agui_bidirectional_processor, ... AGUIEventFilter, ... ) processor = create_agui_bidirectional_processor( ... registry, ... event_filter=AGUIEventFilter.BLOCKS_WITH_PROGRESS, ... ) async for event in processor.process_stream(agui_stream): ... if event["type"] == "CUSTOM": ... if event["name"] == "streamblocks.block_extracted": ... handle_block(event["value"]) ... else: ... # Passthrough: lifecycle, tool calls, state ... forward_to_frontend(event)
hother.streamblocks.extensions.agui.input_adapter
AG-UI input adapter for StreamBlocks.
AGUIInputAdapter
Input adapter for AG-UI protocol events.
Handles event-based streaming from AG-UI protocol.
AG-UI Event Categories: - TEXT_MESSAGE_CONTENT, TEXT_MESSAGE_CHUNK: TEXT_CONTENT (has text) - All other events: PASSTHROUGH (lifecycle, tool calls, state)
Example
adapter = AGUIInputAdapter()
async for event in agui_stream: ... category = adapter.categorize(event) ... if category == EventCategory.TEXT_CONTENT: ... text = adapter.extract_text(event) ... print(text, end='', flush=True)
categorize
categorize(event: BaseEvent) -> EventCategory
Categorize event based on type.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event
|
BaseEvent
|
AG-UI BaseEvent |
required |
Returns:
| Type | Description |
|---|---|
EventCategory
|
TEXT_CONTENT for text message events, PASSTHROUGH for others |
extract_text
extract_text(event: BaseEvent) -> str | None
Extract text from TEXT_CONTENT events.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event
|
BaseEvent
|
AG-UI BaseEvent |
required |
Returns:
| Type | Description |
|---|---|
str | None
|
Delta text if TEXT_MESSAGE_CONTENT or TEXT_MESSAGE_CHUNK |
get_metadata
hother.streamblocks.extensions.agui.output_adapter
AG-UI output adapter for StreamBlocks.
AGUIOutputAdapter
Output adapter for AG-UI protocol events.
Transforms StreamBlocks events into AG-UI CustomEvent format.
StreamBlocks events are mapped to AG-UI as follows: - TextDeltaEvent, TextContentEvent → TextMessageContentEvent - BlockStartEvent → CustomEvent(name="streamblocks.block_start") - BlockHeaderDeltaEvent → CustomEvent(name="streamblocks.block_delta") - BlockMetadataDeltaEvent → CustomEvent(name="streamblocks.block_delta") - BlockContentDeltaEvent → CustomEvent(name="streamblocks.block_delta") - BlockEndEvent → CustomEvent(name="streamblocks.block_end") - BlockErrorEvent → CustomEvent(name="streamblocks.block_error")
Passthrough events (AG-UI events from input) are passed through unchanged.
Example
adapter = AGUIOutputAdapter(event_filter=AGUIEventFilter.BLOCKS_WITH_PROGRESS)
Convert StreamBlocks event to AG-UI event
agui_event = adapter.to_protocol_event(block_extracted_event)
Pass through AG-UI events
original = adapter.passthrough(run_started_event)
passthrough
to_protocol_event
Convert StreamBlocks event to AG-UI event format.
Returns a dictionary representation of the AG-UI event that can be serialized or converted to the actual AG-UI event type.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event
|
BaseEvent
|
StreamBlocks event |
required |
Returns:
| Type | Description |
|---|---|
dict[str, Any] | None
|
Dictionary representing AG-UI event, or None if filtered out |
Note
Returns dict rather than actual AG-UI types to avoid requiring ag-ui-protocol as a runtime dependency.
hother.streamblocks.extensions.agui.filters
Event filter for AG-UI output adapter.
AGUIEventFilter
Bases: Flag
Configurable event filtering for AG-UI output adapter.
Use these flags to control which StreamBlocks events are emitted when using AGUIOutputAdapter.
Example
Only emit block-related events
filter = AGUIEventFilter.BLOCKS_ONLY
Emit blocks with progress updates
filter = AGUIEventFilter.BLOCKS_WITH_PROGRESS
Custom combination
filter = AGUIEventFilter.TEXT_DELTA | AGUIEventFilter.BLOCK_EXTRACTED
ALL
class-attribute
instance-attribute
ALL = (
RAW_TEXT
| TEXT_DELTA
| BLOCK_OPENED
| BLOCK_DELTA
| BLOCK_EXTRACTED
| BLOCK_REJECTED
)
Emit all StreamBlocks events.
BLOCKS_ONLY
class-attribute
instance-attribute
BLOCKS_ONLY = (
BLOCK_OPENED | BLOCK_EXTRACTED | BLOCK_REJECTED
)
Emit only block lifecycle events (opened, extracted, rejected).
BLOCKS_WITH_PROGRESS
class-attribute
instance-attribute
BLOCKS_WITH_PROGRESS = (
BLOCK_OPENED
| BLOCK_DELTA
| BLOCK_EXTRACTED
| BLOCK_REJECTED
)
Emit block lifecycle events plus progress updates.
BLOCK_DELTA
class-attribute
instance-attribute
BLOCK_DELTA = auto()
Emit section delta events (BlockHeaderDeltaEvent, BlockMetadataDeltaEvent, BlockContentDeltaEvent) as CustomEvent.
BLOCK_EXTRACTED
class-attribute
instance-attribute
BLOCK_EXTRACTED = auto()
Emit BlockExtractedEvent as CustomEvent.
BLOCK_OPENED
class-attribute
instance-attribute
BLOCK_OPENED = auto()
Emit BlockOpenedEvent as CustomEvent.
BLOCK_REJECTED
class-attribute
instance-attribute
BLOCK_REJECTED = auto()
Emit BlockRejectedEvent as CustomEvent.
RAW_TEXT
class-attribute
instance-attribute
RAW_TEXT = auto()
Emit RawTextEvent as TextMessageContentEvent.
TEXT_AND_FINAL
class-attribute
instance-attribute
TEXT_AND_FINAL = (
TEXT_DELTA | BLOCK_EXTRACTED | BLOCK_REJECTED
)
Emit text streaming plus final block results.
TEXT_DELTA
class-attribute
instance-attribute
TEXT_DELTA = auto()
Emit TextDeltaEvent as TextMessageContentEvent.