Skip to content

Commit

Permalink
quota: avoid accessing disk when updating user disk quotas
Browse files Browse the repository at this point in the history
Partially addresses #193
  • Loading branch information
mdonadoni committed Sep 1, 2023
1 parent 8cc26f7 commit 64e23a4
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 8 deletions.
2 changes: 1 addition & 1 deletion reana_db/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -758,8 +758,8 @@ def _update_disk_quota(workflow):
return

try:
update_users_disk_quota(user=workflow.owner)
store_workflow_disk_quota(workflow)
update_users_disk_quota(user=workflow.owner)
except Exception as e:
logging.error(f"Failed to update disk quota: \n{e}\nContinuing...")

Expand Down
33 changes: 29 additions & 4 deletions reana_db/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,9 @@ def update_users_disk_quota(
) -> None:
"""Update users disk quota usage.
User disk quotas will be calculated from workflow disk quotas,
so the latter should be updated before the former.
:param user: User whose disk quota will be updated. If None, applies to all users.
:param bytes_to_sum: Amount of bytes to sum to user disk quota,
if None, `du` will be used to recalculate it.
Expand All @@ -309,9 +312,14 @@ def update_users_disk_quota(
:param override_policy_checks: Whether to update the disk quota without checking the
update policy.
"""
from reana_db.config import DEFAULT_QUOTA_RESOURCES
from reana_db.database import Session
from reana_db.models import Resource, ResourceType, User, UserResource
from reana_db.models import (
Workflow,
WorkflowResource,
ResourceType,
User,
UserResource,
)

if not override_policy_checks and should_skip_quota_update(ResourceType.disk):
return
Expand All @@ -336,8 +344,25 @@ def update_users_disk_quota(
else:
user_resource_quota.quota_used = updated_quota_usage
else:
workspace_path = u.get_user_workspace()
disk_usage_bytes = get_disk_usage_or_zero(workspace_path)
# get the size of each workspace of each workflow of the given user
size_per_workspace = (
Session.query(
Workflow.workspace_path,
func.max(WorkflowResource.quota_used).label("quota_used"),
)
.filter(WorkflowResource.workflow_id == Workflow.id_)
.filter(WorkflowResource.resource_id == disk_resource.id_)
.filter(Workflow.id_.in_(Session.query(u.workflows.subquery().c.id_)))
# multiple workflows might have the same workspace path, so the query groups
# by `workspace_path` in order to consider each workspace only once
.group_by(Workflow.workspace_path)
.subquery()
)
disk_usage_bytes = Session.query(
func.sum(size_per_workspace.c.quota_used)
).scalar()
if not disk_usage_bytes:
disk_usage_bytes = 0
user_resource_quota.quota_used = disk_usage_bytes
Session.commit()
timer.count_event()
Expand Down
6 changes: 3 additions & 3 deletions tests/test_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ def test_workflow_termination_user_quota_usage(
quota_usage["cpu"]["usage"]["raw"] == 0

if ResourceType.disk.name in workflow_termination_quota_update_policy:
assert quota_usage["disk"]["usage"]["raw"] == 128
assert quota_usage["disk"]["usage"]["raw"] == 128 * num_workflows
else:
quota_usage["disk"]["usage"]["raw"] == 0

Expand Down Expand Up @@ -425,7 +425,7 @@ def test_all_users_disk_quota_usage_update(
if workflow_termination_quota_update_policy:
for wf in workflows:
assert wf.resources[0].quota_used == dir_size
assert quota_disk_usage == dir_size
assert quota_disk_usage == dir_size * num_workflows
else:
assert quota_disk_usage == 0

Expand All @@ -436,7 +436,7 @@ def test_all_users_disk_quota_usage_update(
if workflow_termination_quota_update_policy or periodic_update:
for wf in workflows:
assert wf.resources[0].quota_used == dir_size
assert quota_disk_usage == dir_size
assert quota_disk_usage == dir_size * num_workflows
else:
assert quota_disk_usage == 0

Expand Down

0 comments on commit 64e23a4

Please sign in to comment.