Skip to content

Extensions Reference

Provider-specific extensions for Streamblocks.

Overview

Extensions provide adapters for different LLM providers. Each extension implements the adapter protocol to convert provider-specific stream formats.

flowchart TB
    subgraph Extensions["Provider Extensions"]
        Gemini[gemini]
        OpenAI[openai]
        Anthropic[anthropic]
        AGUI[agui]
    end

    subgraph Protocol["Adapter Protocol"]
        Input[InputProtocolAdapter]
        Output[OutputProtocolAdapter]
    end

    Gemini --> Input
    OpenAI --> Input
    Anthropic --> Input
    AGUI --> Input
    AGUI --> Output

Installation

Extensions are installed as optional dependencies:

# Individual providers
uv add "streamblocks[gemini]"
uv add "streamblocks[openai]"
uv add "streamblocks[anthropic]"

# All providers
uv add "streamblocks[all-providers]"
# Individual providers
pip install "streamblocks[gemini]"
pip install "streamblocks[openai]"
pip install "streamblocks[anthropic]"

# All providers
pip install "streamblocks[all-providers]"

Gemini Extension

Google Gemini adapter for processing Gemini API responses.

Installation

uv add "streamblocks[gemini]"
pip install "streamblocks[gemini]"

Usage

from hother.streamblocks.extensions.gemini import GeminiInputAdapter

adapter = GeminiInputAdapter()

# Use with processor
processor = StreamBlockProcessor(
    registry=Registry(),
    syntaxes=[DelimiterPreambleSyntax()],
    input_adapter=adapter,
)

Auto-Detection

The Gemini adapter is automatically detected when processing Gemini responses:

import google.generativeai as genai
from hother.streamblocks import StreamBlockProcessor, detect_input_adapter

genai.configure(api_key="...")  # pragma: allowlist secret
model = genai.GenerativeModel("gemini-pro")
response = model.generate_content("...", stream=True)

# Auto-detect adapter
adapter = detect_input_adapter(response)

OpenAI Extension

OpenAI adapter for processing OpenAI API responses.

Installation

uv add "streamblocks[openai]"
pip install "streamblocks[openai]"

Usage

from hother.streamblocks.extensions.openai import OpenAIInputAdapter

adapter = OpenAIInputAdapter()

# Use with processor
processor = StreamBlockProcessor(
    registry=Registry(),
    syntaxes=[DelimiterPreambleSyntax()],
    input_adapter=adapter,
)

With OpenAI Client

from openai import OpenAI
from hother.streamblocks import StreamBlockProcessor, Registry, DelimiterPreambleSyntax
from hother.streamblocks.extensions.openai import OpenAIInputAdapter

client = OpenAI()
stream = client.chat.completions.create(
    model="gpt-4",
    messages=[{"role": "user", "content": "..."}],
    stream=True,
)

processor = StreamBlockProcessor(
    registry=Registry(),
    syntaxes=[DelimiterPreambleSyntax()],
    input_adapter=OpenAIInputAdapter(),
)

async for event in processor.process_stream(stream):
    handle_event(event)

Anthropic Extension

Anthropic adapter for processing Claude API responses.

Installation

uv add "streamblocks[anthropic]"
pip install "streamblocks[anthropic]"

Usage

from hother.streamblocks.extensions.anthropic import AnthropicInputAdapter

adapter = AnthropicInputAdapter()

# Use with processor
processor = StreamBlockProcessor(
    registry=Registry(),
    syntaxes=[DelimiterPreambleSyntax()],
    input_adapter=adapter,
)

With Anthropic Client

import anthropic
from hother.streamblocks import StreamBlockProcessor, Registry, DelimiterPreambleSyntax
from hother.streamblocks.extensions.anthropic import AnthropicInputAdapter

client = anthropic.Anthropic()

processor = StreamBlockProcessor(
    registry=Registry(),
    syntaxes=[DelimiterPreambleSyntax()],
    input_adapter=AnthropicInputAdapter(),
)

with client.messages.stream(
    model="claude-3-opus",
    max_tokens=1024,
    messages=[{"role": "user", "content": "..."}],
) as stream:
    async for event in processor.process_stream(stream):
        handle_event(event)

