From d544e678d516d84a9aef0cbbb3cddf0838565051 Mon Sep 17 00:00:00 2001 From: ImMin5 Date: Wed, 31 Jul 2024 16:04:12 +0900 Subject: [PATCH] feat: add make benefit cost data logic Signed-off-by: ImMin5 --- .../cost_analysis/conf/cost_conf.py | 30 +++ .../connector/azure_cost_mgmt_connector.py | 135 ++++++++++- .../cost_analysis/manager/cost_manager.py | 215 ++++++++++++++++-- .../manager/data_source_manager.py | 74 ++++-- .../cost_analysis/manager/job_manager.py | 22 +- 5 files changed, 433 insertions(+), 43 deletions(-) diff --git a/src/cloudforet/cost_analysis/conf/cost_conf.py b/src/cloudforet/cost_analysis/conf/cost_conf.py index 1d9e8b6..3d76c4c 100644 --- a/src/cloudforet/cost_analysis/conf/cost_conf.py +++ b/src/cloudforet/cost_analysis/conf/cost_conf.py @@ -9,6 +9,36 @@ "UsageQuantity": {"name": "UsageQuantity", "function": "Sum"}, } +BENEFIT_FILTER = { + "and": [ + { + "dimensions": { + "name": "ChargeType", + "operator": "In", + "values": ["Purchase"], + } + }, + { + "dimensions": { + "name": "PricingModel", + "operator": "In", + "values": ["Reservation", "SavingsPlan"], + } + }, + ] +} +BENEFIT_GROUPING = [ + {"type": "Dimension", "name": "CustomerTenantId"}, + {"type": "Dimension", "name": "PricingModel"}, + {"type": "Dimension", "name": "Frequency"}, + {"type": "Dimension", "name": "BenefitId"}, + {"type": "Dimension", "name": "BenefitName"}, + {"type": "Dimension", "name": "ReservationId"}, + {"type": "Dimension", "name": "ReservationName"}, + {"type": "Dimension", "name": "ChargeType"}, + {"type": "Dimension", "name": "MeterCategory"}, +] + GROUPING_EA_AGREEMENT_OPTION = [ {"type": "Dimension", "name": "DepartmentName"}, {"type": "Dimension", "name": "EnrollmentAccountName"}, diff --git a/src/cloudforet/cost_analysis/connector/azure_cost_mgmt_connector.py b/src/cloudforet/cost_analysis/connector/azure_cost_mgmt_connector.py index a8b8118..ab901d4 100644 --- a/src/cloudforet/cost_analysis/connector/azure_cost_mgmt_connector.py +++ b/src/cloudforet/cost_analysis/connector/azure_cost_mgmt_connector.py @@ -1,5 +1,8 @@ import logging import os +import time +from datetime import datetime + import requests import pandas as pd import numpy as np @@ -8,16 +11,18 @@ from azure.identity import DefaultAzureCredential from azure.mgmt.billing import BillingManagementClient from azure.mgmt.costmanagement import CostManagementClient +from azure.mgmt.consumption import ConsumptionManagementClient from azure.core.exceptions import ResourceNotFoundError from spaceone.core.connector import BaseConnector from cloudforet.cost_analysis.error.cost import * +from cloudforet.cost_analysis.conf.cost_conf import * __all__ = ["AzureCostMgmtConnector"] -_LOGGER = logging.getLogger(__name__) +_LOGGER = logging.getLogger("spaceone") -_PAGE_SIZE = 6000 +_PAGE_SIZE = 5000 class AzureCostMgmtConnector(BaseConnector): @@ -26,8 +31,26 @@ def __init__(self, *args, **kwargs): self.billing_client = None self.cost_mgmt_client = None self.billing_account_id = None + self._billing_profile_id = None + self._invoice_section_id = None self.next_link = None + @property + def billing_profile_id(self): + return self._billing_profile_id + + @billing_profile_id.setter + def billing_profile_id(self, billing_profile_id: str): + self._billing_profile_id = billing_profile_id + + @property + def invoice_section_id(self): + return self._invoice_section_id + + @invoice_section_id.setter + def invoice_section_id(self, invoice_section_id: str): + self._invoice_section_id = invoice_section_id + def create_session(self, options: dict, secret_data: dict, schema: str) -> None: self._check_secret_data(secret_data) @@ -47,6 +70,39 @@ def create_session(self, options: dict, secret_data: dict, schema: str) -> None: self.cost_mgmt_client = CostManagementClient( credential=credential, subscription_id=subscription_id ) + self.consumption_client = ConsumptionManagementClient( + credential=credential, subscription_id=subscription_id + ) + + def check_reservation_transaction(self) -> bool: + if not self.billing_account_id: + _LOGGER.debug("[check_reservation_transaction] billing_account_id is None") + return False + elif not self.billing_profile_id: + _LOGGER.debug("[check_reservation_transaction] billing_profile_id is None") + return False + elif not self.invoice_section_id: + _LOGGER.debug("[check_reservation_transaction] invoice_section_id is None") + return False + return True + + def list_reservation_transactions_by_billing_profile_id( + self, query_filter: str + ) -> list: + transactions = [] + try: + transactions = self.consumption_client.reservation_transactions.list_by_billing_profile( + billing_account_id=self.billing_account_id, + billing_profile_id=self.billing_profile_id, + filter=query_filter, + ) + except Exception as e: + _LOGGER.error( + f"[list_reservation_transactions_by_billing_profile_id] error message: {e}", + exc_info=True, + ) + + return transactions def list_billing_accounts(self) -> list: billing_accounts_info = [] @@ -65,6 +121,50 @@ def list_billing_accounts(self) -> list: return billing_accounts_info + def query_usage_http( + self, secret_data: dict, start: datetime, end: datetime, options=None + ): + try: + billing_account_id = secret_data["billing_account_id"] + api_version = "2023-11-01" + self.next_link = f"https://management.azure.com/providers/Microsoft.Billing/billingAccounts/{billing_account_id}/providers/Microsoft.CostManagement/query?api-version={api_version}" + + parameters = { + "type": TYPE, + "timeframe": TIMEFRAME, + "timePeriod": {"from": start.isoformat(), "to": end.isoformat()}, + "dataset": { + "granularity": GRANULARITY, + "aggregation": AGGREGATION, + "grouping": BENEFIT_GROUPING, + "filter": BENEFIT_FILTER, + }, + } + + while self.next_link: + url = self.next_link + headers = self._make_request_headers() + + _LOGGER.debug(f"[query_usage] url:{url}, parameters: {parameters}") + response = requests.post(url=url, headers=headers, json=parameters) + response_json = response.json() + + if response_json.get("error"): + response_json = self._retry_request( + response=response, + url=url, + headers=headers, + json=parameters, + retry_count=RETRY_COUNT, + method="post", + ) + + self.next_link = response_json.get("properties").get("nextLink", None) + yield response_json + except Exception as e: + _LOGGER.error(f"[ERROR] query_usage_http {e}", exc_info=True) + raise ERROR_UNKNOWN(message=f"[ERROR] get_usd_cost_and_tag_http {e}") + def get_billing_account(self) -> dict: billing_account_name = self.billing_account_id billing_account_info = self.billing_client.billing_accounts.get( @@ -165,6 +265,37 @@ def convert_nested_dictionary(self, cloud_svc_object): return cloud_svc_dict + def _retry_request(self, response, url, headers, json, retry_count, method="post"): + try: + print(f"{datetime.utcnow()}[INFO] retry_request {response.headers}") + if retry_count == 0: + raise ERROR_UNKNOWN( + message=f"[ERROR] retry_request failed {response.json()}" + ) + + _sleep_time = self._get_sleep_time(response.headers) + time.sleep(_sleep_time) + + if method == "post": + response = requests.post(url=url, headers=headers, json=json) + else: + response = requests.get(url=url, headers=headers, json=json) + response_json = response.json() + + if response_json.get("error"): + response_json = self._retry_request( + response=response, + url=url, + headers=headers, + json=json, + retry_count=retry_count - 1, + method=method, + ) + return response_json + except Exception as e: + _LOGGER.error(f"[ERROR] retry_request failed {e}") + raise e + @staticmethod def _download_cost_data(blob: dict) -> str: try: diff --git a/src/cloudforet/cost_analysis/manager/cost_manager.py b/src/cloudforet/cost_analysis/manager/cost_manager.py index 197eef2..57a7fe5 100644 --- a/src/cloudforet/cost_analysis/manager/cost_manager.py +++ b/src/cloudforet/cost_analysis/manager/cost_manager.py @@ -2,6 +2,7 @@ import logging import json import time +import pandas as pd from typing import Union from datetime import datetime, timezone @@ -13,7 +14,7 @@ ) from cloudforet.cost_analysis.conf.cost_conf import * -_LOGGER = logging.getLogger(__name__) +_LOGGER = logging.getLogger("spaceone") class CostManager(BaseManager): @@ -24,8 +25,12 @@ def __init__(self, *args, **kwargs): ) def get_linked_accounts( - self, options: dict, secret_data: dict, domain_id: str, schema - ) -> list: + self, + options: dict, + secret_data: dict, + schema: str, + domain_id: str, + ) -> dict: self.azure_cm_connector.create_session(options, secret_data, schema) billing_account_info = self.azure_cm_connector.get_billing_account() agreement_type = billing_account_info.get("agreement_type") @@ -46,12 +51,102 @@ def get_linked_accounts( else: pass _LOGGER.debug( - f"[get_linked_accounts] total accounts count: {len(accounts_info)}" + f"[get_linked_accounts] total accounts count: {len(accounts_info)}, domain_id: {domain_id}" ) - return accounts_info + return {"results": accounts_info} + + def get_benefit_data( + self, + options: dict, + secret_data: dict, + schema: str, + task_options: dict, + domain_id: str, + ): + self.azure_cm_connector.create_session(options, secret_data, schema) + start: datetime = self._get_first_date_of_month(task_options["start"]) + end: datetime = datetime.utcnow() + + monthly_time_period = self._make_monthly_time_period(start, end) + + for time_period in monthly_time_period: + _start = time_period["start"] + _end = time_period["end"] + response_stream = self.azure_cm_connector.query_usage_http( + secret_data, _start, _end + ) + + for results in response_stream: + yield self._make_benefit_cost_data( + results=results, + end=_end, + options=options, + ) + + def _make_benefit_cost_data( + self, + results: dict, + end: datetime, + options: dict, + tenant_id: str = None, + agreement_type: str = None, + ) -> list: + benefit_costs_data = [] + try: + combined_results = self._combine_rows_and_columns_from_results( + results.get("properties").get("rows"), + results.get("properties").get("columns"), + ) + for cb_result in combined_results: + billed_at = self._set_billed_date(cb_result.get("UsageDate", end)) + if not billed_at: + continue + + data = self._make_benefit_cost_info(cb_result, billed_at) + benefit_costs_data.append(data) + + except Exception as e: + _LOGGER.error(f"[_make_cost_data] make data error: {e}", exc_info=True) + raise e + + return benefit_costs_data + + def _make_benefit_cost_info(self, result: dict, billed_at: str) -> dict: + additional_info = { + "Tenant Id": result.get("CustomerTenantId"), + "Pricing Model": result.get("PricingModel"), + "Frequency": result.get("BillingFrequency"), + "Benefit Id": result.get("BenefitId"), + "Benefit Name": result.get("BenefitName"), + "Reservation Id": result.get("ReservationId"), + "Reservation Name": result.get("ReservationName"), + "Charge Type": result.get("ChargeType"), + } + usage_quantity = self._convert_str_to_float_format( + result.get("UsageQuantity", 0.0) + ) + actual_cost = self._convert_str_to_float_format(result.get("Cost", 0.0)) + data = { + "cost": 0, + "usage_quantity": usage_quantity, + "provider": "azure", + "product": result.get("MeterCategory"), + "tags": {}, + "billed_date": billed_at, + "data": { + "Actual Cost": actual_cost, + }, + "additional_info": additional_info, + } + return data def get_data( - self, options: dict, secret_data: dict, schema, task_options: dict + self, + options: dict, + secret_data: dict, + schema: str, + task_options: dict, + domain_id: str, ) -> list: self.azure_cm_connector.create_session(options, secret_data, schema) self._check_task_options(task_options) @@ -74,7 +169,7 @@ def get_data( ) for idx, tenant_id in enumerate(tenant_ids): _LOGGER.info( - f"[get_data] #{idx + 1} {tenant_id} tenant start to collect data from {_start} to {_end}" + f"[get_data] #{idx + 1} {tenant_id} tenant start to collect data from {_start} to {_end}, domain_id: {domain_id}" ) scope = self._make_scope( secret_data, task_options, collect_scope, tenant_id @@ -93,9 +188,11 @@ def get_data( options=options, agreement_type=agreement_type, ) + _LOGGER.info( - f"[get_data] #{idx + 1} {tenant_id} tenant collect is done" + f"[get_data] #{idx + 1} {tenant_id} tenant collect is done, domain_id: {domain_id}" ) + end_time = time.time() _LOGGER.info( f"[get_data] all collect is done in {int(end_time - start_time)} seconds" @@ -139,6 +236,79 @@ def _make_cost_data( return costs_data + def _make_transaction_cost_data(self, tenant_id: str, end: datetime) -> list: + transaction_cost_data = [] + + event_start_date = end.replace(day=1).strftime("%Y-%m-%d") + event_end_date = end.strftime("%Y-%m-%d") + query_filter = f"properties/eventDate ge {event_start_date} AND properties/eventDate le {event_end_date}" + invoice_section_id = self.azure_cm_connector.invoice_section_id + + try: + for ( + reservation_transaction + ) in self.azure_cm_connector.list_reservation_transactions_by_billing_profile_id( + query_filter + ): + + if ( + reservation_transaction.invoice_section_id.split("/")[-1] + == invoice_section_id + ): + reservation_transaction_info = ( + self.azure_cm_connector.convert_nested_dictionary( + reservation_transaction + ) + ) + billed_date = self._set_billed_date( + reservation_transaction_info.get("event_date", end) + ) + actual_cost = reservation_transaction_info["amount"] + reservation_order_id = reservation_transaction_info.get( + "reservation_order_id" + ) + reservation_name = reservation_transaction_info.get( + "reservation_order_name" + ) + additional_info = { + "Tenant Id": tenant_id, + "Customer Name": reservation_transaction_info.get( + "invoice_section_name" + ), + "Usage Type": "Reservation", + "charge_type": reservation_transaction_info.get("event_type"), + "Product Name": reservation_transaction_info.get("description"), + "Price Model": "Reservation", + "Benefit Id": f"/providers/Microsoft.Capacity/reservationOrders/{reservation_order_id}", + "Benefit Name": reservation_name, + "Reservation Id": reservation_order_id, + "Reservation Name": reservation_name, + "Frequency": reservation_transaction_info.get( + "billing_frequency" + ), + "Reservation SKU Name": reservation_transaction_info.get( + "arm_sku_name" + ), + } + cost_info = { + "cost": 0, + "quantity": reservation_transaction_info.get("quantity", 0), + "billed_date": billed_date, + "region": reservation_transaction_info.get("region"), + "data": {"Actual Cost": actual_cost}, + "additional_info": additional_info, + } + transaction_cost_data.append(cost_info) + _LOGGER.debug( + f"[_make_transaction_cost_data] transaction_cost_data: {len(transaction_cost_data)}" + ) + except Exception as e: + _LOGGER.error( + f"[_make_transaction_cost_data] make transaction cost data error: {e}", + exc_info=True, + ) + return transaction_cost_data + def _make_data_info( self, result: dict, @@ -211,6 +381,9 @@ def _get_additional_info(self, result: dict, options: dict, tenant_id: str = Non if result.get("reservationname") != "" and result.get("reservationname"): additional_info["Reservation Name"] = result["reservationname"] + if result.get("reservationid") != "" and result.get("reservationid"): + additional_info["Reservation Id"] = result["reservationid"] + if result.get("benefitname") != "" and result.get("benefitname"): benefit_name = result["benefitname"] additional_info["Benefit Name"] = benefit_name @@ -302,14 +475,23 @@ def get_pay_as_you_go_cost(self, result: dict) -> float: def _get_aggregate_data(self, result: dict, options: dict) -> dict: aggregate_data = {} + if options.get("pay_as_you_go", False): + return aggregate_data + + cost_in_billing_currency = self._convert_str_to_float_format( + result.get("costinbillingcurrency", 0.0) + ) + if options.get("cost_metric") == "AmortizedCost": - aggregate_data["Amortized Cost"] = self._convert_str_to_float_format( - result.get("amortizedcostinbillingcurrency", 0.0) - ) + aggregate_data["Amortized Cost"] = cost_in_billing_currency + + if result.get("reservationname") != "" and result.get("reservationname"): + aggregate_data["Actual Cost"] = 0 + else: + aggregate_data["Actual Cost"] = cost_in_billing_currency + else: - aggregate_data["Actual Cost"] = self._convert_str_to_float_format( - result.get("costinbillingcurrency", 0.0) - ) + aggregate_data["Actual Cost"] = cost_in_billing_currency return aggregate_data @@ -532,3 +714,8 @@ def _set_network_traffic_cost( additional_info["Usage Type Details"] = usage_type return additional_info + + @staticmethod + def _combine_rows_and_columns_from_results(rows: list, columns: list): + _columns = [column.get("name") for column in columns] + return pd.DataFrame(data=rows, columns=_columns).to_dict(orient="records") diff --git a/src/cloudforet/cost_analysis/manager/data_source_manager.py b/src/cloudforet/cost_analysis/manager/data_source_manager.py index db62627..8452dd0 100644 --- a/src/cloudforet/cost_analysis/manager/data_source_manager.py +++ b/src/cloudforet/cost_analysis/manager/data_source_manager.py @@ -1,18 +1,67 @@ import logging from spaceone.core.manager import BaseManager -from cloudforet.cost_analysis.model.data_source_model import PluginMetadata from cloudforet.cost_analysis.connector.azure_cost_mgmt_connector import ( AzureCostMgmtConnector, ) -_LOGGER = logging.getLogger(__name__) +_LOGGER = logging.getLogger("spaceone") + +_DEFAULT_DATA_SOURCE_RULES = [ + { + "name": "match_service_account", + "conditions_policy": "ALWAYS", + "actions": { + "match_service_account": { + "source": "additional_info.Subscription Id", + "target": "data.subscription_id", + } + }, + "options": {"stop_processing": True}, + } +] class DataSourceManager(BaseManager): @staticmethod - def init_response(options): - plugin_metadata = PluginMetadata() + def init_response(options: dict, domain_id: str) -> dict: + """ + Returns: + { + "metadata":{ + "data_source_rules(list)": [], + "supported_secret_types(list)": [], + "currency(str)": "KRW", + "use_account_routing(bool)": False, + "cost_info(dict)": { + "name" :"PayAsYouGo", + "unit" :"KRW" + }, + "data_info(dict)": { + "Actual Cost": { + "name": "Actual Cost", + "unit": "KRW" + } + }, + "additional_info(dict)": { + "Subscription Name": { + "name": "Subscription Name", + "visible": True + } + } + } + } + """ + plugin_metadata = { + "data_source_rules": _DEFAULT_DATA_SOURCE_RULES, + "supported_secret_types": ["MANUAL"], + "currency": "KRW", + "use_account_routing": False, + "exclude_license_cost": False, + "cost_info": {}, + "data_info": {}, + "additional_info": {}, + } # plugin_metadata.additional_info = { # "Subscription Name": {"name": "Subscription Name", "visible": True}, @@ -22,33 +71,32 @@ def init_response(options): # } if currency := options.get("currency"): - plugin_metadata.currency = currency + plugin_metadata["currency"] = currency - plugin_metadata.cost_info = { + plugin_metadata["cost_info"] = { "name": "PayAsYouGo", "unit": options.get("currency", "KRW"), } if options.get("cost_metric") == "AmortizedCost": - plugin_metadata.data_info["Amortized Cost"] = { + plugin_metadata["data_info"]["Amortized Cost"] = { "name": "Amortized Cost", "unit": options.get("currency", "KRW"), } else: - plugin_metadata.data_info["Actual Cost"] = { + plugin_metadata["data_info"]["Actual Cost"] = { "name": "Actual Cost", "unit": options.get("currency", "KRW"), } if options.get("use_account_routing"): - plugin_metadata.use_account_routing = True - plugin_metadata.account_match_key = "additional_info.Tenant Id" + plugin_metadata["use_account_routing"] = True + plugin_metadata["account_match_key"] = "additional_info.Tenant Id" if options.get("exclude_license_cost"): - plugin_metadata.exclude_license_cost = True + plugin_metadata["exclude_license_cost"] = True - plugin_metadata.validate() - return {"metadata": plugin_metadata.to_primitive()} + return {"metadata": plugin_metadata} def verify_plugin(self, options, secret_data, schema): azure_cm_connector: AzureCostMgmtConnector = self.locator.get_connector( diff --git a/src/cloudforet/cost_analysis/manager/job_manager.py b/src/cloudforet/cost_analysis/manager/job_manager.py index e6369f9..0bdbb62 100644 --- a/src/cloudforet/cost_analysis/manager/job_manager.py +++ b/src/cloudforet/cost_analysis/manager/job_manager.py @@ -6,14 +6,10 @@ from dateutil.relativedelta import relativedelta from spaceone.core.manager import BaseManager -from cloudforet.cost_analysis.connector.azure_cost_mgmt_connector import ( - AzureCostMgmtConnector, -) -from cloudforet.cost_analysis.model.job_model import Tasks from cloudforet.cost_analysis.conf.cost_conf import SECRET_TYPE_DEFAULT from cloudforet.cost_analysis.error.cost import * -_LOGGER = logging.getLogger(__name__) +_LOGGER = logging.getLogger("spaceone") _TASK_LIST_SIZE = 4 @@ -21,9 +17,7 @@ class JobManager(BaseManager): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - self.azure_cm_connector: AzureCostMgmtConnector = self.locator.get_connector( - "AzureCostMgmtConnector" - ) + self.azure_cm_connector = self.locator.get_connector("AzureCostMgmtConnector") def get_tasks( self, @@ -34,7 +28,7 @@ def get_tasks( start: str, last_synchronized_at: datetime, domain_id: str, - ): + ) -> dict: start_month = self._get_start_month(start, last_synchronized_at) self.azure_cm_connector.create_session(options, secret_data, schema) @@ -134,11 +128,11 @@ def get_tasks( else: raise ERROR_INVALID_SECRET_TYPE(secret_type=options.get("secret_type")) - tasks = Tasks( - {"tasks": tasks, "changed": changed, "synced_accounts": synced_accounts} - ) - tasks.validate() - return tasks.to_primitive() + tasks = {"tasks": tasks, "changed": changed, "synced_accounts": synced_accounts} + + print(tasks) + + return tasks def _get_tenants_from_billing_account(self): tenants = []