> ## Documentation Index
> Fetch the complete documentation index at: https://docs.definable.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# Middleware

> Intercept, modify, and extend agent execution with composable middleware.

Middleware lets you wrap agent execution with reusable logic such as logging, retries, metrics collection, and custom pre/post-processing. Middleware composes cleanly and executes in a predictable order.

## How Middleware Works

Middleware wraps the agent's core execution. Each middleware receives a `RunContext` and a `next_handler` function, and decides what to do before and after calling `next_handler`:

```mermaid theme={null}
flowchart LR
  Request --> RetryMW["RetryMiddleware"] --> LoggingMW["LoggingMiddleware"] --> AgentCore["Agent Core"] --> Response
```

The response travels back through the same chain in reverse order.

The last middleware added is the outermost wrapper.

## Using Middleware

Add middleware with the `.use()` method:

```python theme={null}
from definable.agent import Agent, LoggingMiddleware, RetryMiddleware, MetricsMiddleware
from definable.model import OpenAIChat
import logging

logger = logging.getLogger("my_agent")

agent = (
    Agent(
        model=OpenAIChat(id="gpt-4o"),
        instructions="You are a helpful assistant.",
    )
    .use(LoggingMiddleware(logger))
    .use(RetryMiddleware(max_retries=3))
    .use(MetricsMiddleware())
)

output = agent.run("Hello!")
```

## Built-in Middleware

### LoggingMiddleware

Logs the start, completion, and any errors for each run.

```python theme={null}
from definable.agent import LoggingMiddleware
import logging

logger = logging.getLogger("my_agent")
agent.use(LoggingMiddleware(logger, level=logging.INFO))
```

<ParamField path="logger" type="logging.Logger" required>
  The Python logger instance to write to.
</ParamField>

<ParamField path="level" type="int" default="logging.INFO">
  Log level for normal events. Errors are always logged at `ERROR`.
</ParamField>

### RetryMiddleware

Retries the entire run on transient errors with exponential backoff.

```python theme={null}
from definable.agent import RetryMiddleware

agent.use(RetryMiddleware(
    max_retries=3,
    backoff_base=1.0,
    backoff_max=60.0,
))
```

<ParamField path="max_retries" type="int" default="3">
  Maximum number of retry attempts.
</ParamField>

<ParamField path="backoff_base" type="float" default="1.0">
  Base delay in seconds (doubles on each retry).
</ParamField>

<ParamField path="backoff_max" type="float" default="60.0">
  Maximum backoff delay in seconds.
</ParamField>

Retries are triggered for `ConnectionError`, `TimeoutError`, and `OSError`.

### MetricsMiddleware

Collects timing and count metrics across runs.

```python theme={null}
from definable.agent import MetricsMiddleware

metrics_mw = MetricsMiddleware()
agent.use(metrics_mw)

# After running the agent
output = agent.run("Hello!")

print(f"Total runs:       {metrics_mw.run_count}")
print(f"Total errors:     {metrics_mw.error_count}")
print(f"Avg latency (ms): {metrics_mw.average_latency_ms:.1f}")
```

### KnowledgeMiddleware

Automatically retrieves relevant documents from a knowledge base and injects them into the agent's context. See [Agent Integration](/knowledge/agent-integration) for details.

## Writing Custom Middleware

Implement the `Middleware` protocol — a callable that accepts `context` and `next_handler`:

```python theme={null}
from definable.agent import Middleware
from definable.agent.run import RunContext
from definable.agent.events import RunOutput

class TimingMiddleware:
    """Measures and prints execution time for each run."""

    async def __call__(self, context: RunContext, next_handler) -> RunOutput:
        import time
        start = time.perf_counter()

        result = await next_handler(context)

        elapsed = time.perf_counter() - start
        print(f"Run {context.run_id} took {elapsed:.2f}s")
        return result

agent.use(TimingMiddleware())
```

### Modifying Context

Middleware can modify the `RunContext` before passing it to the next handler:

```python theme={null}
class InjectMetadataMiddleware:
    async def __call__(self, context, next_handler):
        context.metadata = context.metadata or {}
        context.metadata["source"] = "web_app"
        context.metadata["version"] = "2.1"
        return await next_handler(context)
```

### Error Handling

Middleware can catch and handle errors:

```python theme={null}
class ErrorNotificationMiddleware:
    async def __call__(self, context, next_handler):
        try:
            return await next_handler(context)
        except Exception as e:
            notify_team(f"Agent error: {e}")
            raise
```

## Execution Order

Middleware executes in **reverse registration order** (last added = outermost):

```python theme={null}
agent.use(A())  # Innermost — runs closest to the agent core
agent.use(B())  # Middle
agent.use(C())  # Outermost — runs first on request, last on response
```

Request flow: `C → B → A → Agent Core`

Response flow: `Agent Core → A → B → C`

This means if you want retries to wrap logging, add logging first, then retries.
