From 71e3bf486c2b1e37b6be6e7d93d9e4f68e08088e Mon Sep 17 00:00:00 2001 From: JP Bruins Slot Date: Wed, 8 Jan 2025 12:40:52 +0100 Subject: [PATCH] Changes for rocky to integrate with combined schedulers --- rocky/rocky/scheduler.py | 11 +++++++---- rocky/rocky/views/scheduler.py | 29 ++++++++++++++++++----------- 2 files changed, 25 insertions(+), 15 deletions(-) diff --git a/rocky/rocky/scheduler.py b/rocky/rocky/scheduler.py index 5b160d5bfbf..40f1d4ec4f8 100644 --- a/rocky/rocky/scheduler.py +++ b/rocky/rocky/scheduler.py @@ -124,6 +124,7 @@ class Task(BaseModel): id: uuid.UUID = Field(default_factory=uuid.uuid4) scheduler_id: str schedule_id: str | None = None + organisation: str priority: int status: TaskStatus | None = TaskStatus.PENDING type: str | None = None @@ -158,12 +159,14 @@ class ScheduleResponse(BaseModel): model_config = ConfigDict(from_attributes=True) id: uuid.UUID + scheduler_id: str + organisation: str hash: str data: dict enabled: bool schedule: str tasks: list[Task] - deadline_at: datetime.datetime + deadline_at: datetime.datetime | None created_at: datetime.datetime modified_at: datetime.datetime @@ -342,7 +345,7 @@ def get_task_details(self, task_id: str) -> Task: def push_task(self, item: Task) -> None: try: res = self._client.post( - f"/queues/{item.scheduler_id}/push", + f"/schedulers/{item.scheduler_id}/push", content=item.model_dump_json(exclude_none=True), headers={"Content-Type": "application/json"}, ) @@ -364,8 +367,8 @@ def get_queues(self) -> list[Queue]: return TypeAdapter(list[Queue]).validate_json(response.content) - def pop_item(self, queue: str) -> Task | None: - response = self._client.post(f"/queues/{queue}/pop") + def pop_item(self, scheduler_id: str) -> Task | None: + response = self._client.post(f"/schedulers/{scheduler_id}/pop") response.raise_for_status() return TypeAdapter(Task | None).validate_json(response.content) diff --git a/rocky/rocky/views/scheduler.py b/rocky/rocky/views/scheduler.py index 731a114ecab..8053a7089d8 100644 --- a/rocky/rocky/views/scheduler.py +++ b/rocky/rocky/views/scheduler.py @@ -5,6 +5,9 @@ from django.contrib import messages from django.http import Http404, JsonResponse from django.utils.translation import gettext_lazy as _ +from octopoes.models import OOI +from octopoes.models.ooi.reports import ReportRecipe + from katalogus.client import Boefje, Normalizer from reports.forms import ( ChildReportNameForm, @@ -13,14 +16,10 @@ ReportScheduleStartDateChoiceForm, ReportScheduleStartDateForm, ) -from tools.forms.scheduler import TaskFilterForm - -from octopoes.models import OOI -from octopoes.models.ooi.reports import ReportRecipe from rocky.scheduler import Boefje as SchedulerBoefje +from rocky.scheduler import BoefjeTask, LazyTaskList +from rocky.scheduler import Normalizer as SchedulerNormalizer from rocky.scheduler import ( - BoefjeTask, - LazyTaskList, NormalizerTask, RawData, ReportTask, @@ -31,8 +30,8 @@ Task, scheduler_client, ) -from rocky.scheduler import Normalizer as SchedulerNormalizer from rocky.views.mixins import OctopoesView +from tools.forms.scheduler import TaskFilterForm def get_date_time(date: str | None) -> datetime | None: @@ -57,7 +56,7 @@ class SchedulerView(OctopoesView): def setup(self, request, *args, **kwargs): super().setup(request, *args, **kwargs) self.scheduler_client = scheduler_client(self.organization.code) - self.scheduler_id = f"{self.task_type}-{self.organization.code}" + self.scheduler_id = self.task_type def get_task_filters(self) -> dict[str, Any]: return { @@ -223,7 +222,13 @@ def reschedule_task(self, task_id: str) -> None: new_id = uuid.uuid4() task.data.id = new_id - new_task = Task(id=new_id, scheduler_id=task.scheduler_id, priority=1, data=task.data) + new_task = Task( + id=new_id, + scheduler_id=task.scheduler_id, + organisation=self.organization.code, + priority=1, + data=task.data, + ) self.schedule_task(new_task) else: @@ -237,7 +242,9 @@ def run_normalizer(self, katalogus_normalizer: Normalizer, raw_data: RawData) -> normalizer=SchedulerNormalizer.model_validate(katalogus_normalizer.model_dump()), raw_data=raw_data ) - new_task = Task(priority=1, data=normalizer_task, scheduler_id=f"normalizer-{self.organization.code}") + new_task = Task( + priority=1, data=normalizer_task, scheduler_id="normalizer", organisation=self.organization.code + ) self.schedule_task(new_task) except SchedulerError as error: @@ -251,7 +258,7 @@ def run_boefje(self, katalogus_boefje: Boefje, ooi: OOI | None) -> None: organization=self.organization.code, ) - new_task = Task(priority=1, data=boefje_task, scheduler_id=f"boefje-{self.organization.code}") + new_task = Task(priority=1, data=boefje_task, scheduler_id="boefje", organisation=self.organization.code) self.schedule_task(new_task)