from typing import Any, Optional
from definable.agent.interface import (
BaseInterface,
InterfaceConfig,
InterfaceMessage,
InterfaceResponse,
)
class MyChatConfig(InterfaceConfig):
platform: str = "mychat"
api_token: str = ""
ws_url: str = "wss://chat.example.com/ws"
class MyChatInterface(BaseInterface):
def __init__(self, agent, **kwargs):
super().__init__(agent=agent, **kwargs)
self._ws = None
async def _start_receiver(self) -> None:
"""Connect to the chat platform via WebSocket."""
import websockets
self._ws = await websockets.connect(self.config.ws_url)
# Start receiving messages in background
import asyncio
asyncio.create_task(self._listen())
async def _listen(self) -> None:
"""Read messages from the WebSocket and dispatch them."""
import json
async for raw in self._ws:
event = json.loads(raw)
await self.handle_platform_message(event)
async def _stop_receiver(self) -> None:
"""Disconnect from the chat platform."""
if self._ws:
await self._ws.close()
self._ws = None
async def _convert_inbound(self, raw_message: Any) -> Optional[InterfaceMessage]:
"""Convert a platform event to InterfaceMessage."""
# Skip bot messages
if raw_message.get("is_bot"):
return None
return InterfaceMessage(
platform="mychat",
platform_user_id=raw_message.get("user_id", ""),
platform_chat_id=raw_message.get("room_id", ""),
platform_message_id=raw_message.get("msg_id", ""),
text=raw_message.get("text", ""),
)
async def _send_response(
self,
original_msg: InterfaceMessage,
response: InterfaceResponse,
raw_message: Any,
) -> None:
"""Send the response back to the platform."""
import json
if response.content and self._ws:
await self._ws.send(json.dumps({
"type": "message",
"room_id": original_msg.platform_chat_id,
"text": response.content,
}))