Skip to main content
The AgentLoop is the unified async generator that powers both arun() (non-streaming) and arun_stream() (streaming). It yields RunOutputEvent instances throughout execution, handling tool dispatch, cancellation, human-in-the-loop pauses, and event emission.

How It Works

Both arun() and arun_stream() delegate to the same AgentLoop.run() generator:
  • arun() collects all events and builds a RunOutput at the end.
  • arun_stream() yields events directly to the caller as they happen.

Parallel Tool Calls

When the model returns multiple tool calls in a single response, the loop executes them in parallel using asyncio.gather. This is the default behavior.
from definable.agent import Agent
from definable.tool.decorator import tool

@tool
def search_web(query: str) -> str:
  """Search the web."""
  return f"Results for {query}"

@tool
def search_db(query: str) -> str:
  """Search the database."""
  return f"DB results for {query}"

agent = Agent(model="openai/gpt-4o", tools=[search_web, search_db])

# If the model calls both tools at once, they run in parallel
output = await agent.arun("Find info about Python both on the web and in our database")

Opting Out of Parallel Execution

Mark a tool as sequential=True to force it to run one-at-a-time, even when other tools in the same batch run in parallel. Use this for tools with side effects that depend on execution order.
from definable.tool.function import Function

@tool
def write_file(path: str, content: str) -> str:
  """Write content to a file."""
  with open(path, "w") as f:
    f.write(content)
  return f"Wrote to {path}"

# Mark as sequential — never runs in parallel with other tools
write_file.sequential = True
When a batch contains both parallel and sequential tools, the loop runs all parallel tools first via asyncio.gather, then runs sequential tools one at a time.

Human-in-the-Loop (HITL)

Mark a tool with requires_confirmation=True to pause the run before executing it. The agent yields a RunPausedEvent and waits for the caller to resolve the requirement.
@tool
def send_email(to: str, subject: str, body: str) -> str:
  """Send an email."""
  return f"Email sent to {to}"

# Require human confirmation before sending
send_email.requires_confirmation = True

agent = Agent(model="openai/gpt-4o", tools=[send_email])
output = await agent.arun("Send an email to [email protected] about the meeting")

# The run is paused — the tool has not executed yet
if output.is_paused:
  # Inspect what the agent wants to do
  for req in output.requirements:
    print(f"Tool: {req.tool_execution.tool_name}")
    print(f"Args: {req.tool_execution.tool_args}")

    # Approve it
    req.confirm()

  # Resume the run — the tool executes and the loop continues
  output = await agent.continue_run(run_output=output)
  print(output.content)

Rejecting a Tool Call

if output.is_paused:
  for req in output.requirements:
    req.reject()  # Tool will not execute

  output = await agent.continue_run(run_output=output)

CancellationToken

Use a CancellationToken to cooperatively cancel a running agent from another coroutine or thread.
import asyncio
from definable.agent import Agent
from definable.agent.cancellation import CancellationToken, AgentCancelled

agent = Agent(model="openai/gpt-4o", tools=[])
token = CancellationToken()

async def cancel_after(seconds: float):
  await asyncio.sleep(seconds)
  token.cancel()

try:
  # Start cancellation timer and agent run concurrently
  asyncio.create_task(cancel_after(5.0))
  output = await agent.arun("Do a long research task", cancellation_token=token)
except AgentCancelled:
  print("Run was cancelled")
The loop checks token.raise_if_cancelled() at safe points — before each model call and before each tool execution. Cancellation is cooperative: the loop finishes its current atomic operation before raising AgentCancelled.

CancellationToken Reference

cancel()
method
Request cancellation. Thread-safe (single bool write).
is_cancelled
bool
Whether cancellation has been requested.
raise_if_cancelled()
method
Raises AgentCancelled if cancellation was requested.

EventBus

The EventBus lets you register callbacks for any event type emitted during a run. The loop calls await bus.emit(event) for every event, so your handlers run inline.
from definable.agent import Agent
from definable.agent.events import RunCompletedEvent, ToolCallStartedEvent

agent = Agent(model="openai/gpt-4o", tools=[])

@agent.events.on(RunCompletedEvent)
async def on_complete(event):
  print(f"Run finished: {event.content[:100]}")

@agent.events.on(ToolCallStartedEvent)
async def on_tool_start(event):
  print(f"Calling tool: {event.tool.tool_name}")

output = await agent.arun("Hello")

Direct Registration

# Register without decorator
async def my_handler(event):
  print(f"Event: {event}")

agent.events.on(RunCompletedEvent, my_handler)

# Unregister
agent.events.off(RunCompletedEvent, my_handler)

Available Events

EventEmitted when
RunStartedEventThe agentic loop begins
RunContentEventA content chunk is produced (streaming)
RunContentCompletedEventAll content chunks for one model call are done
RunCompletedEventThe loop finishes with a final response
RunPausedEventA HITL tool pauses the run
RunContinuedEventA paused run is resumed
RunErrorEventAn error occurs during the run
RunCancelledEventThe run is cancelled via CancellationToken
ToolCallStartedEventA tool call begins
ToolCallCompletedEventA tool call finishes
ToolCallErrorEventA tool call raises an exception
ReasoningStartedEventThe thinking phase begins
ReasoningStepEventA reasoning step is produced
ReasoningCompletedEventThe thinking phase finishes
KnowledgeRetrievalStartedEventKnowledge retrieval begins
KnowledgeRetrievalCompletedEventKnowledge retrieval finishes
MemoryRecallStartedEventMemory recall begins
MemoryRecallCompletedEventMemory recall finishes
MemoryUpdateStartedEventMemory update begins
MemoryUpdateCompletedEventMemory update finishes
ModelCallStartedEventA model call is about to execute (includes messages snapshot, tools, model_id)
ModelCallCompletedEventA model call returned (includes content, tool_calls, metrics)
PhaseStartedEventA pipeline phase begins execution
PhaseCompletedEventA pipeline phase finishes execution
SubAgentSpawnedEventA sub-agent is spawned by the parent agent
SubAgentCompletedEventA sub-agent completes successfully
SubAgentFailedEventA sub-agent fails with an error
All events are importable from definable.agent.events.

stop_after_tool_call

Set stop_after_tool_call=True on a tool to make the loop stop immediately after executing it, without making another model call. This is useful for tools that produce a final result that should be returned directly.
@tool
def generate_report(data: str) -> str:
  """Generate a formatted report."""
  return f"# Report\n\n{data}"

# Loop stops after this tool — no follow-up model call
generate_report.stop_after_tool_call = True

agent = Agent(model="openai/gpt-4o", tools=[generate_report])
output = await agent.arun("Generate a report from the sales data")

Streaming

The loop yields events that arun_stream() passes through directly:
async for event in agent.arun_stream("Summarize this article"):
  match event.event:
    case "RunStarted":
      print("Started...")
    case "ToolCallStarted":
      print(f"Calling: {event.tool.tool_name}")
    case "ToolCallCompleted":
      print(f"Result: {event.content[:80]}")
    case "RunContent":
      print(event.content, end="")
    case "RunCompleted":
      print("\nDone.")