diff --git a/backend/btrixcloud/auth.py b/backend/btrixcloud/auth.py index 29508d52b..e08f31a54 100644 --- a/backend/btrixcloud/auth.py +++ b/backend/btrixcloud/auth.py @@ -123,7 +123,7 @@ def verify_password(plain_password: str, hashed_password: str) -> bool: # ============================================================================ def verify_and_update_password( plain_password: str, hashed_password: str -) -> Tuple[bool, str]: +) -> Tuple[bool, Optional[str]]: """verify password and return updated hash, if any""" return PWD_CONTEXT.verify_and_update(plain_password, hashed_password) diff --git a/backend/btrixcloud/basecrawls.py b/backend/btrixcloud/basecrawls.py index 0b21c0f37..9a654a354 100644 --- a/backend/btrixcloud/basecrawls.py +++ b/backend/btrixcloud/basecrawls.py @@ -187,10 +187,11 @@ async def get_crawl_out( if crawl.config and crawl.config.seeds: crawl.config.seeds = None - crawl.storageQuotaReached = await self.orgs.storage_quota_reached(crawl.oid) - crawl.execMinutesQuotaReached = await self.orgs.exec_mins_quota_reached( - crawl.oid - ) + if not org: + org = await self.orgs.get_org_by_id(crawl.oid) + + crawl.storageQuotaReached = self.orgs.storage_quota_reached(org) + crawl.execMinutesQuotaReached = self.orgs.exec_mins_quota_reached(org) return crawl @@ -356,7 +357,9 @@ async def delete_crawls( query = {"_id": {"$in": delete_list.crawl_ids}, "oid": org.id, "type": type_} res = await self.crawls.delete_many(query) - quota_reached = await self.orgs.inc_org_bytes_stored(org.id, -size, type_) + await self.orgs.inc_org_bytes_stored(org.id, -size, type_) + + quota_reached = self.orgs.storage_quota_reached(org) return res.deleted_count, cids_to_update, quota_reached diff --git a/backend/btrixcloud/crawlconfigs.py b/backend/btrixcloud/crawlconfigs.py index 5289748b4..60fd36ea1 100644 --- a/backend/btrixcloud/crawlconfigs.py +++ b/backend/btrixcloud/crawlconfigs.py @@ -225,7 +225,7 @@ async def add_crawl_config( crawlconfig.lastStartedByName = user.name # Ensure page limit is below org maxPagesPerCall if set - max_pages = await self.org_ops.get_max_pages_per_crawl(org.id) + max_pages = org.quotas.maxPagesPerCrawl or 0 if max_pages > 0: crawlconfig.config.limit = max_pages @@ -248,8 +248,8 @@ async def add_crawl_config( exec_mins_quota_reached = True print(f"Can't run crawl now: {e.detail}", flush=True) else: - storage_quota_reached = await self.org_ops.storage_quota_reached(org.id) - exec_mins_quota_reached = await self.org_ops.exec_mins_quota_reached(org.id) + storage_quota_reached = self.org_ops.storage_quota_reached(org) + exec_mins_quota_reached = self.org_ops.exec_mins_quota_reached(org) return CrawlConfigAddedResponse( added=True, @@ -406,10 +406,8 @@ async def update_crawl_config( "updated": True, "settings_changed": changed, "metadata_changed": metadata_changed, - "storageQuotaReached": await self.org_ops.storage_quota_reached(org.id), - "execMinutesQuotaReached": await self.org_ops.exec_mins_quota_reached( - org.id - ), + "storageQuotaReached": self.org_ops.storage_quota_reached(org), + "execMinutesQuotaReached": self.org_ops.exec_mins_quota_reached(org), } if run_now: crawl_id = await self.run_now(cid, org, user) @@ -827,7 +825,7 @@ async def run_now_internal( self, crawlconfig: CrawlConfig, org: Organization, user: User ) -> str: """run new crawl for specified crawlconfig now""" - await self.org_ops.can_run_crawls(org) + self.org_ops.can_write_data(org) if await self.get_running_crawl(crawlconfig): raise HTTPException(status_code=400, detail="crawl_already_running") diff --git a/backend/btrixcloud/crawls.py b/backend/btrixcloud/crawls.py index c5203fbfc..1badc7ec7 100644 --- a/backend/btrixcloud/crawls.py +++ b/backend/btrixcloud/crawls.py @@ -813,7 +813,7 @@ async def start_crawl_qa_run( if not crawl.cid or crawl.type != "crawl": raise HTTPException(status_code=400, detail="invalid_crawl_for_qa") - await self.orgs.can_run_crawls(org) + self.orgs.can_write_data(org) crawlconfig = await self.crawl_configs.get_crawl_config(crawl.cid, org.id) diff --git a/backend/btrixcloud/operator/baseoperator.py b/backend/btrixcloud/operator/baseoperator.py index 32ea60155..ab9fe4341 100644 --- a/backend/btrixcloud/operator/baseoperator.py +++ b/backend/btrixcloud/operator/baseoperator.py @@ -144,13 +144,13 @@ class BaseOperator: k8s: K8sOpAPI crawl_config_ops: CrawlConfigOps crawl_ops: CrawlOps - orgs_ops: OrgOps + org_ops: OrgOps coll_ops: CollectionOps storage_ops: StorageOps - event_webhook_ops: EventWebhookOps background_job_ops: BackgroundJobOps - user_ops: UserManager + event_webhook_ops: EventWebhookOps page_ops: PageOps + user_ops: UserManager def __init__( self, @@ -173,7 +173,6 @@ def __init__( self.background_job_ops = background_job_ops self.event_webhook_ops = event_webhook_ops self.page_ops = page_ops - self.user_ops = crawl_config_ops.user_manager # to avoid background tasks being garbage collected diff --git a/backend/btrixcloud/operator/crawls.py b/backend/btrixcloud/operator/crawls.py index 6ba24d8d1..8edc70a04 100644 --- a/backend/btrixcloud/operator/crawls.py +++ b/backend/btrixcloud/operator/crawls.py @@ -27,6 +27,7 @@ CrawlFile, CrawlCompleteIn, StorageRef, + Organization, ) from btrixcloud.utils import from_k8s_date, to_k8s_date, dt_now @@ -192,6 +193,8 @@ async def sync_crawls(self, data: MCSyncData): await self.k8s.delete_crawl_job(crawl.id) return {"status": status.dict(exclude_none=True), "children": []} + org = None + # first, check storage quota, and fail immediately if quota reached if status.state in ( "starting", @@ -201,20 +204,21 @@ async def sync_crawls(self, data: MCSyncData): # only check on very first run, before any pods/pvcs created # for now, allow if crawl has already started (pods/pvcs created) if not pods and not data.children[PVC]: - if await self.org_ops.storage_quota_reached(crawl.oid): + org = await self.org_ops.get_org_by_id(crawl.oid) + if self.org_ops.storage_quota_reached(org): await self.mark_finished( crawl, status, "skipped_storage_quota_reached" ) return self._empty_response(status) - if await self.org_ops.exec_mins_quota_reached(crawl.oid): + if self.org_ops.exec_mins_quota_reached(org): await self.mark_finished( crawl, status, "skipped_time_quota_reached" ) return self._empty_response(status) if status.state in ("starting", "waiting_org_limit"): - if not await self.can_start_new(crawl, data, status): + if not await self.can_start_new(crawl, data, status, org): return self._empty_response(status) await self.set_state( @@ -359,7 +363,6 @@ async def _load_qa_configmap(self, params, children): params["name"] = name params["qa_source_replay_json"] = crawl_replay.json(include={"resources"}) - print(params["qa_source_replay_json"]) return self.load_from_yaml("qa_configmap.yaml", params) def _load_crawler(self, params, i, status, children): @@ -574,10 +577,19 @@ def get_related(self, data: MCBaseRequest): return {"relatedResources": related_resources} - async def can_start_new(self, crawl: CrawlSpec, data: MCSyncData, status): + async def can_start_new( + self, + crawl: CrawlSpec, + data: MCSyncData, + status: CrawlStatus, + org: Optional[Organization] = None, + ): """return true if crawl can start, otherwise set crawl to 'queued' state until more crawls for org finish""" - max_crawls = await self.org_ops.get_max_concurrent_crawls(crawl.oid) + if not org: + org = await self.org_ops.get_org_by_id(crawl.oid) + + max_crawls = org.quotas.maxConcurrentCrawls or 0 if not max_crawls: return True @@ -586,20 +598,12 @@ async def can_start_new(self, crawl: CrawlSpec, data: MCSyncData, status): name = data.parent.get("metadata", {}).get("name") - # def metadata_key(val): - # return val.get("metadata").get("creationTimestamp") - - # all_crawljobs = sorted(data.related[CJS].values(), key=metadata_key) - # print(list(data.related[CJS].keys())) - i = 0 for crawl_sorted in data.related[CJS].values(): if crawl_sorted.get("status", {}).get("state") in NON_RUNNING_STATES: continue - # print(i, crawl_sorted.get("metadata").get("name")) if crawl_sorted.get("metadata").get("name") == name: - # print("found: ", name, "index", i) if i < max_crawls: return True @@ -1211,7 +1215,6 @@ async def is_crawl_stopping( """check if crawl is stopping and set reason""" # if user requested stop, then enter stopping phase if crawl.stopping: - print("Graceful Stop: User requested stop") return "stopped_by_user" # check timeout if timeout time exceeds elapsed time @@ -1223,32 +1226,33 @@ async def is_crawl_stopping( ).total_seconds() if elapsed > crawl.timeout: - print( - f"Graceful Stop: Crawl running time exceeded {crawl.timeout} second timeout" - ) return "time-limit" # crawl size limit if crawl.max_crawl_size and status.size > crawl.max_crawl_size: - print(f"Graceful Stop: Maximum crawl size {crawl.max_crawl_size} hit") return "size-limit" # gracefully stop crawl if current running crawl sizes reach storage quota org = await self.org_ops.get_org_by_id(crawl.oid) - running_crawls_total_size = 0 - for crawl_sorted in data.related[CJS].values(): - crawl_status = crawl_sorted.get("status", {}) - if crawl_status: - running_crawls_total_size += crawl_status.get("size", 0) + if org.quotas.storageQuota: + running_crawls_total_size = status.size + for crawl_job in data.related[CJS].values(): + # if the job id matches current crawl job, then skip + # this job to avoid double-counting + # using the more up-to-date 'status.size' for this job + if crawl_job.get("spec", {}).get("id") == crawl.id: + continue - if org.quotas.storageQuota and ( - org.bytesStored + running_crawls_total_size >= org.quotas.storageQuota - ): - return "stopped_storage_quota_reached" + crawl_status = crawl_job.get("status", {}) + if crawl_status: + running_crawls_total_size += crawl_status.get("size", 0) + + if self.org_ops.storage_quota_reached(org, running_crawls_total_size): + return "stopped_storage_quota_reached" # gracefully stop crawl is execution time quota is reached - if await self.org_ops.exec_mins_quota_reached(crawl.oid): + if self.org_ops.exec_mins_quota_reached(org): return "stopped_time_quota_reached" return None @@ -1311,6 +1315,8 @@ async def update_crawl_state( if not status.stopReason: status.stopReason = await self.is_crawl_stopping(crawl, status, data) status.stopping = status.stopReason is not None + if status.stopping: + print("Crawl gracefully stopping: {status.stopReason}, id: {crawl.id}") # mark crawl as stopping if status.stopping: diff --git a/backend/btrixcloud/operator/cronjobs.py b/backend/btrixcloud/operator/cronjobs.py index 2b8952aef..bf8cb3ab1 100644 --- a/backend/btrixcloud/operator/cronjobs.py +++ b/backend/btrixcloud/operator/cronjobs.py @@ -113,7 +113,7 @@ async def make_new_crawljob( cid=str(cid), userid=str(userid), oid=str(oid), - storage=org.storage, + storage=str(org.storage), crawler_channel=crawlconfig.crawlerChannel or "default", scale=crawlconfig.scale, crawl_timeout=crawlconfig.crawlTimeout, diff --git a/backend/btrixcloud/orgs.py b/backend/btrixcloud/orgs.py index 5ba8b6a7e..07aa71afd 100644 --- a/backend/btrixcloud/orgs.py +++ b/backend/btrixcloud/orgs.py @@ -288,11 +288,12 @@ async def get_default_org(self) -> Organization: async def get_default_register_org(self) -> Organization: """Get default organiation for new user registration, or default org""" if self.register_to_org_id: - res = await self.get_org_by_id(UUID(self.register_to_org_id)) - if not res: + try: + await self.get_org_by_id(UUID(self.register_to_org_id)) + except HTTPException as exc: raise HTTPException( status_code=500, detail="default_register_org_not_found" - ) + ) from exc return await self.get_default_org() @@ -585,10 +586,14 @@ async def add_user_by_invite( Remove invite after successful add """ + org = None if not invite.oid: org = default_org else: - org = await self.get_org_by_id(invite.oid) + try: + org = await self.get_org_by_id(invite.oid) + except HTTPException: + pass if not org: raise HTTPException(status_code=400, detail="invalid_invite") @@ -668,19 +673,12 @@ async def get_org_owners(self, org: Organization) -> List[str]: org_owners.append(key) return org_owners - async def get_max_pages_per_crawl(self, oid: UUID) -> int: - """Return org-specific max pages per crawl setting or 0.""" - org_data = await self.orgs.find_one({"_id": oid}) - if org_data: - org = Organization.from_dict(org_data) - return org.quotas.maxPagesPerCrawl or 0 - return 0 - - async def inc_org_bytes_stored(self, oid: UUID, size: int, type_="crawl") -> bool: + async def inc_org_bytes_stored(self, oid: UUID, size: int, type_="crawl") -> None: """Increase org bytesStored count (pass negative value to subtract).""" if type_ == "crawl": await self.orgs.find_one_and_update( - {"_id": oid}, {"$inc": {"bytesStored": size, "bytesStoredCrawls": size}} + {"_id": oid}, + {"$inc": {"bytesStored": size, "bytesStoredCrawls": size}}, ) elif type_ == "upload": await self.orgs.find_one_and_update( @@ -692,102 +690,56 @@ async def inc_org_bytes_stored(self, oid: UUID, size: int, type_="crawl") -> boo {"_id": oid}, {"$inc": {"bytesStored": size, "bytesStoredProfiles": size}}, ) - return await self.storage_quota_reached(oid) - - # pylint: disable=invalid-name - async def storage_quota_reached(self, oid: UUID) -> bool: - """Return boolean indicating if storage quota is met or exceeded.""" - quota = await self.get_org_storage_quota(oid) - if not quota: - return False - - org_data = await self.orgs.find_one({"_id": oid}) - if not org_data: - return False - - org = Organization.from_dict(org_data) - if org.bytesStored >= quota: - return True - - return False - - async def can_run_crawls(self, org: Organization) -> None: + def can_write_data(self, org: Organization, include_time=True) -> None: """check crawl quotas and readOnly state, throw if can not run""" if org.readOnly: raise HTTPException(status_code=403, detail="org_set_to_read_only") - if await self.storage_quota_reached(org.id): + if self.storage_quota_reached(org): raise HTTPException(status_code=403, detail="storage_quota_reached") - if await self.exec_mins_quota_reached(org.id): + if include_time and self.exec_mins_quota_reached(org): raise HTTPException(status_code=403, detail="exec_minutes_quota_reached") - async def get_monthly_crawl_exec_seconds(self, oid: UUID) -> int: - """Return monthlyExecSeconds for current month""" - org_data = await self.orgs.find_one({"_id": oid}) - if not org_data: - return 0 - org = Organization.from_dict(org_data) - yymm = dt_now().strftime("%Y-%m") - try: - return org.monthlyExecSeconds[yymm] - except KeyError: - return 0 + # pylint: disable=invalid-name + def storage_quota_reached(self, org: Organization, extra_bytes: int = 0) -> bool: + """Return boolean indicating if storage quota is met or exceeded.""" + if not org.quotas.storageQuota: + return False + + if (org.bytesStored + extra_bytes) >= org.quotas.storageQuota: + return True - async def exec_mins_quota_reached( - self, oid: UUID, include_extra: bool = True + return False + + def exec_mins_quota_reached( + self, org: Organization, include_extra: bool = True ) -> bool: """Return bool for if execution minutes quota is reached""" if include_extra: - gifted_seconds = await self.get_gifted_exec_secs_available(oid) - if gifted_seconds: + if org.giftedExecSecondsAvailable: return False - extra_seconds = await self.get_extra_exec_secs_available(oid) - if extra_seconds: + if org.extraExecSecondsAvailable: return False - monthly_quota = await self.get_org_exec_mins_monthly_quota(oid) + monthly_quota = org.quotas.maxExecMinutesPerMonth if monthly_quota: - monthly_exec_seconds = await self.get_monthly_crawl_exec_seconds(oid) + monthly_exec_seconds = self.get_monthly_crawl_exec_seconds(org) monthly_exec_minutes = math.floor(monthly_exec_seconds / 60) if monthly_exec_minutes >= monthly_quota: return True return False - async def get_org_storage_quota(self, oid: UUID) -> int: - """return org storage quota, if any""" - org_data = await self.orgs.find_one({"_id": oid}) - if org_data: - org = Organization.from_dict(org_data) - return org.quotas.storageQuota or 0 - return 0 - - async def get_org_exec_mins_monthly_quota(self, oid: UUID) -> int: - """return max allowed execution mins per month, if any""" - org_data = await self.orgs.find_one({"_id": oid}) - if org_data: - org = Organization.from_dict(org_data) - return org.quotas.maxExecMinutesPerMonth or 0 - return 0 - - async def get_extra_exec_secs_available(self, oid: UUID) -> int: - """return extra billable rollover seconds available, if any""" - org_data = await self.orgs.find_one({"_id": oid}) - if org_data: - org = Organization.from_dict(org_data) - return org.extraExecSecondsAvailable - return 0 - - async def get_gifted_exec_secs_available(self, oid: UUID) -> int: - """return gifted rollover seconds available, if any""" - org_data = await self.orgs.find_one({"_id": oid}) - if org_data: - org = Organization.from_dict(org_data) - return org.giftedExecSecondsAvailable - return 0 + def get_monthly_crawl_exec_seconds(self, org: Organization) -> int: + """Return monthlyExecSeconds for current month""" + yymm = dt_now().strftime("%Y-%m") + try: + return org.monthlyExecSeconds[yymm] + except KeyError: + return 0 async def set_origin(self, org: Organization, request: Request) -> None: """Get origin from request and store in db for use in event webhooks""" @@ -828,9 +780,9 @@ async def inc_org_time_stats( org = await self.get_org_by_id(oid) - monthly_exec_secs_used = await self.get_monthly_crawl_exec_seconds(oid) + monthly_exec_secs_used = self.get_monthly_crawl_exec_seconds(org) - monthly_quota_mins = await self.get_org_exec_mins_monthly_quota(oid) + monthly_quota_mins = org.quotas.maxExecMinutesPerMonth or 0 monthly_quota_secs = monthly_quota_mins * 60 if ( @@ -901,19 +853,11 @@ async def inc_org_time_stats( }, ) - async def get_max_concurrent_crawls(self, oid) -> int: - """return max allowed concurrent crawls, if any""" - org_data = await self.orgs.find_one({"_id": oid}) - if org_data: - org = Organization.from_dict(org_data) - return org.quotas.maxConcurrentCrawls or 0 - return 0 - async def get_org_metrics(self, org: Organization) -> dict[str, int]: """Calculate and return org metrics""" # pylint: disable=too-many-locals - storage_quota = await self.get_org_storage_quota(org.id) - max_concurrent_crawls = await self.get_max_concurrent_crawls(org.id) + storage_quota = org.quotas.storageQuota or 0 + max_concurrent_crawls = org.quotas.maxConcurrentCrawls or 0 # Calculate these counts in loop to avoid having db iterate through # archived items several times. @@ -1410,9 +1354,10 @@ async def org_owner_dep( return org async def org_public(oid: UUID): - org = await ops.get_org_by_id(oid) - if not org: - raise HTTPException(status_code=404, detail="org_not_found") + try: + org = await ops.get_org_by_id(oid) + except HTTPException as exc: + raise HTTPException(status_code=404, detail="org_not_found") from exc return org @@ -1464,8 +1409,8 @@ async def get_org( org: Organization = Depends(org_dep), user: User = Depends(user_dep) ): org_out = await org.serialize_for_user(user, user_manager) - org_out.storageQuotaReached = await ops.storage_quota_reached(org.id) - org_out.execMinutesQuotaReached = await ops.exec_mins_quota_reached(org.id) + org_out.storageQuotaReached = ops.storage_quota_reached(org) + org_out.execMinutesQuotaReached = ops.exec_mins_quota_reached(org) return org_out @router.delete("", tags=["organizations"], response_model=DeletedResponse) diff --git a/backend/btrixcloud/pages.py b/backend/btrixcloud/pages.py index e5abecb56..0904d2bd0 100644 --- a/backend/btrixcloud/pages.py +++ b/backend/btrixcloud/pages.py @@ -170,7 +170,6 @@ async def add_page_to_db( return compare = PageQACompare(**compare_dict) - print("Adding QA Run Data for Page", page_dict.get("url"), compare) await self.add_qa_run_for_page(page.id, oid, qa_run_id, compare) diff --git a/backend/btrixcloud/profiles.py b/backend/btrixcloud/profiles.py index 4d5098326..fe16e80b9 100644 --- a/backend/btrixcloud/profiles.py +++ b/backend/btrixcloud/profiles.py @@ -220,13 +220,7 @@ async def commit_to_profile( print("baseid", baseid) baseid = UUID(baseid) - oid = UUID(metadata.get("btrix.org")) - - if org.readOnly: - raise HTTPException(status_code=403, detail="org_set_to_read_only") - - if await self.orgs.storage_quota_reached(oid): - raise HTTPException(status_code=403, detail="storage_quota_reached") + self.orgs.can_write_data(org, include_time=False) profile = Profile( id=profileid, @@ -241,7 +235,7 @@ async def commit_to_profile( origins=json["origins"], resource=profile_file, userid=UUID(metadata.get("btrix.user")), - oid=oid, + oid=org.id, baseid=baseid, crawlerChannel=browser_commit.crawlerChannel, ) @@ -251,15 +245,15 @@ async def commit_to_profile( ) await self.background_job_ops.create_replica_jobs( - oid, profile_file, str(profileid), "profile" + org.id, profile_file, str(profileid), "profile" ) - quota_reached = await self.orgs.inc_org_bytes_stored(oid, file_size, "profile") + await self.orgs.inc_org_bytes_stored(org.id, file_size, "profile") return { "added": True, "id": str(profile.id), - "storageQuotaReached": quota_reached, + "storageQuotaReached": self.orgs.storage_quota_reached(org), } async def update_profile_metadata( @@ -432,7 +426,7 @@ async def delete_profile( if not res or res.deleted_count != 1: raise HTTPException(status_code=404, detail="profile_not_found") - quota_reached = await self.orgs.storage_quota_reached(org.id) + quota_reached = self.orgs.storage_quota_reached(org) return {"success": True, "storageQuotaReached": quota_reached} diff --git a/backend/btrixcloud/uploads.py b/backend/btrixcloud/uploads.py index 6f6a70366..ded063071 100644 --- a/backend/btrixcloud/uploads.py +++ b/backend/btrixcloud/uploads.py @@ -67,11 +67,7 @@ async def upload_stream( replaceId: Optional[str], ) -> dict[str, Any]: """Upload streaming file, length unknown""" - if org.readOnly: - raise HTTPException(status_code=403, detail="org_set_to_read_only") - - if await self.orgs.storage_quota_reached(org.id): - raise HTTPException(status_code=403, detail="storage_quota_reached") + self.orgs.can_write_data(org, include_time=False) prev_upload = None if replaceId: @@ -129,11 +125,7 @@ async def upload_formdata( user: User, ) -> dict[str, Any]: """handle uploading content to uploads subdir + request subdir""" - if org.readOnly: - raise HTTPException(status_code=403, detail="org_set_to_read_only") - - if await self.orgs.storage_quota_reached(org.id): - raise HTTPException(status_code=403, detail="storage_quota_reached") + self.orgs.can_write_data(org, include_time=False) id_ = uuid.uuid4() files: List[CrawlFile] = [] @@ -203,9 +195,9 @@ async def _create_upload( self.event_webhook_ops.create_upload_finished_notification(crawl_id, org.id) ) - quota_reached = await self.orgs.inc_org_bytes_stored( - org.id, file_size, "upload" - ) + await self.orgs.inc_org_bytes_stored(org.id, file_size, "upload") + + quota_reached = self.orgs.storage_quota_reached(org) if uploaded.files: for file in uploaded.files: