PydanticAI Integration
Streamblocks integrates with PydanticAI for processing structured agent responses.
Installation
Basic Usage
from pydantic_ai import Agent
from hother.streamblocks.integrations.pydantic_ai import StreamblocksProcessor
# Create a PydanticAI agent
agent = Agent(
model="gemini-2.0-flash-exp",
system_prompt="You are a helpful assistant."
)
# Create the processor
processor = StreamblocksProcessor()
# Process agent responses
async def run_agent(prompt: str):
async with agent.run_stream(prompt) as response:
async for event in processor.process(response):
if event.type == EventType.BLOCK_CLOSED:
print(f"Block: {event.block.block_type}")
print(f"Content: {event.block.content}")
Features
Structured Block Extraction
Extract structured blocks from agent responses:
async for event in processor.process(response):
match event.type:
case EventType.BLOCK_OPENED:
print(f"New block: {event.block.block_type}")
case EventType.BLOCK_UPDATED:
print(f"Content: {event.block.content[-50:]}")
case EventType.BLOCK_CLOSED:
handle_complete_block(event.block)
Tool Call Handling
Handle tool calls from the agent:
async for event in processor.process(response):
if event.type == EventType.BLOCK_CLOSED:
if event.block.block_type == "tool_call":
name = event.block.metadata.get("name")
args = event.block.content
result = await execute_tool(name, args)
Real-time Streaming
Display content as it streams:
async for event in processor.process(response):
if event.type == EventType.TEXT_DELTA:
print(event.data, end="", flush=True)
Example
See the full example:
Example file not found
Could not find: examples/06_integrations/01_pydantic_ai_integration.py
API Reference
hother.streamblocks.integrations.pydantic_ai
PydanticAI integration for StreamBlocks.
This module provides transparent integration between PydanticAI agents and StreamBlocks, allowing agents to generate structured blocks that are extracted in real-time during streaming.
AgentStreamProcessor
Bases: StreamBlockProcessor
Enhanced processor designed to work with PydanticAI agent streaming output.
This processor is optimized for handling streaming text from AI agents, with special handling for partial blocks and real-time extraction.
Source code in src/hother/streamblocks/integrations/pydantic_ai/processor.py
finalize
finalize() -> list[Event]
Finalize processing and flush any incomplete blocks.
Call this method after processing all chunks to get rejection events for any blocks that were opened but never closed.
This method processes any accumulated text as a final line before flushing candidates, ensuring the last line is processed even if it doesn't end with a newline.
Returns:
| Type | Description |
|---|---|
list[Event]
|
List of events including processed final line and rejection events |
list[Event]
|
for incomplete blocks |
Example
processor = StreamBlockProcessor(registry) async for chunk in stream: ... events = processor.process_chunk(chunk) ... # ... handle events ...
Stream ended, process remaining text and flush incomplete blocks
final_events = processor.finalize() for event in final_events: ... if isinstance(event, BlockErrorEvent): ... print(f"Incomplete block: {event.reason}")
Source code in src/hother/streamblocks/core/processor.py
is_native_event
Check if event is a native provider event (not a StreamBlocks event).
This method provides provider-agnostic detection of native events. It checks if the event originates from the AI provider (Gemini, OpenAI, Anthropic, etc.) versus being a StreamBlocks event.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event
|
Any
|
Event to check |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if event is from the native provider, False if it's a StreamBlocks |
bool
|
event or if detection is not possible |
Example
processor = StreamBlockProcessor(registry) async for event in processor.process_stream(gemini_stream): ... if processor.is_native_event(event): ... # Handle Gemini event (provider-agnostic!) ... usage = getattr(event, 'usage_metadata', None) ... elif isinstance(event, BlockEndEvent): ... # Handle StreamBlocks event ... print(f"Block: {event.block_id}")
Source code in src/hother/streamblocks/core/processor.py
process_agent_stream
async
process_agent_stream(
agent_stream: AsyncIterator[str],
) -> AsyncGenerator[str | BaseEvent]
Process streaming output from a PydanticAI agent.
This method is specifically designed to handle the streaming output from agent.run_stream() or similar agent streaming methods.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
agent_stream
|
AsyncIterator[str]
|
Async iterator from agent streaming (e.g., stream_text()) |
required |
Yields:
| Type | Description |
|---|---|
AsyncGenerator[str | BaseEvent]
|
Mixed stream of: |
AsyncGenerator[str | BaseEvent]
|
|
AsyncGenerator[str | BaseEvent]
|
|
Source code in src/hother/streamblocks/integrations/pydantic_ai/processor.py
process_agent_with_events
async
process_agent_with_events(
agent_stream: AsyncIterator[str],
event_handler: Callable[[str | BaseEvent], Any]
| None = None,
) -> AsyncGenerator[str | BaseEvent]
Process agent stream with optional event handler for agent-specific events.
This allows handling both StreamBlocks events and PydanticAI events in a unified manner.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
agent_stream
|
AsyncIterator[str]
|
Async iterator from agent streaming |
required |
event_handler
|
Callable[[str | BaseEvent], Any] | None
|
Optional callback for handling events (both text chunks and Events) |
None
|
Yields:
| Type | Description |
|---|---|
AsyncGenerator[str | BaseEvent]
|
Mixed stream of: |
AsyncGenerator[str | BaseEvent]
|
|
AsyncGenerator[str | BaseEvent]
|
|
Source code in src/hother/streamblocks/integrations/pydantic_ai/processor.py
process_chunk
process_chunk(
chunk: TChunk,
adapter: InputProtocolAdapter[TChunk] | None = None,
) -> list[TChunk | Event]
Process a single chunk and return resulting events.
This method is stateful - it maintains internal state between calls. Call finalize() after processing all chunks to flush incomplete blocks.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
chunk
|
TChunk
|
Single chunk to process |
required |
adapter
|
InputProtocolAdapter[TChunk] | None
|
Optional adapter for extracting text. If not provided and auto_detect_adapter=True, will auto-detect on first chunk. |
None
|
Returns:
| Type | Description |
|---|---|
list[TChunk | Event]
|
List of events generated from this chunk. May be empty if chunk only |
list[TChunk | Event]
|
accumulates text without completing any lines. |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If adapter is not set after first chunk processing (internal state error, should not occur in normal usage). |
Example
processor = StreamBlockProcessor(registry) response = await client.generate_content_stream(...) async for chunk in response: ... events = processor.process_chunk(chunk) ... for event in events: ... if isinstance(event, BlockEndEvent): ... print(f"Block: {event.block_id}") ...
Finalize at stream end
final_events = processor.finalize() for event in final_events: ... if isinstance(event, BlockErrorEvent): ... print(f"Incomplete block: {event.reason}")
Source code in src/hother/streamblocks/core/processor.py
160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 | |
process_stream
async
process_stream(
stream: AsyncIterator[TChunk],
adapter: InputProtocolAdapter[TChunk] | None = None,
) -> AsyncGenerator[TChunk | Event]
Process stream and yield mixed events.
This method processes chunks from any stream format, extracting text via an adapter and emitting both original chunks (if enabled) and StreamBlocks events.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
stream
|
AsyncIterator[TChunk]
|
Async iterator yielding chunks (text or objects) |
required |
adapter
|
InputProtocolAdapter[TChunk] | None
|
Optional adapter for extracting text from chunks. If None and auto_detect_adapter=True, will auto-detect from first chunk. |
None
|
Yields:
| Type | Description |
|---|---|
AsyncGenerator[TChunk | Event]
|
Mixed stream of: |
AsyncGenerator[TChunk | Event]
|
|
AsyncGenerator[TChunk | Event]
|
|
AsyncGenerator[TChunk | Event]
|
|
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If adapter is not set after first chunk processing (internal state error, should not occur in normal usage). |
Example
Plain text
async for event in processor.process_stream(text_stream): ... if isinstance(event, BlockEndEvent): ... print(f"Extracted: {event.block_id}")
With Gemini adapter (auto-detected)
async for event in processor.process_stream(gemini_stream): ... if hasattr(event, 'usage_metadata'): ... print(f"Tokens: {event.usage_metadata}") ... elif isinstance(event, BlockEndEvent): ... print(f"Extracted: {event.block_id}")
Source code in src/hother/streamblocks/core/processor.py
335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 | |
BlockAwareAgent
PydanticAI agent that generates StreamBlocks-compatible output.
This wrapper makes a PydanticAI agent aware of StreamBlocks syntaxes, allowing it to generate structured blocks that can be extracted in real-time.
Source code in src/hother/streamblocks/integrations/pydantic_ai/agent.py
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 | |
run
async
Run the agent without block extraction (standard PydanticAI interface).
This method provides compatibility with the standard PydanticAI Agent interface.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
user_prompt
|
str
|
The user's prompt to the agent |
required |
message_history
|
Any
|
Optional conversation history |
None
|
deps
|
Any
|
Optional dependencies for the agent |
None
|
**kwargs
|
Any
|
Additional arguments for agent.run() |
{}
|
Returns:
| Type | Description |
|---|---|
Any
|
The agent's response |
Source code in src/hother/streamblocks/integrations/pydantic_ai/agent.py
run_sync
Run the agent synchronously without block extraction.
This method provides compatibility with the standard PydanticAI Agent interface.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
user_prompt
|
str
|
The user's prompt to the agent |
required |
message_history
|
Any
|
Optional conversation history |
None
|
deps
|
Any
|
Optional dependencies for the agent |
None
|
**kwargs
|
Any
|
Additional arguments for agent.run_sync() |
{}
|
Returns:
| Type | Description |
|---|---|
Any
|
The agent's response |
Source code in src/hother/streamblocks/integrations/pydantic_ai/agent.py
run_with_blocks
async
run_with_blocks(
user_prompt: str,
message_history: Any = None,
deps: Any = None,
**kwargs: Any,
) -> AsyncGenerator[str | BaseEvent]
Run the agent and stream blocks in real-time.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
user_prompt
|
str
|
The user's prompt to the agent |
required |
message_history
|
Any
|
Optional conversation history |
None
|
deps
|
Any
|
Optional dependencies for the agent |
None
|
**kwargs
|
Any
|
Additional arguments for agent.run_stream() |
{}
|
Yields:
| Type | Description |
|---|---|
AsyncGenerator[str | BaseEvent]
|
Mixed stream of: |
AsyncGenerator[str | BaseEvent]
|
|
AsyncGenerator[str | BaseEvent]
|
|