diff --git a/src/cloudforet/cost_analysis/conf/cost_conf.py b/src/cloudforet/cost_analysis/conf/cost_conf.py index 5a12a8f..c8d2189 100644 --- a/src/cloudforet/cost_analysis/conf/cost_conf.py +++ b/src/cloudforet/cost_analysis/conf/cost_conf.py @@ -13,7 +13,6 @@ GROUPING = [ {'type': 'Dimension', 'name': 'ResourceGroup'}, {'type': 'Dimension', 'name': 'ResourceType'}, - {'type': 'Dimension', 'name': 'ResourceId'}, {'type': 'Dimension', 'name': 'ResourceLocation'}, {'type': 'Dimension', 'name': 'SubscriptionId'}, {'type': 'Dimension', 'name': 'SubscriptionName'}, @@ -32,6 +31,7 @@ ] GROUPING_CUSTOMER_TENANT_OPTION = {'type': 'Dimension', 'name': 'CustomerTenantId'} GROUPING_TAG_OPTION = {'type': 'Tag', 'name': ''} +GROUPING_RESOURCE_ID_OPTION = {'type': 'Dimension', 'name': 'ResourceId'} REGION_MAP = { 'global': 'Global', diff --git a/src/cloudforet/cost_analysis/connector/azure_cost_mgmt_connector.py b/src/cloudforet/cost_analysis/connector/azure_cost_mgmt_connector.py index 677e89a..5a1d0a2 100644 --- a/src/cloudforet/cost_analysis/connector/azure_cost_mgmt_connector.py +++ b/src/cloudforet/cost_analysis/connector/azure_cost_mgmt_connector.py @@ -2,7 +2,9 @@ import os import requests import time -import re +import pandas as pd +import numpy as np +from io import StringIO from datetime import datetime from cloudforet.cost_analysis.conf.cost_conf import * @@ -17,6 +19,8 @@ _LOGGER = logging.getLogger(__name__) +_PAGE_SIZE = 1000 + class AzureCostMgmtConnector(BaseConnector): @@ -61,24 +65,6 @@ def get_billing_account(self): billing_account_info = self.billing_client.billing_accounts.get(billing_account_name=billing_account_name) return billing_account_info - def query(self, customer_id, start, end): - billing_account_id = self.billing_account_id - scope = f'providers/Microsoft.Billing/billingAccounts/{billing_account_id}/customers/{customer_id}' - parameters = { - 'type': TYPE, - 'timeframe': TIMEFRAME, - 'timePeriod': { - 'from': start.isoformat(), - 'to': end.isoformat() - }, - 'dataset': { - 'granularity': GRANULARITY, - 'aggregation': AGGREGATION, - 'grouping': GROUPING - } - } - return self.cost_mgmt_client.query.usage(scope=scope, parameters=parameters) - def query_http(self, scope, secret_data, parameters, **kwargs): try: api_version = '2023-03-01' @@ -100,6 +86,40 @@ def query_http(self, scope, secret_data, parameters, **kwargs): except Exception as e: raise ERROR_UNKNOWN(message=f'[ERROR] query_http {e}') + def begin_create_operation(self, scope, parameters): + try: + content_type = 'application/json' + response = self.cost_mgmt_client.generate_cost_details_report.begin_create_operation(scope=scope, + parameters=parameters, + content_type=content_type) + + result = self.convert_nested_dictionary(response.result()) + blobs = result.get('blobs', []) + _LOGGER.debug(f'[begin_create_operation] csv_file_link: {blobs}') + return blobs + + except Exception as e: + _LOGGER.error(f'[begin_create_operation] error message: {e}') + raise ERROR_UNKNOWN(message=f'[ERROR] begin_create_operation failed') + + def get_cost_data(self, blobs): + for blob in blobs: + cost_csv = self._download_cost_data(blob) + + df = pd.read_csv(StringIO(cost_csv)) + df = df.replace({np.nan: None}) + + costs_data = df.to_dict('records') + + _LOGGER.debug(f'[get_cost_data] costs count: {len(costs_data)}') + + # Paginate + page_count = int(len(costs_data) / _PAGE_SIZE) + 1 + + for page_num in range(page_count): + offset = _PAGE_SIZE * page_num + yield costs_data[offset:offset + _PAGE_SIZE] + def list_by_billing_account(self): billing_account_name = self.billing_account_id return self.billing_client.billing_subscriptions.list_by_billing_account(billing_account_name=billing_account_name) @@ -140,22 +160,36 @@ def _retry_request(self, response, url, headers, json, retry_count, method='post _LOGGER.error(f'[ERROR] retry_request failed {e}') raise e - def _get_latest_api_version(self, secret_data, scope): - try: - url = f'https://management.azure.com/{scope}/providers/Microsoft.CostManagement/query?api-version=""' - headers = self._make_request_headers(secret_data) - response = requests.post(url=url, headers=headers) - - response_json = response.json() - if error_json := response_json.get('error'): - error_msg = error_json.get('message', '') - api_versions = re.findall(r"'([^\']+)'", error_msg.split('.')[2].strip().strip("'")+"'")[0].split(',') - for api_version in reversed(api_versions): - if 'preview' not in api_version: - return api_version + def convert_nested_dictionary(self, cloud_svc_object): + cloud_svc_dict = {} + if hasattr(cloud_svc_object, '__dict__'): # if cloud_svc_object is not a dictionary type but has dict method + cloud_svc_dict = cloud_svc_object.__dict__ + elif isinstance(cloud_svc_object, dict): + cloud_svc_dict = cloud_svc_object + elif not isinstance(cloud_svc_object, list): # if cloud_svc_object is one of type like int, float, char, ... + return cloud_svc_object + + # if cloud_svc_object is dictionary type + for key, value in cloud_svc_dict.items(): + if hasattr(value, '__dict__') or isinstance(value, dict): + cloud_svc_dict[key] = self.convert_nested_dictionary(value) + if 'azure' in str(type(value)): + cloud_svc_dict[key] = self.convert_nested_dictionary(value) + elif isinstance(value, list): + value_list = [] + for v in value: + value_list.append(self.convert_nested_dictionary(v)) + cloud_svc_dict[key] = value_list + + return cloud_svc_dict + @staticmethod + def _download_cost_data(blob: dict) -> str: + try: + response = requests.get(blob.get('blob_link')) + return response.text except Exception as e: - _LOGGER.error(f'[ERROR] _get_latest_api_version {e}') + _LOGGER.error(f'[_download_cost_data] download error: {e}', exc_info=True) raise e @staticmethod diff --git a/src/cloudforet/cost_analysis/manager/cost_manager.py b/src/cloudforet/cost_analysis/manager/cost_manager.py index 4dcc343..fd9ca70 100644 --- a/src/cloudforet/cost_analysis/manager/cost_manager.py +++ b/src/cloudforet/cost_analysis/manager/cost_manager.py @@ -1,5 +1,5 @@ import logging -import pandas as pd +import json import time from datetime import datetime, timedelta, timezone @@ -21,8 +21,7 @@ def get_data(self, options, secret_data, schema, task_options): self._check_task_options(task_options) collect_scope = task_options['collect_scope'] - account_agreement_type = task_options.get('account_agreement_type') - customer_tenants = self._get_customer_tenant_id(task_options, collect_scope) + tenant_ids = self._get_tenant_ids(task_options, collect_scope) start = self._add_first_day_of_month(task_options['start']) end = datetime.utcnow().replace(tzinfo=timezone.utc) @@ -30,47 +29,36 @@ def get_data(self, options, secret_data, schema, task_options): for time_period in monthly_time_period: _start = time_period['start'] _end = time_period['end'] - parameters = self._make_parameters(_start, _end, account_agreement_type, options) + parameters = self._make_parameters(_start, _end, options) + start_time = time.time() - print(f"{datetime.utcnow()} [INFO][get_data] all tenant is start to collect data from {_start} to {_end}") - for idx, customer_tenant_id in enumerate(customer_tenants): - scope = self._make_scope(secret_data, task_options, collect_scope, customer_tenant_id) - - for response_stream in self.azure_cm_connector.query_http(scope, secret_data, parameters): - yield self._make_cost_data(results=response_stream, end=_end, tenant_id=customer_tenant_id) - end_time = time.time() - print(f"{datetime.utcnow()} [INFO][get_data] #{idx+1} {customer_tenant_id} tenant collect is done") + print(f"{datetime.utcnow()} [INFO][get_data] start to collect data from {_start} to {_end}") + for idx, tenant_id in enumerate(tenant_ids): + scope = self._make_scope(secret_data, task_options, collect_scope, tenant_id) + blobs = self.azure_cm_connector.begin_create_operation(scope, parameters) + response_stream = self.azure_cm_connector.get_cost_data(blobs) + for results in response_stream: + yield self._make_cost_data(results=results, end=_end, tenant_id=tenant_id, options=options) + print(f"{datetime.utcnow()} [INFO][get_data] #{idx+1} {tenant_id} tenant collect is done") + end_time = time.time() print(f"{datetime.utcnow()} [INFO][get_data] all collect is done in {int(end_time - start_time)} seconds") - yield [] - def _make_cost_data(self, results, end, tenant_id=None): + yield [] + + def _make_cost_data(self, results, end, options, tenant_id=None): """ Source Data Model - class QueryResult Class - id: str, - name: str, - type: str, - location: str, - sku: str, - eTag: str, - tags: dict, - properties: { - nextLink: str, - columns: list, - rows: list - } """ costs_data = [] try: - combined_results = self._combine_rows_and_columns_from_results(results.get('properties').get('rows'), - results.get('properties').get('columns')) - for cb_result in combined_results: - billed_date = self._set_billed_date(cb_result.get('UsageDate', end)) + for result in results: + result = {key.lower(): value for key, value in result.items()} + billed_date = self._set_billed_date(result.get('date', end)) if not billed_date: continue - data = self._make_data_info(cb_result, billed_date, tenant_id) + data = self._make_data_info(result, billed_date, options, tenant_id) costs_data.append(data) except Exception as e: @@ -79,15 +67,15 @@ class QueryResult Class return costs_data - def _make_data_info(self, result, billed_date, tenant_id=None): - additional_info = self._get_additional_info(result, tenant_id) - cost = self._convert_str_to_float_format(result.get('Cost', 0)) - usage_quantity = self._convert_str_to_float_format(result.get('UsageQuantity', 0)) - usage_type = result.get('Meter', '') - usage_unit = result.get('UnitOfMeasure', '') - region_code = result.get('ResourceLocation', '').lower() - product = result.get('MeterCategory', '') - tags = {} # self._convert_tag_str_to_dict(result.get('Tag')) + def _make_data_info(self, result, billed_date, options, tenant_id=None): + additional_info = self._get_additional_info(result, options, tenant_id) + cost = self._convert_str_to_float_format(result.get('costinbillingcurrency', 0)) + usage_quantity = self._convert_str_to_float_format(result.get('quantity', 0)) + usage_type = result.get('metername', '') + usage_unit = result.get('unitofmeasure', '') + region_code = result.get('resourcelocation', '').lower() + product = result.get('metercategory', '') + tags = self._convert_tags_str_to_dict(result.get('tags', {})) data = { 'cost': cost, @@ -104,86 +92,85 @@ def _make_data_info(self, result, billed_date, tenant_id=None): return data - def _get_additional_info(self, result, tenant_id=None): + def _get_additional_info(self, result, options, tenant_id=None): additional_info = {} - meter_category = result.get('MeterCategory', '') - tenant_id = result.get('CustomerTenantId') if result.get('CustomerTenantId') else tenant_id + meter_category = result.get('metercategory', '') + tenant_id = result.get('customertenantid') if result.get('customertenantid') else tenant_id additional_info['Tenant ID'] = tenant_id - additional_info['Currency'] = result.get('Currency', 'USD') - additional_info['Subscription Id'] = result.get('SubscriptionId', 'Shared') + additional_info['Subscription Id'] = result.get('subscriptionid', 'Shared') if meter_category == 'Virtual Machines' and 'Meter' in result: - additional_info['Instance Type'] = result['Meter'] + additional_info['Instance Type'] = result['meter'] + + if result.get('resourcelocation') != '' and result.get('resourcelocation'): + additional_info['Resource Location'] = result['resourcelocation'] - if result.get('ResourceLocation') != '' and result.get('ResourceGroup'): - additional_info['Resource Group'] = result['ResourceGroup'] + if result.get('resourcegroupname') != '' and result.get('resourcegroupname'): + additional_info['Resource Group'] = result['resourcegroupname'] - if result.get('ResourceType') != '' and result.get('ResourceType'): - additional_info['Resource Type'] = result['ResourceType'] + if result.get('resourcetype') != '' and result.get('resourcetype'): + additional_info['Resource Type'] = result['resourcetype'] - if result.get('SubscriptionName') != '' and result.get('SubscriptionName'): - additional_info['Subscription Name'] = result['SubscriptionName'] + if result.get('subscriptionname') != '' and result.get('subscriptionname'): + additional_info['Subscription Name'] = result['subscriptionname'] - if result.get('PricingModel') != '' and result.get('PricingModel'): - additional_info['Pricing Model'] = result['PricingModel'] + if result.get('pricingmodel') != '' and result.get('pricingmodel'): + additional_info['Pricing Model'] = result['pricingmodel'] - if result.get('BenefitName') != '' and result.get('BenefitName'): - benefit_name = result['BenefitName'] + if result.get('benefitname') != '' and result.get('benefitname'): + benefit_name = result['benefitname'] additional_info['Benefit Name'] = benefit_name - if result.get('PricingModel') == 'Reservation' and result['MeterCategory'] == '': - result['MeterCategory'] = self._set_product_from_benefit_name(benefit_name) + if result.get('pricingmodel') == 'Reservation' and result['metercategory'] == '': + result['metercategory'] = self._set_product_from_benefit_name(benefit_name) - if result.get('MeterSubcategory') != '' and result.get('MeterSubcategory'): - additional_info['Meter SubCategory'] = result.get('MeterSubcategory') - if result.get('PricingModel') == 'OnDemand' and result.get('MeterCategory') == '': - result['MeterCategory'] = result.get('MeterSubcategory') + if result.get('metersubcategory') != '' and result.get('metersubcategory'): + additional_info['Meter SubCategory'] = result.get('metersubcategory') + if result.get('pricingmodel') == 'OnDemand' and result.get('metercategory') == '': + result['metercategory'] = result.get('metercategory') - if result.get('DepartmentName') != '' and result.get('DepartmentName'): - additional_info['Department Name'] = result['DepartmentName'] + if result.get('departmentname') != '' and result.get('departmentname'): + additional_info['Department Name'] = result['departmentname'] - if result.get('EnrollmentAccountName') != '' and result.get('EnrollmentAccountName'): - additional_info['Enrollment Account Name'] = result['EnrollmentAccountName'] + if result.get('enrollmentaccountname') != '' and result.get('enrollmentaccountname'): + additional_info['Enrollment Account Name'] = result['enrollmentaccountname'] - if result.get('ResourceId') != '' and result.get('ResourceId'): - additional_info['Resource ID'] = result['ResourceId'] - additional_info['Resource Name'] = result['ResourceId'].split('/')[-1] + collect_resource_id = options.get('collect_resource_id', False) + if collect_resource_id and result.get('resourceid') != '' and result.get('resourceid'): + additional_info['Resource ID'] = result['resourceid'] + additional_info['Resource Name'] = result['resourceid'].split('/')[-1] - return additional_info + if result.get('productname'): + additional_info['Product Name'] = result['productname'] - def _combine_make_data(self, costs_data, results): - try: - costs_data_without_tag = [] - combined_results = self._combine_rows_and_columns_from_results(results.get('properties').get('rows'), - results.get('properties').get('columns')) - for cb_result in combined_results: - billed_date = self._set_billed_date(cb_result.get('UsageDate')) - if not billed_date: - continue + if result.get('unitprice') != '' and result.get('unitprice'): + additional_info['Unit Price'] = result['unitprice'] - data = self._make_data_info(cb_result, billed_date) - costs_data_without_tag.append(data) + return additional_info - for idx, cost_data in enumerate(costs_data): - if self._check_prev_and_current_result(cost_data, costs_data_without_tag[idx]): - cost_data.update({'usd_cost': costs_data_without_tag[idx]['usd_cost']}) - return costs_data - except Exception as e: - _LOGGER.error(f'[_combine_make_data] make data error: {e}', exc_info=True) - raise e + @staticmethod + def _make_parameters(start, end, options): + parameters = { + "metric": "ActualCost", + "timePeriod": { + "start": start, + "end": end + } + } + return parameters @staticmethod - def _get_customer_tenant_id(task_options, collect_scope): - customer_tenants = [] + def _get_tenant_ids(task_options, collect_scope): + tenant_ids = [] if 'tenant_id' in task_options: - customer_tenants.append(task_options['tenant_id']) + tenant_ids.append(task_options['tenant_id']) elif collect_scope == 'customer_tenant_id': - customer_tenants.append(task_options['customer_tenant']) + tenant_ids.append(task_options['customer_tenant']) else: - customer_tenants.append('EA Agreement') - return customer_tenants + tenant_ids.append('EA Agreement') + return tenant_ids @staticmethod def _make_scope(secret_data, task_options, collect_scope, customer_tenant_id=None): @@ -200,78 +187,35 @@ def _make_scope(secret_data, task_options, collect_scope, customer_tenant_id=Non return scope @staticmethod - def _make_parameters(start, end, account_agreement_type, options=None): - parameters = {} - aggregation = AGGREGATION - grouping = GROUPING - - if account_agreement_type == 'EnterpriseAgreement': - grouping = [dimension for dimension in grouping if dimension['name'] != 'UnitOfMeasure'] - grouping.extend(GROUPING_EA_AGREEMENT_OPTION) - - parameters.update({ - 'type': TYPE, - 'timeframe': TIMEFRAME, - 'timePeriod': { - 'from': start.isoformat(), - 'to': end.isoformat() - }, - 'dataset': { - 'aggregation': aggregation, - 'grouping': grouping, - 'granularity': options.get('granularity', GRANULARITY), - } - }) - - return parameters - - @staticmethod - def _convert_tag_str_to_dict(tag: str): - if tag is None: + def _convert_tags_str_to_dict(tags: str): + if tags is None: return {} - - tag_dict = {} - if ":" in tag: - tag = tag.split(':') - _key = tag[0] - _value = tag[1] - tag_dict[_key] = _value - elif tag: - tag_dict[tag] = '' - return tag_dict + json_str = '{' + tags + '}' + return json.loads(json_str) @staticmethod def _set_product_from_benefit_name(benefit_name): - _product_name_format = 'Reserved {_product_name}' - product_name = _product_name_format.format(_product_name=benefit_name) + _product_name_format = 'Reserved {product_name}' + product_name = _product_name_format.format(product_name=benefit_name) try: if 'VM' in benefit_name.upper(): - product_name = _product_name_format.format(_product_name='VM Instances') + product_name = _product_name_format.format(product_name='VM Instances') elif 'REDIS' in benefit_name.upper(): - product_name = _product_name_format.format(_product_name='Redis Cache') + product_name = _product_name_format.format(product_name='Redis Cache') elif 'DISK' in benefit_name.upper(): - product_name = _product_name_format.format(_product_name='Disk') + product_name = _product_name_format.format(product_name='Disk') elif 'BLOB' in benefit_name.upper(): - product_name = _product_name_format.format(_product_name='Blob Storage Capacity') + product_name = _product_name_format.format(product_name='Blob Storage Capacity') elif 'FILE' in benefit_name.upper(): - product_name = _product_name_format.format(_product_name='File Capacity') + product_name = _product_name_format.format(product_name='File Capacity') elif len(benefit_name.split("_")) > 1: - product_name = _product_name_format.format(_product_name=benefit_name.split("_")[0]) + product_name = _product_name_format.format(product_name=benefit_name.split("_")[0]) return product_name except Exception as e: return product_name - @staticmethod - def _remove_cost_data_start_from_last_billed_at(costs_data, last_billed_at): - return [cost_data for cost_data in costs_data if cost_data.get('billed_at') < last_billed_at] - - @staticmethod - def _combine_rows_and_columns_from_results(rows, columns): - _columns = [column.get('name') for column in columns] - return pd.DataFrame(data=rows, columns=_columns).to_dict(orient='records') - @staticmethod def _convert_str_to_float_format(num_str: str): return format(float(num_str), 'f') @@ -283,7 +227,9 @@ def _set_billed_date(start): start = str(start) formatted_start = datetime.strptime(start, "%Y%m%d") elif isinstance(start, datetime): - return start + return start.strftime("%Y-%m-%d") + elif len(start.split('/')) == 3: + formatted_start = datetime.strptime(start, "%m/%d/%Y") else: formatted_start = start @@ -292,19 +238,6 @@ def _set_billed_date(start): _LOGGER.error(f'[_set_billed_at] set billed_at error: {e}', exc_info=True) return None - @staticmethod - def _check_prev_and_current_result(prev_result, cur_result): - if cur_result.get('UsageDate') != prev_result.get('UsageDate'): - return False - if cur_result.get('Meter') != prev_result.get('Meter'): - return False - if cur_result.get('MeterCategory') != prev_result.get('MeterCategory'): - return False - if cur_result.get('ResourceId') != prev_result.get('ResourceId'): - return False - - return True - @staticmethod def _add_first_day_of_month(start_month): return datetime.strptime(start_month, "%Y-%m").replace(day=1) diff --git a/src/cloudforet/cost_analysis/model/job_model.py b/src/cloudforet/cost_analysis/model/job_model.py index 2c2b491..2870333 100644 --- a/src/cloudforet/cost_analysis/model/job_model.py +++ b/src/cloudforet/cost_analysis/model/job_model.py @@ -1,7 +1,6 @@ from schematics.models import Model -from schematics.types import ListType, IntType, DateTimeType, StringType, DictType +from schematics.types import ListType, StringType, DictType from schematics.types.compound import ModelType -# todo: change pydantic __all__ = ['Tasks']