Skip to content

Commit

Permalink
Standardize handling of storage and execution time quotas (#1969)
Browse files Browse the repository at this point in the history
Fixes #1968 

Changes:
- `stopped_quota_reached` and `skipped_quota_reached` migrated to new
values that indicate which quota was reached
- Before crawls are run, the operator checks if storage or exec mins
quotas are reached and if so fails the crawl with the appropriate state
of `skipped_storage_quota_reached` or `skipped_time_quota_reached`
- While crawls are running, the operator checks if the exec mins quota
is reached or if the size of all running crawls will mean the storage
quota is reached once uploaded; if so, the crawl is stopped gracefully
and given `stopped_storage_quota_needed` or `stopped_time_quota_reached`
state as appropriate
- Adds new nightly tests for enforcing storage quota
  • Loading branch information
tw4l authored Jul 25, 2024
1 parent 2c89edc commit d38abbc
Show file tree
Hide file tree
Showing 11 changed files with 305 additions and 36 deletions.
2 changes: 1 addition & 1 deletion backend/btrixcloud/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from .migrations import BaseMigration


CURR_DB_VERSION = "0032"
CURR_DB_VERSION = "0033"


# ============================================================================
Expand Down
98 changes: 98 additions & 0 deletions backend/btrixcloud/migrations/migration_0033_crawl_quota_states.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
"""
Migration 0033 - Standardizing quota-based crawl states
"""

from btrixcloud.migrations import BaseMigration


MIGRATION_VERSION = "0033"


class Migration(BaseMigration):
"""Migration class."""

# pylint: disable=unused-argument
def __init__(self, mdb, **kwargs):
super().__init__(mdb, migration_version=MIGRATION_VERSION)

async def migrate_up(self):
"""Perform migration up.
Migrate skipped_quota_reached state to skipped_storage_quota_reached
Migrate stopped_quota_reached to stopped_time_quota_reached
Also update lastCrawlStates in workflows with these states
"""
crawls_db = self.mdb["crawls"]
crawl_configs_db = self.mdb["crawl_configs"]

## CRAWLS ##

try:
res = await crawls_db.update_many(
{"type": "crawl", "state": "skipped_quota_reached"},
{"$set": {"state": "skipped_storage_quota_reached"}},
)
updated = res.modified_count
print(
f"{updated} crawls with state skipped_quota_reached migrated",
flush=True,
)
# pylint: disable=broad-exception-caught
except Exception as err:
print(
f"Error migrating crawls with state skipped_quota_reached: {err}",
flush=True,
)

try:
res = await crawls_db.update_many(
{"type": "crawl", "state": "stopped_quota_reached"},
{"$set": {"state": "stopped_time_quota_reached"}},
)
updated = res.modified_count
print(
f"{updated} crawls with state stopped_quota_reached migrated",
flush=True,
)
# pylint: disable=broad-exception-caught
except Exception as err:
print(
f"Error migrating crawls with state stopped_quota_reached: {err}",
flush=True,
)

## WORKFLOWS ##

try:
res = await crawl_configs_db.update_many(
{"lastCrawlState": "skipped_quota_reached"},
{"$set": {"lastCrawlState": "skipped_storage_quota_reached"}},
)
updated = res.modified_count
print(
f"{updated} crawl configs with lastCrawlState skipped_quota_reached migrated",
flush=True,
)
# pylint: disable=broad-exception-caught
except Exception as err:
print(
f"Error migrating crawlconfigs with lastCrawlState skipped_quota_reached: {err}",
flush=True,
)

try:
res = await crawl_configs_db.update_many(
{"lastCrawlState": "stopped_quota_reached"},
{"$set": {"lastCrawlState": "stopped_time_quota_reached"}},
)
updated = res.modified_count
print(
f"{updated} crawl configs with lastCrawlState stopped_quota_reached migrated",
flush=True,
)
# pylint: disable=broad-exception-caught
except Exception as err:
print(
f"Error migrating crawl configs with lastCrawlState stopped_quota_reached: {err}",
flush=True,
)
14 changes: 12 additions & 2 deletions backend/btrixcloud/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,10 +211,20 @@ class UserOut(BaseModel):
TYPE_STARTING_STATES = Literal["starting", "waiting_capacity", "waiting_org_limit"]
STARTING_STATES = get_args(TYPE_STARTING_STATES)

TYPE_FAILED_STATES = Literal["canceled", "failed", "skipped_quota_reached"]
TYPE_FAILED_STATES = Literal[
"canceled",
"failed",
"skipped_storage_quota_reached",
"skipped_time_quota_reached",
]
FAILED_STATES = get_args(TYPE_FAILED_STATES)

TYPE_SUCCESSFUL_STATES = Literal["complete", "stopped_by_user", "stopped_quota_reached"]
TYPE_SUCCESSFUL_STATES = Literal[
"complete",
"stopped_by_user",
"stopped_storage_quota_reached",
"stopped_time_quota_reached",
]
SUCCESSFUL_STATES = get_args(TYPE_SUCCESSFUL_STATES)

TYPE_RUNNING_AND_STARTING_STATES = Literal[TYPE_STARTING_STATES, TYPE_RUNNING_STATES]
Expand Down
68 changes: 45 additions & 23 deletions backend/btrixcloud/operator/crawls.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,16 +193,25 @@ async def sync_crawls(self, data: MCSyncData):
return {"status": status.dict(exclude_none=True), "children": []}

# first, check storage quota, and fail immediately if quota reached
if status.state in ("starting", "skipped_quota_reached"):
if status.state in (
"starting",
"skipped_storage_quota_reached",
"skipped_time_quota_reached",
):
# only check on very first run, before any pods/pvcs created
# for now, allow if crawl has already started (pods/pvcs created)
if (
not pods
and not data.children[PVC]
and await self.org_ops.storage_quota_reached(crawl.oid)
):
await self.mark_finished(crawl, status, "skipped_quota_reached")
return self._empty_response(status)
if not pods and not data.children[PVC]:
if await self.org_ops.storage_quota_reached(crawl.oid):
await self.mark_finished(
crawl, status, "skipped_storage_quota_reached"
)
return self._empty_response(status)

if await self.org_ops.exec_mins_quota_reached(crawl.oid):
await self.mark_finished(
crawl, status, "skipped_time_quota_reached"
)
return self._empty_response(status)

if status.state in ("starting", "waiting_org_limit"):
if not await self.can_start_new(crawl, data, status):
Expand All @@ -216,13 +225,7 @@ async def sync_crawls(self, data: MCSyncData):
for pod_name, pod in pods.items():
self.sync_resources(status, pod_name, pod, data.children)

status = await self.sync_crawl_state(
redis_url,
crawl,
status,
pods,
data.related.get(METRICS, {}),
)
status = await self.sync_crawl_state(redis_url, crawl, status, pods, data)

if self.k8s.enable_auto_resize:
# auto sizing handled here
Expand Down Expand Up @@ -743,7 +746,7 @@ async def sync_crawl_state(
crawl: CrawlSpec,
status: CrawlStatus,
pods: dict[str, dict],
metrics: Optional[dict],
data: MCSyncData,
):
"""sync crawl state for running crawl"""
# check if at least one crawler pod started running
Expand All @@ -752,6 +755,8 @@ async def sync_crawl_state(
)
redis = None

metrics = data.related.get(METRICS, {})

try:
if redis_running:
redis = await self._get_redis(redis_url)
Expand Down Expand Up @@ -851,7 +856,7 @@ async def sync_crawl_state(

# update stats and get status
return await self.update_crawl_state(
redis, crawl, status, pods, pod_done_count
redis, crawl, status, pods, pod_done_count, data
)

# pylint: disable=broad-except
Expand Down Expand Up @@ -1201,7 +1206,7 @@ async def add_file_to_crawl(self, cc_data, crawl, redis):
return True

async def is_crawl_stopping(
self, crawl: CrawlSpec, status: CrawlStatus
self, crawl: CrawlSpec, status: CrawlStatus, data: MCSyncData
) -> Optional[StopReason]:
"""check if crawl is stopping and set reason"""
# if user requested stop, then enter stopping phase
Expand All @@ -1228,9 +1233,23 @@ async def is_crawl_stopping(
print(f"Graceful Stop: Maximum crawl size {crawl.max_crawl_size} hit")
return "size-limit"

# check exec time quotas and stop if reached limit
# gracefully stop crawl if current running crawl sizes reach storage quota
org = await self.org_ops.get_org_by_id(crawl.oid)

running_crawls_total_size = 0
for crawl_sorted in data.related[CJS].values():
crawl_status = crawl_sorted.get("status", {})
if crawl_status:
running_crawls_total_size += crawl_status.get("size", 0)

if org.quotas.storageQuota and (
org.bytesStored + running_crawls_total_size >= org.quotas.storageQuota
):
return "stopped_storage_quota_reached"

# gracefully stop crawl is execution time quota is reached
if await self.org_ops.exec_mins_quota_reached(crawl.oid):
return "stopped_quota_reached"
return "stopped_time_quota_reached"

return None

Expand Down Expand Up @@ -1264,6 +1283,7 @@ async def update_crawl_state(
status: CrawlStatus,
pods: dict[str, dict],
pod_done_count: int,
data: MCSyncData,
) -> CrawlStatus:
"""update crawl state and check if crawl is now done"""
results = await redis.hgetall(f"{crawl.id}:status")
Expand All @@ -1289,7 +1309,7 @@ async def update_crawl_state(
pod_info.used.storage = value

if not status.stopReason:
status.stopReason = await self.is_crawl_stopping(crawl, status)
status.stopReason = await self.is_crawl_stopping(crawl, status, data)
status.stopping = status.stopReason is not None

# mark crawl as stopping
Expand Down Expand Up @@ -1329,8 +1349,10 @@ async def update_crawl_state(
state: TYPE_NON_RUNNING_STATES
if status.stopReason == "stopped_by_user":
state = "stopped_by_user"
elif status.stopReason == "stopped_quota_reached":
state = "stopped_quota_reached"
elif status.stopReason == "stopped_storage_quota_reached":
state = "stopped_storage_quota_reached"
elif status.stopReason == "stopped_time_quota_reached":
state = "stopped_time_quota_reached"
else:
state = "complete"

Expand Down
6 changes: 5 additions & 1 deletion backend/btrixcloud/operator/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@
CJS = f"CrawlJob.{BTRIX_API}"

StopReason = Literal[
"stopped_by_user", "time-limit", "size-limit", "stopped_quota_reached"
"stopped_by_user",
"time-limit",
"size-limit",
"stopped_storage_quota_reached",
"stopped_time_quota_reached",
]


Expand Down
2 changes: 1 addition & 1 deletion backend/btrixcloud/orgs.py
Original file line number Diff line number Diff line change
Expand Up @@ -758,7 +758,7 @@ async def exec_mins_quota_reached(
return False

async def get_org_storage_quota(self, oid: UUID) -> int:
"""return max allowed concurrent crawls, if any"""
"""return org storage quota, if any"""
org_data = await self.orgs.find_one({"_id": oid})
if org_data:
org = Organization.from_dict(org_data)
Expand Down
14 changes: 12 additions & 2 deletions backend/test_nightly/test_execution_minutes_quota.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def test_crawl_stopped_when_quota_reached(org_with_quotas, admin_auth_headers):
# Ensure that crawl was stopped by quota
assert (
get_crawl_status(org_with_quotas, crawl_id, admin_auth_headers)
== "stopped_quota_reached"
== "stopped_time_quota_reached"
)

time.sleep(5)
Expand Down Expand Up @@ -134,7 +134,7 @@ def test_crawl_stopped_when_quota_reached_with_extra(
# Ensure that crawl was stopped by quota
assert (
get_crawl_status(org_with_quotas, crawl_id, admin_auth_headers)
== "stopped_quota_reached"
== "stopped_time_quota_reached"
)

time.sleep(5)
Expand Down Expand Up @@ -183,3 +183,13 @@ def run_crawl(org_id, headers):

def get_total_exec_seconds(execSeconds: Dict[str, int]) -> int:
return sum(list(execSeconds.values()))


def test_unset_execution_mins_quota(org_with_quotas, admin_auth_headers):
r = requests.post(
f"{API_PREFIX}/orgs/{org_with_quotas}/quotas",
headers=admin_auth_headers,
json={"maxExecMinutesPerMonth": 0},
)
data = r.json()
assert data.get("updated") == True
Loading

0 comments on commit d38abbc

Please sign in to comment.