Skip to content

PydanticAI Integration

Streamblocks integrates with PydanticAI for processing structured agent responses.

Installation

uv add streamblocks pydantic-ai
pip install streamblocks pydantic-ai

Basic Usage

from pydantic_ai import Agent
from hother.streamblocks.integrations.pydantic_ai import StreamblocksProcessor

# Create a PydanticAI agent
agent = Agent(
    model="gemini-2.0-flash-exp",
    system_prompt="You are a helpful assistant."
)

# Create the processor
processor = StreamblocksProcessor()

# Process agent responses
async def run_agent(prompt: str):
    async with agent.run_stream(prompt) as response:
        async for event in processor.process(response):
            if event.type == EventType.BLOCK_CLOSED:
                print(f"Block: {event.block.block_type}")
                print(f"Content: {event.block.content}")

Features

Structured Block Extraction

Extract structured blocks from agent responses:

async for event in processor.process(response):
    match event.type:
        case EventType.BLOCK_OPENED:
            print(f"New block: {event.block.block_type}")
        case EventType.BLOCK_UPDATED:
            print(f"Content: {event.block.content[-50:]}")
        case EventType.BLOCK_CLOSED:
            handle_complete_block(event.block)

Tool Call Handling

Handle tool calls from the agent:

async for event in processor.process(response):
    if event.type == EventType.BLOCK_CLOSED:
        if event.block.block_type == "tool_call":
            name = event.block.metadata.get("name")
            args = event.block.content
            result = await execute_tool(name, args)

Real-time Streaming

Display content as it streams:

async for event in processor.process(response):
    if event.type == EventType.TEXT_DELTA:
        print(event.data, end="", flush=True)

Example

See the full example:

Example file not found

Could not find: examples/06_integrations/01_pydantic_ai_integration.py

API Reference

hother.streamblocks.integrations.pydantic_ai

PydanticAI integration for StreamBlocks.

This module provides transparent integration between PydanticAI agents and StreamBlocks, allowing agents to generate structured blocks that are extracted in real-time during streaming.

AgentStreamProcessor

Bases: StreamBlockProcessor

Enhanced processor designed to work with PydanticAI agent streaming output.

This processor is optimized for handling streaming text from AI agents, with special handling for partial blocks and real-time extraction.

Source code in src/hother/streamblocks/integrations/pydantic_ai/processor.py
class AgentStreamProcessor(StreamBlockProcessor):
    """Enhanced processor designed to work with PydanticAI agent streaming output.

    This processor is optimized for handling streaming text from AI agents,
    with special handling for partial blocks and real-time extraction.
    """

    def __init__(
        self,
        registry: Registry,
        config: ProcessorConfig | None = None,
        *,
        enable_partial_blocks: bool = True,
    ) -> None:
        """Initialize the agent stream processor.

        Args:
            registry: Registry with a single syntax
            config: Configuration object for processor settings
            enable_partial_blocks: Whether to emit section delta events for partial blocks
        """
        super().__init__(registry, config=config)
        self.enable_partial_blocks = enable_partial_blocks

    async def process_agent_stream(self, agent_stream: AsyncIterator[str]) -> AsyncGenerator[str | BaseEvent]:
        """Process streaming output from a PydanticAI agent.

        This method is specifically designed to handle the streaming output
        from agent.run_stream() or similar agent streaming methods.

        Args:
            agent_stream: Async iterator from agent streaming (e.g., stream_text())

        Yields:
            Mixed stream of:
            - Original text chunks (if emit_original_events=True)
            - Event objects as blocks are detected and extracted
        """
        async for event in self.process_stream(agent_stream):
            yield event

    async def process_agent_with_events(
        self,
        agent_stream: AsyncIterator[str],
        event_handler: Callable[[str | BaseEvent], Any] | None = None,
    ) -> AsyncGenerator[str | BaseEvent]:
        """Process agent stream with optional event handler for agent-specific events.

        This allows handling both StreamBlocks events and PydanticAI events
        in a unified manner.

        Args:
            agent_stream: Async iterator from agent streaming
            event_handler: Optional callback for handling events (both text chunks and Events)

        Yields:
            Mixed stream of:
            - Original text chunks (if emit_original_events=True)
            - Event objects with enhanced metadata
        """
        async for event in self.process_agent_stream(agent_stream):
            # Call event handler if provided
            if event_handler:
                await event_handler(event)

            # Always yield the event
            yield event