AG-UI Extension

AG-UI protocol adapters for bidirectional streaming.

Installation

The AG-UI extension is included in the base installation.

Input Adapter

Process incoming AG-UI events:

from hother.streamblocks.extensions.agui import AGUIInputAdapter

adapter = AGUIInputAdapter()

processor = StreamBlockProcessor(
    registry=Registry(),
    syntaxes=[DelimiterPreambleSyntax()],
    input_adapter=adapter,
)

async for event in processor.process_stream(agui_stream):
    handle_event(event)

Output Adapter

Convert Streamblocks events to AG-UI protocol:

from hother.streamblocks.extensions.agui import AGUIOutputAdapter

adapter = AGUIOutputAdapter()

processor = StreamBlockProcessor(
    registry=Registry(),
    syntaxes=[DelimiterPreambleSyntax()],
    output_adapter=adapter,
)

# Events are now AG-UI formatted
async for event in processor.process_stream(llm_stream):
    yield event  # AG-UI protocol event

Bidirectional Usage

from hother.streamblocks.extensions.agui import AGUIInputAdapter, AGUIOutputAdapter

processor = StreamBlockProcessor(
    registry=Registry(),
    syntaxes=[DelimiterPreambleSyntax()],
    input_adapter=AGUIInputAdapter(),
    output_adapter=AGUIOutputAdapter(),
)

Event Filtering

Configure which events are converted:

from hother.streamblocks.extensions.agui import AGUIOutputAdapter

adapter = AGUIOutputAdapter(
    include_text_deltas=True,
    include_block_content=False,
    include_metadata=True,
)

Custom Event Mapping

Map block types to AG-UI events:

adapter = AGUIOutputAdapter(
    block_type_mapping={
        "task": "CustomTaskEvent",
        "code": "CodeBlockEvent",
        "message": "TextMessageContent",
    }
)

Creating Custom Extensions

Implement the adapter protocol for custom providers:

from hother.streamblocks import InputProtocolAdapter, EventCategory

class MyProviderAdapter(InputProtocolAdapter):
    """Adapter for MyProvider streams."""

    def categorize(self, event) -> EventCategory:
        """Categorize the event type."""
        if hasattr(event, "text"):
            return EventCategory.TEXT_CONTENT
        if hasattr(event, "tool_call"):
            return EventCategory.PASSTHROUGH
        return EventCategory.SKIP

    def extract_text(self, event) -> str:
        """Extract text content from event."""
        return event.text

    def get_original_event(self, event):
        """Get the original event for passthrough."""
        return event

Extension API Reference

Gemini

hother.streamblocks.extensions.gemini

Gemini extension for StreamBlocks.

This extension provides input adapters for Google GenAI streams.

Importing this module registers the GeminiInputAdapter for auto-detection.

Example

Import to enable auto-detection

import hother.streamblocks.extensions.gemini

Auto-detect from Gemini stream

processor = ProtocolStreamProcessor(registry) async for event in processor.process_stream(gemini_stream): ... print(event)

Or use convenience factory

from hother.streamblocks.extensions.gemini import create_gemini_processor processor = create_gemini_processor(registry)

GeminiInputAdapter

Input adapter for Google GenAI streams.

Handles chunks from google.genai.models.generate_content_stream() and google.ai.generativelanguage clients.

Extracts: - Text from chunk.text attribute - Usage metadata (token counts) - Model version information

Example

from google import genai adapter = GeminiInputAdapter()

async for chunk in client.aio.models.generate_content_stream(...): ... text = adapter.extract_text(chunk) ... metadata = adapter.get_metadata(chunk) ... if metadata: ... print(f"Tokens: {metadata['usage']}")

native_module_prefix class-attribute

native_module_prefix: str = 'google.genai'

categorize

categorize(event: GenerateContentResponse) -> EventCategory

Categorize event - all Gemini chunks are text content.

Parameters:

Name Type Description Default
event GenerateContentResponse

Gemini GenerateContentResponse chunk

required

Returns:

Type Description
EventCategory

TEXT_CONTENT for all chunks

extract_text

