Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Boefjes combined schedulers integration #4015

52 changes: 16 additions & 36 deletions boefjes/boefjes/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,42 +81,28 @@ def run(self, queue_type: WorkerManager.Queue) -> None:
raise

def _fill_queue(self, task_queue: Queue, queue_type: WorkerManager.Queue) -> None:
if task_queue.qsize() > self.settings.pool_size:
time.sleep(self.settings.worker_heartbeat)
return
logger.debug("Popping from queue %s", queue_type.value)

try:
queues = self.scheduler_client.get_queues()
except HTTPError:
# Scheduler is having issues, so make note of it and try again
logger.exception("Getting the queues from the scheduler failed")
time.sleep(self.settings.poll_interval) # But not immediately
response = self.scheduler_client.pop_item(queue_type.value)
except (HTTPError, ValidationError):
logger.exception("Popping task from scheduler failed, sleeping 10 seconds")
time.sleep(10)
return

# We do not target a specific queue since we start one runtime for all organisations
# and queue ids contain the organisation_id
queues = [q for q in queues if q.id.startswith(queue_type.value) and q.size > 0]

logger.debug("Found queues: %s", [queue.id for queue in queues])

all_queues_empty = True

for queue in queues:
logger.debug("Popping from queue %s", queue.id)

try:
p_item = self.scheduler_client.pop_item(queue.id)
except (HTTPError, ValidationError):
logger.exception("Popping task from scheduler failed, sleeping 10 seconds")
time.sleep(10)
continue

if not p_item:
logger.debug("Queue %s empty", queue.id)
continue
# TODO: check
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check what kind of response the scheduler gives when the queue is empty and handle this.

if not response:
logger.debug("Queue %s empty", queue_type.value)
time.sleep(10)
return

all_queues_empty = False
# TODO: check
if response.count == 0:
logger.debug("Queue %s empty", queue_type.value)
time.sleep(10)
return

for p_item in response.results:
logger.info("Handling task[%s]", p_item.data.id)

try:
Expand All @@ -134,12 +120,6 @@ def _fill_queue(self, task_queue: Queue, queue_type: WorkerManager.Queue) -> Non
except HTTPError:
logger.exception("Could not patch scheduler task to %s", TaskStatus.FAILED.value)

raise

if all_queues_empty:
logger.debug("All queues empty, sleeping %f seconds", self.settings.poll_interval)
time.sleep(self.settings.poll_interval)

def _check_workers(self) -> None:
new_workers = []

Expand Down
25 changes: 20 additions & 5 deletions boefjes/boefjes/clients/scheduler_client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import datetime
import uuid
from enum import Enum
from typing import Any

from httpx import Client, HTTPTransport, Response
from pydantic import BaseModel, TypeAdapter
Expand Down Expand Up @@ -29,7 +30,8 @@ class TaskStatus(Enum):
class Task(BaseModel):
id: uuid.UUID
scheduler_id: str
schedule_id: str | None
schedule_id: uuid.UUID | None = None
organisation: str
priority: int
status: TaskStatus
type: str
Expand All @@ -39,6 +41,13 @@ class Task(BaseModel):
modified_at: datetime.datetime


class PaginatedTasksResponse(BaseModel):
count: int
next: str | None = None
previous: str | None = None
results: list[Task]


class SchedulerClientInterface:
def get_queues(self) -> list[Queue]:
raise NotImplementedError()
Expand Down Expand Up @@ -72,14 +81,20 @@ def get_queues(self) -> list[Queue]:

return TypeAdapter(list[Queue]).validate_json(response.content)

def pop_item(self, queue_id: str) -> Task | None:
response = self._session.post(f"/queues/{queue_id}/pop")
def pop_item(self, scheduler_id: str) -> PaginatedTasksResponse | None:
response = self._session.post(f"/schedulers/{scheduler_id}/pop?limit=1")
self._verify_response(response)

return TypeAdapter(PaginatedTasksResponse | None).validate_json(response.content)

def pop_items(self, scheduler_id: str, filters: dict[str, Any]) -> PaginatedTasksResponse | None:
response = self._session.post(f"/schedulers/{scheduler_id}/pop", json=filters)
self._verify_response(response)

return TypeAdapter(Task | None).validate_json(response.content)
return TypeAdapter(PaginatedTasksResponse | None).validate_json(response.content)

def push_item(self, p_item: Task) -> None:
response = self._session.post(f"/queues/{p_item.scheduler_id}/push", content=p_item.model_dump_json())
response = self._session.post(f"/schedulers/{p_item.scheduler_id}/push", content=p_item.model_dump_json())
self._verify_response(response)

def patch_task(self, task_id: uuid.UUID, status: TaskStatus) -> None:
Expand Down
Loading