config instance-attribute

config = config or ProcessorConfig()

enable_partial_blocks instance-attribute

enable_partial_blocks = enable_partial_blocks

logger instance-attribute

logger = logger or StdlibLoggerAdapter(getLogger(__name__))

registry instance-attribute

registry = registry

syntax instance-attribute

syntax = syntax

finalize

finalize() -> list[Event]

Finalize processing and flush any incomplete blocks.

Call this method after processing all chunks to get rejection events for any blocks that were opened but never closed.

This method processes any accumulated text as a final line before flushing candidates, ensuring the last line is processed even if it doesn't end with a newline.

Returns:

Type Description
list[Event]

List of events including processed final line and rejection events

list[Event]

for incomplete blocks

Example

processor = StreamBlockProcessor(registry) async for chunk in stream: ... events = processor.process_chunk(chunk) ... # ... handle events ...

Stream ended, process remaining text and flush incomplete blocks

final_events = processor.finalize() for event in final_events: ... if isinstance(event, BlockErrorEvent): ... print(f"Incomplete block: {event.reason}")

Source code in src/hother/streamblocks/core/processor.py
def finalize(self) -> list[Event]:
    """Finalize processing and flush any incomplete blocks.

    Call this method after processing all chunks to get rejection events
    for any blocks that were opened but never closed.

    This method processes any accumulated text as a final line before
    flushing candidates, ensuring the last line is processed even if it
    doesn't end with a newline.

    Returns:
        List of events including processed final line and rejection events
        for incomplete blocks

    Example:
        >>> processor = StreamBlockProcessor(registry)
        >>> async for chunk in stream:
        ...     events = processor.process_chunk(chunk)
        ...     # ... handle events
        ...
        >>> # Stream ended, process remaining text and flush incomplete blocks
        >>> final_events = processor.finalize()
        >>> for event in final_events:
        ...     if isinstance(event, BlockErrorEvent):
        ...         print(f"Incomplete block: {event.reason}")
    """
    events: list[Event] = []

    # Process any remaining accumulated text as a final line
    final_line = self._line_accumulator.finalize()
    if final_line:
        line_number, line = final_line
        line_events = self._block_machine.process_line(line, line_number)
        self._update_stats(line_events)
        events.extend(line_events)

    # Flush remaining candidates
    flush_events = self._block_machine.flush(self._line_accumulator.line_number)
    self._update_stats(flush_events)
    events.extend(flush_events)

    return events

is_native_event

is_native_event(event: Any) -> bool

Check if event is a native provider event (not a StreamBlocks event).

This method provides provider-agnostic detection of native events. It checks if the event originates from the AI provider (Gemini, OpenAI, Anthropic, etc.) versus being a StreamBlocks event.

Parameters:

Name Type Description Default
event Any

Event to check

required

Returns:

Type Description
bool

True if event is from the native provider, False if it's a StreamBlocks

bool

event or if detection is not possible

Example

processor = StreamBlockProcessor(registry) async for event in processor.process_stream(gemini_stream): ... if processor.is_native_event(event): ... # Handle Gemini event (provider-agnostic!) ... usage = getattr(event, 'usage_metadata', None) ... elif isinstance(event, BlockEndEvent): ... # Handle StreamBlocks event ... print(f"Block: {event.block_id}")

