Skip to content

Commit

Permalink
Add support for DPS sandbox queue (#136)
Browse files Browse the repository at this point in the history
* Respond with HTTP 400 if invalid queue provided during registration

* Update dos job time limits if using dps sand box queue

* Ensure that message is of type string

* update code to use new job_queue.get_user_queues() function

* Move queue settings to db

* add is_default and time_limit_minutes to job_queue endpoints

* Job submission bug fix

---------

Co-authored-by: bsatoriu <[email protected]>
  • Loading branch information
sujen1412 and bsatoriu authored Sep 12, 2024
1 parent f510858 commit 401b86a
Show file tree
Hide file tree
Showing 9 changed files with 193 additions and 32 deletions.
9 changes: 5 additions & 4 deletions api/endpoints/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,11 @@
from flask import request
from flask_api import status
from api.models.job_queue import JobQueue
from api.models.organization import Organization
from api.models.organization_job_queue import OrganizationJobQueue
from api.models.role import Role
from api.restplus import api
from api.auth.security import login_required
from api.maap_database import db
from api.models.pre_approved import PreApproved
from api.schemas.job_queue_schema import JobQueueSchema
from api.schemas.pre_approved_schema import PreApprovedSchema
from datetime import datetime
import json
Expand Down Expand Up @@ -55,9 +52,11 @@ def post(self):
return err_response("Valid queue description is required.")

guest_tier = req_data.get("guest_tier", False)
is_default = req_data.get("is_default", False)
time_limit_minutes = req_data.get("time_limit_minutes", 0)
orgs = req_data.get("orgs", [])

new_queue = job_queue.create_queue(queue_name, queue_description, guest_tier, orgs)
new_queue = job_queue.create_queue(queue_name, queue_description, guest_tier, is_default, time_limit_minutes, orgs)
return new_queue


Expand Down Expand Up @@ -87,6 +86,8 @@ def put(self, queue_id):
queue.queue_name = req_data.get("queue_name", queue.queue_name)
queue.queue_description = req_data.get("queue_description", queue.queue_description)
queue.guest_tier = req_data.get("guest_tier", queue.guest_tier)
queue.is_default = req_data.get("is_default", queue.is_default)
queue.time_limit_minutes = req_data.get("time_limit_minutes", queue.time_limit_minutes)
orgs = req_data.get("orgs", [])

updated_queue = job_queue.update_queue(queue, orgs)
Expand Down
25 changes: 18 additions & 7 deletions api/endpoints/algorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import traceback
import api.utils.github_util as git
import api.utils.hysds_util as hysds
import api.utils.http_util as http_util
import api.settings as settings
import api.utils.ogc_translate as ogc
from api.auth.security import get_authorized_user, login_required
Expand Down Expand Up @@ -260,12 +261,16 @@ def post(self):

try:
# validate if input queue is valid
if resource is None: resource = settings.DEFAULT_QUEUE
if resource not in hysds.get_mozart_queues():
response_body["code"] = status.HTTP_500_INTERNAL_SERVER_ERROR
response_body["message"] = "The resource {} is invalid. Please select from one of {}".format(resource, hysds.get_mozart_queues())
response_body["error"] = "Invalid queue in request: {}".format(req_data)
return response_body, 500
user = get_authorized_user()
if resource is None:
resource = job_queue.get_default_queue().queue_name
else:
valid_queues = job_queue.get_user_queues(user.id)
valid_queue_names = list(map(lambda q: q.queue_name, valid_queues))
if resource not in valid_queue_names:
return http_util.err_response(msg=f"User does not have permissions for resource {resource}."
f"Please select from one of {valid_queue_names}",
code=status.HTTP_400_BAD_REQUEST)
# clean up any old specs from the repo
repo = git.clean_up_git_repo(repo, repo_name=settings.REPO_NAME)
# creating hysds-io file
Expand Down Expand Up @@ -316,6 +321,7 @@ def post(self):
hysds.write_file("{}/{}".format(settings.REPO_PATH, settings.REPO_NAME), "job-submission.json",
job_submission_json)
logging.debug("Created spec files")

except Exception as ex:
tb = traceback.format_exc()
response_body["code"] = status.HTTP_500_INTERNAL_SERVER_ERROR
Expand Down Expand Up @@ -416,6 +422,10 @@ def get(self, algo_id):
try:
job_type = "job-{}".format(algo_id)
response = hysds.get_job_spec(job_type)
if response is None:
return Response(ogc.get_exception(type="FailedSearch", origin_process="DescribeProcess",
ex_message="Algorithm not found. {}".format(job_type)),
status=status.HTTP_404_NOT_FOUND,mimetype='text/xml')
params = response.get("result").get("params")
queue = response.get("result").get("recommended-queues")[0]
response_body = ogc.describe_process_response(algo_id, params, queue)
Expand Down Expand Up @@ -468,9 +478,10 @@ def get(self):
response_body = {"code": None, "message": None}
user = get_authorized_user()
queues = job_queue.get_user_queues(user.id)
queue_names = list(map(lambda q: q.queue_name, queues))

response_body["code"] = status.HTTP_200_OK
response_body["queues"] = queues
response_body["queues"] = queue_names
response_body["message"] = "success"
return response_body
except Exception as ex:
Expand Down
12 changes: 10 additions & 2 deletions api/endpoints/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from api.restplus import api
import api.utils.hysds_util as hysds
import api.utils.ogc_translate as ogc
import api.utils.job_queue as job_queue
import api.settings as settings
try:
import urllib.parse as urlparse
Expand Down Expand Up @@ -60,8 +61,11 @@ def post(self):

try:
dedup = "false" if dedup is None else dedup
queue = hysds.get_recommended_queue(job_type=job_type) if queue is None or queue is "" else queue
response = hysds.mozart_submit_job(job_type=job_type, params=params, dedup=dedup, queue=queue,
user = get_authorized_user()
queue = job_queue.validate_or_get_queue(queue, job_type, user.id)
if job_queue.contains_time_limit(queue):
hysds.set_timelimit_for_dps_sandbox(params, queue)
response = hysds.mozart_submit_job(job_type=job_type, params=params, dedup=dedup, queue=queue.queue_name,
identifier=identifier)

logging.info("Mozart Response: {}".format(json.dumps(response)))
Expand All @@ -78,6 +82,10 @@ def post(self):
return Response(ogc.status_response(job_id=job_id, job_status=job_status), mimetype='text/xml')
else:
raise Exception(response.get("message"))
except ValueError as ex:
logging.error(traceback.format_exc())
return Response(ogc.get_exception(type="FailedJobSubmit", origin_process="Execute",
ex_message=str(ex)), status.HTTP_400_BAD_REQUEST)
except Exception as ex:
logging.info("Error submitting job: {}".format(ex))
return Response(ogc.get_exception(type="FailedJobSubmit", origin_process="Execute",
Expand Down
4 changes: 4 additions & 0 deletions api/models/job_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ class JobQueue(Base):
queue_description = db.Column(db.String())
# Whether the queue is available to public 'Guest' users
guest_tier = db.Column(db.Boolean())
# Whether the queue is used as a default when no queues are specified
is_default = db.Column(db.Boolean())
# The maximum time, in minutes, that jobs are allowed to run using this queue
time_limit_minutes = db.Column(db.Integer)
creation_date = db.Column(db.DateTime())

def __repr__(self):
Expand Down
3 changes: 1 addition & 2 deletions api/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ def str2bool(v):


MAAP_API_URL = os.getenv('MAAP_API_URL', "http://localhost:5000/api")
PROJECT_QUEUE_PREFIX = os.getenv('PROJECT_QUEUE_PREFIX', "maap")
API_HOST_URL = os.getenv('API_HOST_URL', 'http://0.0.0.0:5000/')

# Flask settings
Expand Down Expand Up @@ -59,12 +58,12 @@ def str2bool(v):
MOZART_URL = os.getenv('MOZART_URL', 'https://[MOZART_IP]/mozart/api/v0.2')
MOZART_V1_URL = os.getenv('MOZART_V1_URL', 'https://[MOZART_IP]/mozart/api/v0.1') # new from sister
GRQ_URL = os.getenv('GRQ_URL', 'http://[GRQ_IP]:8878/api/v0.1') # new from sister
DEFAULT_QUEUE = os.getenv('DEFAULT_QUEUE', 'test-job_worker-large')
LW_QUEUE = os.getenv('LW_QUEUE', 'system-jobs-queue')
HYSDS_LW_VERSION = os.getenv('HYSDS_LW_VERSION', 'v1.2.2')
GRQ_REST_URL = os.getenv('GRQ_REST_URL', 'http://[GRQ_IP]/api/v0.1')
S3_CODE_BUCKET = os.getenv('S3_CODE_BUCKET', 's3://[S3_BUCKET_NAME]')
DPS_MACHINE_TOKEN = os.getenv('DPS_MACHINE_TOKEN', '')
PROJECT_QUEUE_PREFIX = os.getenv('PROJECT_QUEUE_PREFIX', "maap")

# FASTBROWSE API
TILER_ENDPOINT = os.getenv('TILER_ENDPOINT', 'https://d852m4cmf5.execute-api.us-east-1.amazonaws.com')
Expand Down
28 changes: 18 additions & 10 deletions api/utils/hysds_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
import time
import copy

import api.utils.job_queue
from api.models import job_queue

log = logging.getLogger(__name__)

STATUS_JOB_STARTED = "job-started"
Expand Down Expand Up @@ -202,7 +205,7 @@ def create_hysds_io(algorithm_description, inputs, verified=False, submission_ty
hysds_io["params"] = params
return hysds_io

def create_job_spec(run_command, inputs, disk_usage, queue_name=settings.DEFAULT_QUEUE, verified=False):
def create_job_spec(run_command, inputs, disk_usage, queue_name, verified=False):
"""
Creates the contents of the job spec file
:param run_command:
Expand Down Expand Up @@ -410,7 +413,7 @@ def get_algorithms():
return maap_algo_list


def mozart_submit_job(job_type, params={}, queue=settings.DEFAULT_QUEUE, dedup="false", identifier="maap-api_submit"):
def mozart_submit_job(job_type, params={}, queue="", dedup="false", identifier="maap-api_submit"):
"""
Submit a job to Mozart
:param job_type:
Expand Down Expand Up @@ -549,7 +552,7 @@ def get_recommended_queue(job_type):
response = get_job_spec(job_type)
recommended_queues = response.get("result", None).get("recommended-queues", None)
recommended_queue = recommended_queues[0] if type(recommended_queues) is list else None
return recommended_queue if recommended_queue != "" else settings.DEFAULT_QUEUE
return recommended_queue if recommended_queue != "" else api.utils.job_queue.get_default_queue().queue_name


def validate_job_submit(hysds_io, user_params):
Expand Down Expand Up @@ -588,8 +591,8 @@ def validate_job_submit(hysds_io, user_params):
if known_params.get(p).get("default") is not None:
validated_params[p] = known_params.get(p).get("default")
else:
raise Exception("Parameter {} missing from inputs. Didn't find any default set for it in "
"algorithm specification. Please specify it and attempt to submit.".format(p))
raise ValueError("Parameter {} missing from inputs. Didn't find any default set for it in "
"algorithm specification. Please specify it and attempt to submit.".format(p))
return validated_params


Expand Down Expand Up @@ -776,8 +779,13 @@ def revoke_mozart_job(job_id, wait_for_completion=False):
return poll_for_completion(lw_job_id)


def pele_get_product_by_id(id):
return



def set_timelimit_for_dps_sandbox(params: dict, queue: job_queue):
"""
Sets the soft_time_limit and time_limit parameters for DPS sandbox queue
at job submission
:param params:
:param queue: Job queue
:return: params
"""
params.update({"soft_time_limit": queue.time_limit_minutes * 60,
"time_limit": queue.time_limit_minutes * 60})
Loading

0 comments on commit 401b86a

Please sign in to comment.