Skip to content

Commit

Permalink
Add original_schedule_name column for tasks and requested_tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
benoit74 committed Sep 26, 2023
1 parent ed231c3 commit 1933310
Show file tree
Hide file tree
Showing 15 changed files with 305 additions and 4 deletions.
2 changes: 2 additions & 0 deletions dispatcher/backend/src/common/schemas/orms.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ class TaskLightSchema(m.Schema):
worker_name = mf.String(data_key="worker")
updated_at = MadeAwareDateTime()
config = mf.Nested(ConfigWithOnlyTaskNameAndResourcesSchema, only=["resources"])
original_schedule_name = mf.String()


class TaskFullSchema(TaskLightSchema):
Expand Down Expand Up @@ -101,6 +102,7 @@ class RequestedTaskLightSchema(m.Schema):
requested_by = mf.String()
priority = mf.Integer()
schedule_name = mf.String()
original_schedule_name = mf.String()
worker = mf.String()


Expand Down
2 changes: 2 additions & 0 deletions dispatcher/backend/src/db/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ class Task(Base):
notification: Mapped[Dict[str, Any]]
files: Mapped[Dict[str, Any]]
upload: Mapped[Dict[str, Any]]
original_schedule_name: Mapped[str]

schedule_id: Mapped[Optional[UUID]] = mapped_column(
ForeignKey("schedule.id"), init=False
Expand Down Expand Up @@ -358,6 +359,7 @@ class RequestedTask(Base):
config: Mapped[Dict[str, Any]] = mapped_column(MutableDict.as_mutable(JSON))
upload: Mapped[Dict[str, Any]]
notification: Mapped[Dict[str, Any]]
original_schedule_name: Mapped[str]

schedule_id: Mapped[Optional[UUID]] = mapped_column(
ForeignKey("schedule.id"), init=False
Expand Down
14 changes: 11 additions & 3 deletions dispatcher/backend/src/import_schedules.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,17 @@
},
{
"cell_type": "code",
"execution_count": null,
"execution_count": 1,
"metadata": {},
"outputs": [],
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"env: POSTGRES_URI=postgresql+psycopg://zimfarm:zimpass@localhost:5432/zimfarm\n"
]
}
],
"source": [
"%env POSTGRES_URI=postgresql+psycopg://zimfarm:zimpass@localhost:5432/zimfarm\n",
"\n",
Expand Down Expand Up @@ -141,7 +149,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.2"
"version": "3.8.17"
},
"orig_nbformat": 4
},
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
"""add_original_schedule_name
Revision ID: 43f385b318d4
Revises: 15354d56545a
Create Date: 2023-09-26 07:56:45.008277
"""
import sqlalchemy as sa
from alembic import op

from db.models import RequestedTask, Schedule, Task

# revision identifiers, used by Alembic.
revision = "43f385b318d4"
down_revision = "15354d56545a"
branch_labels = None
depends_on = None


def upgrade() -> None:
bind = op.get_bind()
session = sa.orm.Session(bind=bind)

# add original_schedule_name as nullable
op.add_column(
"requested_task",
sa.Column("original_schedule_name", sa.String(), nullable=True),
)

# set original_schedule_name for requested tasks with existing schedule
session.execute(
sa.update(RequestedTask)
.where(RequestedTask.schedule_id is not None)
.values(
original_schedule_name=sa.select(Schedule.name)
.where(Schedule.id == RequestedTask.schedule_id)
.scalar_subquery()
)
)

# set original_schedule_name for requested tasks without existing schedule
session.execute(
sa.update(RequestedTask)
.where(RequestedTask.schedule_id is None)
.values(original_schedule_name="<unknown>")
)

# set original_schedule_name as not nullable
op.alter_column("requested_task", "original_schedule_name", nullable=False)

# add original_schedule_name as nullable
op.add_column(
"task", sa.Column("original_schedule_name", sa.String(), nullable=True)
)

# set original_schedule_name for requested tasks with existing schedule
session.execute(
sa.update(Task)
.where(Task.schedule_id is not None)
.values(
original_schedule_name=sa.select(Schedule.name)
.where(Schedule.id == Task.schedule_id)
.scalar_subquery()
)
)

# set original_schedule_name for requested tasks without existing schedule
session.execute(
sa.update(Task)
.where(Task.schedule_id is None)
.values(original_schedule_name="<unknown>")
)

# set original_schedule_name as not nullable
op.alter_column("task", "original_schedule_name", nullable=False)

# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column("task", "original_schedule_name")
op.drop_column("requested_task", "original_schedule_name")
# ### end Alembic commands ###
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ def list_of_requested_tasks(session: so.Session, token: AccessToken.Payload = No
dbm.RequestedTask.timestamp,
dbm.RequestedTask.requested_by,
dbm.RequestedTask.priority,
dbm.RequestedTask.original_schedule_name,
dbm.Schedule.name.label("schedule_name"),
dbm.Worker.name.label("worker"),
)
Expand Down
3 changes: 3 additions & 0 deletions dispatcher/backend/src/routes/tasks/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ def get(self, session: so.Session):
dbm.Task.id,
dbm.Task.status,
dbm.Task.timestamp,
dbm.Task.original_schedule_name,
# dbm.Task.config,
so.Bundle(
"config",
Expand Down Expand Up @@ -117,6 +118,7 @@ def get(
dbm.Task.files,
dbm.Task.upload,
dbm.Task.updated_at,
dbm.Task.original_schedule_name,
dbm.Schedule.name.label("schedule_name"),
dbm.Worker.name.label("worker_name"),
)
Expand Down Expand Up @@ -169,6 +171,7 @@ def post(self, session: so.Session, task_id: UUID, token: AccessToken.Payload):
notification=requested_task.notification,
files={},
upload=requested_task.upload,
original_schedule_name=requested_task.original_schedule_name,
)
task.id = requested_task.id
task.schedule_id = requested_task.schedule_id
Expand Down
77 changes: 77 additions & 0 deletions dispatcher/backend/src/tests.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"env: POSTGRES_URI=postgresql+psycopg://zimfarm:zimpass@localhost:5432/zimfarm\n"
]
}
],
"source": [
"%env POSTGRES_URI=postgresql+psycopg://zimfarm:zimpass@localhost:5432/zimfarm\n",
"\n",
"import json\n",
"import pathlib\n",
"import sqlalchemy as sa\n",
"import sqlalchemy.orm as so\n",
"\n",
"from db import Session\n",
"from db.models import Schedule, RequestedTask"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"UPDATE requested_task SET original_schedule_name=(SELECT schedule.name \n",
"FROM schedule \n",
"WHERE schedule.id = requested_task.schedule_id) WHERE requested_task.schedule_id IS NOT NULL\n"
]
}
],
"source": [
"stmt = (\n",
" sa.update(RequestedTask)\n",
" .where(\n",
" RequestedTask.schedule_id != None\n",
" )\n",
" .values(original_schedule_name=sa.select(Schedule.name).where(Schedule.id == RequestedTask.schedule_id).scalar_subquery())\n",
")\n",
"print(stmt)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "venv",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.17"
},
"orig_nbformat": 4
},
"nbformat": 4,
"nbformat_minor": 2
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import json

import pytest


class TestRequestedTaskBusiness:
@pytest.fixture(scope="module")
def headers(self, access_token):
return {"Authorization": access_token, "Content-Type": "application/json"}

@pytest.fixture(scope="module")
def requested_task(self, client, headers, schedule):
response = client.post(
"/requested-tasks/",
headers=headers,
data=json.dumps({"schedule_names": [schedule["name"]]}),
)
assert response.status_code == 201
assert "requested" in response.json
assert len(response.json["requested"]) == 1
requested_task_id = response.json["requested"][0]
yield requested_task_id

response = client.delete(
f"/requested-tasks/{requested_task_id}",
headers=headers,
)
assert response.status_code == 200

def test_requested_task_with_schedule(
self, client, headers, schedule, requested_task
):
url = f"/requested-tasks/{requested_task}"
response = client.get(
url,
headers=headers,
)
assert response.status_code == 200
assert "schedule_name" in response.json
assert response.json["schedule_name"] == schedule["name"]
assert "original_schedule_name" in response.json
assert response.json["original_schedule_name"] == schedule["name"]

def test_requested_task_without_schedule(
self, client, headers, schedule, requested_task
):
url = f"/schedules/{schedule['name']}"
response = client.delete(
url,
headers=headers,
)
assert response.status_code == 204
url = f"/requested-tasks/{requested_task}"
response = client.get(
url,
headers=headers,
)
assert response.status_code == 200
assert "schedule_name" in response.json
assert response.json["schedule_name"] == "none"
assert "original_schedule_name" in response.json
assert response.json["original_schedule_name"] == schedule["name"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import pytest

SCHEDULE_NAME = "a_schedule_for_tasks"


class TestTaskBusiness:
@pytest.fixture(scope="module")
def headers(self, access_token):
return {"Authorization": access_token, "Content-Type": "application/json"}

@pytest.fixture(scope="module")
def task(self, client, headers, worker, make_requested_task, garbage_collector):
requested_task = make_requested_task(SCHEDULE_NAME)
url = "/tasks/{}".format(str(requested_task["_id"]))
response = client.post(
url, headers=headers, query_string={"worker_name": "worker_name"}
)
assert response.status_code == 201
assert "_id" in response.json
task_id = response.json["_id"]
garbage_collector.add_task_id(task_id)
yield task_id

def test_task_with_schedule(self, client, headers, task):
url = f"/tasks/{task}"
response = client.get(
url,
headers=headers,
)
assert response.status_code == 200
assert "schedule_name" in response.json
assert response.json["schedule_name"] == SCHEDULE_NAME
assert "original_schedule_name" in response.json
assert response.json["original_schedule_name"] == SCHEDULE_NAME

def test_task_without_schedule(self, client, headers, task):
url = f"/schedules/{SCHEDULE_NAME}"
response = client.delete(
url,
headers=headers,
)
assert response.status_code == 204
url = f"/tasks/{task}"
response = client.get(
url,
headers=headers,
)
assert response.status_code == 200
assert "schedule_name" in response.json
assert response.json["schedule_name"] is None
assert "original_schedule_name" in response.json
assert response.json["original_schedule_name"] == SCHEDULE_NAME
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ def _make_requested_task(
config=config,
upload={},
notification={},
original_schedule_name=schedule_name,
)
requested_task.schedule = schedule
session.add(requested_task)
Expand Down
Loading

0 comments on commit 1933310

Please sign in to comment.