Skip to content

Commit

Permalink
Queue refactoring & error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
bsatoriu committed Sep 4, 2024
1 parent f80377e commit 9e0fdfd
Show file tree
Hide file tree
Showing 4 changed files with 278 additions and 209 deletions.
105 changes: 8 additions & 97 deletions api/endpoints/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from flask_restx import Resource
from flask import request
from flask_api import status
import api.utils.hysds_util as hysds
from api.models.job_queue import JobQueue
from api.models.organization import Organization
from api.models.organization_job_queue import OrganizationJobQueue
Expand All @@ -15,7 +14,7 @@
from api.schemas.pre_approved_schema import PreApprovedSchema
from datetime import datetime
import json

from api.utils import job_queue
from api.utils.http_util import err_response

log = logging.getLogger(__name__)
Expand All @@ -31,63 +30,9 @@ def get(self):
Lists the job queues and associated organizations
:return:
"""
all_queues = job_queue.get_all_queues()
return all_queues

result = []

queues = db.session.query(
JobQueue.id,
JobQueue.queue_name,
JobQueue.queue_description,
JobQueue.guest_tier,
PreApproved.creation_date
).order_by(PreApproved.email).all()

orgs_query = db.session.query(
Organization, OrganizationJobQueue,
).filter(
Organization.id == OrganizationJobQueue.org_id
).order_by(Organization.name).all()

hysds_queues = hysds.get_mozart_queues()

for q in queues:
queue = {
'id': q.id,
'queue_name': q.queue_name,
'queue_description': q.queue_description,
'guest_tier': q.guest_tier,
'status': 'Online' if q.queue_name in hysds_queues else 'Offline',
'orgs': [],
'creation_date': q.creation_date.strftime('%m/%d/%Y'),
}

for o in orgs_query:
if o.OrganizationJobQueue.job_queue_id == q.id:
queue['orgs'].append({
'id': o.Organization.id,
'org_name': o.Organization.name,
'default_job_limit_count': o.Organization.default_job_limit_count,
'default_job_limit_hours': o.Organization.default_job_limit_hours
})

result.append(queue)

unassigned_queues = (hq for hq in hysds_queues if hq not in map(self._queue_name, queues))
for uq in unassigned_queues:
result.append({
'id': 0,
'queue_name': uq,
'queue_description': '',
'guest_tier': False,
'status': 'Unassigned',
'orgs': [],
'creation_date': None,
})

return result

def _queue_name(self, q):
return q.queue_name

@api.doc(security='ApiKeyAuth')
@login_required(role=Role.ROLE_ADMIN)
Expand All @@ -110,23 +55,10 @@ def post(self):
return err_response("Valid queue description is required.")

guest_tier = req_data.get("guest_tier", False)

new_queue = JobQueue(queue_name=queue_name, queue_description=queue_description, guest_tier=guest_tier, creation_date=datetime.utcnow())

db.session.add(new_queue)
db.session.commit()

queue_orgs = []
orgs = req_data.get("orgs", [])
for queue_org in orgs:
queue_orgs.append(OrganizationJobQueue(org_id=queue_org['org_id'], job_queue_id=new_queue.id, creation_date=datetime.utcnow()))

if len(queue_orgs) > 0:
db.session.add_all(queue_orgs)
db.session.commit()

org_schema = JobQueueSchema()
return json.loads(org_schema.dumps(new_queue))
new_queue = job_queue.create_queue(queue_name, queue_description, guest_tier, orgs)
return new_queue


@ns.route('/job-queues/<int:queue_id>')
Expand Down Expand Up @@ -155,25 +87,11 @@ def put(self, queue_id):
queue.queue_name = req_data.get("queue_name", queue.queue_name)
queue.queue_description = req_data.get("queue_description", queue.queue_description)
queue.guest_tier = req_data.get("guest_tier", queue.guest_tier)
db.session.commit()

# Update org assignments
db.session.execute(
db.delete(OrganizationJobQueue).filter_by(job_queue_id=queue_id)
)
db.session.commit()

queue_orgs = []
orgs = req_data.get("orgs", [])
for queue_org in orgs:
queue_orgs.append(OrganizationJobQueue(org_id=queue_org['org_id'], job_queue_id=queue_id, creation_date=datetime.utcnow()))

if len(queue_orgs) > 0:
db.session.add_all(queue_orgs)
db.session.commit()
updated_queue = job_queue.update_queue(queue, orgs)
return updated_queue

queue_schema = JobQueueSchema()
return json.loads(queue_schema.dumps(queue))

@api.doc(security='ApiKeyAuth')
@login_required(role=Role.ROLE_ADMIN)
Expand All @@ -188,14 +106,7 @@ def delete(self, queue_id):
if queue is None:
return err_response(msg="Job queue does not exist")

# Clear orgs
db.session.execute(
db.delete(OrganizationJobQueue).filter_by(job_queue_id=queue_id)
)
db.session.commit()

db.session.query(JobQueue).filter_by(id=queue_id).delete()
db.session.commit()
job_queue.delete_queue(queue_id)

return {"code": status.HTTP_200_OK, "message": "Successfully deleted {}.".format(queue_name)}

Expand Down
Loading

0 comments on commit 9e0fdfd

Please sign in to comment.