extract_text(event: GenerateContentResponse) -> str | None

Extract text from chunk.text attribute.

Parameters:

Name Type Description Default
event GenerateContentResponse

Gemini GenerateContentResponse chunk

required

Returns:

Type Description
str | None

Text content, or None if not present

is_complete

is_complete(event: GenerateContentResponse) -> bool

Gemini doesn't have explicit finish markers in each chunk.

Completion is typically detected by the stream ending.

Parameters:

Name Type Description Default
event GenerateContentResponse

Gemini GenerateContentResponse chunk

required

Returns:

Type Description
bool

Always False - Gemini streams end naturally

get_metadata

get_metadata(
    event: GenerateContentResponse,
) -> dict[str, Any] | None

Extract usage metadata and model information.

Parameters:

Name Type Description Default
event GenerateContentResponse

Gemini GenerateContentResponse chunk

required

Returns:

Type Description
dict[str, Any] | None

Dictionary with usage and/or model if present

create_gemini_processor

create_gemini_processor(
    registry: Registry,
) -> ProtocolStreamProcessor[Any, BaseEvent]

Create processor pre-configured for Gemini streams.

This is a convenience factory that creates a ProtocolStreamProcessor with GeminiInputAdapter and StreamBlocksOutputAdapter.

Parameters:

Name Type Description Default
registry Registry

Registry with syntax and block definitions

required

Returns:

Type Description
ProtocolStreamProcessor[Any, BaseEvent]

Pre-configured processor for Gemini streams

Example

from hother.streamblocks.extensions.gemini import create_gemini_processor processor = create_gemini_processor(registry) async for event in processor.process_stream(gemini_stream): ... if isinstance(event, BlockExtractedEvent): ... print(f"Block: {event.block.metadata.id}")

OpenAI

hother.streamblocks.extensions.openai

OpenAI extension for StreamBlocks.

This extension provides input adapters for OpenAI ChatCompletionChunk streams.

Importing this module registers the OpenAIInputAdapter for auto-detection.

Example

Import to enable auto-detection

import hother.streamblocks.extensions.openai

Auto-detect from OpenAI stream

processor = ProtocolStreamProcessor(registry) async for event in processor.process_stream(openai_stream): ... print(event)

Or use convenience factory

from hother.streamblocks.extensions.openai import create_openai_processor processor = create_openai_processor(registry)

OpenAIInputAdapter

Input adapter for OpenAI ChatCompletionChunk streams.

Handles streams from openai.AsyncStream[ChatCompletionChunk].

Extracts: - Delta content from choices[0].delta.content - Finish reasons - Model information

Example

from openai import AsyncOpenAI adapter = OpenAIInputAdapter()

client = AsyncOpenAI() stream = await client.chat.completions.create( ... model="gpt-4", ... messages=[...], ... stream=True ... )

async for chunk in stream: ... text = adapter.extract_text(chunk) ... if adapter.is_complete(chunk): ... print("Stream complete!")

native_module_prefix class-attribute

native_module_prefix: str = 'openai.types'

categorize

categorize(event: ChatCompletionChunk) -> EventCategory

Categorize event - all OpenAI chunks are text content.

Parameters:

Name Type Description Default
event ChatCompletionChunk

OpenAI ChatCompletionChunk

required

Returns:

Type Description
EventCategory

TEXT_CONTENT for all chunks

extract_text

extract_text(event: ChatCompletionChunk) -> str | None

Extract text from choices[0].delta.content.

Parameters:

Name Type Description Default
event ChatCompletionChunk

OpenAI ChatCompletionChunk

required

Returns:

Type Description
str | None

Delta content text, or None if not present

is_complete

is_complete(event: ChatCompletionChunk) -> bool

Check if finish_reason is set.

Parameters:

Name Type Description Default
event ChatCompletionChunk

OpenAI ChatCompletionChunk

required

Returns:

Type Description
bool

True if this is the final chunk

get_metadata

get_metadata(
    event: ChatCompletionChunk,
) -> dict[str, Any] | None

Extract model and finish reason.

Parameters:

Name Type Description Default
event ChatCompletionChunk

OpenAI ChatCompletionChunk

