Skip to main content
Hooks let you inject custom logic at four points in the message processing pipeline. Use them for logging, access control, content filtering, analytics, or any cross-cutting concern.

Hook Points

The InterfaceHook Protocol

Implement any combination of these four methods. All are optional — only implement the ones you need:
from definable.interfaces import InterfaceHook, InterfaceMessage, InterfaceResponse, InterfaceSession

class MyHook:
    async def on_message_received(self, message: InterfaceMessage) -> bool | None:
        """Called when a message arrives. Return False to skip it."""
        ...

    async def on_before_respond(
        self, message: InterfaceMessage, session: InterfaceSession
    ) -> InterfaceMessage | None:
        """Called before agent processing. Return a new message to modify it."""
        ...

    async def on_after_respond(
        self, message: InterfaceMessage, response: InterfaceResponse, session: InterfaceSession
    ) -> InterfaceResponse | None:
        """Called after agent produces a response. Return a new response to modify it."""
        ...

    async def on_error(self, error: Exception, message: InterfaceMessage | None) -> None:
        """Called when an error occurs."""
        ...

Adding Hooks

Use .add_hook() for fluent chaining:
from definable.interfaces import TelegramInterface, LoggingHook, AllowlistHook

interface = (
    TelegramInterface(agent=agent, config=config)
    .add_hook(LoggingHook())
    .add_hook(AllowlistHook(allowed_user_ids={"123", "456"}))
)
Or pass them at construction:
interface = TelegramInterface(
    agent=agent,
    config=config,
    hooks=[LoggingHook(), AllowlistHook(allowed_user_ids={"123"})],
)
Hooks run in the order they are added. For on_message_received, if any hook returns False, processing stops and subsequent hooks are skipped.

Built-in Hooks

LoggingHook

Logs incoming messages and errors for observability:
from definable.interfaces import LoggingHook

interface.add_hook(LoggingHook())
Logs on_message_received (user, chat, text preview) and on_error (exception details).

AllowlistHook

Restricts access to a set of user IDs:
from definable.interfaces import AllowlistHook

interface.add_hook(AllowlistHook(allowed_user_ids={"user-123", "user-456"}))
Messages from users not in the set are silently dropped (returns False from on_message_received).

Writing Custom Hooks

Access Control

class BusinessHoursHook:
    """Only respond during business hours."""

    async def on_message_received(self, message):
        from datetime import datetime
        hour = datetime.now().hour
        if hour < 9 or hour >= 17:
            return False  # Silently drop messages outside 9am-5pm
        return None  # Continue processing

Content Filtering

class ContentFilterHook:
    """Filter inappropriate content from responses."""

    async def on_after_respond(self, message, response, session):
        if response.content and contains_inappropriate(response.content):
            from definable.interfaces import InterfaceResponse
            return InterfaceResponse(
                content="I'm sorry, I can't provide that information."
            )
        return None  # Keep original response

Analytics

class AnalyticsHook:
    """Track message and response metrics."""

    def __init__(self):
        self.message_count = 0
        self.error_count = 0

    async def on_message_received(self, message):
        self.message_count += 1
        return None

    async def on_error(self, error, message):
        self.error_count += 1
        await report_to_dashboard(error, message)

Message Enrichment

class ContextHook:
    """Add context to messages before agent processing."""

    async def on_before_respond(self, message, session):
        from definable.interfaces import InterfaceMessage
        user_tier = await lookup_user_tier(message.platform_user_id)
        enriched_text = f"[User tier: {user_tier}] {message.text}"
        return InterfaceMessage(
            platform=message.platform,
            platform_user_id=message.platform_user_id,
            platform_chat_id=message.platform_chat_id,
            platform_message_id=message.platform_message_id,
            text=enriched_text,
            username=message.username,
            images=message.images,
            audio=message.audio,
            metadata=message.metadata,
        )

Error Notification

class SlackAlertHook:
    """Send critical errors to Slack."""

    def __init__(self, webhook_url: str):
        self.webhook_url = webhook_url

    async def on_error(self, error, message):
        import httpx
        user = message.platform_user_id if message else "unknown"
        await httpx.AsyncClient().post(self.webhook_url, json={
            "text": f"Interface error for user {user}: {error}"
        })

Hook Execution Order

When multiple hooks are registered:
  • on_message_received: Runs in order. First False vetoes the message and stops the chain.
  • on_before_respond: Runs in order. If a hook returns a modified message, subsequent hooks see the modified version.
  • on_after_respond: Runs in order. If a hook returns a modified response, subsequent hooks see the modified version.
  • on_error: All hooks run, regardless of whether earlier hooks raised exceptions.