Skip to content

Commit

Permalink
Merge branch 'release_24.1' into release_24.2
Browse files Browse the repository at this point in the history
  • Loading branch information
mvdbeek committed Feb 11, 2025
2 parents c15a897 + c3b6cbf commit 7f6a243
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 14 deletions.
10 changes: 9 additions & 1 deletion lib/galaxy/jobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1518,7 +1518,7 @@ def pause(self, job=None, message=None):
job = self.get_job()
if message is None:
message = "Execution of this dataset's job is paused"
if job.state == job.states.NEW:
if job.state in (job.states.NEW, job.states.QUEUED):
for dataset_assoc in job.output_datasets + job.output_library_datasets:
dataset_assoc.dataset.dataset.state = dataset_assoc.dataset.dataset.states.PAUSED
dataset_assoc.dataset.info = message
Expand Down Expand Up @@ -1618,10 +1618,18 @@ def enqueue(self):
self.set_job_destination(self.job_destination, None, flush=False, job=job)
# Set object store after job destination so can leverage parameters...
self._set_object_store_ids(job)
# Now that we have the object store id, check if we are over the limit
self._pause_job_if_over_quota(job)
with transaction(self.sa_session):
self.sa_session.commit()
return True

def _pause_job_if_over_quota(self, job):
if self.app.quota_agent.is_over_quota(self.app, job, self.job_destination):
log.info("(%d) User (%s) is over quota: job paused" % (job.id, job.user_id))
message = "Execution of this dataset's job is paused because you were over your disk quota at the time it was ready to run"
self.pause(job, message)

def set_job_destination(self, job_destination, external_id=None, flush=True, job=None):
"""Subclasses should implement this to persist a destination, if necessary."""

Expand Down
14 changes: 3 additions & 11 deletions lib/galaxy/jobs/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -562,18 +562,12 @@ def __handle_waiting_jobs(self):
log.info("(%d) Job deleted by user while still queued" % job.id)
elif job_state == JOB_ADMIN_DELETED:
log.info("(%d) Job deleted by admin while still queued" % job.id)
elif job_state in (JOB_USER_OVER_QUOTA, JOB_USER_OVER_TOTAL_WALLTIME):
if job_state == JOB_USER_OVER_QUOTA:
log.info("(%d) User (%s) is over quota: job paused" % (job.id, job.user_id))
what = "your disk quota"
else:
log.info("(%d) User (%s) is over total walltime limit: job paused" % (job.id, job.user_id))
what = "your total job runtime"

elif job_state == JOB_USER_OVER_TOTAL_WALLTIME:
log.info("(%d) User (%s) is over total walltime limit: job paused" % (job.id, job.user_id))
job.set_state(model.Job.states.PAUSED)
for dataset_assoc in job.output_datasets + job.output_library_datasets:
dataset_assoc.dataset.dataset.state = model.Dataset.states.PAUSED
dataset_assoc.dataset.info = f"Execution of this dataset's job is paused because you were over {what} at the time it was ready to run"
dataset_assoc.dataset.info = "Execution of this dataset's job is paused because you were over your total job runtime at the time it was ready to run"
self.sa_session.add(dataset_assoc.dataset.dataset)
self.sa_session.add(job)
elif job_state == JOB_ERROR:
Expand Down Expand Up @@ -747,8 +741,6 @@ def __verify_job_ready(self, job, job_wrapper):

if state == JOB_READY:
state = self.__check_user_jobs(job, job_wrapper)
if state == JOB_READY and self.app.quota_agent.is_over_quota(self.app, job, job_destination):
return JOB_USER_OVER_QUOTA, job_destination
# Check total walltime limits
if state == JOB_READY and "delta" in self.app.job_config.limits.total_walltime:
jobs_to_check = self.sa_session.query(model.Job).filter(
Expand Down
5 changes: 3 additions & 2 deletions lib/galaxy/quota/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import galaxy.util
from galaxy.model.base import transaction
from galaxy.objectstore import is_user_object_store

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -378,8 +379,8 @@ def set_entity_quota_associations(self, quotas=None, users=None, groups=None, de
self.sa_session.commit()

def is_over_quota(self, app, job, job_destination):
# Doesn't work because job.object_store_id until inside handler :_(
# quota_source_label = job.quota_source_label
if is_user_object_store(job.object_store_id):
return False # User object stores are not subject to quotas
if job_destination is not None:
object_store_id = job_destination.params.get("object_store_id", None)
object_store = app.object_store
Expand Down
66 changes: 66 additions & 0 deletions test/integration/objectstore/test_per_user.py
Original file line number Diff line number Diff line change
Expand Up @@ -421,3 +421,69 @@ def test_create_and_upgrade(self):
assert object_store_json["template_version"] == 2
assert "sec1" not in secrets
assert "sec2" in secrets


class TestPerUserObjectStoreQuotaIntegration(BaseUserObjectStoreIntegration):
@classmethod
def handle_galaxy_config_kwds(cls, config):
cls._write_template_and_object_store_config(config, LIBRARY_2)
config["enable_quotas"] = True

def test_user_object_store_does_not_pause_jobs_over_quota(self):
object_store_id = self._create_simple_object_store()
with self.dataset_populator.test_history() as history_id:

def _run_tool(tool_id, inputs, preferred_object_store_id=None):
response = self.dataset_populator.run_tool(
tool_id,
inputs,
history_id,
preferred_object_store_id=preferred_object_store_id,
)
self.dataset_populator.wait_for_history(history_id)
return response

# Create one dataset in the default object store
hda1 = self.dataset_populator.new_dataset(history_id, content="1 2 3\n4 5 6\n7 8 9\n", wait=True)
storage_info = self.dataset_populator.dataset_storage_info(hda1["id"])
assert storage_info["object_store_id"] == "default"

# Set a quota of 1 byte so running a tool will pause the job
self._define_quota_in_bytes(1)

# Run a tool
hda1_input = {"src": "hda", "id": hda1["id"]}
response = _run_tool("multi_data_param", {"f1": hda1_input, "f2": hda1_input})
paused_job_id = response["jobs"][0]["id"]
storage_info, _ = self._storage_info_for_job_output(response)
assert storage_info["object_store_id"] == "default"

# The job should be paused because the default object store is over quota
state = self.dataset_populator.wait_for_job(paused_job_id)
assert state == "paused"

# Set the user object store as the preferred object store
self.dataset_populator.set_user_preferred_object_store_id(object_store_id)

# Run the tool again
response = _run_tool("multi_data_param", {"f1": hda1_input, "f2": hda1_input})
job_id = response["jobs"][0]["id"]
storage_info, _ = self._storage_info_for_job_output(response)
assert storage_info["object_store_id"] == object_store_id

# The job should not be paused because the user object store is not subject to quotas
state = self.dataset_populator.wait_for_job(job_id)
assert state == "ok"

def _define_quota_in_bytes(self, bytes: int):
quotas = self.dataset_populator.get_quotas()
assert len(quotas) == 0

payload = {
"name": "defaultquota1",
"description": "first default quota",
"amount": f"{bytes} bytes",
"operation": "=",
"default": "registered",
}
self.dataset_populator.create_quota(payload)

0 comments on commit 7f6a243

Please sign in to comment.