Basic Execution
result = await workflow.arun("Research AI trends")
result.content # Final step's content
result.success # True if all steps succeeded
result.duration_ms # Total execution time
result.step_outputs # List[StepOutput] for each step
Workflow.arun() returns a WorkflowOutput, which is different from RunOutput. It includes step-level results.
Accessing Step Results
# By name
research = result.get_step_output("researcher")
print(research.content)
print(research.duration_ms)
# Iterate all steps
for step_output in result.step_outputs:
print(f"{step_output.step_name}: {step_output.duration_ms:.0f}ms")
Step Context
Each step receives a StepInput with context from previous steps:
from definable.agent.workflow import StepInput
async def custom_step(ctx: StepInput):
original = ctx.input # Original workflow input
prev = ctx.get_last_step_content() # Previous step's output
research = ctx.get_step_content("researcher") # Specific step by name
api_key = ctx.session_state.get("api_key") # Shared session state
return f"Processed: {research}"
Session State
Share data across steps:
workflow = Workflow(
name="stateful",
steps=[...],
session_state={"api_key": "...", "mode": "production"},
)
# Override per-run
result = await workflow.arun("Go", session_state={"mode": "staging"})
Events
| Event | When |
|---|
WorkflowRunStartedEvent | Workflow begins |
WorkflowRunCompletedEvent | Workflow completes |
WorkflowRunErrorEvent | Workflow fails |
StepStartedEvent | A step begins |
StepCompletedEvent | A step completes |
StepErrorEvent | A step fails |
StepSkippedEvent | A step is skipped |
LoopIterationEvent | Loop completes an iteration |
from definable.agent.workflow import StepCompletedEvent
workflow.events.on(StepCompletedEvent, lambda e: print(f"{e.step_name}: {e.duration_ms:.0f}ms"))
Imports
from definable.agent.workflow import (
Workflow, Step, Steps, Parallel, Loop, Condition, Router,
StepInput, StepOutput, WorkflowOutput,
)