Skip to content

Provider Examples

Examples for working with different LLM providers.

Google Gemini

Basic Usage

import asyncio
import google.generativeai as genai
from streamblocks import StreamBlockProcessor, BlockRegistry, Syntax, EventType

async def gemini_example():
    genai.configure(api_key="your-api-key")  # pragma: allowlist secret  # pragma: allowlist secret
    model = genai.GenerativeModel("gemini-pro")

    prompt = """
    Create a task list:

    !!task01:task
    Review code changes
    !!end

    !!task02:task
    Update documentation
    !!end
    """

    response = model.generate_content(prompt, stream=True)

    processor = StreamBlockProcessor(
        registry=BlockRegistry(),
        syntax=Syntax.DELIMITER_PREAMBLE,
        input_adapter="auto",
    )

    async for event in processor.process_stream(response):
        if event.type == EventType.BLOCK_EXTRACTED:
            print(f"Task: {event.block.content.raw_content}")

asyncio.run(gemini_example())

With Explicit Adapter

from streamblocks.ext.gemini import GeminiInputAdapter

processor = StreamBlockProcessor(
    registry=BlockRegistry(),
    syntax=Syntax.DELIMITER_PREAMBLE,
    input_adapter=GeminiInputAdapter(),
)

OpenAI

Basic Usage

import asyncio
from openai import OpenAI
from streamblocks import StreamBlockProcessor, BlockRegistry, Syntax, EventType
from streamblocks.ext.openai import OpenAIInputAdapter

async def openai_example():
    client = OpenAI(api_key="your-api-key")  # pragma: allowlist secret

    prompt = """
    Create a task list using:
    !!id:task
    description
    !!end
    """

    stream = client.chat.completions.create(
        model="gpt-4",
        messages=[{"role": "user", "content": prompt}],
        stream=True,
    )

    processor = StreamBlockProcessor(
        registry=BlockRegistry(),
        syntax=Syntax.DELIMITER_PREAMBLE,
        input_adapter=OpenAIInputAdapter(),
    )

    async for event in processor.process_stream(stream):
        if event.type == EventType.BLOCK_EXTRACTED:
            print(f"Task: {event.block.content.raw_content}")

asyncio.run(openai_example())

Streaming Text Display

processor = StreamBlockProcessor(
    registry=BlockRegistry(),
    syntax=Syntax.DELIMITER_PREAMBLE,
    input_adapter=OpenAIInputAdapter(),
    emit_text_deltas=True,
)

async for event in processor.process_stream(stream):
    match event.type:
        case EventType.TEXT_DELTA:
            print(event.text, end="", flush=True)
        case EventType.BLOCK_EXTRACTED:
            print(f"\n[Block: {event.block.metadata.id}]")

Anthropic

Basic Usage

import asyncio
import anthropic
from streamblocks import StreamBlockProcessor, BlockRegistry, Syntax, EventType
from streamblocks.ext.anthropic import AnthropicInputAdapter

async def anthropic_example():
    client = anthropic.Anthropic(api_key="your-api-key")  # pragma: allowlist secret

    prompt = """
    Create a task list using:
    !!id:task
    description
    !!end
    """

    processor = StreamBlockProcessor(
        registry=BlockRegistry(),
        syntax=Syntax.DELIMITER_PREAMBLE,
        input_adapter=AnthropicInputAdapter(),
    )

    with client.messages.stream(
        model="claude-3-opus",
        max_tokens=1024,
        messages=[{"role": "user", "content": prompt}],
    ) as stream:
        async for event in processor.process_stream(stream):
            if event.type == EventType.BLOCK_EXTRACTED:
                print(f"Task: {event.block.content.raw_content}")

asyncio.run(anthropic_example())

Plain Text Streams

Identity Adapter

For plain text without LLM-specific formatting:

from streamblocks.adapters import IdentityAdapter

async def plain_text_stream():
    yield "!!task01:task\n"
    yield "Do something\n"
    yield "!!end\n"

processor = StreamBlockProcessor(
    registry=BlockRegistry(),
    syntax=Syntax.DELIMITER_PREAMBLE,
    input_adapter=IdentityAdapter(),
)

async for event in processor.process_stream(plain_text_stream()):
    if event.type == EventType.BLOCK_EXTRACTED:
        print(event.block)

No Adapter

Strings work directly:

async def string_stream():
    yield "!!task01:task\nDo something\n!!end\n"

async for event in processor.process_stream(string_stream()):
    if event.type == EventType.BLOCK_EXTRACTED:
        print(event.block)

Auto-Detection

Let Streamblocks detect the provider:

processor = StreamBlockProcessor(
    registry=BlockRegistry(),
    syntax=Syntax.DELIMITER_PREAMBLE,
    input_adapter="auto",  # Detects from first event
)

# Works with any supported provider
async for event in processor.process_stream(any_stream):
    handle_event(event)

Error Handling

Provider Errors

import google.api_core.exceptions

try:
    async for event in processor.process_stream(gemini_stream):
        handle_event(event)
except google.api_core.exceptions.ResourceExhausted:
    print("Rate limit exceeded")
    await asyncio.sleep(60)
except google.api_core.exceptions.InvalidArgument as e:
    print(f"Invalid request: {e}")

Block Rejections

async for event in processor.process_stream(stream):
    if event.type == EventType.BLOCK_REJECTED:
        print(f"Block rejected: {event.rejection.reason}")
    elif event.type == EventType.BLOCK_EXTRACTED:
        print(f"Block extracted: {event.block.metadata.id}")

Next Steps