Middleware lets you wrap agent execution with reusable logic such as logging, retries, metrics collection, and custom pre/post-processing. Middleware composes cleanly and executes in a predictable order.
How Middleware Works
Middleware wraps the agent’s core execution. Each middleware receives a RunContext and a next_handler function, and decides what to do before and after calling next_handler:
The response travels back through the same chain in reverse order.
The last middleware added is the outermost wrapper.
Using Middleware
Add middleware with the .use() method:
from definable.agents import Agent, LoggingMiddleware, RetryMiddleware, MetricsMiddleware
from definable.models import OpenAIChat
import logging
logger = logging.getLogger("my_agent")
agent = (
Agent(
model=OpenAIChat(id="gpt-4o"),
instructions="You are a helpful assistant.",
)
.use(LoggingMiddleware(logger))
.use(RetryMiddleware(max_retries=3))
.use(MetricsMiddleware())
)
output = agent.run("Hello!")
Built-in Middleware
LoggingMiddleware
Logs the start, completion, and any errors for each run.
from definable.agents import LoggingMiddleware
import logging
logger = logging.getLogger("my_agent")
agent.use(LoggingMiddleware(logger, level=logging.INFO))
The Python logger instance to write to.
level
int
default:"logging.INFO"
Log level for normal events. Errors are always logged at ERROR.
RetryMiddleware
Retries the entire run on transient errors with exponential backoff.
from definable.agents import RetryMiddleware
agent.use(RetryMiddleware(
max_retries=3,
backoff_base=1.0,
backoff_max=60.0,
))
Maximum number of retry attempts.
Base delay in seconds (doubles on each retry).
Maximum backoff delay in seconds.
Retries are triggered for ConnectionError, TimeoutError, and OSError.
MetricsMiddleware
Collects timing and count metrics across runs.
from definable.agents import MetricsMiddleware
metrics_mw = MetricsMiddleware()
agent.use(metrics_mw)
# After running the agent
output = agent.run("Hello!")
print(f"Total runs: {metrics_mw.run_count}")
print(f"Total errors: {metrics_mw.error_count}")
print(f"Avg latency (ms): {metrics_mw.average_latency_ms:.1f}")
KnowledgeMiddleware
Automatically retrieves relevant documents from a knowledge base and injects them into the agent’s context. See Agent Integration for details.
Writing Custom Middleware
Implement the Middleware protocol — a callable that accepts context and next_handler:
from definable.agents import Middleware
from definable.run import RunContext, RunOutput
class TimingMiddleware:
"""Measures and prints execution time for each run."""
async def __call__(self, context: RunContext, next_handler) -> RunOutput:
import time
start = time.perf_counter()
result = await next_handler(context)
elapsed = time.perf_counter() - start
print(f"Run {context.run_id} took {elapsed:.2f}s")
return result
agent.use(TimingMiddleware())
Modifying Context
Middleware can modify the RunContext before passing it to the next handler:
class InjectMetadataMiddleware:
async def __call__(self, context, next_handler):
context.metadata = context.metadata or {}
context.metadata["source"] = "web_app"
context.metadata["version"] = "2.1"
return await next_handler(context)
Error Handling
Middleware can catch and handle errors:
class ErrorNotificationMiddleware:
async def __call__(self, context, next_handler):
try:
return await next_handler(context)
except Exception as e:
notify_team(f"Agent error: {e}")
raise
Execution Order
Middleware executes in reverse registration order (last added = outermost):
agent.use(A()) # Innermost — runs closest to the agent core
agent.use(B()) # Middle
agent.use(C()) # Outermost — runs first on request, last on response
Request flow: C → B → A → Agent Core
Response flow: Agent Core → A → B → C
This means if you want retries to wrap logging, add logging first, then retries.