|
| 1 | +from datetime import datetime, timedelta |
| 2 | +from sqlalchemy import select |
| 3 | + |
| 4 | +from program.db import db |
| 5 | +from program.media.item import MediaItem |
| 6 | +from program.media.state import States |
| 7 | +from program.types import Event |
| 8 | +from program.utils.logging import logger |
| 9 | + |
| 10 | + |
| 11 | +class ItemScheduler: |
| 12 | + """Service to schedule items based on their air dates""" |
| 13 | + |
| 14 | + def __init__(self): |
| 15 | + self.initialized = True |
| 16 | + self.processing_delay = timedelta(hours=4) # Delay after air date |
| 17 | + |
| 18 | + def schedule_item(self, item_id: str, aired_at: datetime, event_manager) -> None: |
| 19 | + """Schedule an individual item if it's upcoming""" |
| 20 | + if not aired_at or aired_at <= datetime.now(): |
| 21 | + return |
| 22 | + |
| 23 | + processing_time = aired_at + self.processing_delay |
| 24 | + event = Event(emitted_by=ItemScheduler, item_id=item_id) |
| 25 | + event_manager.schedule_event(event, run_at=processing_time) |
| 26 | + logger.debug(f"Scheduled item {item_id} for processing at {processing_time}") |
| 27 | + |
| 28 | + def schedule_upcoming_items(self, event_manager) -> None: |
| 29 | + """Schedule all upcoming items from the database""" |
| 30 | + with db.Session() as session: |
| 31 | + upcoming_items = session.execute( |
| 32 | + select(MediaItem.id, MediaItem.aired_at) |
| 33 | + .where(MediaItem.type.in_(["movie", "episode"])) |
| 34 | + .where(MediaItem.aired_at > (datetime.now() + self.processing_delay)) |
| 35 | + .where(MediaItem.last_state == States.Unreleased) |
| 36 | + ).scalars().all() |
| 37 | + |
| 38 | + for item_id, aired_at in upcoming_items: |
| 39 | + self.schedule_item(item_id, aired_at, event_manager) |
| 40 | + |
| 41 | + logger.debug(f"Scheduled {len(upcoming_items)} upcoming items") |
0 commit comments