diff --git a/src/spaceone/cost_analysis/interface/task/v1/data_source_sync_scheduler.py b/src/spaceone/cost_analysis/interface/task/v1/data_source_sync_scheduler.py index 2a7c1c39..85416cb7 100644 --- a/src/spaceone/cost_analysis/interface/task/v1/data_source_sync_scheduler.py +++ b/src/spaceone/cost_analysis/interface/task/v1/data_source_sync_scheduler.py @@ -67,18 +67,31 @@ def _create_cost_report_run_task(self): "stages": [ { "locator": "SERVICE", - "name": "CostReportConfigService", + "name": "CostReportService", "metadata": {"token": self._token}, "method": "create_cost_report_by_cost_report_config", "params": {"params": {}}, - } + }, + { + "locator": "SERVICE", + "name": "CostReportDataService", + "metadata": {"token": self._token}, + "method": "create_cost_report_data_by_cost_report_config", + "params": {"params": {}}, + }, ], } + print( + f"{utils.datetime_to_iso8601(datetime.utcnow())} [INFO] [create_task] create_cost_report_by_cost_report_config => START" + ) print( f"{utils.datetime_to_iso8601(datetime.utcnow())} [INFO] [create_task] create_cost_report_data_by_cost_report_config => START" ) return [stp] else: + print( + f"{utils.datetime_to_iso8601(datetime.utcnow())} [INFO] [create_task] create_cost_report_by_cost_report_config => SKIP" + ) print( f"{utils.datetime_to_iso8601(datetime.utcnow())} [INFO] [create_task] create_cost_report_data_by_cost_report_config => SKIP" ) diff --git a/src/spaceone/cost_analysis/manager/cost_report_data_manager.py b/src/spaceone/cost_analysis/manager/cost_report_data_manager.py index 27a12478..303d771a 100644 --- a/src/spaceone/cost_analysis/manager/cost_report_data_manager.py +++ b/src/spaceone/cost_analysis/manager/cost_report_data_manager.py @@ -47,7 +47,14 @@ def filter_data_sources(self, **conditions): def list_cost_reports_data(self, query: dict) -> Tuple[QuerySet, int]: return self.cost_report_data_model.query(**query) - def analyze_cost_reports_data(self, query: dict) -> dict: + def analyze_cost_reports_data( + self, query: dict, target="SECONDARY_PREFERRED" + ) -> dict: + query["target"] = target + query["date_field"] = "report_month" + query["date_field_format"] = "%Y-%m" + _LOGGER.debug(f"[analyze_cost_reports_data] query: {query}") + return self.cost_report_data_model.analyze(**query) def stat_cost_reports_data(self, query) -> dict: diff --git a/src/spaceone/cost_analysis/model/cost_report_data/database.py b/src/spaceone/cost_analysis/model/cost_report_data/database.py index 54f6aeea..9c2c887c 100644 --- a/src/spaceone/cost_analysis/model/cost_report_data/database.py +++ b/src/spaceone/cost_analysis/model/cost_report_data/database.py @@ -24,12 +24,9 @@ class CostReportData(MongoModel): max_length=40, default=None, null=True ) # todo workspace_id required domain_id = StringField(max_length=40) - created_at = DateTimeField(auto_now_add=True) meta = { - "updatable_fields": [ - "created_at", - ], + "updatable_fields": [], "minimal_fields": [ "cost_report_config_id", "cost_report_data_id", @@ -37,5 +34,10 @@ class CostReportData(MongoModel): "workspace_id", ], "ordering": ["is_confirmed"], - "indexes": ["domain_id", "cost_report_data_id", "data_source_id"], + "indexes": [ + "domain_id", + "-report_year", + "cost_report_data_id", + "data_source_id", + ], } diff --git a/src/spaceone/cost_analysis/model/cost_report_data/request.py b/src/spaceone/cost_analysis/model/cost_report_data/request.py index a8634ac0..ce5dffae 100644 --- a/src/spaceone/cost_analysis/model/cost_report_data/request.py +++ b/src/spaceone/cost_analysis/model/cost_report_data/request.py @@ -25,12 +25,13 @@ class CostReportDataSearchQueryRequest(BaseModel): class CostReportDataAnalyzeQueryRequest(BaseModel): query: Union[dict, None] = None - cost_report_data_id: str + cost_report_data_id: Union[str, None] = None report_year: Union[str, None] = None report_month: Union[str, None] = None product: Union[str, None] = None provider: Union[str, None] = None data_source_id: Union[str, None] = None + cost_report_config_id: Union[str, None] = None workspace_id: Union[str, None] = None domain_id: str diff --git a/src/spaceone/cost_analysis/model/cost_report_data/response.py b/src/spaceone/cost_analysis/model/cost_report_data/response.py index 18b43130..a122b825 100644 --- a/src/spaceone/cost_analysis/model/cost_report_data/response.py +++ b/src/spaceone/cost_analysis/model/cost_report_data/response.py @@ -26,12 +26,6 @@ class CostReportDataResponse(BaseModel): project_id: Union[str, None] = None workspace_id: Union[str, None] = None domain_id: Union[str, None] = None - created_at: Union[datetime, None] = None - - def dict(self, *args, **kwargs): - data = super().dict(*args, **kwargs) - data["created_at"] = utils.datetime_to_iso8601(data["created_at"]) - return data class CostReportsDataResponse(BaseModel): diff --git a/src/spaceone/cost_analysis/service/cost_report_config_service.py b/src/spaceone/cost_analysis/service/cost_report_config_service.py index f4b9b82d..631c18b2 100644 --- a/src/spaceone/cost_analysis/service/cost_report_config_service.py +++ b/src/spaceone/cost_analysis/service/cost_report_config_service.py @@ -8,6 +8,9 @@ CostReportConfigManager, ) from spaceone.cost_analysis.service.cost_report_serivce import CostReportService +from spaceone.cost_analysis.service.cost_report_data_service import ( + CostReportDataService, +) from spaceone.cost_analysis.model.cost_report_config.request import * from spaceone.cost_analysis.model.cost_report_config.response import * @@ -202,6 +205,9 @@ def run(self, params: CostReportConfigRunRequest) -> None: cost_report_service = CostReportService() cost_report_service.create_cost_report(cost_report_config_vo) + cost_report_data_service = CostReportDataService() + cost_report_data_service.create_cost_report_data(cost_report_config_vo) + @transaction( permission="cost-analysis:CostReportConfig.read", role_types=["DOMAIN_ADMIN", "WORKSPACE_OWNER"], diff --git a/src/spaceone/cost_analysis/service/cost_report_data_service.py b/src/spaceone/cost_analysis/service/cost_report_data_service.py index 08121bc5..406e204b 100644 --- a/src/spaceone/cost_analysis/service/cost_report_data_service.py +++ b/src/spaceone/cost_analysis/service/cost_report_data_service.py @@ -1,21 +1,25 @@ -import copy +import calendar import logging -import random -import re -import string -from typing import Union +from datetime import datetime +from dateutil.relativedelta import relativedelta +from typing import Union, Tuple +from mongoengine import QuerySet from spaceone.core.service import * from spaceone.core.service.utils import * -from spaceone.core import config +from spaceone.cost_analysis.manager.cost_manager import CostManager from spaceone.cost_analysis.manager.cost_report_data_manager import ( CostReportDataManager, ) - +from spaceone.cost_analysis.manager.cost_report_config_manager import ( + CostReportConfigManager, +) +from spaceone.cost_analysis.manager.data_source_manager import DataSourceManager +from spaceone.cost_analysis.manager.identity_manager import IdentityManager from spaceone.cost_analysis.model.cost_report_data.request import * from spaceone.cost_analysis.model.cost_report_data.response import * -from spaceone.cost_analysis.model.cost_report_data.database import CostReportData +from spaceone.cost_analysis.model.cost_report_config.database import CostReportConfig _LOGGER = logging.getLogger(__name__) @@ -29,8 +33,62 @@ class CostReportDataService(BaseService): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) + self.cost_mgr = CostManager() self.cost_report_data_mgr = CostReportDataManager() + @transaction(exclude=["authentication", "authorization", "mutation"]) + def create_cost_report_data_by_cost_report_config(self, params: dict) -> None: + """Create cost report by cost report config""" + + for cost_report_config_vo in self._get_all_cost_report_configs(): + self.create_cost_report_data(cost_report_config_vo) + + @transaction( + permission="cost-analysis:CostReportData.read", + role_types=["DOMAIN_ADMIN", "WORKSPACE_OWNER"], + ) + @append_query_filter( + [ + "cost_report_config_id", + "cost_report_data_id", + "workspace_id", + "domain_id", + ] + ) + @convert_model + def stat(self, params: CostReportDataStatQueryRequest) -> dict: + """Analyze cost report data""" + + query = params.query or {} + return self.cost_report_data_mgr.stat_cost_reports_data(query) + + @transaction( + permission="cost-analysis:CostReportData.read", + role_types=["DOMAIN_ADMIN", "WORKSPACE_OWNER"], + ) + @append_query_filter( + [ + "product", + "provider", + "is_confirmed", + "cost_report_config_id", + "cost_report_data_id", + "data_source_id", + "workspace_id", + "domain_id", + ] + ) + @append_keyword_filter( + ["provider", "product", "workspace_name", "project_name", "cost_report_data_id"] + ) + @set_query_page_limit(1000) + @convert_model + def analyze(self, params: CostReportDataAnalyzeQueryRequest) -> dict: + """Analyze cost report data""" + + query = params.query or {} + return self.cost_report_data_mgr.analyze_cost_reports_data(query) + @transaction( permission="cost-analysis:CostReportData.read", role_types=["DOMAIN_ADMIN", "WORKSPACE_OWNER"], @@ -68,8 +126,240 @@ def list( results=cost_reports_data_info, total_count=total_count ) - def analyze(self, params: CostReportDataAnalyzeQueryRequest) -> dict: - """Analyze cost report data""" + @staticmethod + def _get_all_cost_report_configs() -> QuerySet: + cost_report_config_mgr = CostReportConfigManager() + return cost_report_config_mgr.filter_cost_report_configs(state="ENABLED") - query = params.query or {} - return self.cost_report_data_mgr.analyze_cost_reports_data(query) + def create_cost_report_data(self, cost_report_config_vo: CostReportConfig): + domain_id = cost_report_config_vo.domain_id + data_source_filter = cost_report_config_vo.data_source_filter or {} + + workspace_name_map, workspace_ids = self._get_workspace_name_map(domain_id) + data_source_currency_map, data_source_ids = self._get_data_source_currency_map( + data_source_filter, workspace_ids, domain_id + ) + project_name_map = self._get_project_name_map(workspace_ids, domain_id) + service_account_name_map = self._get_service_account_name_map( + workspace_ids, domain_id + ) + + current_month, last_month = self._get_current_and_last_month() + issue_day = self._get_issue_day(cost_report_config_vo) + + if issue_day == datetime.utcnow().day: + self._aggregate_monthly_cost_report_data( + cost_report_config_vo=cost_report_config_vo, + workspace_name_map=workspace_name_map, + workspace_ids=workspace_ids, + project_name_map=project_name_map, + service_account_name_map=service_account_name_map, + data_source_currency_map=data_source_currency_map, + data_source_ids=data_source_ids, + report_month=last_month, + issue_day=issue_day, + is_confirmed=True, + ) + + self._aggregate_monthly_cost_report_data( + cost_report_config_vo=cost_report_config_vo, + workspace_name_map=workspace_name_map, + workspace_ids=workspace_ids, + project_name_map=project_name_map, + service_account_name_map=service_account_name_map, + data_source_currency_map=data_source_currency_map, + data_source_ids=data_source_ids, + report_month=current_month, + issue_day=issue_day, + ) + + def _aggregate_monthly_cost_report_data( + self, + cost_report_config_vo: CostReportConfig, + workspace_name_map: dict, + workspace_ids: list, + project_name_map: dict, + service_account_name_map: dict, + data_source_currency_map: dict, + data_source_ids: list, + report_month: str, + issue_day: int, + is_confirmed: bool = False, + ): + domain_id = cost_report_config_vo.domain_id + currency = cost_report_config_vo.currency + report_year = report_month.split("-")[0] + + query = { + "group_by": [ + "billed_year", + "workspace_id", + "project_id", + "data_source_id", + "service_account_id", + "data_source_id", + "product", + "provider", + ], + "fields": { + "cost": {"key": "cost", "operator": "sum"}, + }, + "start": report_month, + "end": report_month, + "filter": [ + {"k": "domain_id", "v": domain_id, "o": "eq"}, + {"k": "billed_year", "v": report_year, "o": "eq"}, + {"k": "billed_month", "v": report_month, "o": "eq"}, + {"k": "data_source_id", "v": data_source_ids, "o": "in"}, + {"k": "workspace_id", "v": workspace_ids, "o": "in"}, + ], + } + + _LOGGER.debug(f"[aggregate_monthly_cost_report] query: {query}") + response = self.cost_mgr.analyze_monthly_costs( + query, domain_id, target="PRIMARY" + ) + + results = response.get("results", []) + for aggregated_cost_report in results: + ag_cost_report_currency = data_source_currency_map.get( + aggregated_cost_report.pop("data_source_id") + ) + aggregated_cost_report["cost"] = { + ag_cost_report_currency: aggregated_cost_report.pop("cost") + } + aggregated_cost_report["currency"] = currency + aggregated_cost_report["issue_date"] = f"{report_month}-{issue_day}" + aggregated_cost_report["report_month"] = report_month + aggregated_cost_report["report_year"] = aggregated_cost_report.pop( + "billed_year" + ) + aggregated_cost_report["workspace_name"] = workspace_name_map.get( + aggregated_cost_report["workspace_id"], "Unknown" + ) + aggregated_cost_report["project_name"] = project_name_map.get( + aggregated_cost_report["project_id"], "Unknown" + ) + aggregated_cost_report[ + "service_account_name" + ] = service_account_name_map.get( + aggregated_cost_report["service_account_id"], "Unknown" + ) + + aggregated_cost_report["bank_name"] = "Yahoo! Finance" # todo : replace + aggregated_cost_report[ + "cost_report_config_id" + ] = cost_report_config_vo.cost_report_config_id + aggregated_cost_report["domain_id"] = domain_id + aggregated_cost_report["is_confirmed"] = is_confirmed + + self.cost_report_data_mgr.create_cost_report_data(aggregated_cost_report) + + _LOGGER.debug( + f"[aggregate_monthly_cost_report] create cost report ({report_month}) (count = {len(results)})" + ) + + @staticmethod + def _get_current_and_last_month() -> Tuple[str, str]: + current_month = datetime.utcnow().strftime("%Y-%m") + last_month = (datetime.utcnow() - relativedelta(months=1)).strftime("%Y-%m") + return current_month, last_month + + @staticmethod + def _get_issue_day(cost_report_config_vo: CostReportConfig) -> int: + current_date = datetime.utcnow() + current_year = current_date.year + current_month = current_date.month + + _, last_day = calendar.monthrange(current_year, current_month) + + if cost_report_config_vo.is_last_day: + return last_day + else: + return min(cost_report_config_vo.issue_day, last_day) + + @staticmethod + def _get_workspace_name_map(domain_id: str) -> Tuple[dict, list]: + identity_mgr = IdentityManager() + workspace_name_map = {} + workspaces = identity_mgr.list_workspaces( + {"query": {"filter": [{"k": "state", "v": "ENABLED", "o": "eq"}]}}, + domain_id, + ) + workspace_ids = [] + for workspace in workspaces.get("results", []): + workspace_name_map[workspace["workspace_id"]] = workspace["name"] + workspace_ids.append(workspace["workspace_id"]) + return workspace_name_map, workspace_ids + + @staticmethod + def _get_data_source_currency_map( + data_source_filter: dict, workspace_ids, domain_id: str + ) -> Tuple[dict, list]: + data_source_currency_map = {} + data_source_mgr = DataSourceManager() + + query = { + "filter": [ + {"k": "domain_id", "v": domain_id, "o": "eq"}, + {"k": "workspace_id", "v": workspace_ids, "o": "in"}, + ] + } + + if data_sources := data_source_filter.get("data_sources"): + query["filter"].append( + {"k": "data_source_id", "v": data_sources, "o": "in"}, + ) + + if data_source_state := data_source_filter.get("state", "ENABLED"): + query["filter"].append({"k": "state", "v": data_source_state, "o": "eq"}) + + _LOGGER.debug(f"[get_data_source_currency_map] query: {query}") + + data_source_vos, total_count = data_source_mgr.list_data_sources(query) + data_source_ids = [] + for data_source_vo in data_source_vos: + data_source_currency_map[ + data_source_vo.data_source_id + ] = data_source_vo.plugin_info["metadata"]["currency"] + data_source_ids.append(data_source_vo.data_source_id) + + return data_source_currency_map, data_source_ids + + @staticmethod + def _get_project_name_map(workspace_ids, domain_id: str) -> dict: + identity_mgr = IdentityManager() + project_name_map = {} + projects = identity_mgr.list_projects( + { + "query": { + "filter": [ + {"k": "domain_id", "v": domain_id, "o": "eq"}, + {"k": "workspace_id", "v": workspace_ids, "o": "in"}, + ] + } + }, + domain_id, + ) + for project in projects.get("results", []): + project_name_map[project["project_id"]] = project["name"] + return project_name_map + + @staticmethod + def _get_service_account_name_map(workspace_ids, domain_id: str) -> dict: + identity_mgr = IdentityManager() + service_account_name_map = {} + service_accounts = identity_mgr.list_service_accounts( + { + "filter": [ + {"k": "domain_id", "v": domain_id, "o": "eq"}, + {"k": "workspace_id", "v": workspace_ids, "o": "in"}, + ] + }, + domain_id, + ) + for service_account in service_accounts.get("results", []): + service_account_name_map[ + service_account["service_account_id"] + ] = service_account["name"] + return service_account_name_map diff --git a/src/spaceone/cost_analysis/service/cost_report_serivce.py b/src/spaceone/cost_analysis/service/cost_report_serivce.py index 21b0e3b4..37d7ad14 100644 --- a/src/spaceone/cost_analysis/service/cost_report_serivce.py +++ b/src/spaceone/cost_analysis/service/cost_report_serivce.py @@ -187,16 +187,12 @@ def stat(self, params: CostReportDataStatQueryRequest) -> dict: return self.cost_report_mgr.stat_cost_reports(params.query) def create_cost_report(self, cost_report_config_vo: CostReportConfig): - # currency_mgr = CurrencyManager() - # currency_rate_map: dict = currency_mgr.get_currency( - # cost_report_config_vo.currency - # ) domain_id = cost_report_config_vo.domain_id data_source_filter = cost_report_config_vo.data_source_filter or {} workspace_name_map, workspace_ids = self._get_workspace_name_map(domain_id) data_source_currency_map, data_source_ids = self._get_data_source_currency_map( - data_source_filter, domain_id + data_source_filter, workspace_ids, domain_id ) current_month, last_month = self._get_current_and_last_month() @@ -275,11 +271,11 @@ def _aggregate_monthly_cost_report( aggregated_cost_report["report_number"] = self.generate_report_number( report_month, issue_day ) + aggregated_cost_report["issue_date"] = f"{report_month}-{issue_day}" aggregated_cost_report["report_month"] = report_month aggregated_cost_report["report_year"] = aggregated_cost_report.pop( "billed_year" ) - aggregated_cost_report["issue_date"] = report_month aggregated_cost_report["workspace_name"] = workspace_name_map.get( aggregated_cost_report["workspace_id"], "Unknown" ) @@ -296,7 +292,7 @@ def _aggregate_monthly_cost_report( self.cost_report_mgr.create_cost_report(aggregated_cost_report) _LOGGER.debug( - f"[aggregate_monthly_cost_report] create cost report ({report_month}) (count = {len(results)})" + f"[aggregate_monthly_cost_report] create cost report ({report_month}) (count = {len(aggregated_cost_report_results)})" ) self._delete_old_cost_reports(report_month, domain_id) @@ -364,12 +360,17 @@ def _get_workspace_name_map(domain_id: str) -> Tuple[dict, list]: @staticmethod def _get_data_source_currency_map( - data_source_filter: dict, domain_id: str + data_source_filter: dict, workspace_ids: list, domain_id: str ) -> Tuple[dict, list]: data_source_currency_map = {} data_source_mgr = DataSourceManager() - query = {"filter": [{"k": "domain_id", "v": domain_id, "o": "eq"}]} + query = { + "filter": [ + {"k": "domain_id", "v": domain_id, "o": "eq"}, + {"k": "workspace_id", "v": workspace_ids, "o": "in"}, + ] + } if data_sources := data_source_filter.get("data_sources"): query["filter"].append( {"k": "data_source_id", "v": data_sources, "o": "in"}