Skip to main content

Basic Execution

result = await workflow.arun("Research AI trends")

result.content       # Final step's content
result.success       # True if all steps succeeded
result.duration_ms   # Total execution time
result.step_outputs  # List[StepOutput] for each step
Workflow.arun() returns a WorkflowOutput, which is different from RunOutput. It includes step-level results.

Accessing Step Results

# By name
research = result.get_step_output("researcher")
print(research.content)
print(research.duration_ms)

# Iterate all steps
for step_output in result.step_outputs:
    print(f"{step_output.step_name}: {step_output.duration_ms:.0f}ms")

Step Context

Each step receives a StepInput with context from previous steps:
from definable.agent.workflow import StepInput

async def custom_step(ctx: StepInput):
    original = ctx.input                         # Original workflow input
    prev = ctx.get_last_step_content()           # Previous step's output
    research = ctx.get_step_content("researcher")  # Specific step by name
    api_key = ctx.session_state.get("api_key")   # Shared session state
    return f"Processed: {research}"

Session State

Share data across steps:
workflow = Workflow(
    name="stateful",
    steps=[...],
    session_state={"api_key": "...", "mode": "production"},
)

# Override per-run
result = await workflow.arun("Go", session_state={"mode": "staging"})

Events

EventWhen
WorkflowRunStartedEventWorkflow begins
WorkflowRunCompletedEventWorkflow completes
WorkflowRunErrorEventWorkflow fails
StepStartedEventA step begins
StepCompletedEventA step completes
StepErrorEventA step fails
StepSkippedEventA step is skipped
LoopIterationEventLoop completes an iteration
from definable.agent.workflow import StepCompletedEvent

workflow.events.on(StepCompletedEvent, lambda e: print(f"{e.step_name}: {e.duration_ms:.0f}ms"))

Imports

from definable.agent.workflow import (
    Workflow, Step, Steps, Parallel, Loop, Condition, Router,
    StepInput, StepOutput, WorkflowOutput,
)