Skip to content

Commit

Permalink
optimize org quota lookups (#1973)
Browse files Browse the repository at this point in the history
- instead of looking up storage and exec min quotas from oid, and
loading an org each time, load org once and then check quotas on the org
object - many times the org was already available, and was looked up
again
- storage and exec quota checks become sync
- rename can_run_crawl() to more generic can_write_data(), optionally
also checks exec minutes
- typing: get_org_by_id() always returns org, or throws, adjust methods
accordingly (don't check for none, catch exception)
- typing: fix typo in BaseOperator, catch type errors in operator
'org_ops'
- operator quota check: use up-to-date 'status.size' for current job,
ignore current job in all jobs list to avoid double-counting
- follow up to #1969
  • Loading branch information
ikreymer authored Jul 25, 2024
1 parent dd6c33a commit 94e985a
Show file tree
Hide file tree
Showing 11 changed files with 114 additions and 178 deletions.
2 changes: 1 addition & 1 deletion backend/btrixcloud/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def verify_password(plain_password: str, hashed_password: str) -> bool:
# ============================================================================
def verify_and_update_password(
plain_password: str, hashed_password: str
) -> Tuple[bool, str]:
) -> Tuple[bool, Optional[str]]:
"""verify password and return updated hash, if any"""
return PWD_CONTEXT.verify_and_update(plain_password, hashed_password)

Expand Down
13 changes: 8 additions & 5 deletions backend/btrixcloud/basecrawls.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,10 +187,11 @@ async def get_crawl_out(
if crawl.config and crawl.config.seeds:
crawl.config.seeds = None

crawl.storageQuotaReached = await self.orgs.storage_quota_reached(crawl.oid)
crawl.execMinutesQuotaReached = await self.orgs.exec_mins_quota_reached(
crawl.oid
)
if not org:
org = await self.orgs.get_org_by_id(crawl.oid)

crawl.storageQuotaReached = self.orgs.storage_quota_reached(org)
crawl.execMinutesQuotaReached = self.orgs.exec_mins_quota_reached(org)

return crawl

Expand Down Expand Up @@ -356,7 +357,9 @@ async def delete_crawls(
query = {"_id": {"$in": delete_list.crawl_ids}, "oid": org.id, "type": type_}
res = await self.crawls.delete_many(query)

quota_reached = await self.orgs.inc_org_bytes_stored(org.id, -size, type_)
await self.orgs.inc_org_bytes_stored(org.id, -size, type_)

quota_reached = self.orgs.storage_quota_reached(org)

return res.deleted_count, cids_to_update, quota_reached

Expand Down
14 changes: 6 additions & 8 deletions backend/btrixcloud/crawlconfigs.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ async def add_crawl_config(
crawlconfig.lastStartedByName = user.name

# Ensure page limit is below org maxPagesPerCall if set
max_pages = await self.org_ops.get_max_pages_per_crawl(org.id)
max_pages = org.quotas.maxPagesPerCrawl or 0
if max_pages > 0:
crawlconfig.config.limit = max_pages

Expand All @@ -248,8 +248,8 @@ async def add_crawl_config(
exec_mins_quota_reached = True
print(f"Can't run crawl now: {e.detail}", flush=True)
else:
storage_quota_reached = await self.org_ops.storage_quota_reached(org.id)
exec_mins_quota_reached = await self.org_ops.exec_mins_quota_reached(org.id)
storage_quota_reached = self.org_ops.storage_quota_reached(org)
exec_mins_quota_reached = self.org_ops.exec_mins_quota_reached(org)

return CrawlConfigAddedResponse(
added=True,
Expand Down Expand Up @@ -406,10 +406,8 @@ async def update_crawl_config(
"updated": True,
"settings_changed": changed,
"metadata_changed": metadata_changed,
"storageQuotaReached": await self.org_ops.storage_quota_reached(org.id),
"execMinutesQuotaReached": await self.org_ops.exec_mins_quota_reached(
org.id
),
"storageQuotaReached": self.org_ops.storage_quota_reached(org),
"execMinutesQuotaReached": self.org_ops.exec_mins_quota_reached(org),
}
if run_now:
crawl_id = await self.run_now(cid, org, user)
Expand Down Expand Up @@ -827,7 +825,7 @@ async def run_now_internal(
self, crawlconfig: CrawlConfig, org: Organization, user: User
) -> str:
"""run new crawl for specified crawlconfig now"""
await self.org_ops.can_run_crawls(org)
self.org_ops.can_write_data(org)

if await self.get_running_crawl(crawlconfig):
raise HTTPException(status_code=400, detail="crawl_already_running")
Expand Down
2 changes: 1 addition & 1 deletion backend/btrixcloud/crawls.py
Original file line number Diff line number Diff line change
Expand Up @@ -813,7 +813,7 @@ async def start_crawl_qa_run(
if not crawl.cid or crawl.type != "crawl":
raise HTTPException(status_code=400, detail="invalid_crawl_for_qa")

await self.orgs.can_run_crawls(org)
self.orgs.can_write_data(org)

crawlconfig = await self.crawl_configs.get_crawl_config(crawl.cid, org.id)

Expand Down
7 changes: 3 additions & 4 deletions backend/btrixcloud/operator/baseoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,13 +144,13 @@ class BaseOperator:
k8s: K8sOpAPI
crawl_config_ops: CrawlConfigOps
crawl_ops: CrawlOps
orgs_ops: OrgOps
org_ops: OrgOps
coll_ops: CollectionOps
storage_ops: StorageOps
event_webhook_ops: EventWebhookOps
background_job_ops: BackgroundJobOps
user_ops: UserManager
event_webhook_ops: EventWebhookOps
page_ops: PageOps
user_ops: UserManager

def __init__(
self,
Expand All @@ -173,7 +173,6 @@ def __init__(
self.background_job_ops = background_job_ops
self.event_webhook_ops = event_webhook_ops
self.page_ops = page_ops

self.user_ops = crawl_config_ops.user_manager

# to avoid background tasks being garbage collected
Expand Down
64 changes: 35 additions & 29 deletions backend/btrixcloud/operator/crawls.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
CrawlFile,
CrawlCompleteIn,
StorageRef,
Organization,
)

from btrixcloud.utils import from_k8s_date, to_k8s_date, dt_now
Expand Down Expand Up @@ -192,6 +193,8 @@ async def sync_crawls(self, data: MCSyncData):
await self.k8s.delete_crawl_job(crawl.id)
return {"status": status.dict(exclude_none=True), "children": []}

org = None

# first, check storage quota, and fail immediately if quota reached
if status.state in (
"starting",
Expand All @@ -201,20 +204,21 @@ async def sync_crawls(self, data: MCSyncData):
# only check on very first run, before any pods/pvcs created
# for now, allow if crawl has already started (pods/pvcs created)
if not pods and not data.children[PVC]:
if await self.org_ops.storage_quota_reached(crawl.oid):
org = await self.org_ops.get_org_by_id(crawl.oid)
if self.org_ops.storage_quota_reached(org):
await self.mark_finished(
crawl, status, "skipped_storage_quota_reached"
)
return self._empty_response(status)

if await self.org_ops.exec_mins_quota_reached(crawl.oid):
if self.org_ops.exec_mins_quota_reached(org):
await self.mark_finished(
crawl, status, "skipped_time_quota_reached"
)
return self._empty_response(status)

if status.state in ("starting", "waiting_org_limit"):
if not await self.can_start_new(crawl, data, status):
if not await self.can_start_new(crawl, data, status, org):
return self._empty_response(status)

await self.set_state(
Expand Down Expand Up @@ -359,7 +363,6 @@ async def _load_qa_configmap(self, params, children):

params["name"] = name
params["qa_source_replay_json"] = crawl_replay.json(include={"resources"})
print(params["qa_source_replay_json"])
return self.load_from_yaml("qa_configmap.yaml", params)

def _load_crawler(self, params, i, status, children):
Expand Down Expand Up @@ -574,10 +577,19 @@ def get_related(self, data: MCBaseRequest):

return {"relatedResources": related_resources}

async def can_start_new(self, crawl: CrawlSpec, data: MCSyncData, status):
async def can_start_new(
self,
crawl: CrawlSpec,
data: MCSyncData,
status: CrawlStatus,
org: Optional[Organization] = None,
):
"""return true if crawl can start, otherwise set crawl to 'queued' state
until more crawls for org finish"""
max_crawls = await self.org_ops.get_max_concurrent_crawls(crawl.oid)
if not org:
org = await self.org_ops.get_org_by_id(crawl.oid)

max_crawls = org.quotas.maxConcurrentCrawls or 0
if not max_crawls:
return True

Expand All @@ -586,20 +598,12 @@ async def can_start_new(self, crawl: CrawlSpec, data: MCSyncData, status):

name = data.parent.get("metadata", {}).get("name")

# def metadata_key(val):
# return val.get("metadata").get("creationTimestamp")

# all_crawljobs = sorted(data.related[CJS].values(), key=metadata_key)
# print(list(data.related[CJS].keys()))

i = 0
for crawl_sorted in data.related[CJS].values():
if crawl_sorted.get("status", {}).get("state") in NON_RUNNING_STATES:
continue

# print(i, crawl_sorted.get("metadata").get("name"))
if crawl_sorted.get("metadata").get("name") == name:
# print("found: ", name, "index", i)
if i < max_crawls:
return True

Expand Down Expand Up @@ -1211,7 +1215,6 @@ async def is_crawl_stopping(
"""check if crawl is stopping and set reason"""
# if user requested stop, then enter stopping phase
if crawl.stopping:
print("Graceful Stop: User requested stop")
return "stopped_by_user"

# check timeout if timeout time exceeds elapsed time
Expand All @@ -1223,32 +1226,33 @@ async def is_crawl_stopping(
).total_seconds()

if elapsed > crawl.timeout:
print(
f"Graceful Stop: Crawl running time exceeded {crawl.timeout} second timeout"
)
return "time-limit"

# crawl size limit
if crawl.max_crawl_size and status.size > crawl.max_crawl_size:
print(f"Graceful Stop: Maximum crawl size {crawl.max_crawl_size} hit")
return "size-limit"

# gracefully stop crawl if current running crawl sizes reach storage quota
org = await self.org_ops.get_org_by_id(crawl.oid)

running_crawls_total_size = 0
for crawl_sorted in data.related[CJS].values():
crawl_status = crawl_sorted.get("status", {})
if crawl_status:
running_crawls_total_size += crawl_status.get("size", 0)
if org.quotas.storageQuota:
running_crawls_total_size = status.size
for crawl_job in data.related[CJS].values():
# if the job id matches current crawl job, then skip
# this job to avoid double-counting
# using the more up-to-date 'status.size' for this job
if crawl_job.get("spec", {}).get("id") == crawl.id:
continue

if org.quotas.storageQuota and (
org.bytesStored + running_crawls_total_size >= org.quotas.storageQuota
):
return "stopped_storage_quota_reached"
crawl_status = crawl_job.get("status", {})
if crawl_status:
running_crawls_total_size += crawl_status.get("size", 0)

if self.org_ops.storage_quota_reached(org, running_crawls_total_size):
return "stopped_storage_quota_reached"

# gracefully stop crawl is execution time quota is reached
if await self.org_ops.exec_mins_quota_reached(crawl.oid):
if self.org_ops.exec_mins_quota_reached(org):
return "stopped_time_quota_reached"

return None
Expand Down Expand Up @@ -1311,6 +1315,8 @@ async def update_crawl_state(
if not status.stopReason:
status.stopReason = await self.is_crawl_stopping(crawl, status, data)
status.stopping = status.stopReason is not None
if status.stopping:
print("Crawl gracefully stopping: {status.stopReason}, id: {crawl.id}")

# mark crawl as stopping
if status.stopping:
Expand Down
2 changes: 1 addition & 1 deletion backend/btrixcloud/operator/cronjobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ async def make_new_crawljob(
cid=str(cid),
userid=str(userid),
oid=str(oid),
storage=org.storage,
storage=str(org.storage),
crawler_channel=crawlconfig.crawlerChannel or "default",
scale=crawlconfig.scale,
crawl_timeout=crawlconfig.crawlTimeout,
Expand Down
Loading

0 comments on commit 94e985a

Please sign in to comment.