Skip to content

Commit

Permalink
Changes for rocky to integrate with combined schedulers
Browse files Browse the repository at this point in the history
  • Loading branch information
jpbruinsslot committed Jan 8, 2025
1 parent 346adf3 commit 71e3bf4
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 15 deletions.
11 changes: 7 additions & 4 deletions rocky/rocky/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ class Task(BaseModel):
id: uuid.UUID = Field(default_factory=uuid.uuid4)
scheduler_id: str
schedule_id: str | None = None
organisation: str
priority: int
status: TaskStatus | None = TaskStatus.PENDING
type: str | None = None
Expand Down Expand Up @@ -158,12 +159,14 @@ class ScheduleResponse(BaseModel):
model_config = ConfigDict(from_attributes=True)

id: uuid.UUID
scheduler_id: str
organisation: str
hash: str
data: dict
enabled: bool
schedule: str
tasks: list[Task]
deadline_at: datetime.datetime
deadline_at: datetime.datetime | None
created_at: datetime.datetime
modified_at: datetime.datetime

Expand Down Expand Up @@ -342,7 +345,7 @@ def get_task_details(self, task_id: str) -> Task:
def push_task(self, item: Task) -> None:
try:
res = self._client.post(
f"/queues/{item.scheduler_id}/push",
f"/schedulers/{item.scheduler_id}/push",
content=item.model_dump_json(exclude_none=True),
headers={"Content-Type": "application/json"},
)
Expand All @@ -364,8 +367,8 @@ def get_queues(self) -> list[Queue]:

return TypeAdapter(list[Queue]).validate_json(response.content)

def pop_item(self, queue: str) -> Task | None:
response = self._client.post(f"/queues/{queue}/pop")
def pop_item(self, scheduler_id: str) -> Task | None:
response = self._client.post(f"/schedulers/{scheduler_id}/pop")
response.raise_for_status()

return TypeAdapter(Task | None).validate_json(response.content)
Expand Down
29 changes: 18 additions & 11 deletions rocky/rocky/views/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
from django.contrib import messages
from django.http import Http404, JsonResponse
from django.utils.translation import gettext_lazy as _
from octopoes.models import OOI
from octopoes.models.ooi.reports import ReportRecipe

from katalogus.client import Boefje, Normalizer
from reports.forms import (
ChildReportNameForm,
Expand All @@ -13,14 +16,10 @@
ReportScheduleStartDateChoiceForm,
ReportScheduleStartDateForm,
)
from tools.forms.scheduler import TaskFilterForm

from octopoes.models import OOI
from octopoes.models.ooi.reports import ReportRecipe
from rocky.scheduler import Boefje as SchedulerBoefje
from rocky.scheduler import BoefjeTask, LazyTaskList
from rocky.scheduler import Normalizer as SchedulerNormalizer
from rocky.scheduler import (
BoefjeTask,
LazyTaskList,
NormalizerTask,
RawData,
ReportTask,
Expand All @@ -31,8 +30,8 @@
Task,
scheduler_client,
)
from rocky.scheduler import Normalizer as SchedulerNormalizer
from rocky.views.mixins import OctopoesView
from tools.forms.scheduler import TaskFilterForm


def get_date_time(date: str | None) -> datetime | None:
Expand All @@ -57,7 +56,7 @@ class SchedulerView(OctopoesView):
def setup(self, request, *args, **kwargs):
super().setup(request, *args, **kwargs)
self.scheduler_client = scheduler_client(self.organization.code)
self.scheduler_id = f"{self.task_type}-{self.organization.code}"
self.scheduler_id = self.task_type

def get_task_filters(self) -> dict[str, Any]:
return {
Expand Down Expand Up @@ -223,7 +222,13 @@ def reschedule_task(self, task_id: str) -> None:
new_id = uuid.uuid4()
task.data.id = new_id

new_task = Task(id=new_id, scheduler_id=task.scheduler_id, priority=1, data=task.data)
new_task = Task(
id=new_id,
scheduler_id=task.scheduler_id,
organisation=self.organization.code,
priority=1,
data=task.data,
)

self.schedule_task(new_task)
else:
Expand All @@ -237,7 +242,9 @@ def run_normalizer(self, katalogus_normalizer: Normalizer, raw_data: RawData) ->
normalizer=SchedulerNormalizer.model_validate(katalogus_normalizer.model_dump()), raw_data=raw_data
)

new_task = Task(priority=1, data=normalizer_task, scheduler_id=f"normalizer-{self.organization.code}")
new_task = Task(
priority=1, data=normalizer_task, scheduler_id="normalizer", organisation=self.organization.code
)

self.schedule_task(new_task)
except SchedulerError as error:
Expand All @@ -251,7 +258,7 @@ def run_boefje(self, katalogus_boefje: Boefje, ooi: OOI | None) -> None:
organization=self.organization.code,
)

new_task = Task(priority=1, data=boefje_task, scheduler_id=f"boefje-{self.organization.code}")
new_task = Task(priority=1, data=boefje_task, scheduler_id="boefje", organisation=self.organization.code)

self.schedule_task(new_task)

Expand Down

0 comments on commit 71e3bf4

Please sign in to comment.