The Scheduler manages timed agent execution. Register triggers (interval, one-shot, cron) as jobs, and the scheduler fires them on schedule with concurrency control and persistence.
Quick Start
import asyncio
from definable.agent import Agent
from definable.agent.trigger import Interval, OneShot, TriggerExecutor
from definable.agent.scheduler import Scheduler
agent = Agent(model="gpt-4o", instructions="You are a helpful assistant.")
# Create scheduler
scheduler = Scheduler(tick_interval=1.0, max_concurrent=5)
# Add jobs
async def on_tick(event):
return "Summarize today's news" # Runs agent.arun() with this prompt
interval = Interval(seconds=60)
interval.handler = on_tick
scheduler.add(interval, name="news-summary")
# Run
async def main():
executor = TriggerExecutor(agent)
task = asyncio.create_task(scheduler.start(executor))
await asyncio.sleep(300) # Run for 5 minutes
scheduler.stop()
await task
asyncio.run(main())
Triggers
Interval
Fire at regular intervals:
from definable.agent.trigger import Interval
trigger = Interval(seconds=60) # Every 60 seconds
trigger.handler = my_handler
Interval between executions in seconds. Must be greater than 0.
OneShot
Fire exactly once, then stop:
from definable.agent.trigger import OneShot
# Fire in 30 seconds
trigger = OneShot(delay=30)
# Or fire at a specific time
trigger = OneShot(fire_at=1700000000.0)
Seconds from now until fire. Use either delay or fire_at.
Absolute Unix timestamp to fire at. Use either delay or fire_at.
Handler Return Values
Handlers control what happens when a trigger fires:
| Return value | Behavior |
|---|
None | No-op |
str | Calls agent.arun(str) |
dict | Calls agent.arun(**dict) |
awaitable | Awaits the result and processes recursively |
async def handler(event):
return "Write a daily summary" # → agent.arun("Write a daily summary")
async def handler_with_params(event):
return {"instruction": "Summarize", "user_id": "system"}
Scheduler
Constructor
scheduler = Scheduler(
store=None, # Job persistence (default: InMemoryJobStore)
tick_interval=1.0, # Check for due jobs every N seconds
max_concurrent=10, # Max concurrent job executions
)
Job Management
# Add a job
job = scheduler.add(trigger, name="my-job", max_runs=100)
# List jobs
all_jobs = scheduler.list_jobs()
active_jobs = scheduler.list_jobs(status=JobStatus.ACTIVE)
# Control jobs
scheduler.pause(job.job_id)
scheduler.resume(job.job_id)
scheduler.cancel(job.job_id)
scheduler.remove(job.job_id)
Callbacks
scheduler.on_job_started = lambda job: print(f"Started: {job.name}")
scheduler.on_job_completed = lambda job: print(f"Done: {job.name}")
scheduler.on_job_failed = lambda job, err: print(f"Failed: {job.name}: {err}")
Job Lifecycle
PENDING → ACTIVE → COMPLETED (max_runs reached)
→ PAUSED (manual)
→ CANCELLED (manual)
| Status | Fires? | Description |
|---|
pending | No | Initial state before activation |
active | Yes | Ready to fire on schedule |
paused | No | Manually paused |
completed | No | Finished (max_runs reached or OneShot fired) |
cancelled | No | Manually cancelled |
Persistence
InMemoryJobStore (default)
Jobs are lost when the process exits.
from definable.agent.scheduler import Scheduler, InMemoryJobStore
scheduler = Scheduler(store=InMemoryJobStore())
SQLiteJobStore
Persist jobs across restarts:
from definable.agent.scheduler import Scheduler, SQLiteJobStore
store = SQLiteJobStore(".definable/scheduler.db")
await store.initialize() # REQUIRED before use
scheduler = Scheduler(store=store)
SQLiteJobStore must be initialized with await store.initialize() before the scheduler starts. Trigger objects are not serialized — they must be re-attached when restoring jobs.
Agent Integration
Agents with triggers auto-create a scheduler:
agent = Agent(
model="gpt-4o",
triggers=[Interval(seconds=60, handler=my_handler)],
)
# Auto-detected scheduler
scheduler = agent.scheduler # Scheduler instance or None
When using agent.serve(), the scheduler starts automatically alongside interfaces, webhooks, and HTTP endpoints.
Imports
from definable.agent.trigger import Interval, OneShot, BaseTrigger, TriggerEvent, TriggerExecutor
from definable.agent.scheduler import Scheduler, ScheduledJob, JobStatus
from definable.agent.scheduler import InMemoryJobStore, SQLiteJobStore