required

Returns:

Type Description
dict[str, Any] | None

Dictionary with model and/or finish_reason if present

create_openai_processor

create_openai_processor(
    registry: Registry,
) -> ProtocolStreamProcessor[Any, BaseEvent]

Create processor pre-configured for OpenAI streams.

This is a convenience factory that creates a ProtocolStreamProcessor with OpenAIInputAdapter and StreamBlocksOutputAdapter.

Parameters:

Name Type Description Default
registry Registry

Registry with syntax and block definitions

required

Returns:

Type Description
ProtocolStreamProcessor[Any, BaseEvent]

Pre-configured processor for OpenAI streams

Example

from hother.streamblocks.extensions.openai import create_openai_processor processor = create_openai_processor(registry) async for event in processor.process_stream(openai_stream): ... if isinstance(event, BlockExtractedEvent): ... print(f"Block: {event.block.metadata.id}")

Anthropic

hother.streamblocks.extensions.anthropic

Anthropic extension for StreamBlocks.

This extension provides input adapters for Anthropic message streams.

Importing this module registers the AnthropicInputAdapter for auto-detection.

Example

Import to enable auto-detection

import hother.streamblocks.extensions.anthropic

Auto-detect from Anthropic stream

processor = ProtocolStreamProcessor(registry) async for event in processor.process_stream(anthropic_stream): ... print(event)

Or use convenience factory

from hother.streamblocks.extensions.anthropic import create_anthropic_processor processor = create_anthropic_processor(registry)

AnthropicEvent module-attribute

AnthropicEvent = (
    ContentBlockDeltaEvent
    | MessageDeltaEvent
    | MessageStopEvent
)

AnthropicInputAdapter

Input adapter for Anthropic message streams.

Handles event-based streaming from anthropic.MessageStream.

Anthropic uses different event types: - content_block_delta: Contains text deltas (TEXT_CONTENT) - message_delta: Contains usage information (PASSTHROUGH) - message_stop: Signals stream completion (PASSTHROUGH) - Other events: PASSTHROUGH

Example

from anthropic import AsyncAnthropic adapter = AnthropicInputAdapter()

client = AsyncAnthropic() async with client.messages.stream(...) as stream: ... async for event in stream: ... text = adapter.extract_text(event) ... if text: ... print(text, end='', flush=True) ... if adapter.is_complete(event): ... print("\nDone!")

native_module_prefix class-attribute

native_module_prefix: str = 'anthropic.'

categorize

categorize(event: AnthropicEvent) -> EventCategory

Categorize event based on type.

Parameters:

Name Type Description Default
event AnthropicEvent

Anthropic event

required

Returns:

Type Description
EventCategory

TEXT_CONTENT for content_block_delta events, PASSTHROUGH for others

extract_text

extract_text(event: AnthropicEvent) -> str | None

Extract text from content_block_delta events.

Parameters:

Name Type Description Default
event AnthropicEvent

Anthropic event

required

Returns:

Type Description
str | None

Delta text if present, None otherwise

is_complete

is_complete(event: AnthropicEvent) -> bool

Check for message_stop event.

Parameters:

Name Type Description Default
event AnthropicEvent

Anthropic event

required

Returns:

Type Description
bool

True if this is the message_stop event

get_metadata

get_metadata(
    event: AnthropicEvent,
) -> dict[str, Any] | None

Extract stop reason and usage information.

Parameters:

Name Type Description Default
event AnthropicEvent

Anthropic event

required

Returns:

Type Description
dict[str, Any] | None

Dictionary with stop_reason or usage if present

create_anthropic_processor

create_anthropic_processor(
    registry: Registry,
) -> ProtocolStreamProcessor[Any, BaseEvent]

Create processor pre-configured for Anthropic streams.

This is a convenience factory that creates a ProtocolStreamProcessor with AnthropicInputAdapter and StreamBlocksOutputAdapter.

Parameters:

Name Type Description Default
registry Registry

Registry with syntax and block definitions

required

Returns:

Type Description
ProtocolStreamProcessor[Any, BaseEvent]

Pre-configured processor for Anthropic streams

Example

