Skip to content

Basic Stage Usage

Stages are the programmable building blocks of Draive pipelines. Every Stage receives a StageState (which encapsulates the active context and latest result), performs its work, and returns a new state. This guide walks through the most common operations so you can assemble reliable pipelines with confidence.

Core Concepts

  • Stage – an async callable that produces or transforms multimodal content.
  • StageState – an immutable snapshot holding context entries and the latest result. Always return state.updated(...) instead of mutating in place.
  • MultimodalContent – container for text, images, artifacts, and resources used as model inputs or outputs.
  • ctx.scope(...) – binds providers, disposables, and logging/metrics for the lifetime of your pipeline. All observability flows through ctx.

Creating Your First Completion

from draive import Stage

basic_stage = Stage.completion(
    "Explain quantum computing",
    instructions="Keep the answer concise and friendly.",
)

result_state = await basic_stage.execute()
summary = result_state.result.as_string()

Stage.completion wires a model call, applies optional instructions, and returns a StageState whose result contains the generated content. Use .execute() for quick experiments or tests; in production you typically compose stages into larger flows.

Adding Tools and Structured Output

from draive import Stage, tool

@tool
async def calculate(a: int, b: int) -> int:
    """Add two numbers together."""
    return a + b

tooling_stage = Stage.completion(
    "What is 15 + 27?",
    instructions="Use the provided calculator tool.",
    tools=[calculate],
)

structured_stage = Stage.completion(
    "List three programming languages",
    instructions="Return a JSON array of language names.",
    output="json",
)

Provide the tools the model is allowed to call, and use output when you need structured responses (for example, "json" or "yaml").

Working with Static or Prompted Content

from draive import Stage

static_stage = Stage.predefined(
    "System: Processing user request...",
    "User input received.",
)

async def get_user_query() -> str:
    return "What's the weather like today?"

prompted_stage = Stage.prompting_completion(
    get_user_query,
    instructions="Answer clearly and reference the city if provided.",
)

loopback_stage = Stage.loopback_completion(
    instructions="Polish the previous response for clarity.",
)

Stage.predefined injects fixed conversation turns. Prompting and loopback stages fetch new input at runtime and feed it into subsequent completions.

Transforming Context and Results

from draive import ModelInput, MultimodalContent, Stage

transform_result_stage = Stage.transform_result(
    lambda content: MultimodalContent.of("Transformed: ", content),
)

transform_context_stage = Stage.transform_context(
    lambda context: context + (ModelInput.of("Additional context"),),
)

Use transformers when you need to adjust only the result or the conversation context. Because StageState is immutable, each transformation returns a fresh copy.

Looping Until a Condition Is Met

from draive import Stage, StageState

async def should_continue(*, state: StageState, iteration: int) -> bool:
    return iteration < 3 and "done" not in state.result.as_string().lower()

loop_stage = Stage.loop(
    Stage.completion("Refine the analysis further."),
    condition=should_continue,
    mode="post_check",  # Evaluate the condition after each iteration
)

Stage.loop repeatedly executes a stage while condition stays true. The iteration argument lets you cap retries or break on custom signals.

Sequencing Stages into Pipelines

from draive import Stage

analyze_stage = Stage.completion(
    "User behaviour dataset...",
    instructions="Identify key engagement patterns.",
)

summarize_stage = Stage.completion(
    "Summarize the analysis in 2-3 sentences.",
)

format_stage = Stage.completion(
    "Format the summary as bullet points.",
)

pipeline = Stage.sequence(
    analyze_stage,
    summarize_stage,
    format_stage,
)

final_state = await pipeline.execute()

Stage.sequence runs each stage in order, feeding the updated state forward. You can nest sequences to build larger flows.

Routing, Concurrency, and Merging

from draive import Stage

analysis_stage = Stage.completion(
    "Perform detailed analysis.",
    instructions="Go deep on the data.",
).with_meta(
    name="detailed_analysis",
    description="Full analysis of the input dataset.",
)

summary_stage = Stage.completion(
    "Create a brief summary.",
    instructions="Highlight only the top three insights.",
).with_meta(
    name="quick_summary",
    description="Lightweight overview for dashboards.",
)