Source code in src/hother/streamblocks/core/processor.py
def is_native_event(self, event: Any) -> bool:
    """Check if event is a native provider event (not a StreamBlocks event).

    This method provides provider-agnostic detection of native events.
    It checks if the event originates from the AI provider (Gemini, OpenAI,
    Anthropic, etc.) versus being a StreamBlocks event.

    Args:
        event: Event to check

    Returns:
        True if event is from the native provider, False if it's a StreamBlocks
        event or if detection is not possible

    Example:
        >>> processor = StreamBlockProcessor(registry)
        >>> async for event in processor.process_stream(gemini_stream):
        ...     if processor.is_native_event(event):
        ...         # Handle Gemini event (provider-agnostic!)
        ...         usage = getattr(event, 'usage_metadata', None)
        ...     elif isinstance(event, BlockEndEvent):
        ...         # Handle StreamBlocks event
        ...         print(f"Block: {event.block_id}")
    """
    # Check if it's a known StreamBlocks event
    if isinstance(
        event,
        (
            StreamStartedEvent,
            StreamFinishedEvent,
            TextContentEvent,
            TextDeltaEvent,
            BlockStartEvent,
            BlockHeaderDeltaEvent,
            BlockMetadataDeltaEvent,
            BlockContentDeltaEvent,
            BlockMetadataEndEvent,
            BlockContentEndEvent,
            BlockEndEvent,
            BlockErrorEvent,
        ),
    ):
        return False

    # Check if we have an adapter with a module prefix
    if self._adapter is None:
        return False

    # Use Protocol-based check for native module prefix
    if not isinstance(self._adapter, HasNativeModulePrefix):
        return False

    # Check if event's module matches the adapter's prefix
    return type(event).__module__.startswith(self._adapter.native_module_prefix)

process_agent_stream async

process_agent_stream(
    agent_stream: AsyncIterator[str],
) -> AsyncGenerator[str | BaseEvent]

Process streaming output from a PydanticAI agent.

This method is specifically designed to handle the streaming output from agent.run_stream() or similar agent streaming methods.

Parameters:

Name Type Description Default
agent_stream AsyncIterator[str]

Async iterator from agent streaming (e.g., stream_text())

required

Yields:

Type Description
AsyncGenerator[str | BaseEvent]

Mixed stream of:

AsyncGenerator[str | BaseEvent]
  • Original text chunks (if emit_original_events=True)
AsyncGenerator[str | BaseEvent]
  • Event objects as blocks are detected and extracted
Source code in src/hother/streamblocks/integrations/pydantic_ai/processor.py
async def process_agent_stream(self, agent_stream: AsyncIterator[str]) -> AsyncGenerator[str | BaseEvent]:
    """Process streaming output from a PydanticAI agent.

    This method is specifically designed to handle the streaming output
    from agent.run_stream() or similar agent streaming methods.

    Args:
        agent_stream: Async iterator from agent streaming (e.g., stream_text())

    Yields:
        Mixed stream of:
        - Original text chunks (if emit_original_events=True)
        - Event objects as blocks are detected and extracted
    """
    async for event in self.process_stream(agent_stream):
        yield event

process_agent_with_events async

process_agent_with_events(
    agent_stream: AsyncIterator[str],
    event_handler: Callable[[str | BaseEvent], Any]
    | None = None,
) -> AsyncGenerator[str | BaseEvent]

Process agent stream with optional event handler for agent-specific events.

This allows handling both StreamBlocks events and PydanticAI events in a unified manner.

Parameters:

Name Type Description Default
agent_stream AsyncIterator[str]

Async iterator from agent streaming

required
event_handler Callable[[str | BaseEvent], Any] | None

Optional callback for handling events (both text chunks and Events)

None

Yields:

Type Description
AsyncGenerator[str | BaseEvent]

Mixed stream of:

AsyncGenerator[str | BaseEvent]
  • Original text chunks (if emit_original_events=True)
AsyncGenerator[str | BaseEvent]
  • Event objects with enhanced metadata
Source code in src/hother/streamblocks/integrations/pydantic_ai/processor.py
async def process_agent_with_events(
    self,
    agent_stream: AsyncIterator[str],
    event_handler: Callable[[str | BaseEvent], Any] | None = None,
) -> AsyncGenerator[str | BaseEvent]:
    """Process agent stream with optional event handler for agent-specific events.

    This allows handling both StreamBlocks events and PydanticAI events
    in a unified manner.

    Args:
        agent_stream: Async iterator from agent streaming
        event_handler: Optional callback for handling events (both text chunks and Events)

    Yields:
        Mixed stream of:
        - Original text chunks (if emit_original_events=True)
        - Event objects with enhanced metadata
    """
    async for event in self.process_agent_stream(agent_stream):
        # Call event handler if provided
        if event_handler:
            await event_handler(event)

        # Always yield the event
        yield event

process_chunk

process_chunk(
    chunk: TChunk,
    adapter: InputProtocolAdapter[TChunk] | None = None,
) -> list[TChunk | Event]

Process a single chunk and return resulting events.

