From 0ea5476fc8d530cfa8d44b0e630d7bef6777cb6a Mon Sep 17 00:00:00 2001 From: Jongmin Kim Date: Tue, 15 Oct 2024 01:00:36 +0900 Subject: [PATCH] refactor: optimize job progress Signed-off-by: Jongmin Kim --- .../cost_analysis/service/job_service.py | 59 +++++++++++++------ 1 file changed, 42 insertions(+), 17 deletions(-) diff --git a/src/spaceone/cost_analysis/service/job_service.py b/src/spaceone/cost_analysis/service/job_service.py index 907ce98..b9c9095 100644 --- a/src/spaceone/cost_analysis/service/job_service.py +++ b/src/spaceone/cost_analysis/service/job_service.py @@ -782,6 +782,22 @@ def _delete_old_cost_data(self, data_source_id: str, domain_id: str): ) monthly_cost_vos.delete() + def _distinct_job_id( + self, query_filter: list, domain_id: str, data_source_id: str + ) -> list: + query = { + "distinct": "job_id", + "filter": query_filter, + "target": "PRIMARY", # Execute a query to primary DB + } + _LOGGER.debug(f"[_distinct_job_id] query: {query}") + response = self.cost_mgr.stat_costs(query, domain_id, data_source_id) + values = response.get("results", []) + + _LOGGER.debug(f"[_distinct_job_id] job_ids: {values}") + + return values + def _delete_changed_cost_data( self, job_vo: Job, start, end, change_filter, domain_id ): @@ -790,7 +806,7 @@ def _delete_changed_cost_data( {"k": "billed_month", "v": start, "o": "gte"}, {"k": "data_source_id", "v": job_vo.data_source_id, "o": "eq"}, {"k": "domain_id", "v": job_vo.domain_id, "o": "eq"}, - {"k": "job_id", "v": job_vo.job_id, "o": "not"}, + # {"k": "job_id", "v": job_vo.job_id, "o": "not"}, ], "hint": "COMPOUND_INDEX_FOR_SYNC_JOB_2", } @@ -801,24 +817,33 @@ def _delete_changed_cost_data( for key, value in change_filter.items(): query["filter"].append({"k": key, "v": value, "o": "eq"}) - _LOGGER.debug(f"[_delete_changed_cost_data] query: {query}") - - cost_vos, total_count = self.cost_mgr.list_costs( - copy.deepcopy(query), domain_id, job_vo.data_source_id - ) - cost_vos.delete() - _LOGGER.debug( - f"[_delete_changed_cost_data] delete costs (count = {total_count})" + job_ids = self._distinct_job_id( + query["filter"], domain_id, job_vo.data_source_id ) + for job_id in job_ids: + if job_vo.job_id == job_id: + continue - query["hint"] = "COMPOUND_INDEX_FOR_SYNC_JOB" - monthly_cost_vos, total_count = self.cost_mgr.list_monthly_costs( - copy.deepcopy(query), domain_id - ) - monthly_cost_vos.delete() - _LOGGER.debug( - f"[_delete_changed_cost_data] delete monthly costs (count = {total_count})" - ) + query["filter"].append({"k": "job_id", "v": job_id, "o": "eq"}) + + _LOGGER.debug(f"[_delete_changed_cost_data] query: {query}") + + cost_vos, total_count = self.cost_mgr.list_costs( + copy.deepcopy(query), domain_id, job_vo.data_source_id + ) + cost_vos.delete() + _LOGGER.debug( + f"[_delete_changed_cost_data] delete costs (count = {total_count})" + ) + + query["hint"] = "COMPOUND_INDEX_FOR_SYNC_JOB" + monthly_cost_vos, total_count = self.cost_mgr.list_monthly_costs( + copy.deepcopy(query), domain_id + ) + monthly_cost_vos.delete() + _LOGGER.debug( + f"[_delete_changed_cost_data] delete monthly costs (count = {total_count})" + ) def _aggregate_cost_data( self, job_vo: Job, data_keys: list, additional_info_keys: list, tag_keys: list