from hother.streamblocks.extensions.anthropic import create_anthropic_processor processor = create_anthropic_processor(registry) async for event in processor.process_stream(anthropic_stream): ... if isinstance(event, BlockExtractedEvent): ... print(f"Block: {event.block.metadata.id}")

AG-UI

hother.streamblocks.extensions.agui

AG-UI extension for StreamBlocks.

This extension provides bidirectional adapters for the AG-UI protocol.

AG-UI is an event-based protocol for agent-to-frontend communication with event types for lifecycle, text messages, tool calls, and state management.

Importing this module registers the AGUIInputAdapter for auto-detection.

Example

Import to enable auto-detection

import hother.streamblocks.extensions.agui

Auto-detect from AG-UI stream

processor = ProtocolStreamProcessor(registry) async for event in processor.process_stream(agui_stream): ... print(event)

Bidirectional: AG-UI in, AG-UI out

from hother.streamblocks.extensions.agui import ( ... create_agui_bidirectional_processor, ... AGUIEventFilter, ... ) processor = create_agui_bidirectional_processor( ... registry, ... event_filter=AGUIEventFilter.BLOCKS_WITH_PROGRESS, ... ) async for event in processor.process_stream(agui_stream): ... # event is an AG-UI event (dict format) ... if event["type"] == "CUSTOM": ... handle_streamblocks_event(event) ... else: ... forward_to_frontend(event)

AGUIEventFilter

Bases: Flag

Configurable event filtering for AG-UI output adapter.

Use these flags to control which StreamBlocks events are emitted when using AGUIOutputAdapter.

Example

filter = AGUIEventFilter.BLOCKS_ONLY

Emit blocks with progress updates

filter = AGUIEventFilter.BLOCKS_WITH_PROGRESS

Custom combination

filter = AGUIEventFilter.TEXT_DELTA | AGUIEventFilter.BLOCK_EXTRACTED

NONE class-attribute instance-attribute

NONE = 0

Emit no StreamBlocks events.

RAW_TEXT class-attribute instance-attribute

RAW_TEXT = auto()

Emit RawTextEvent as TextMessageContentEvent.

TEXT_DELTA class-attribute instance-attribute

TEXT_DELTA = auto()

Emit TextDeltaEvent as TextMessageContentEvent.

BLOCK_OPENED class-attribute instance-attribute

BLOCK_OPENED = auto()

Emit BlockOpenedEvent as CustomEvent.

BLOCK_DELTA class-attribute instance-attribute

BLOCK_DELTA = auto()

Emit section delta events (BlockHeaderDeltaEvent, BlockMetadataDeltaEvent, BlockContentDeltaEvent) as CustomEvent.

BLOCK_EXTRACTED class-attribute instance-attribute

BLOCK_EXTRACTED = auto()

Emit BlockExtractedEvent as CustomEvent.

BLOCK_REJECTED class-attribute instance-attribute

BLOCK_REJECTED = auto()

Emit BlockRejectedEvent as CustomEvent.

ALL class-attribute instance-attribute

Emit all StreamBlocks events.

BLOCKS_ONLY class-attribute instance-attribute

Emit only block lifecycle events (opened, extracted, rejected).

BLOCKS_WITH_PROGRESS class-attribute instance-attribute

BLOCKS_WITH_PROGRESS = (
    BLOCK_OPENED
    | BLOCK_DELTA
    | BLOCK_EXTRACTED
    | BLOCK_REJECTED
)

Emit block lifecycle events plus progress updates.

TEXT_AND_FINAL class-attribute instance-attribute

TEXT_AND_FINAL = (
    TEXT_DELTA | BLOCK_EXTRACTED | BLOCK_REJECTED
)

Emit text streaming plus final block results.

AGUIInputAdapter

Input adapter for AG-UI protocol events.

Handles event-based streaming from AG-UI protocol.

AG-UI Event Categories: - TEXT_MESSAGE_CONTENT, TEXT_MESSAGE_CHUNK: TEXT_CONTENT (has text) - All other events: PASSTHROUGH (lifecycle, tool calls, state)

Example

adapter = AGUIInputAdapter()

async for event in agui_stream: ... category = adapter.categorize(event) ... if category == EventCategory.TEXT_CONTENT: ... text = adapter.extract_text(event) ... print(text, end='', flush=True)

native_module_prefix class-attribute

native_module_prefix: str = 'ag_ui.'

categorize

categorize(event: BaseEvent) -> EventCategory

Categorize event based on type.

Parameters:

Name Type Description Default
event BaseEvent

AG-UI BaseEvent

required

Returns:

Type Description
EventCategory

TEXT_CONTENT for text message events, PASSTHROUGH for others

extract_text

extract_text(event: BaseEvent) -> str | None

Extract text from TEXT_CONTENT events.

Parameters:

Name Type Description Default
event BaseEvent

AG-UI BaseEvent

required

Returns:

Type Description
str | None

Delta text if TEXT_MESSAGE_CONTENT or TEXT_MESSAGE_CHUNK

is_complete

is_complete(event: BaseEvent) -> bool

Check for RUN_FINISHED event.

Parameters:

Name Type Description Default
event BaseEvent

AG-UI BaseEvent

required

Returns:

Type Description
bool

True if this is the RUN_FINISHED event

get_metadata

get_metadata(event: BaseEvent) -> dict[str, Any] | None

Extract protocol metadata.

Parameters:

Name Type Description Default
event BaseEvent

AG-UI BaseEvent

required

Returns:

Type Description
dict[str, Any] | None

Dictionary with event_type and timestamp if available

AGUIOutputAdapter

Output adapter for AG-UI protocol events.

Transforms StreamBlocks events into AG-UI CustomEvent format.

StreamBlocks events are mapped to AG-UI as follows: - TextDeltaEvent, TextContentEvent → TextMessageContentEvent - BlockStartEvent → CustomEvent(name="streamblocks.block_start") - BlockHeaderDeltaEvent → CustomEvent(name="streamblocks.block_delta") - BlockMetadataDeltaEvent → CustomEvent(name="streamblocks.block_delta") - BlockContentDeltaEvent → CustomEvent(name="streamblocks.block_delta") - BlockEndEvent → CustomEvent(name="streamblocks.block_end") - BlockErrorEvent → CustomEvent(name="streamblocks.block_error")

Passthrough events (AG-UI events from input) are passed through unchanged.

Example

adapter = AGUIOutputAdapter(event_filter=AGUIEventFilter.BLOCKS_WITH_PROGRESS)

Convert StreamBlocks event to AG-UI event

agui_event = adapter.to_protocol_event(block_extracted_event)

Pass through AG-UI events

original = adapter.passthrough(run_started_event)

event_filter instance-attribute

event_filter = event_filter

to_protocol_event

to_protocol_event(
    event: BaseEvent,
) -> dict[str, Any] | None

Convert StreamBlocks event to AG-UI event format.

Returns a dictionary representation of the AG-UI event that can be serialized or converted to the actual AG-UI event type.

Parameters:

Name Type Description Default
event BaseEvent

StreamBlocks event

required

Returns:

Type Description
dict[str, Any] | None

Dictionary representing AG-UI event, or None if filtered out

Note

Returns dict rather than actual AG-UI types to avoid requiring ag-ui-protocol as a runtime dependency.

passthrough

passthrough(original_event: Any) -> Any

Handle passthrough events.

For AG-UI events, passes them through unchanged.

Parameters:

Name Type Description Default
original_event Any

Original input event

required

Returns:

Type Description
Any

The original event unchanged

reset_message_id

reset_message_id() -> None

Reset message ID for new conversation turn.

create_agui_processor

create_agui_processor(
    registry: Registry,
) -> ProtocolStreamProcessor[Any, BaseEvent]

Create processor for AG-UI → StreamBlocks (unidirectional).

This processor takes AG-UI events as input and emits native StreamBlocks events.

Parameters:

Name Type Description Default
registry Registry

Registry with syntax and block definitions

required

Returns:

Type Description
ProtocolStreamProcessor[Any, BaseEvent]

Pre-configured processor for AG-UI input, StreamBlocks output

Example

from hother.streamblocks.extensions.agui import create_agui_processor processor = create_agui_processor(registry) async for event in processor.process_stream(agui_stream): ... if isinstance(event, BlockExtractedEvent): ... print(f"Block: {event.block.metadata.id}")

create_agui_bidirectional_processor

create_agui_bidirectional_processor(
    registry: Registry, event_filter: AGUIEventFilter = ALL
) -> ProtocolStreamProcessor[Any, dict[str, Any]]

Create processor for AG-UI → AG-UI (bidirectional).

This processor takes AG-UI events as input and emits AG-UI events as output. StreamBlocks events are converted to AG-UI CustomEvent format.

Parameters:

Name Type Description Default
registry Registry

Registry with syntax and block definitions

required
event_filter AGUIEventFilter

Filter to control which StreamBlocks events are emitted

ALL

Returns:

Type Description
ProtocolStreamProcessor[Any, dict[str, Any]]

Pre-configured processor for AG-UI input and output

Example

from hother.streamblocks.extensions.agui import ( ... create_agui_bidirectional_processor, ... AGUIEventFilter, ... ) processor = create_agui_bidirectional_processor( ... registry, ... event_filter=AGUIEventFilter.BLOCKS_WITH_PROGRESS, ... ) async for event in processor.process_stream(agui_stream): ... if event["type"] == "CUSTOM": ... if event["name"] == "streamblocks.block_extracted": ... handle_block(event["value"]) ... else: ... # Passthrough: lifecycle, tool calls, state ... forward_to_frontend(event)

hother.streamblocks.extensions.agui.input_adapter

AG-UI input adapter for StreamBlocks.

AGUITextEvent module-attribute

AGUITextEvent = (
    TextMessageContentEvent | TextMessageChunkEvent
)

AGUIInputAdapter

Input adapter for AG-UI protocol events.

Handles event-based streaming from AG-UI protocol.

AG-UI Event Categories: - TEXT_MESSAGE_CONTENT, TEXT_MESSAGE_CHUNK: TEXT_CONTENT (has text) - All other events: PASSTHROUGH (lifecycle, tool calls, state)

Example

adapter = AGUIInputAdapter()

async for event in agui_stream: ... category = adapter.categorize(event) ... if category == EventCategory.TEXT_CONTENT: ... text = adapter.extract_text(event) ... print(text, end='', flush=True)

native_module_prefix class-attribute

native_module_prefix: str = 'ag_ui.'

categorize

categorize(event: BaseEvent) -> EventCategory

Categorize event based on type.

Parameters:

Name Type Description Default
event BaseEvent

AG-UI BaseEvent

required

Returns:

Type Description
EventCategory

TEXT_CONTENT for text message events, PASSTHROUGH for others

extract_text

extract_text(event: BaseEvent) -> str | None

Extract text from TEXT_CONTENT events.

Parameters:

Name Type Description Default
event BaseEvent

AG-UI BaseEvent

required

Returns:

Type Description
str | None

Delta text if TEXT_MESSAGE_CONTENT or TEXT_MESSAGE_CHUNK

get_metadata

get_metadata(event: BaseEvent) -> dict[str, Any] | None

Extract protocol metadata.

Parameters:

Name Type Description Default
event BaseEvent

AG-UI BaseEvent

required

Returns:

Type Description
dict[str, Any] | None

Dictionary with event_type and timestamp if available

is_complete

is_complete(event: BaseEvent) -> bool

Check for RUN_FINISHED event.

Parameters:

Name Type Description Default
event BaseEvent

AG-UI BaseEvent

required

Returns:

Type Description
bool

True if this is the RUN_FINISHED event

hother.streamblocks.extensions.agui.output_adapter

AG-UI output adapter for StreamBlocks.

AGUIOutputAdapter

Output adapter for AG-UI protocol events.

Transforms StreamBlocks events into AG-UI CustomEvent format.

StreamBlocks events are mapped to AG-UI as follows: - TextDeltaEvent, TextContentEvent → TextMessageContentEvent - BlockStartEvent → CustomEvent(name="streamblocks.block_start") - BlockHeaderDeltaEvent → CustomEvent(name="streamblocks.block_delta") - BlockMetadataDeltaEvent → CustomEvent(name="streamblocks.block_delta") - BlockContentDeltaEvent → CustomEvent(name="streamblocks.block_delta") - BlockEndEvent → CustomEvent(name="streamblocks.block_end") - BlockErrorEvent → CustomEvent(name="streamblocks.block_error")

Passthrough events (AG-UI events from input) are passed through unchanged.

Example

adapter = AGUIOutputAdapter(event_filter=AGUIEventFilter.BLOCKS_WITH_PROGRESS)

Convert StreamBlocks event to AG-UI event

agui_event = adapter.to_protocol_event(block_extracted_event)

Pass through AG-UI events

original = adapter.passthrough(run_started_event)

event_filter instance-attribute

event_filter = event_filter

passthrough

passthrough(original_event: Any) -> Any

Handle passthrough events.

For AG-UI events, passes them through unchanged.

Parameters:

Name Type Description Default
original_event Any

Original input event

required

Returns:

Type Description
Any

The original event unchanged

reset_message_id

reset_message_id() -> None

Reset message ID for new conversation turn.

to_protocol_event

to_protocol_event(
    event: BaseEvent,
) -> dict[str, Any] | None

Convert StreamBlocks event to AG-UI event format.

Returns a dictionary representation of the AG-UI event that can be serialized or converted to the actual AG-UI event type.

Parameters:

Name Type Description Default
event BaseEvent

StreamBlocks event

required

Returns:

Type Description
dict[str, Any] | None

Dictionary representing AG-UI event, or None if filtered out

Note

Returns dict rather than actual AG-UI types to avoid requiring ag-ui-protocol as a runtime dependency.

HasEventType

Bases: Protocol

Protocol for events with a type attribute.

type instance-attribute

type: Any

hother.streamblocks.extensions.agui.filters

Event filter for AG-UI output adapter.

AGUIEventFilter

Bases: Flag

Configurable event filtering for AG-UI output adapter.

Use these flags to control which StreamBlocks events are emitted when using AGUIOutputAdapter.

Example

filter = AGUIEventFilter.BLOCKS_ONLY

Emit blocks with progress updates

filter = AGUIEventFilter.BLOCKS_WITH_PROGRESS

Custom combination

filter = AGUIEventFilter.TEXT_DELTA | AGUIEventFilter.BLOCK_EXTRACTED

ALL class-attribute instance-attribute

Emit all StreamBlocks events.

BLOCKS_ONLY class-attribute instance-attribute

Emit only block lifecycle events (opened, extracted, rejected).

BLOCKS_WITH_PROGRESS class-attribute instance-attribute

BLOCKS_WITH_PROGRESS = (
    BLOCK_OPENED
    | BLOCK_DELTA
    | BLOCK_EXTRACTED
    | BLOCK_REJECTED
)

Emit block lifecycle events plus progress updates.

BLOCK_DELTA class-attribute instance-attribute

BLOCK_DELTA = auto()

Emit section delta events (BlockHeaderDeltaEvent, BlockMetadataDeltaEvent, BlockContentDeltaEvent) as CustomEvent.

BLOCK_EXTRACTED class-attribute instance-attribute

BLOCK_EXTRACTED = auto()

Emit BlockExtractedEvent as CustomEvent.

BLOCK_OPENED class-attribute instance-attribute

BLOCK_OPENED = auto()

Emit BlockOpenedEvent as CustomEvent.

BLOCK_REJECTED class-attribute instance-attribute

BLOCK_REJECTED = auto()

Emit BlockRejectedEvent as CustomEvent.

NONE class-attribute instance-attribute

NONE = 0

Emit no StreamBlocks events.

RAW_TEXT class-attribute instance-attribute

RAW_TEXT = auto()

Emit RawTextEvent as TextMessageContentEvent.

TEXT_AND_FINAL class-attribute instance-attribute

TEXT_AND_FINAL = (
    TEXT_DELTA | BLOCK_EXTRACTED | BLOCK_REJECTED
)

Emit text streaming plus final block results.

TEXT_DELTA class-attribute instance-attribute

TEXT_DELTA = auto()

Emit TextDeltaEvent as TextMessageContentEvent.