From 4a022198d16ade98c26722d012d17ae50a0c6c40 Mon Sep 17 00:00:00 2001 From: ImMin5 Date: Thu, 25 Jan 2024 23:13:42 +0900 Subject: [PATCH] feat: add create cost_report scheduler task --- .../task/v1/data_source_sync_scheduler.py | 20 +-- .../manager/budget_usage_manager.py | 2 +- .../manager/cost_report_config_manager.py | 3 + .../manager/cost_report_manager.py | 12 ++ .../cost_analysis/manager/email_manager.py | 4 +- .../cost_analysis/manager/identity_manager.py | 8 + .../model/cost_report/database.py | 2 +- .../model/cost_report/response.py | 2 +- .../service/cost_report_config_service.py | 10 +- .../service/cost_report_serivce.py | 146 +++++++++++++++--- 10 files changed, 174 insertions(+), 35 deletions(-) diff --git a/src/spaceone/cost_analysis/interface/task/v1/data_source_sync_scheduler.py b/src/spaceone/cost_analysis/interface/task/v1/data_source_sync_scheduler.py index 148b4d18..2a7c1c39 100644 --- a/src/spaceone/cost_analysis/interface/task/v1/data_source_sync_scheduler.py +++ b/src/spaceone/cost_analysis/interface/task/v1/data_source_sync_scheduler.py @@ -24,7 +24,7 @@ def _init_config(self): def create_task(self) -> list: tasks = [] - # tasks.extend(self._create_cost_report_data_run_task()) + tasks.extend(self._create_cost_report_run_task()) tasks.extend(self._create_data_source_sync_task()) return tasks @@ -46,19 +46,19 @@ def _create_data_source_sync_task(self): } print( - f"{utils.datetime_to_iso8601(datetime.now())} [INFO] [create_task] create_jobs_by_data_source => START" + f"{utils.datetime_to_iso8601(datetime.utcnow())} [INFO] [create_task] create_jobs_by_data_source => START" ) return [stp] else: print( - f"{utils.datetime_to_iso8601(datetime.now())} [INFO] [create_task] create_jobs_by_data_source => SKIP" + f"{utils.datetime_to_iso8601(datetime.utcnow())} [INFO] [create_task] create_jobs_by_data_source => SKIP" ) print( - f"{utils.datetime_to_iso8601(datetime.now())} [INFO] [create_task] data_source_sync_time: {self._data_source_sync_hour} hour (UTC)" + f"{utils.datetime_to_iso8601(datetime.utcnow())} [INFO] [create_task] data_source_sync_time: {self._data_source_sync_hour} hour (UTC)" ) return [] - def _create_cost_report_data_run_task(self): + def _create_cost_report_run_task(self): if datetime.utcnow().hour == self._data_source_sync_hour - 1: stp = { "name": "cost_report_data_sync_schedule", @@ -67,22 +67,22 @@ def _create_cost_report_data_run_task(self): "stages": [ { "locator": "SERVICE", - "name": "CostReportDataService", + "name": "CostReportConfigService", "metadata": {"token": self._token}, - "method": "create_cost_report_data_by_data_source", + "method": "create_cost_report_by_cost_report_config", "params": {"params": {}}, } ], } print( - f"{utils.datetime_to_iso8601(datetime.now())} [INFO] [create_task] create_cost_report_data_by_cost_report_config => START" + f"{utils.datetime_to_iso8601(datetime.utcnow())} [INFO] [create_task] create_cost_report_data_by_cost_report_config => START" ) return [stp] else: print( - f"{utils.datetime_to_iso8601(datetime.now())} [INFO] [create_task] create_cost_report_data_by_cost_report_config => SKIP" + f"{utils.datetime_to_iso8601(datetime.utcnow())} [INFO] [create_task] create_cost_report_data_by_cost_report_config => SKIP" ) print( - f"{utils.datetime_to_iso8601(datetime.now())} [INFO] [create_task] data_source_sync_time: {self._data_source_sync_hour} hour (UTC)" + f"{utils.datetime_to_iso8601(datetime.utcnow())} [INFO] [create_task] data_source_sync_time: {self._data_source_sync_hour} hour (UTC)" ) return [] diff --git a/src/spaceone/cost_analysis/manager/budget_usage_manager.py b/src/spaceone/cost_analysis/manager/budget_usage_manager.py index ffda5acc..274c0da1 100644 --- a/src/spaceone/cost_analysis/manager/budget_usage_manager.py +++ b/src/spaceone/cost_analysis/manager/budget_usage_manager.py @@ -107,7 +107,7 @@ def notify_budget_usage(self, budget_vo: Budget): budget_id = budget_vo.budget_id workspace_id = budget_vo.workspace_id domain_id = budget_vo.domain_id - current_month = datetime.now().strftime("%Y-%m") + current_month = datetime.utcnow().strftime("%Y-%m") updated_notifications = [] is_changed = False for notification in budget_vo.notifications: diff --git a/src/spaceone/cost_analysis/manager/cost_report_config_manager.py b/src/spaceone/cost_analysis/manager/cost_report_config_manager.py index de5cd8f2..8d5bfd40 100644 --- a/src/spaceone/cost_analysis/manager/cost_report_config_manager.py +++ b/src/spaceone/cost_analysis/manager/cost_report_config_manager.py @@ -98,6 +98,9 @@ def list_cost_report_config(self, query: dict, domain_id) -> Tuple[QuerySet, int return self.cost_report_config_model.query(**query) + def filter_cost_report_configs(self, **conditions) -> QuerySet: + return self.cost_report_config_model.filter(**conditions) + def stat_cost_report_config(self, query: dict) -> dict: return self.cost_report_config_model.stat(**query) diff --git a/src/spaceone/cost_analysis/manager/cost_report_manager.py b/src/spaceone/cost_analysis/manager/cost_report_manager.py index 4d455b60..e7ac8372 100644 --- a/src/spaceone/cost_analysis/manager/cost_report_manager.py +++ b/src/spaceone/cost_analysis/manager/cost_report_manager.py @@ -13,6 +13,18 @@ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.cost_report_model = CostReport + def create_cost_report(self, params: dict) -> CostReport: + def _rollback(vo: CostReport): + _LOGGER.info( + f"[create_cost_report._rollback] Delete cost_report: {vo.cost_report_id})" + ) + vo.delete() + + cost_report_vo = self.cost_report_model.create(params) + self.transaction.add_rollback(_rollback, cost_report_vo) + + return cost_report_vo + def get_cost_report( self, domain_id: str, cost_report_id: str, workspace_id: str = None ) -> CostReport: diff --git a/src/spaceone/cost_analysis/manager/email_manager.py b/src/spaceone/cost_analysis/manager/email_manager.py index f17624e5..e661673e 100644 --- a/src/spaceone/cost_analysis/manager/email_manager.py +++ b/src/spaceone/cost_analysis/manager/email_manager.py @@ -44,13 +44,13 @@ def send_cost_report_email( email_contents = template.render( user_name=user_id, - report_number=cost_report_vo.cost_report_number, + report_number=cost_report_vo.report_number, workspace_name=cost_report_vo.workspace_name, report_date=cost_report_vo.issue_date, report_period=self.get_date_range_of_month(cost_report_vo.report_month), reset_password_link=cost_report_link, ) - subject = f'[{service_name}] #{cost_report_vo.cost_report_number} {language_map_info["cost_report"]}' + subject = f'[{service_name}] #{cost_report_vo.report_number} {language_map_info["cost_report"]}' self.smtp_connector.send_email(email, subject, email_contents) diff --git a/src/spaceone/cost_analysis/manager/identity_manager.py b/src/spaceone/cost_analysis/manager/identity_manager.py index 3fdc502e..b25502d1 100644 --- a/src/spaceone/cost_analysis/manager/identity_manager.py +++ b/src/spaceone/cost_analysis/manager/identity_manager.py @@ -42,6 +42,14 @@ def get_workspace(self, workspace_id: str, domain_id: str) -> str: _LOGGER.error(f"[get_project_name] API Error: {e}") return workspace_id + def list_workspaces(self, params: dict, domain_id: str) -> dict: + if self.token_type == "SYSTEM_TOKEN": + return self.identity_conn.dispatch( + "Workspace.list", params, x_domain_id=domain_id + ) + else: + return self.identity_conn.dispatch("Workspace.list", params) + def list_workspace_users(self, params: dict, domain_id: str) -> dict: if self.token_type == "SYSTEM_TOKEN": return self.identity_conn.dispatch( diff --git a/src/spaceone/cost_analysis/model/cost_report/database.py b/src/spaceone/cost_analysis/model/cost_report/database.py index 423eec62..9480c2e9 100644 --- a/src/spaceone/cost_analysis/model/cost_report/database.py +++ b/src/spaceone/cost_analysis/model/cost_report/database.py @@ -8,7 +8,7 @@ class CostReport(MongoModel): status = StringField( max_length=20, choices=("IN_PROGRESS", "SUCCESS"), default="IN_PROGRESS" ) - cost_report_number = StringField(max_length=255) + report_number = StringField(max_length=255) issue_date = StringField(max_length=10) report_year = StringField(max_length=10) report_month = StringField(max_length=10) diff --git a/src/spaceone/cost_analysis/model/cost_report/response.py b/src/spaceone/cost_analysis/model/cost_report/response.py index 5ba60d7e..68f57306 100644 --- a/src/spaceone/cost_analysis/model/cost_report/response.py +++ b/src/spaceone/cost_analysis/model/cost_report/response.py @@ -9,7 +9,7 @@ class CostReportResponse(BaseModel): cost_report_id: Union[str, None] = None cost: Union[dict, None] = None status: Union[Status, None] = None - cost_report_number: Union[str, None] = None + report_number: Union[str, None] = None currency: Union[str, None] = None issue_date: Union[str, None] = None report_year: Union[str, None] = None diff --git a/src/spaceone/cost_analysis/service/cost_report_config_service.py b/src/spaceone/cost_analysis/service/cost_report_config_service.py index 77fa70ad..f4b9b82d 100644 --- a/src/spaceone/cost_analysis/service/cost_report_config_service.py +++ b/src/spaceone/cost_analysis/service/cost_report_config_service.py @@ -7,6 +7,7 @@ from spaceone.cost_analysis.manager.cost_report_config_manager import ( CostReportConfigManager, ) +from spaceone.cost_analysis.service.cost_report_serivce import CostReportService from spaceone.cost_analysis.model.cost_report_config.request import * from spaceone.cost_analysis.model.cost_report_config.response import * @@ -188,13 +189,18 @@ def delete(self, params: CostReportConfigDeleteRequest) -> None: self.cost_report_mgr.delete_cost_report_config_by_vo(cost_report_config_vo) - # TODO: Business Logic. Check Return Type @transaction( permission="cost-analysis:CostReportConfig.read", role_types=["DOMAIN_ADMIN"] ) @convert_model def run(self, params: CostReportConfigRunRequest) -> None: - pass + """RUN cost report config""" + cost_report_config_vo = self.cost_report_mgr.get_cost_report_config( + params.cost_report_config_id, params.domain_id + ) + + cost_report_service = CostReportService() + cost_report_service.create_cost_report(cost_report_config_vo) @transaction( permission="cost-analysis:CostReportConfig.read", diff --git a/src/spaceone/cost_analysis/service/cost_report_serivce.py b/src/spaceone/cost_analysis/service/cost_report_serivce.py index dc5f2830..19ceb579 100644 --- a/src/spaceone/cost_analysis/service/cost_report_serivce.py +++ b/src/spaceone/cost_analysis/service/cost_report_serivce.py @@ -2,10 +2,12 @@ import calendar import datetime import logging +from dateutil.relativedelta import relativedelta from datetime import datetime +from typing import Tuple +from mongoengine import QuerySet from spaceone.core.service import * -from spaceone.cost_analysis.model.cost_report.database import CostReport from spaceone.cost_analysis.model.cost_report_config.database import CostReportConfig from spaceone.cost_analysis.model.cost_report.request import * from spaceone.cost_analysis.model.cost_report.response import * @@ -37,8 +39,8 @@ def __init__(self, *args, **kwargs): def create_cost_report_by_cost_report_config(self, params: dict): """Create cost report by cost report config""" - for cost_report_config_id in self._get_all_cost_report_configs(): - self.create_cost_report(cost_report_config_id) + for cost_report_config_vo in self._get_all_cost_report_configs(): + self.create_cost_report(cost_report_config_vo) @transaction( permission="cost-analysis:CostReport.read", @@ -115,8 +117,33 @@ def get(self, params: CostReportGetRequest) -> CostReportResponse: return CostReportResponse(**cost_report_vo.to_dict()) - def create_cost_report(self, cost_report_config_id: str): - pass + def create_cost_report(self, cost_report_config_vo: CostReportConfig): + workspace_name_map = self._get_workspace_name_map( + cost_report_config_vo.domain_id + ) + + workspace_ids = [workspace_id for workspace_id in workspace_name_map.keys()] + current_month, last_month = self._get_current_and_last_month() + issue_day = self._get_issue_day(cost_report_config_vo) + + if issue_day == datetime.utcnow().day: + self._aggregate_monthly_cost_report( + cost_report_config_vo, + workspace_name_map, + workspace_ids, + last_month, + issue_day, + "SUCCESS", + ) + + self._aggregate_monthly_cost_report( + cost_report_config_vo, + workspace_name_map, + workspace_ids, + current_month, + issue_day, + "IN_PROGRESS", + ) @transaction( permission="cost-analysis:CostReport.read", @@ -132,7 +159,7 @@ def create_cost_report(self, cost_report_config_id: str): ) @append_keyword_filter( [ - "cost_report_number", + "report_number", "workspace_name", "report_year" "report_month", ] @@ -163,7 +190,7 @@ def list(self, params: CostReportSearchQueryRequest) -> CostReportsResponse: ) @append_keyword_filter( [ - "cost_report_number", + "report_number", "workspace_name", ] ) @@ -173,9 +200,17 @@ def stat(self, params: CostReportDataStatQueryRequest) -> dict: return self.cost_report_mgr.stat_cost_reports(params.query) - def _aggregate_monthly_cost_report(self, cost_report_config_vo: CostReportConfig): - issue_day = cost_report_config_vo.issue_day - report_month = self._get_report_month() + def _aggregate_monthly_cost_report( + self, + cost_report_config_vo: CostReportConfig, + workspace_name_map: dict, + workspace_ids: list, + report_month: str, + issue_day: int, + status: str = None, + ) -> None: + currency = cost_report_config_vo.currency + report_year = report_month.split("-")[0] data_sources = cost_report_config_vo.data_source_filter.get("data_sources", []) domain_id = cost_report_config_vo.domain_id @@ -191,24 +226,81 @@ def _aggregate_monthly_cost_report(self, cost_report_config_vo: CostReportConfig "end": report_month, "filter": [ {"k": "domain_id", "v": domain_id, "o": "eq"}, + {"k": "billed_year", "v": report_year, "o": "eq"}, + {"k": "billed_month", "v": report_month, "o": "eq"}, + {"k": "workspace_id", "v": workspace_ids, "o": "in"}, ], } + if data_sources: query["filter"].append( {"k": "data_source_id", "v": data_sources, "o": "in"} ) + _LOGGER.debug(f"[aggregate_monthly_cost_report] query: {query}") + + response = self.cost_mgr.analyze_monthly_costs( + query, domain_id, target="PRIMARY" + ) + results = response.get("results", []) + for aggregated_cost_report in results: + # todo: convert currency + aggregated_cost_report["cost"] = {"KRW": aggregated_cost_report.pop("cost")} + aggregated_cost_report["status"] = status + aggregated_cost_report["currency"] = currency + aggregated_cost_report["report_number"] = self.generate_report_number( + report_month, issue_day + ) + aggregated_cost_report["report_month"] = report_month + aggregated_cost_report["report_year"] = aggregated_cost_report.pop( + "billed_year" + ) + aggregated_cost_report["issue_date"] = report_month + aggregated_cost_report["workspace_name"] = workspace_name_map.get( + aggregated_cost_report["workspace_id"], "Unknown" + ) + aggregated_cost_report["bank_name"] = "Yahoo! Finance" # todo : replace + aggregated_cost_report[ + "cost_report_config_id" + ] = cost_report_config_vo.cost_report_config_id + aggregated_cost_report["domain_id"] = cost_report_config_vo.domain_id + self.cost_report_mgr.create_cost_report(aggregated_cost_report) + + _LOGGER.debug( + f"[aggregate_monthly_cost_report] create cost report ({report_month}) (count = {len(results)})" + ) + self._delete_old_cost_reports(report_month, domain_id) + + def _get_all_cost_report_configs(self) -> QuerySet: + return self.cost_report_config_mgr.filter_cost_report_configs(state="ENABLED") - response = self.cost_mgr.analyze_costs(query, domain_id, target="PRIMARY") + def _delete_old_cost_reports(self, report_month: str, domain_id: str) -> None: + yesterday_datetime = datetime.utcnow() - relativedelta(day=1) # todo : refactor + cost_report_delete_query = { + "filter": [ + {"k": "report_month", "v": report_month, "o": "eq"}, + {"k": "status", "v": "IN_PROGRESS", "o": "eq"}, + {"k": "domain_id", "v": domain_id, "o": "eq"}, + {"k": "created_at", "v": yesterday_datetime, "o": "datetime_lt"}, + ] + } + cost_reports_vos, total_count = self.cost_report_mgr.list_cost_reports( + cost_report_delete_query + ) - def _get_all_cost_report_configs(self) -> CostReportConfig: - return self.cost_report_config_mgr.list_cost_reports_config(state="ENABLED") + _LOGGER.debug( + f"[delete_old_cost_reports] delete cost reports ({report_month}) (count = {total_count})" + ) + cost_reports_vos.delete() - def _get_report_month(self): - return datetime.now().strftime("%Y-%m") + @staticmethod + def _get_current_and_last_month() -> Tuple[str, str]: + current_month = datetime.utcnow().strftime("%Y-%m") + last_month = (datetime.utcnow() - relativedelta(months=1)).strftime("%Y-%m") + return current_month, last_month @staticmethod - def _get_issue_day(cost_report_config_vo: CostReportConfig): - current_date = datetime.now() + def _get_issue_day(cost_report_config_vo: CostReportConfig) -> int: + current_date = datetime.utcnow() current_year = current_date.year current_month = current_date.month @@ -216,6 +308,24 @@ def _get_issue_day(cost_report_config_vo: CostReportConfig): if cost_report_config_vo.is_last_day: return last_day - else: return min(cost_report_config_vo.issue_day, last_day) + + @staticmethod + def generate_report_number(report_month: str, issue_day: int) -> str: + report_date = f"{report_month}-{issue_day}" + date_object = datetime.strptime(report_date, "%Y-%m-%d") + + return f"CostReport_{date_object.strftime('%y%m%d%H%M')}" + + @staticmethod + def _get_workspace_name_map(domain_id: str) -> dict: + identity_mgr = IdentityManager() + workspace_name_map = {} + workspaces = identity_mgr.list_workspaces( + {"query": {"filter": [{"k": "state", "v": "ENABLED", "o": "eq"}]}}, + domain_id, + ) + for workspace in workspaces.get("results", []): + workspace_name_map[workspace["workspace_id"]] = workspace["name"] + return workspace_name_map