Skip to content

Block State Machine

Streamblocks uses a state machine to track block detection and extraction. This document explains the states, transitions, and internal workings of the block processing system.

State Overview

stateDiagram-v2
    [*] --> SEARCHING: Start processing

    SEARCHING --> HEADER_DETECTED: Start marker found
    SEARCHING --> SEARCHING: Regular text line

    HEADER_DETECTED --> ACCUMULATING_METADATA: Has frontmatter
    HEADER_DETECTED --> ACCUMULATING_CONTENT: No frontmatter
    HEADER_DETECTED --> REJECTED: Invalid header

    ACCUMULATING_METADATA --> ACCUMULATING_CONTENT: Metadata complete
    ACCUMULATING_METADATA --> REJECTED: Invalid metadata

    ACCUMULATING_CONTENT --> CLOSING_DETECTED: End marker found
    ACCUMULATING_CONTENT --> ACCUMULATING_CONTENT: Content line
    ACCUMULATING_CONTENT --> REJECTED: Max size exceeded

    CLOSING_DETECTED --> COMPLETED: Validation passed
    CLOSING_DETECTED --> REJECTED: Validation failed

    COMPLETED --> SEARCHING: Continue processing
    REJECTED --> SEARCHING: Continue processing

    COMPLETED --> [*]: Stream ends
    REJECTED --> [*]: Stream ends
    SEARCHING --> [*]: Stream ends

States

SEARCHING

The initial and default state. The processor scans lines looking for block start markers.

# In SEARCHING state, each line is checked for block start
if syntax.detect_start(line):
    state = BlockState.HEADER_DETECTED
    emit(EventType.BLOCK_OPENED)
else:
    emit(EventType.TEXT_DELTA)  # Regular text

Transitions from SEARCHING:

Condition Next State Event
Start marker found HEADER_DETECTED BLOCK_OPENED
Regular line SEARCHING TEXT_DELTA
Stream ends Terminal STREAM_END

HEADER_DETECTED

A block start marker has been found. The processor determines if frontmatter parsing is needed.

flowchart TB
    HeaderDetected[HEADER_DETECTED]

    HeaderDetected --> CheckFrontmatter{Syntax has frontmatter?}
    CheckFrontmatter -->|Yes| AccumulatingMetadata[ACCUMULATING_METADATA]
    CheckFrontmatter -->|No| AccumulatingContent[ACCUMULATING_CONTENT]

    HeaderDetected --> CheckValid{Valid header?}
    CheckValid -->|No| Rejected[REJECTED]

Transitions from HEADER_DETECTED:

Condition Next State Event
Has frontmatter ACCUMULATING_METADATA -
No frontmatter ACCUMULATING_CONTENT -
Invalid header REJECTED BLOCK_REJECTED

ACCUMULATING_METADATA

Collecting YAML frontmatter lines until the metadata section ends.

# Frontmatter is collected until closing delimiter
if line.strip() == "---":  # Frontmatter end
    metadata = parse_yaml(frontmatter_lines)
    state = BlockState.ACCUMULATING_CONTENT
else:
    frontmatter_lines.append(line)

Transitions from ACCUMULATING_METADATA:

Condition Next State Event
Metadata end marker ACCUMULATING_CONTENT -
Invalid YAML REJECTED BLOCK_REJECTED
Stream ends REJECTED BLOCK_REJECTED

ACCUMULATING_CONTENT

Collecting block content lines until an end marker is found.

# Content is accumulated until end marker
if syntax.detect_end(line):
    state = BlockState.CLOSING_DETECTED
elif len(content) > max_block_size:
    state = BlockState.REJECTED
else:
    content_lines.append(line)
    emit(EventType.BLOCK_CONTENT)

Transitions from ACCUMULATING_CONTENT:

Condition Next State Event
End marker found CLOSING_DETECTED -
Max size exceeded REJECTED BLOCK_REJECTED
Regular line ACCUMULATING_CONTENT BLOCK_CONTENT
Stream ends REJECTED BLOCK_REJECTED

CLOSING_DETECTED

An end marker has been found. The complete block is validated.

flowchart TB
    ClosingDetected[CLOSING_DETECTED]

    ClosingDetected --> Parse[Parse block]
    Parse --> Validate{Validation passes?}

    Validate -->|Yes| Completed[COMPLETED]
    Validate -->|No| Rejected[REJECTED]

    Completed --> EmitExtracted[Emit BLOCK_EXTRACTED]
    Rejected --> EmitRejected[Emit BLOCK_REJECTED]

Transitions from CLOSING_DETECTED:

Condition Next State Event
Validation passes COMPLETED BLOCK_EXTRACTED
Validation fails REJECTED BLOCK_REJECTED

COMPLETED

Block successfully extracted. Returns to searching for next block.

# Block is complete and valid
block = Block(metadata=metadata, content=content)
emit(EventType.BLOCK_EXTRACTED, block=block)
state = BlockState.SEARCHING

REJECTED

Block was rejected due to validation failure or other error.

# Block was rejected
rejection = BlockRejection(
    reason=error_code,
    message="Validation failed",
    partial_content=content_so_far,
)
emit(EventType.BLOCK_REJECTED, rejection=rejection)
state = BlockState.SEARCHING

