Skip to content

Commit 75d1102

Browse files
committed
feat: add cron trigger
1 parent ccb2a5e commit 75d1102

File tree

2 files changed

+26
-15
lines changed

2 files changed

+26
-15
lines changed

openagent/agent/config.py

+5-4
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,12 @@ def validate_celery_urls(cls, v, values):
3535

3636

3737
class TaskConfig(BaseModel):
38-
interval: int = Field(description="Interval in seconds between task executions")
39-
delay_variation: int = Field(
38+
interval: Optional[int] = Field(default=None, description="Interval in seconds between task executions")
39+
delay_variation: Optional[int] = Field(
4040
default=0, description="Maximum random delay in seconds to add to the interval"
4141
)
4242
query: str
43+
cron: Optional[str] = Field(default=None, description="Cron expression for scheduling tasks")
4344
schedule: SchedulerConfig = Field(
4445
default_factory=lambda: SchedulerConfig(type="local"),
4546
description="Scheduler configuration for this task",
@@ -48,14 +49,14 @@ class TaskConfig(BaseModel):
4849
@classmethod
4950
@field_validator("interval")
5051
def validate_interval(cls, v):
51-
if v < 1:
52+
if v is not None and v < 1:
5253
raise ValueError("Interval must be greater than 1 second")
5354
return v
5455

5556
@classmethod
5657
@field_validator("delay_variation")
5758
def validate_delay_variation(cls, v):
58-
if v < 0:
59+
if v is not None and v < 0:
5960
raise ValueError("Delay variation must be non-negative")
6061
return v
6162

openagent/agent/scheduler.py

+21-11
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from celery.beat import PersistentScheduler
99
from apscheduler.schedulers.asyncio import AsyncIOScheduler
1010
from apscheduler.triggers.interval import IntervalTrigger
11+
from apscheduler.triggers.cron import CronTrigger
1112

1213
from openagent.agent.config import TaskConfig
1314

@@ -32,24 +33,33 @@ def init_scheduled_tasks(self, tasks_config: dict, task_runner):
3233
if task_config.schedule.type == "queue":
3334
self._init_celery_task(task_id, task_config, task_runner)
3435
else:
35-
# Default to local scheduler
3636
async def task_wrapper(query, delay_variation):
3737
if delay_variation > 0:
3838
delay = random.uniform(0, delay_variation)
3939
logger.debug(f"Task '{task_id}' sleeping for {delay} seconds")
4040
await asyncio.sleep(delay)
4141
await task_runner(query)
4242

43-
self.scheduler.add_job(
44-
func=task_wrapper,
45-
trigger=IntervalTrigger(seconds=task_config.interval),
46-
args=[task_config.query, task_config.delay_variation],
47-
id=task_id,
48-
name=f"Task_{task_id}",
49-
)
50-
logger.info(
51-
f"Scheduled local task '{task_id}' with interval: {task_config.interval} seconds and delay variation: {task_config.delay_variation} seconds"
52-
)
43+
if task_config.cron:
44+
self.scheduler.add_job(
45+
func=task_wrapper,
46+
trigger=CronTrigger.from_crontab(task_config.cron),
47+
args=[task_config.query, task_config.delay_variation],
48+
id=task_id,
49+
name=f"Task_{task_id}",
50+
)
51+
logger.info(f"Scheduled cron task '{task_id}' with cron expression: {task_config.cron}")
52+
else:
53+
self.scheduler.add_job(
54+
func=task_wrapper,
55+
trigger=IntervalTrigger(seconds=task_config.interval),
56+
args=[task_config.query, task_config.delay_variation],
57+
id=task_id,
58+
name=f"Task_{task_id}",
59+
)
60+
logger.info(
61+
f"Scheduled local task '{task_id}' with interval: {task_config.interval} seconds and delay variation: {task_config.delay_variation} seconds"
62+
)
5363

5464
# Start the local scheduler if we have any local tasks
5565
if any(task.schedule.type == "local" for task in tasks_config.values()):

0 commit comments

Comments
 (0)