Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Boefjes combined schedulers integration #4015

67 changes: 20 additions & 47 deletions boefjes/boefjes/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,60 +85,33 @@ def _fill_queue(self, task_queue: Queue, queue_type: WorkerManager.Queue) -> Non
time.sleep(self.settings.worker_heartbeat)
return

logger.debug("Popping from queue %s", queue_type.value)

jpbruinsslot marked this conversation as resolved.
Show resolved Hide resolved
try:
queues = self.scheduler_client.get_queues()
except HTTPError:
# Scheduler is having issues, so make note of it and try again
logger.exception("Getting the queues from the scheduler failed")
time.sleep(self.settings.poll_interval) # But not immediately
p_item = self.scheduler_client.pop_item(queue_type.value)
except (HTTPError, ValidationError):
logger.exception("Popping task from scheduler failed, sleeping 10 seconds")
time.sleep(self.settings.worker_heartbeat)
return

# We do not target a specific queue since we start one runtime for all organisations
# and queue ids contain the organisation_id
queues = [q for q in queues if q.id.startswith(queue_type.value) and q.size > 0]

logger.debug("Found queues: %s", [queue.id for queue in queues])

all_queues_empty = True

for queue in queues:
logger.debug("Popping from queue %s", queue.id)

try:
p_item = self.scheduler_client.pop_item(queue.id)
except (HTTPError, ValidationError):
logger.exception("Popping task from scheduler failed, sleeping 10 seconds")
time.sleep(10)
continue

if not p_item:
logger.debug("Queue %s empty", queue.id)
continue
if p_item is None:
time.sleep(self.settings.worker_heartbeat)
return

all_queues_empty = False
logger.info("Handling task[%s]", p_item.data.id)

logger.info("Handling task[%s]", p_item.data.id)
try:
task_queue.put(p_item)
logger.info("Dispatched task[%s]", p_item.data.id)
except: # noqa
logger.exception("Exiting worker...")
logger.info("Patching scheduler task[id=%s] to %s", p_item.data.id, TaskStatus.FAILED.value)

try:
task_queue.put(p_item)
logger.info("Dispatched task[%s]", p_item.data.id)
except: # noqa
logger.exception("Exiting worker...")
logger.info("Patching scheduler task[id=%s] to %s", p_item.data.id, TaskStatus.FAILED.value)

try:
self.scheduler_client.patch_task(p_item.id, TaskStatus.FAILED)
logger.info(
"Set task status to %s in the scheduler for task[id=%s]", TaskStatus.FAILED, p_item.data.id
)
except HTTPError:
logger.exception("Could not patch scheduler task to %s", TaskStatus.FAILED.value)

raise

if all_queues_empty:
logger.debug("All queues empty, sleeping %f seconds", self.settings.poll_interval)
time.sleep(self.settings.poll_interval)
self.scheduler_client.patch_task(p_item.id, TaskStatus.FAILED)
logger.info("Set task status to %s in the scheduler for task[id=%s]", TaskStatus.FAILED, p_item.data.id)
except HTTPError:
logger.exception("Could not patch scheduler task to %s", TaskStatus.FAILED.value)

def _check_workers(self) -> None:
new_workers = []
Expand Down
31 changes: 22 additions & 9 deletions boefjes/boefjes/clients/scheduler_client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import datetime
import uuid
from enum import Enum
from typing import Any

from httpx import Client, HTTPTransport, Response
from pydantic import BaseModel, TypeAdapter
Expand Down Expand Up @@ -29,7 +30,8 @@ class TaskStatus(Enum):
class Task(BaseModel):
id: uuid.UUID
scheduler_id: str
schedule_id: str | None
schedule_id: uuid.UUID | None = None
organisation: str
priority: int
status: TaskStatus
type: str
Expand All @@ -39,11 +41,21 @@ class Task(BaseModel):
modified_at: datetime.datetime


class PaginatedTasksResponse(BaseModel):
count: int
next: str | None = None
previous: str | None = None
results: list[Task]


class SchedulerClientInterface:
def get_queues(self) -> list[Queue]:
raise NotImplementedError()

def pop_item(self, queue_id: str) -> Task | None:
def pop_item(self, scheduler_id: str) -> Task | None:
raise NotImplementedError()

def pop_items(self, scheduler_id: str, filters: dict[str, Any]) -> PaginatedTasksResponse | None:
raise NotImplementedError()

def patch_task(self, task_id: uuid.UUID, status: TaskStatus) -> None:
Expand All @@ -66,20 +78,21 @@ def __init__(self, base_url: str):
def _verify_response(response: Response) -> None:
response.raise_for_status()

def get_queues(self) -> list[Queue]:
response = self._session.get("/queues")
def pop_item(self, scheduler_id: str) -> Task | None:
response = self._session.post(f"/schedulers/{scheduler_id}/pop?limit=1")
self._verify_response(response)