Block Candidate

During accumulation, the processor builds a BlockCandidate:

@dataclass
class BlockCandidate:
    """Represents a block being accumulated."""

    start_line: int          # Line where block started
    header_line: str         # The header line content
    metadata_lines: list[str]  # Frontmatter lines (if any)
    content_lines: list[str]  # Content lines
    syntax: Syntax           # Syntax being used

    @property
    def total_size(self) -> int:
        """Total accumulated size in characters."""
        return sum(len(line) for line in self.content_lines)

Line Accumulation

Text arrives as chunks that may not align with line boundaries:

sequenceDiagram
    participant Stream
    participant Accumulator
    participant StateMachine

    Stream->>Accumulator: "Hello\nWorld"
    Accumulator->>StateMachine: Line: "Hello"
    Accumulator->>StateMachine: Line: "World"

    Stream->>Accumulator: "Partial..."
    Note over Accumulator: Buffer: "Partial..."

    Stream->>Accumulator: "line\nNext"
    Accumulator->>StateMachine: Line: "Partial...line"
    Accumulator->>StateMachine: Line: "Next"
class LineAccumulator:
    """Accumulates chunks into complete lines."""

    def __init__(self):
        self.buffer = ""

    def add_chunk(self, chunk: str) -> list[str]:
        """Add a chunk and return complete lines."""
        self.buffer += chunk
        lines = []

        while "\n" in self.buffer:
            line, self.buffer = self.buffer.split("\n", 1)
            lines.append(line)

        return lines

    def flush(self) -> str | None:
        """Flush remaining buffer at stream end."""
        if self.buffer:
            line = self.buffer
            self.buffer = ""
            return line
        return None

Syntax Detection

Each syntax defines how to detect block boundaries:

Delimiter Preamble

!!block01:task
Do something important
!!end
class DelimiterPreambleSyntax:
    START_PATTERN = re.compile(r"^!!(\w+):(\w+)\s*$")
    END_PATTERN = re.compile(r"^!!end\s*$")

    def detect_start(self, line: str) -> bool:
        return bool(self.START_PATTERN.match(line))

    def detect_end(self, line: str) -> bool:
        return bool(self.END_PATTERN.match(line))

Delimiter Frontmatter

!!block01
---
type: task
priority: high
---
Do something important
!!end

Markdown Frontmatter

```block01
---
type: task
---
Do something important
```

Validation Pipeline

flowchart TB
    subgraph Parse["Parsing Stage"]
        ParseMetadata[Parse Metadata]
        ParseContent[Parse Content]
    end

    subgraph Validate["Validation Stage"]
        ValidateMetadata[Validate Metadata]
        ValidateContent[Validate Content]
        CustomValidation[Custom Validators]
    end

    subgraph Result["Result"]
        Success[Block Created]
        Failure[Rejection Created]
    end

    ParseMetadata --> ValidateMetadata
    ParseContent --> ValidateContent

    ValidateMetadata -->|Pass| CustomValidation
    ValidateContent -->|Pass| CustomValidation
    ValidateMetadata -->|Fail| Failure
    ValidateContent -->|Fail| Failure

    CustomValidation -->|Pass| Success
    CustomValidation -->|Fail| Failure

Metadata Validation

class MetadataValidator:
    """Validates block metadata."""

    def validate(self, metadata: dict) -> ValidationResult:
        # Required fields
        if "id" not in metadata:
            return ValidationResult.failure("Missing required field: id")

        # Type checking
        if not isinstance(metadata.get("type"), str):
            return ValidationResult.failure("Field 'type' must be string")

        return ValidationResult.success()

Content Validation

class ContentValidator:
    """Validates block content."""

    def validate(self, content: str, block_type: str) -> ValidationResult:
        # Type-specific validation
        if block_type == "json":
            try:
                json.loads(content)
            except json.JSONDecodeError as e:
                return ValidationResult.failure(f"Invalid JSON: {e}")

        return ValidationResult.success()

Error Handling

BlockErrorCode

class BlockErrorCode(Enum):
    """Error codes for block rejection."""

    INVALID_HEADER = "invalid_header"
    INVALID_METADATA = "invalid_metadata"
    INVALID_CONTENT = "invalid_content"
    VALIDATION_FAILED = "validation_failed"
    MAX_SIZE_EXCEEDED = "max_size_exceeded"
    UNCLOSED_BLOCK = "unclosed_block"
    UNKNOWN_BLOCK_TYPE = "unknown_block_type"

BlockRejection

@dataclass
class BlockRejection:
    """Information about a rejected block."""

    reason: BlockErrorCode
    message: str
    partial_content: str | None = None
    line_number: int | None = None

Processing Configuration

Configure state machine behavior:

processor = StreamBlockProcessor(
    registry=registry,
    syntax=syntax,
    max_block_size=100_000,       # Max content size
    emit_text_deltas=True,        # Emit TEXT_DELTA events
    emit_block_content=True,      # Emit BLOCK_CONTENT events
    emit_original_events=False,   # Emit original provider events
)

Next Steps