This method is stateful - it maintains internal state between calls. Call finalize() after processing all chunks to flush incomplete blocks.

Parameters:

Name Type Description Default
chunk TChunk

Single chunk to process

required
adapter InputProtocolAdapter[TChunk] | None

Optional adapter for extracting text. If not provided and auto_detect_adapter=True, will auto-detect on first chunk.

None

Returns:

Type Description
list[TChunk | Event]

List of events generated from this chunk. May be empty if chunk only

list[TChunk | Event]

accumulates text without completing any lines.

Raises:

Type Description
RuntimeError

If adapter is not set after first chunk processing (internal state error, should not occur in normal usage).

Example

processor = StreamBlockProcessor(registry) response = await client.generate_content_stream(...) async for chunk in response: ... events = processor.process_chunk(chunk) ... for event in events: ... if isinstance(event, BlockEndEvent): ... print(f"Block: {event.block_id}") ...

Finalize at stream end

final_events = processor.finalize() for event in final_events: ... if isinstance(event, BlockErrorEvent): ... print(f"Incomplete block: {event.reason}")

Source code in src/hother/streamblocks/core/processor.py
def process_chunk(
    self,
    chunk: TChunk,
    adapter: InputProtocolAdapter[TChunk] | None = None,
) -> list[TChunk | Event]:
    """Process a single chunk and return resulting events.

    This method is stateful - it maintains internal state between calls.
    Call finalize() after processing all chunks to flush incomplete blocks.

    Args:
        chunk: Single chunk to process
        adapter: Optional adapter for extracting text. If not provided and
                auto_detect_adapter=True, will auto-detect on first chunk.

    Returns:
        List of events generated from this chunk. May be empty if chunk only
        accumulates text without completing any lines.

    Raises:
        RuntimeError: If adapter is not set after first chunk processing
            (internal state error, should not occur in normal usage).

    Example:
        >>> processor = StreamBlockProcessor(registry)
        >>> response = await client.generate_content_stream(...)
        >>> async for chunk in response:
        ...     events = processor.process_chunk(chunk)
        ...     for event in events:
        ...         if isinstance(event, BlockEndEvent):
        ...             print(f"Block: {event.block_id}")
        ...
        >>> # Finalize at stream end
        >>> final_events = processor.finalize()
        >>> for event in final_events:
        ...     if isinstance(event, BlockErrorEvent):
        ...         print(f"Incomplete block: {event.reason}")
    """
    events: list[TChunk | Event] = []

    # Auto-detect adapter on first chunk
    self._ensure_adapter(chunk, adapter)

    # Emit original chunk (passthrough)
    if self.config.emit_original_events and not isinstance(self._adapter, IdentityInputAdapter):
        events.append(chunk)

    # Extract text from chunk
    if self._adapter is None:
        msg = "Adapter should be set after first chunk processing"
        raise RuntimeError(msg)
    text = self._adapter.extract_text(chunk)  # type: ignore[arg-type]

    if not text:
        return events

    # Log stream processing start on first chunk with text
    if self._line_accumulator.line_number == 0 and not self._line_accumulator.has_pending_text:
        self.logger.debug(
            "stream_processing_started",
            syntax=get_syntax_name(self.syntax),
            lines_buffer=self.config.lines_buffer,
            max_block_size=self.config.max_block_size,
        )

    # Emit text delta for real-time streaming
    if self.config.emit_text_deltas:
        events.append(self._create_text_delta_event(text))

    # Process text through line accumulator and block state machine
    for line_number, line in self._line_accumulator.add_text(text):
        line_events = self._block_machine.process_line(line, line_number)
        self._update_stats(line_events)
        events.extend(line_events)

    return events

process_stream async

process_stream(
    stream: AsyncIterator[TChunk],
    adapter: InputProtocolAdapter[TChunk] | None = None,
) -> AsyncGenerator[TChunk | Event]

Process stream and yield mixed events.

This method processes chunks from any stream format, extracting text via an adapter and emitting both original chunks (if enabled) and StreamBlocks events.

Parameters:

Name Type Description Default
stream AsyncIterator[TChunk]

Async iterator yielding chunks (text or objects)

required
adapter InputProtocolAdapter[TChunk] | None

Optional adapter for extracting text from chunks. If None and auto_detect_adapter=True, will auto-detect from first chunk.

