Skip to content

Commit

Permalink
Merge pull request #69 from mgcam/ok_to_register_repeatedly
Browse files Browse the repository at this point in the history
Stopped errors for requests to create an existing tasks.
  • Loading branch information
nerdstrike authored Jun 27, 2024
2 parents 92f32c5 + 57cfb5e commit ea576dd
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 61 deletions.
14 changes: 2 additions & 12 deletions docs/user_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,6 @@ $request->content($DOC);
my $response = $ua->request($request);
if ($response->is_success) {
print "Submitted successfully\n";
} elsif ($response->code == 409 ){
print "Already exists, that's fine.\n";
} else {
die q(It's all gone wrong!)
}
Expand All @@ -183,18 +181,10 @@ if ($response->is_success) {
}
```
If you get a 409 response, it is highly likely that this particular task is already registered. In this way it is possible to tell whether something has already been submitted. Note that if there are many many tasks to register some of which were submitted previously, further work is required to make the process efficient - such as to ask the npg_porch server for a list of previously registered tasks for this pipeline.
**Example 409 failure response**
```javascript
{
"detail": "Unable to create task, as another like it already exists"
}
```
Once a task has been submitted, and a 201 CREATED response has been received, the npg_porch server assigns a timestamp to the task, gives it a status of `PENDING` and assigns a unique ID to it. The response from the server contains this extra information.
A 200 OK response means that this particular task for this pipeline has already been registered. The current representation of the task is returned, the status of the task might be differ from `PENDING`. Note that if there are many tasks to register, some of which were submitted previously, further work is required to make the process efficient - such as to ask the npg_porch server for a list of previously registered tasks for this pipeline.
### Step 4 - write a script or program that can launch the pipeline
Supposing there are new tasks created every 24 hours, we then also need a client that checks every 24 hours for new work it can run on a compute farm.
Expand Down
66 changes: 50 additions & 16 deletions src/npg_porch/db/data_access.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,17 @@
# this program. If not, see <http://www.gnu.org/licenses/>.

import logging

from sqlalchemy import select
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import contains_eager, joinedload
from sqlalchemy.orm.exc import NoResultFound

from npg_porch.db.models import Pipeline as DbPipeline, Task as DbTask, Token as DbToken, Event
from npg_porch.models import Task, Pipeline, TaskStateEnum
from npg_porch.db.models import Event
from npg_porch.db.models import Pipeline as DbPipeline
from npg_porch.db.models import Task as DbTask
from npg_porch.db.models import Token as DbToken
from npg_porch.models import Pipeline, Task, TaskStateEnum
from npg_porch.models.token import Token


Expand Down Expand Up @@ -102,29 +106,44 @@ async def create_pipeline_token(self, name: str, desc: str) -> Token:

return Token(name=db_pipeline.name, token=db_token.token, description=desc)

async def create_task(self, token_id: int, task: Task) -> tuple[Task, bool]:
'''Given a task definition creates a task.
If the task does not exist, a tuple consisting of Task object for a
newly created database record and a boolean True object is returned.
async def create_task(self, token_id: int, task: Task) -> Task:
If the task already exists, a tuple consisting of Task object for an
existing database record and a boolean True object is returned.
'''
self.logger.debug('CREATE TASK: ' + str(task))
session = self.session
db_pipeline = await self._get_pipeline_db_object(
task.pipeline.name
)
# Check they exist and so on
task.status = TaskStateEnum.PENDING

task.status = TaskStateEnum.PENDING
t = self.convert_task_to_db(task, db_pipeline)
session.add(t)

event = Event(
task=t,
token_id = token_id,
change='Created'
)
t.events.append(event)
created = True
try:
nested = await session.begin_nested()
session.add(t)
event = Event(
task=t,
token_id = token_id,
change='Created'
)
t.events.append(event)
await session.commit()
except IntegrityError:
await nested.rollback()
# Task already exists, query the database to get the up-to-date
# representation of the task.
t = await self.get_db_task(
pipeline_name=task.pipeline.name, job_descriptor=t.job_descriptor
)
created = False

await session.commit()
# Error handling to follow
return t.convert_to_model()
return (t.convert_to_model(), created)

async def claim_tasks(
self, token_id: int, pipeline: Pipeline, claim_limit: int | None = 1
Expand Down Expand Up @@ -228,6 +247,21 @@ async def get_tasks(
tasks = task_result.scalars().all()
return [t.convert_to_model() for t in tasks]

async def get_db_task(
self,
pipeline_name: str,
job_descriptor: str,
) -> DbTask:
'''Get the task.'''
query = select(DbTask)\
.join(DbTask.pipeline)\
.options(joinedload(DbTask.pipeline))\
.where(DbPipeline.name == pipeline_name)\
.where(DbTask.job_descriptor == job_descriptor)
task_result = await self.session.execute(query)

return task_result.scalars().one()

@staticmethod
def convert_task_to_db(task: Task, pipeline: DbPipeline) -> DbTask:
assert task.status in TaskStateEnum
Expand Down
9 changes: 5 additions & 4 deletions src/npg_porch/endpoints/pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,18 @@
# You should have received a copy of the GNU General Public License along with
# this program. If not, see <http://www.gnu.org/licenses/>.

from fastapi import APIRouter, HTTPException, Depends, Response
import logging
import re

from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm.exc import NoResultFound
from starlette import status

from npg_porch.models.pipeline import Pipeline
from npg_porch.models.permission import RolesEnum
from npg_porch.db.connection import get_DbAccessor
from npg_porch.auth.token import validate
from npg_porch.db.connection import get_DbAccessor
from npg_porch.models.permission import RolesEnum
from npg_porch.models.pipeline import Pipeline
from npg_porch.models.token import Token

router = APIRouter(
Expand Down
30 changes: 17 additions & 13 deletions src/npg_porch/endpoints/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@
from typing import Annotated

from fastapi import APIRouter, Depends, HTTPException, Query
from fastapi.responses import JSONResponse
from sqlalchemy.orm.exc import NoResultFound
from starlette import status

from npg_porch.auth.token import validate
from npg_porch.db.connection import get_DbAccessor
from npg_porch.models.permission import PermissionValidationException
from npg_porch.models.pipeline import Pipeline
from npg_porch.models.task import Task, TaskStateEnum
from npg_porch.db.connection import get_DbAccessor
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm.exc import NoResultFound
from starlette import status


def _validate_request(permission, pipeline):
Expand Down Expand Up @@ -82,14 +83,20 @@ async def get_tasks(
status_code=status.HTTP_201_CREATED,
responses={
status.HTTP_201_CREATED: {"description": "Task creation was successful"},
status.HTTP_409_CONFLICT: {"description": "A task with the same signature already exists"}
status.HTTP_200_OK: {"description": "A task with the same signature already exists"},
status.HTTP_404_NOT_FOUND: {"description": "Pipeline does not exist."},
},
summary="Creates one task record.",
description='''
Given a Task object, creates a database record for it and returns
the same object, the response HTTP status is 201 'Created'. The
a new Task object. The response HTTP status is 201 'Created'. The
new task is assigned pending status, ie becomes available for claiming.
A request to create a task for which the database record already exists
is accepted. The return status code 200 is set in this case. The returned
Task object has its status set to the value currently available in the
database.
The pipeline specified by the `pipeline` attribute of the Task object
should exist. If it does not exist, return status 404 'Not found'.'''
)
Expand All @@ -102,19 +109,16 @@ async def create_task(
_validate_request(permission, task.pipeline)

try:
created_task = await db_accessor.create_task(
(task, created) = await db_accessor.create_task(
token_id=permission.requestor_id,
task=task
)
except IntegrityError:
raise HTTPException(
status_code=409,
detail='Unable to create task, as another like it already exists'
)
except NoResultFound:
raise HTTPException(status_code=404, detail='Failed to find pipeline for this task')

return created_task
if created is True:
return task
return JSONResponse(status_code=status.HTTP_200_OK, content=task.model_dump())


@router.put(
Expand Down
25 changes: 14 additions & 11 deletions tests/data_access_test.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import re

import pytest
from npg_porch.db.data_access import AsyncDbAccessor
from npg_porch.models import Pipeline as ModelledPipeline
from npg_porch.models import Task, TaskStateEnum
from pydantic import ValidationError
import re
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm.exc import NoResultFound

from npg_porch.db.data_access import AsyncDbAccessor
from npg_porch.models import Pipeline as ModelledPipeline, Task, TaskStateEnum


def give_me_a_pipeline(number: int = 1):
return ModelledPipeline(
Expand Down Expand Up @@ -101,11 +102,11 @@ async def test_create_task(db_accessor):
task_input={'test': True}
)

saved_task = await db_accessor.create_task(
(saved_task, created) = await db_accessor.create_task(
token_id=1,
task=task
)

assert created is True
assert saved_task.status == TaskStateEnum.PENDING, 'State automatically set to PENDING'
assert saved_task.pipeline.name == 'ptest 1'
assert saved_task.task_input_id, 'Input ID is created automatically'
Expand All @@ -114,10 +115,12 @@ async def test_create_task(db_accessor):
assert len(events) == 1, 'An event was created with a successful task creation'
assert events[0].change == 'Created', 'Message set'

with pytest.raises(IntegrityError) as exception:
await db_accessor.create_task(1, task)

assert re.match('UNIQUE constraint failed', exception.value)
(existing_task, created) = await db_accessor.create_task(1, task)
assert created is False
assert existing_task.status == TaskStateEnum.PENDING, 'State automatically set to PENDING'
assert existing_task.pipeline.name == 'ptest 1'
events = await db_accessor.get_events_for_task(existing_task)
assert len(events) == 1, 'No additional events'

@pytest.mark.asyncio
async def test_claim_tasks(db_accessor):
Expand Down Expand Up @@ -201,7 +204,7 @@ async def test_multi_claim_tasks(db_accessor):
@pytest.mark.asyncio
async def test_update_tasks(db_accessor):
saved_pipeline = await store_me_a_pipeline(db_accessor)
saved_task = await db_accessor.create_task(
(saved_task, created) = await db_accessor.create_task(
token_id=1,
task=Task(
task_input={'number': 1},
Expand Down
12 changes: 7 additions & 5 deletions tests/task_route_test.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from npg_porch.models import Pipeline, Task, TaskStateEnum
from starlette import status

from npg_porch.models import Task, TaskStateEnum, Pipeline

# Not testing get-all-tasks as this method will ultimately go

headers4ptest_one = {
Expand Down Expand Up @@ -32,16 +31,19 @@ def test_task_creation(async_minimum, fastapi_testclient):
headers=headers4ptest_one
)
assert response.status_code == status.HTTP_201_CREATED
assert task_one == response.json()
response_obj = response.json()
assert task_one == response_obj

# Try again and expect to fail
# Try again and expect to succeed with a different status code and the
# same task returned.
response = fastapi_testclient.post(
'tasks',
json=task_one.model_dump(),
follow_redirects=True,
headers=headers4ptest_one
)
assert response.status_code == status.HTTP_409_CONFLICT
assert response.status_code == status.HTTP_200_OK
assert response.json() == response_obj

task_two = Task(
pipeline = {
Expand Down

0 comments on commit ea576dd

Please sign in to comment.