return TypeAdapter(list[Queue]).validate_json(response.content)
page = TypeAdapter(PaginatedTasksResponse | None).validate_json(response.content)
return TypeAdapter(Task | None).validate_json(page.results[0]) if page else None

def pop_item(self, queue_id: str) -> Task | None:
response = self._session.post(f"/queues/{queue_id}/pop")
def pop_items(self, scheduler_id: str, filters: dict[str, Any]) -> PaginatedTasksResponse | None:
response = self._session.post(f"/schedulers/{scheduler_id}/pop", json=filters)
self._verify_response(response)

return TypeAdapter(Task | None).validate_json(response.content)
return TypeAdapter(PaginatedTasksResponse | None).validate_json(response.content)

def push_item(self, p_item: Task) -> None:
response = self._session.post(f"/queues/{p_item.scheduler_id}/push", content=p_item.model_dump_json())
response = self._session.post(f"/schedulers/{p_item.scheduler_id}/push", content=p_item.model_dump_json())
self._verify_response(response)

def patch_task(self, task_id: uuid.UUID, status: TaskStatus) -> None:
Expand Down
15 changes: 5 additions & 10 deletions boefjes/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

from boefjes.app import SchedulerWorkerManager
from boefjes.clients.bytes_client import BytesAPIClient
from boefjes.clients.scheduler_client import Queue, SchedulerClientInterface, Task, TaskStatus
from boefjes.clients.scheduler_client import PaginatedTasksResponse, SchedulerClientInterface, Task, TaskStatus
from boefjes.config import Settings, settings
from boefjes.dependencies.plugins import PluginService, get_plugin_service
from boefjes.job_handler import bytes_api_client
Expand Down Expand Up @@ -50,15 +50,13 @@
class MockSchedulerClient(SchedulerClientInterface):
def __init__(
self,
queue_response: bytes,
boefje_responses: list[bytes],
normalizer_responses: list[bytes],
log_path: Path,
raise_on_empty_queue: Exception = KeyboardInterrupt,
iterations_to_wait_for_exception: int = 0,
sleep_time: float = 0.1,
):
self.queue_response = queue_response
self.boefje_responses = boefje_responses
self.normalizer_responses = normalizer_responses