None

Yields:

Type Description
AsyncGenerator[TChunk | Event]

Mixed stream of:

AsyncGenerator[TChunk | Event]
  • Original chunks (if emit_original_events=True)
AsyncGenerator[TChunk | Event]
  • TextDeltaEvent (if emit_text_deltas=True)
AsyncGenerator[TChunk | Event]
  • TextContentEvent, BlockStartEvent, BlockEndEvent, BlockErrorEvent, and section delta events

Raises:

Type Description
RuntimeError

If adapter is not set after first chunk processing (internal state error, should not occur in normal usage).

Example
Plain text

async for event in processor.process_stream(text_stream): ... if isinstance(event, BlockEndEvent): ... print(f"Extracted: {event.block_id}")

With Gemini adapter (auto-detected)

async for event in processor.process_stream(gemini_stream): ... if hasattr(event, 'usage_metadata'): ... print(f"Tokens: {event.usage_metadata}") ... elif isinstance(event, BlockEndEvent): ... print(f"Extracted: {event.block_id}")

Source code in src/hother/streamblocks/core/processor.py
async def process_stream(
    self,
    stream: AsyncIterator[TChunk],
    adapter: InputProtocolAdapter[TChunk] | None = None,
) -> AsyncGenerator[TChunk | Event]:
    """Process stream and yield mixed events.

    This method processes chunks from any stream format, extracting text
    via an adapter and emitting both original chunks (if enabled) and
    StreamBlocks events.

    Args:
        stream: Async iterator yielding chunks (text or objects)
        adapter: Optional adapter for extracting text from chunks.
                If None and auto_detect_adapter=True, will auto-detect from first chunk.

    Yields:
        Mixed stream of:
        - Original chunks (if emit_original_events=True)
        - TextDeltaEvent (if emit_text_deltas=True)
        - TextContentEvent, BlockStartEvent, BlockEndEvent, BlockErrorEvent, and section delta events

    Raises:
        RuntimeError: If adapter is not set after first chunk processing
            (internal state error, should not occur in normal usage).

    Example:
        >>> # Plain text
        >>> async for event in processor.process_stream(text_stream):
        ...     if isinstance(event, BlockEndEvent):
        ...         print(f"Extracted: {event.block_id}")
        >>>
        >>> # With Gemini adapter (auto-detected)
        >>> async for event in processor.process_stream(gemini_stream):
        ...     if hasattr(event, 'usage_metadata'):
        ...         print(f"Tokens: {event.usage_metadata}")
        ...     elif isinstance(event, BlockEndEvent):
        ...         print(f"Extracted: {event.block_id}")
    """
    # Set adapter if provided
    if adapter:
        self._adapter = adapter
        self._first_chunk_processed = True

    async for chunk in stream:
        # Auto-detection on first chunk
        self._ensure_adapter(chunk, None)

        # Emit original chunk (passthrough)
        if self.config.emit_original_events and not isinstance(self._adapter, IdentityInputAdapter):
            yield chunk

        # Extract text from chunk
        if self._adapter is None:
            msg = "Adapter should be set after first chunk processing"
            raise RuntimeError(msg)
        text = self._adapter.extract_text(chunk)  # type: ignore[arg-type]

        if not text:
            continue

        # Log stream processing start on first chunk with text
        if self._line_accumulator.line_number == 0 and not self._line_accumulator.has_pending_text:
            self.logger.debug(
                "stream_processing_started",
                syntax=get_syntax_name(self.syntax),
                lines_buffer=self.config.lines_buffer,
                max_block_size=self.config.max_block_size,
            )

        # Emit text delta for real-time streaming
        if self.config.emit_text_deltas:
            yield self._create_text_delta_event(text)

        # Process text through line accumulator and block state machine
        for line_number, line in self._line_accumulator.add_text(text):
            for event in self._block_machine.process_line(line, line_number):
                self._update_stats([event])
                yield event

    # Process any remaining accumulated text as a final line
    final_line = self._line_accumulator.finalize()
    if final_line:
        line_number, line = final_line
        for event in self._block_machine.process_line(line, line_number):
            self._update_stats([event])
            yield event

    # Flush remaining candidates at stream end
    for event in self._block_machine.flush(self._line_accumulator.line_number):
        self._update_stats([event])
        yield event

