Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Galaxy ignoring job object_store_id for quota check #19854

Merged
merged 4 commits into from
Mar 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion lib/galaxy/jobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1772,7 +1772,8 @@ def enqueue(self):
return True

def _pause_job_if_over_quota(self, job):
if self.app.quota_agent.is_over_quota(self.app, job, self.job_destination):
quota_source_map = self.app.object_store.get_quota_source_map()
if self.app.quota_agent.is_over_quota(quota_source_map, job):
log.info("(%d) User (%s) is over quota: job paused", job.id, job.user_id)
message = "Execution of this dataset's job is paused because you were over your disk quota at the time it was ready to run"
self.pause(job, message)
Expand Down
23 changes: 7 additions & 16 deletions lib/galaxy/quota/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,8 @@ def get_usage(self, trans=None, user=False, history=False, quota_source_label=No
usage = quota_source_usage.disk_usage
return usage

def is_over_quota(self, app, job, job_destination):
"""Return True if the user or history is over quota for specified job.

job_destination unused currently but an important future application will
be admins and/or users dynamically specifying which object stores to use
and that will likely come in through the job destination.
"""
def is_over_quota(self, quota_source_map, job):
"""Return True if the user or history is over quota for specified job."""


class NoQuotaAgent(QuotaAgent):
Expand All @@ -101,7 +96,7 @@ def get_percent(
) -> Optional[int]:
return None

def is_over_quota(self, app, job, job_destination):
def is_over_quota(self, quota_source_map, job):
return False


Expand Down Expand Up @@ -374,16 +369,12 @@ def set_entity_quota_associations(self, quotas=None, users=None, groups=None, de
self.sa_session.add(gqa)
self.sa_session.commit()

def is_over_quota(self, app, job, job_destination):
def is_over_quota(self, quota_source_map, job):
if is_user_object_store(job.object_store_id):
return False # User object stores are not subject to quotas
if job_destination is not None:
object_store_id = job_destination.params.get("object_store_id", None)
object_store = app.object_store
quota_source_map = object_store.get_quota_source_map()
quota_source_label = quota_source_map.get_quota_source_info(object_store_id).label
else:
quota_source_label = None

quota_source_label = quota_source_map.get_quota_source_info(job.object_store_id).label

quota = self.get_quota(job.user, quota_source_label=quota_source_label)
if quota is not None:
try:
Expand Down
130 changes: 127 additions & 3 deletions test/unit/data/test_quota.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,13 @@
from galaxy import model
from galaxy.model.unittest_utils.utils import random_email
from galaxy.objectstore import (
build_object_store_from_config,
QuotaSourceInfo,
QuotaSourceMap,
serialize_static_object_store_config,
)
from galaxy.objectstore.unittest_utils import (
Config,
)
from galaxy.quota import DatabaseQuotaAgent
from .test_galaxy_mapping import (
Expand Down Expand Up @@ -443,19 +448,138 @@ def _add_user_quota(self, user, quota):
def _assert_user_quota_is(self, user, amount, quota_source_label=None):
actual_quota = self.quota_agent.get_quota(user, quota_source_label=quota_source_label)
assert amount == actual_quota, f"Expected quota [{amount}], got [{actual_quota}]"
quota_source_map = QuotaSourceMap()
if quota_source_label is None:
if amount is None:
user.total_disk_usage = 1000
job = model.Job()
job.user = user
assert not self.quota_agent.is_over_quota(None, job, None)
assert not self.quota_agent.is_over_quota(quota_source_map, job)
else:
job = model.Job()
job.user = user
user.total_disk_usage = amount - 1
assert not self.quota_agent.is_over_quota(None, job, None)
assert not self.quota_agent.is_over_quota(quota_source_map, job)
user.total_disk_usage = amount + 1
assert self.quota_agent.is_over_quota(None, job, None)
assert self.quota_agent.is_over_quota(quota_source_map, job)


class TestQuotaObjectStore(BaseModelTestCase):
def setUp(self):
super().setUp()
u = model.User(email=f"calc_usage{uuid.uuid1()}@example.com", password="password")
self.persist(u)
h = model.History(name="History for Calculated Usage", user=u)
self.persist(h)
self.u = u
self.h = h

self.quota_agent = DatabaseQuotaAgent(self.model)

def test_labeled_quota_objectstore(self):
"""
setup an object store with 3 backends with 2 quota sources
- backends "files" and "legacy" count for a quota source "permanent"
- backend "files-scratch"
setup corresponding default quotas for the quota sources

- add datasets to each of the backends such that the default quotas are (just) not violated
- assert that jobs targeting files / files-scratch pass the quota check

- add datasets such that quotas are violated
- assert that jobs targeting files / files-scratch violate the quota check
"""

DISTRIBUTED_TEST_CONFIG_YAML = """
type: distributed
search_for_missing: true
backends:
- id: "files"
type: disk
device: "files"
weight: 1
store_by: uuid
allow_selection: true
private: false
quota:
source: permanent
name: "Permanent Storage"
description: Data in Permanent Storage is not deleted automatically. Default quota is X.
files_dir: database/files_24.1/
badges:
- type: not_backed_up
- id: "files-scratch"
type: disk
device: "files"
weight: 0
store_by: uuid
allow_selection: true
private: true
quota:
source: scratch
name: "Scratch storage"
description: "Data in scratch storage is scheduled for automatic removal after Y days. Default quota is Z."
files_dir: database/files_24.1/
badges:
- type: not_backed_up
- type: short_term
message: "Data stored here is scheduled for removal after 30 days"
- id: legacy
type: disk
store_by: id
quota:
source: permanent
weight: 0
files_dir: database/files/
"""
with Config(DISTRIBUTED_TEST_CONFIG_YAML) as (directory, object_store):
as_dict = serialize_static_object_store_config(object_store, set())
self.object_store = build_object_store_from_config(None, config_dict=as_dict)

quota = model.Quota(name="default permanent quota", amount=20, quota_source_label="permanent")
self.quota_agent.set_default_quota(
model.DefaultQuotaAssociation.types.REGISTERED,
quota,
)

quota = model.Quota(name="default scratch quota", amount=100, quota_source_label="scratch")
self.quota_agent.set_default_quota(
model.DefaultQuotaAssociation.types.REGISTERED,
quota,
)

self._add_dataset(10, "legacy")
self._add_dataset(10, "files")
self._add_dataset(100, "files-scratch")
self.u.calculate_and_set_disk_usage(self.object_store)

self._run_job("files", False)
self._run_job("files-scratch", False)

self._add_dataset(1, "files")
self._add_dataset(1, "files-scratch")
self.u.calculate_and_set_disk_usage(self.object_store)

self._run_job("files", True)
self._run_job("files-scratch", True)

def _add_dataset(self, total_size, object_store_id=None):
d1 = model.HistoryDatasetAssociation(
extension="txt", history=self.h, create_dataset=True, sa_session=self.model.session
)
d1.dataset.total_size = total_size
d1.dataset.object_store_id = object_store_id
self.persist(d1)
return d1

def _run_job(self, object_store_id, over_quota):
"""
check if a job targeting object_store_id is over_quota
"""
job = model.Job()
job.user = self.u
job.object_store_id = object_store_id
assert over_quota is self.quota_agent.is_over_quota(self.object_store.get_quota_source_map(), job)


class TestUsage(BaseModelTestCase):
Expand Down
Loading