Expand All @@ -73,22 +71,20 @@ def __init__(
self._popped_items: dict[str, Task] = multiprocessing.Manager().dict()
self._pushed_items: dict[str, Task] = multiprocessing.Manager().dict()

def get_queues(self) -> list[Queue]:
time.sleep(self.sleep_time)
return TypeAdapter(list[Queue]).validate_json(self.queue_response)

def pop_item(self, queue: str) -> Task | None:
time.sleep(self.sleep_time)

try:
if WorkerManager.Queue.BOEFJES.value in queue:
p_item = TypeAdapter(Task).validate_json(self.boefje_responses.pop(0))
response = TypeAdapter(PaginatedTasksResponse).validate_json(self.boefje_responses.pop(0))
p_item = response.results[0]
self._popped_items[str(p_item.id)] = p_item
self._tasks[str(p_item.id)] = self._task_from_id(p_item.id)
return p_item

if WorkerManager.Queue.NORMALIZERS.value in queue:
p_item = TypeAdapter(Task).validate_json(self.normalizer_responses.pop(0))
response = TypeAdapter(PaginatedTasksResponse).validate_json(self.normalizer_responses.pop(0))
p_item = response.results[0]
self._popped_items[str(p_item.id)] = p_item
self._tasks[str(p_item.id)] = self._task_from_id(p_item.id)
return p_item
Expand Down Expand Up @@ -151,7 +147,6 @@ def item_handler(tmp_path: Path):
@pytest.fixture
def manager(item_handler: MockHandler, tmp_path: Path) -> SchedulerWorkerManager:
scheduler_client = MockSchedulerClient(
queue_response=get_dummy_data("scheduler/queues_response.json"),
boefje_responses=[
get_dummy_data("scheduler/pop_response_boefje.json"),
get_dummy_data("scheduler/pop_response_boefje_2.json"),
Expand Down
54 changes: 31 additions & 23 deletions boefjes/tests/examples/scheduler/pop_response_boefje.json
Original file line number Diff line number Diff line change
@@ -1,25 +1,33 @@
{
"id": "70da7d4f-f41f-4940-901b-d98a92e9014b",
"priority": 1,
"scheduler_id": "boefje-_dev",
"schedule_id": null,
"status": "dispatched",
"type": "boefje",
"hash": "70da7d4f-f41f-4940-901b-d98a92e9014b",
"data": {
"id": "70da7d4f-f41f-4940-901b-d98a92e9014b",
"boefje": {
"id": "dns-records",
"version": null
},
"input_ooi": "Hostname|internet|test.test",
"organization": "_dev",
"arguments": {},
"started_at": null,
"runnable_hash": null,
"environment": null,
"ended_at": null
},
"created_at": "2021-06-29T14:00:00",
"modified_at": "2021-06-29T14:00:00"
"count": 1,
"next": null,
"previous": null,
"results": [
{
"id": "70da7d4f-f41f-4940-901b-d98a92e9014b",
"priority": 1,
"scheduler_id": "boefje",
"organisation": "_dev",
"schedule_id": null,
"status": "dispatched",
"type": "boefje",
"hash": "70da7d4f-f41f-4940-901b-d98a92e9014b",
"data": {
"id": "70da7d4f-f41f-4940-901b-d98a92e9014b",
"boefje": {
"id": "dns-records",
"version": null
},
"input_ooi": "Hostname|internet|test.test",
"organization": "_dev",
"arguments": {},
"started_at": null,
"runnable_hash": null,
"environment": null,
"ended_at": null
},
"created_at": "2021-06-29T14:00:00",
"modified_at": "2021-06-29T14:00:00"
}
]
}
54 changes: 31 additions & 23 deletions boefjes/tests/examples/scheduler/pop_response_boefje_2.json
Original file line number Diff line number Diff line change
@@ -1,25 +1,33 @@
{
"id": "70da7d4f-f41f-4940-901b-d98a92e9014c",
"priority": 1,
"scheduler_id": "boefje-_dev",
"schedule_id": null,
"status": "dispatched",
"type": "boefje",
"hash": "70da7d4f-f41f-4940-901b-d98a92e9014c",
"data": {
"id": "70da7d4f-f41f-4940-901b-d98a92e9014c",
"boefje": {
"id": "dns-records",
"version": null
},
"input_ooi": "Hostname|internet|test.test",
"organization": "_dev",
"arguments": {},
"started_at": null,
"runnable_hash": null,
"environment": null,
"ended_at": null
},
"created_at": "2021-06-29T14:00:00",
"modified_at": "2021-06-29T14:00:00"
"count": 1,
"next": null,
"previous": null,
"results": [
{
"id": "70da7d4f-f41f-4940-901b-d98a92e9014c",
"priority": 1,
"scheduler_id": "boefje",
"organisation": "_dev",
"schedule_id": null,
"status": "dispatched",
"type": "boefje",
"hash": "70da7d4f-f41f-4940-901b-d98a92e9014c",
"data": {
"id": "70da7d4f-f41f-4940-901b-d98a92e9014c",
"boefje": {
"id": "dns-records",
"version": null
},
"input_ooi": "Hostname|internet|test.test",
"organization": "_dev",
"arguments": {},
"started_at": null,
"runnable_hash": null,
"environment": null,
"ended_at": null
},
"created_at": "2021-06-29T14:00:00",
"modified_at": "2021-06-29T14:00:00"
}
]
}
54 changes: 31 additions & 23 deletions boefjes/tests/examples/scheduler/pop_response_boefje_no_ooi.json
Original file line number Diff line number Diff line change
@@ -1,25 +1,33 @@
{
"id": "70da7d4f-f41f-4940-901b-d98a92e9014b",
"priority": 1,
"scheduler_id": "boefje-_dev",
"schedule_id": null,
"status": "dispatched",
"type": "boefje",
"hash": "70da7d4f-f41f-4940-901b-d98a92e9014b",
"data": {
"id": "70da7d4f-f41f-4940-901b-d98a92e9014b",
"boefje": {
"id": "dns-records",
"version": null
},
"input_ooi": "",
"organization": "_dev",
"arguments": {},
"started_at": null,
"runnable_hash": null,
"environment": null,
"ended_at": null
},
"created_at": "2021-06-29T14:00:00",
"modified_at": "2021-06-29T14:00:00"
"count": 1,
"next": null,
"previous": null,
"results": [
{
"id": "70da7d4f-f41f-4940-901b-d98a92e9014b",
"priority": 1,
"scheduler_id": "boefje",
"organisation": "_dev",
"schedule_id": null,
"status": "dispatched",
"type": "boefje",
"hash": "70da7d4f-f41f-4940-901b-d98a92e9014b",
"data": {
"id": "70da7d4f-f41f-4940-901b-d98a92e9014b",
"boefje": {
"id": "dns-records",
"version": null
},
"input_ooi": "",
"organization": "_dev",
"arguments": {},
"started_at": null,
"runnable_hash": null,
"environment": null,
"ended_at": null
},
"created_at": "2021-06-29T14:00:00",
"modified_at": "2021-06-29T14:00:00"
}
]
}
Loading
Loading