BlockAwareAgent

PydanticAI agent that generates StreamBlocks-compatible output.

This wrapper makes a PydanticAI agent aware of StreamBlocks syntaxes, allowing it to generate structured blocks that can be extracted in real-time.

Source code in src/hother/streamblocks/integrations/pydantic_ai/agent.py
class BlockAwareAgent:
    """PydanticAI agent that generates StreamBlocks-compatible output.

    This wrapper makes a PydanticAI agent aware of StreamBlocks syntaxes,
    allowing it to generate structured blocks that can be extracted in real-time.
    """

    def __init__(
        self,
        registry: Registry,
        model: str | Agent | None = None,
        system_prompt: str | None = None,
        **agent_kwargs: Any,
    ) -> None:
        """Initialize a block-aware agent.

        Args:
            registry: StreamBlocks registry containing syntax definitions
            model: Model name (e.g., 'openai:gpt-4o') or existing Agent instance
            system_prompt: System prompt for the agent (required if creating new agent)
            **agent_kwargs: Additional arguments to pass to Agent constructor
        """
        # Create or use existing agent
        if isinstance(model, Agent):
            self.agent = model
        else:
            # Use provided system_prompt
            if system_prompt:
                agent_kwargs["system_prompt"] = system_prompt
            self.agent = Agent(model or "openai:gpt-4o", **agent_kwargs)

        # Setup registry and processor
        self.registry = registry
        self.processor = AgentStreamProcessor(registry)

    async def run_with_blocks(
        self,
        user_prompt: str,
        message_history: Any = None,
        deps: Any = None,
        **kwargs: Any,
    ) -> AsyncGenerator[str | BaseEvent]:
        """Run the agent and stream blocks in real-time.

        Args:
            user_prompt: The user's prompt to the agent
            message_history: Optional conversation history
            deps: Optional dependencies for the agent
            **kwargs: Additional arguments for agent.run_stream()

        Yields:
            Mixed stream of:
            - Original text chunks (if emit_original_events=True)
            - Event objects as blocks are detected and extracted
        """

        # Start agent streaming
        async with self.agent.run_stream(
            user_prompt, message_history=message_history, deps=deps, **kwargs
        ) as stream_result:
            # Create text stream from agent
            # Use PydanticAI's native delta streaming feature
            async def agent_text_stream() -> AsyncIterator[str]:
                async for delta_text in stream_result.stream_text(delta=True):
                    yield delta_text

            # Process through StreamBlocks
            async for event in self.processor.process_agent_stream(agent_text_stream()):
                yield event

    async def run(
        self,
        user_prompt: str,
        message_history: Any = None,
        deps: Any = None,
        **kwargs: Any,
    ) -> Any:
        """Run the agent without block extraction (standard PydanticAI interface).

        This method provides compatibility with the standard PydanticAI Agent interface.

        Args:
            user_prompt: The user's prompt to the agent
            message_history: Optional conversation history
            deps: Optional dependencies for the agent
            **kwargs: Additional arguments for agent.run()

        Returns:
            The agent's response
        """
        return await self.agent.run(user_prompt, message_history=message_history, deps=deps, **kwargs)

    def run_sync(
        self,
        user_prompt: str,
        message_history: Any = None,
        deps: Any = None,
        **kwargs: Any,
    ) -> Any:
        """Run the agent synchronously without block extraction.

        This method provides compatibility with the standard PydanticAI Agent interface.

        Args:
            user_prompt: The user's prompt to the agent
            message_history: Optional conversation history
            deps: Optional dependencies for the agent
            **kwargs: Additional arguments for agent.run_sync()

        Returns:
            The agent's response
        """
        return self.agent.run_sync(user_prompt, message_history=message_history, deps=deps, **kwargs)

    def __getattr__(self, name: str) -> Any:
        """Forward unknown attributes to the underlying agent.

        This allows the BlockAwareAgent to act as a transparent wrapper.

        Args:
            name: Attribute name

        Returns:
            The attribute from the underlying agent
        """
        return getattr(self.agent, name)

agent instance-attribute

agent = model

processor instance-attribute

processor = AgentStreamProcessor(registry)

registry instance-attribute

registry = registry

run async

run(
    user_prompt: str,
    message_history: Any = None,
    deps: Any = None,
    **kwargs: Any,
) -> Any