router_stage = Stage.router(
    analysis_stage,
    summary_stage,
    # routing=custom_router,  # Optionally provide your own routing logic
)
from collections.abc import Sequence

from draive import MultimodalContent, Stage, StageState
from draive.stages.types import StageException

async def merge_results(
    branches: Sequence[StageState | StageException],
) -> StageState:
    successful = [branch for branch in branches if isinstance(branch, StageState)]
    combined = MultimodalContent.of(*[state.result for state in successful])
    return successful[0].updated(result=combined)

concurrent_stage = Stage.concurrent(
    Stage.completion("Analyze aspect A."),
    Stage.completion("Analyze aspect B."),
    Stage.completion("Analyze aspect C."),
    merge=merge_results,
)

Routers pick the most appropriate stage at runtime, while Stage.concurrent fans out work and merges the resulting states.

Conditional, Cached, and Resilient Stages

from draive import Stage, StageState

async def needs_analysis(*, state: StageState) -> bool:
    return "analysis" not in state.result.as_string().lower()

conditional_stage = Stage.completion("Prepare detailed analysis...").when(
    condition=needs_analysis,
    alternative=Stage.completion("Skip analysis and move on."),
)

cached_stage = Stage.completion("Perform complex analysis.").cached(
    limit=10,
    expiration=3600,
)

resilient_stage = Stage.completion("Call external API.").with_retry(
    limit=3,
    delay=1.0,
    catching=Exception,
)

when toggles execution dynamically, .cached(...) stores results, and .with_retry(...) protects against transient failures.

Advanced Context Management

from draive import Stage, tool

@tool
async def dummy_tool(text: str) -> str:
    return text.upper()

trimmed_stage = Stage.sequence(
    Stage.trim_context(limit=4),
    Stage.completion("Process with limited context."),
)

clean_stage = Stage.sequence(
    Stage.completion("Process with tools.", tools=[dummy_tool]),
    Stage.strip_context_tools(),
)

volatile_stage = Stage.completion(
    "Process data without persisting context.",
).with_volatile_context()

ignored_stage = Stage.completion(
    "Generate intermediate data.",
).ignore_result()

extending_stage = Stage.completion(
    "Append additional insight.",
).extend_result()

These helpers trim, clean, or discard context and results so downstream stages only see what they need.

Building Composite Behaviour

from draive import Stage

composite_stage = (
    Stage.completion("Analyze and process data.")
    .with_meta(name="data_processor", description="Main processing stage")
    .cached(limit=5)
    .with_retry(limit=2)
    .traced(label="data_processing")
    .when(lambda *, state: len(state.result.as_string()) > 10)
)

You can chain modifiers to produce sophisticated behaviour from a single stage definition.

Defining Custom Stages

from draive import MultimodalContent, StageState, stage

@stage
async def custom_processor(*, state: StageState) -> StageState:
    processed = MultimodalContent.of(f"Processed: {state.result.as_string()}")
    return state.updated(result=processed)

Any async function decorated with @stage gains the same API as built-in stages and can be mixed freely with them.

End-to-End Example

from draive import ctx, Stage, tool
from draive.openai import OpenAI, OpenAIResponsesConfig

@tool
async def word_count(text: str) -> int:
    """Count words in text."""
    return len(text.split())

async def process_document(document: str) -> str:
    async with ctx.scope(
        "document_processor",
        OpenAIResponsesConfig(model="gpt-4o"),
        disposables=(OpenAI(),),
    ):
        pipeline = Stage.sequence(
            Stage.completion(
                document,
                instructions="Analyze the document structure and content.",
            ).with_meta(name="analyzer", description="Document analysis"),
            Stage.completion(
                "Add word count information using the tool.",
                tools=[word_count],
            ).when(
                condition=lambda *, state: len(state.result.as_string()) > 100,
            ),
            Stage.completion("Create a concise summary.").cached(limit=10),
        ).with_retry(limit=2).traced(label="document_pipeline")

        result_state = await pipeline.execute()
        ctx.log_info(
            "Generated document summary",
            summary=result_state.result.as_string(),
        )
        return result_state.result.as_string()

This pipeline analyzes a document, augments it with tool output when needed, summarizes the result, and records the outcome for observability. Combine these patterns to compose the exact behaviour your application needs.