Run the agent without block extraction (standard PydanticAI interface).

This method provides compatibility with the standard PydanticAI Agent interface.

Parameters:

Name Type Description Default
user_prompt str

The user's prompt to the agent

required
message_history Any

Optional conversation history

None
deps Any

Optional dependencies for the agent

None
**kwargs Any

Additional arguments for agent.run()

{}

Returns:

Type Description
Any

The agent's response

Source code in src/hother/streamblocks/integrations/pydantic_ai/agent.py
async def run(
    self,
    user_prompt: str,
    message_history: Any = None,
    deps: Any = None,
    **kwargs: Any,
) -> Any:
    """Run the agent without block extraction (standard PydanticAI interface).

    This method provides compatibility with the standard PydanticAI Agent interface.

    Args:
        user_prompt: The user's prompt to the agent
        message_history: Optional conversation history
        deps: Optional dependencies for the agent
        **kwargs: Additional arguments for agent.run()

    Returns:
        The agent's response
    """
    return await self.agent.run(user_prompt, message_history=message_history, deps=deps, **kwargs)

run_sync

run_sync(
    user_prompt: str,
    message_history: Any = None,
    deps: Any = None,
    **kwargs: Any,
) -> Any

Run the agent synchronously without block extraction.

This method provides compatibility with the standard PydanticAI Agent interface.

Parameters:

Name Type Description Default
user_prompt str

The user's prompt to the agent

required
message_history Any

Optional conversation history

None
deps Any

Optional dependencies for the agent

None
**kwargs Any

Additional arguments for agent.run_sync()

{}

Returns:

Type Description
Any

The agent's response

Source code in src/hother/streamblocks/integrations/pydantic_ai/agent.py
def run_sync(
    self,
    user_prompt: str,
    message_history: Any = None,
    deps: Any = None,
    **kwargs: Any,
) -> Any:
    """Run the agent synchronously without block extraction.

    This method provides compatibility with the standard PydanticAI Agent interface.

    Args:
        user_prompt: The user's prompt to the agent
        message_history: Optional conversation history
        deps: Optional dependencies for the agent
        **kwargs: Additional arguments for agent.run_sync()

    Returns:
        The agent's response
    """
    return self.agent.run_sync(user_prompt, message_history=message_history, deps=deps, **kwargs)

run_with_blocks async

run_with_blocks(
    user_prompt: str,
    message_history: Any = None,
    deps: Any = None,
    **kwargs: Any,
) -> AsyncGenerator[str | BaseEvent]

Run the agent and stream blocks in real-time.

Parameters:

Name Type Description Default
user_prompt str

The user's prompt to the agent

required
message_history Any

Optional conversation history

None
deps Any

Optional dependencies for the agent

None
**kwargs Any

Additional arguments for agent.run_stream()

{}

Yields:

Type Description
AsyncGenerator[str | BaseEvent]

Mixed stream of:

AsyncGenerator[str | BaseEvent]
  • Original text chunks (if emit_original_events=True)
AsyncGenerator[str | BaseEvent]
  • Event objects as blocks are detected and extracted
Source code in src/hother/streamblocks/integrations/pydantic_ai/agent.py
async def run_with_blocks(
    self,
    user_prompt: str,
    message_history: Any = None,
    deps: Any = None,
    **kwargs: Any,
) -> AsyncGenerator[str | BaseEvent]:
    """Run the agent and stream blocks in real-time.

    Args:
        user_prompt: The user's prompt to the agent
        message_history: Optional conversation history
        deps: Optional dependencies for the agent
        **kwargs: Additional arguments for agent.run_stream()

    Yields:
        Mixed stream of:
        - Original text chunks (if emit_original_events=True)
        - Event objects as blocks are detected and extracted
    """

    # Start agent streaming
    async with self.agent.run_stream(
        user_prompt, message_history=message_history, deps=deps, **kwargs
    ) as stream_result:
        # Create text stream from agent
        # Use PydanticAI's native delta streaming feature
        async def agent_text_stream() -> AsyncIterator[str]:
            async for delta_text in stream_result.stream_text(delta=True):
                yield delta_text

        # Process through StreamBlocks
        async for event in self.processor.process_agent_stream(agent_text_stream()):
            yield event