From dd5ed5245449de464fa169cc024f2c83c598a722 Mon Sep 17 00:00:00 2001 From: 1101-1 <70093559+1101-1@users.noreply.github.com> Date: Thu, 12 Dec 2024 12:46:51 +0500 Subject: [PATCH] [azure][feat] Rewrite metrics collection to make it better (#2298) Co-authored-by: Matthias Veit --- plugins/azure/fix_plugin_azure/collector.py | 15 ++ .../azure/fix_plugin_azure/resource/base.py | 81 +++++- .../fix_plugin_azure/resource/compute.py | 232 ++++++++--------- .../resource/machinelearning.py | 6 - .../fix_plugin_azure/resource/metrics.py | 204 +++++++-------- .../azure/fix_plugin_azure/resource/mysql.py | 6 - .../fix_plugin_azure/resource/postgresql.py | 6 - .../fix_plugin_azure/resource/storage.py | 237 ++++++++++-------- plugins/azure/fix_plugin_azure/utils.py | 20 +- plugins/azure/test/compute_test.py | 1 + plugins/azure/test/metric_test.py | 32 ++- 11 files changed, 435 insertions(+), 405 deletions(-) diff --git a/plugins/azure/fix_plugin_azure/collector.py b/plugins/azure/fix_plugin_azure/collector.py index 228f2679ca..4956b58ca6 100644 --- a/plugins/azure/fix_plugin_azure/collector.py +++ b/plugins/azure/fix_plugin_azure/collector.py @@ -39,6 +39,7 @@ MicrosoftGraphOrganizationRoot, ) from fix_plugin_azure.resource.monitor import resources as monitor_resources +from fix_plugin_azure.resource.metrics import AzureMetricData from fix_plugin_azure.resource.mysql import AzureMysqlServerType, resources as mysql_resources from fix_plugin_azure.resource.network import ( AzureNetworkExpressRoutePortsLocation, @@ -159,6 +160,14 @@ def get_last_run() -> Optional[datetime]: for after_collect in builder.after_collect_actions: after_collect() + if builder.config.collect_usage_metrics: + try: + log.info(f"[Azure:{self.account.safe_name}] Collect usage metrics.") + self.collect_usage_metrics(builder) + builder.executor.wait_for_submitted_work() + except Exception as e: + log.warning(f"[Azure] Failed to collect usage metrics in project {self.account.safe_name}: {e}") + # connect nodes log.info(f"[Azure:{self.account.safe_name}] Connect resources and create edges.") for node, data in list(self.graph.nodes(data=True)): @@ -184,6 +193,12 @@ def get_last_run() -> Optional[datetime]: self.core_feedback.progress_done(self.account.id, 1, 1, context=[self.cloud.id]) log.info(f"[Azure:{self.account.safe_name}] Collecting resources done.") + def collect_usage_metrics(self, builder: GraphBuilder) -> None: + for resource in builder.graph.nodes: + if isinstance(resource, MicrosoftResource) and (mq := resource.collect_usage_metrics(builder)): + start_at = builder.created_at - builder.metrics_delta + AzureMetricData.query_for(builder, resource, mq, start_at, builder.created_at) + def collect_resource_list( self, name: str, builder: GraphBuilder, resources: List[Type[MicrosoftResource]] ) -> Future[None]: diff --git a/plugins/azure/fix_plugin_azure/resource/base.py b/plugins/azure/fix_plugin_azure/resource/base.py index fea5892192..e3d7fdf374 100644 --- a/plugins/azure/fix_plugin_azure/resource/base.py +++ b/plugins/azure/fix_plugin_azure/resource/base.py @@ -3,9 +3,10 @@ import logging from concurrent.futures import Future from datetime import datetime, timedelta -from typing import Any, ClassVar, Dict, Optional, TypeVar, List, Type, Callable, cast, Union, Set +from typing import Any, ClassVar, Dict, Optional, TypeVar, List, Type, Tuple, Callable, cast, Union, Set from attr import define, field +from attrs import frozen from azure.identity import DefaultAzureCredential from fix_plugin_azure.azure_client import AzureResourceSpec, MicrosoftClient, MicrosoftRestSpec @@ -20,6 +21,9 @@ BaseRegion, ModelReference, PhantomBaseResource, + StatName, + MetricName, + MetricUnit, ) from fixlib.config import current_config from fixlib.core.actions import CoreFeedback @@ -187,12 +191,9 @@ def connect_in_graph(self, builder: GraphBuilder, source: Json) -> None: # Default behavior: add resource to the namespace pass - @classmethod - def collect_usage_metrics( - cls: Type[MicrosoftResourceType], builder: GraphBuilder, collected_resources: List[MicrosoftResourceType] - ) -> None: + def collect_usage_metrics(self, builder: GraphBuilder) -> List[AzureMetricQuery]: # Default behavior: do nothing - pass + return [] @classmethod def collect_resources( @@ -203,13 +204,7 @@ def collect_resources( if spec := cls.api_spec: try: items = builder.client.list(spec, **kwargs) - collected = cls.collect(items, builder) - if builder.config.collect_usage_metrics: - try: - cls.collect_usage_metrics(builder, collected) - except Exception as e: - log.warning(f"Failed to collect usage metrics for {cls.__name__}: {e}") - return collected + return cls.collect(items, builder) except Exception as e: msg = f"Error while collecting {cls.__name__} with service {spec.service} and location: {builder.location}: {e}" builder.core_feedback.info(msg, log) @@ -1008,6 +1003,66 @@ def with_location(self, location: BaseRegion) -> GraphBuilder: ) +STAT_MAP: Dict[str, StatName] = { + "minimum": StatName.min, + "average": StatName.avg, + "maximum": StatName.max, +} + + +@frozen(kw_only=True) +class MetricNormalization: + unit: MetricUnit + normalize_value: Callable[[float], float] = lambda x: x + + +@define(hash=True, frozen=True) +class AzureMetricQuery: + metric_name: str + metric_namespace: str + metric_normalization_name: MetricName + ref_id: str + instance_id: str + metric_id: str + aggregation: Tuple[str, ...] + normalization: MetricNormalization + # Optional `start_time` and `period` override defaults for query timespan and interval. + period: Optional[timedelta] = None + start_time: Optional[datetime] = None + unit: str = "Count" + + @staticmethod + def create( + *, + metric_name: str, + metric_namespace: str, + metric_normalization_name: MetricName, + instance_id: str, + ref_id: str, + normalization: MetricNormalization, + aggregation: Tuple[str, ...], + unit: str = "Count", + start_time: Optional[datetime] = None, + period: Optional[timedelta] = None, + metric_id: Optional[str] = None, + ) -> "AzureMetricQuery": + metric_id = f"{instance_id}/providers/Microsoft.Insights/metrics/{metric_name}" + # noinspection PyTypeChecker + return AzureMetricQuery( + metric_name=metric_name, + metric_namespace=metric_namespace, + metric_normalization_name=metric_normalization_name, + instance_id=instance_id, + metric_id=metric_id, + aggregation=aggregation, + ref_id=ref_id, + unit=unit, + normalization=normalization, + period=period, + start_time=start_time, + ) + + resources: List[Type[MicrosoftResource]] = [ AzureResourceGroup, ] diff --git a/plugins/azure/fix_plugin_azure/resource/compute.py b/plugins/azure/fix_plugin_azure/resource/compute.py index ab06cf50db..c4f67cf12c 100644 --- a/plugins/azure/fix_plugin_azure/resource/compute.py +++ b/plugins/azure/fix_plugin_azure/resource/compute.py @@ -17,15 +17,16 @@ AzureExtendedLocation, AzurePrincipalClient, AzurePrivateEndpointConnection, + AzureMetricQuery, ) -from fix_plugin_azure.resource.metrics import AzureMetricData, AzureMetricQuery, update_resource_metrics +from fix_plugin_azure.resource.metrics import NormalizerFactory from fix_plugin_azure.resource.network import ( AzureNetworkSecurityGroup, AzureNetworkSubnet, AzureNetworkInterface, AzureNetworkLoadBalancer, ) -from fix_plugin_azure.utils import MetricNormalization, rgetvalue +from fix_plugin_azure.utils import rgetvalue from fixlib.baseresources import ( BaseInstance, BaseKeyPair, @@ -34,7 +35,6 @@ BaseSnapshot, BaseVolumeType, MetricName, - MetricUnit, VolumeStatus, BaseAutoScalingGroup, InstanceStatus, @@ -1064,11 +1064,6 @@ def collect_resources(cls, builder: GraphBuilder, **kwargs: Any) -> List["AzureC AzureComputeDisk._collect_disk_types(builder, d_loc) # Create additional custom disk sizes for disks with Ultra SSD or Premium SSD v2 types AzureComputeDiskType.create_unique_disk_sizes(disks, builder, d_loc) - if builder.config.collect_usage_metrics: - try: - cls.collect_usage_metrics(builder, collected) - except Exception as e: - log.warning(f"Failed to collect usage metrics for {cls.__name__}: {e}") return collected return [] @@ -1099,61 +1094,48 @@ def collect_disk_types() -> None: graph_builder.submit_work(service_name, collect_disk_types) - @classmethod - def collect_usage_metrics( - cls: Type[MicrosoftResource], builder: GraphBuilder, collected_resources: List[MicrosoftResourceType] - ) -> None: - volumes = {volume.id: volume for volume in collected_resources if volume} + def collect_usage_metrics(self, builder: GraphBuilder) -> List[AzureMetricQuery]: + volume_id = self.id queries = [] - start = builder.metrics_start - now = builder.created_at - delta = builder.metrics_delta - for volume_id in volumes: - queries.extend( - [ - AzureMetricQuery.create( - metric_name=metric_name, - metric_namespace="microsoft.compute/disks", - instance_id=volume_id, - aggregation=("average",), - ref_id=volume_id, - unit="BytesPerSecond", - ) - for metric_name in ["Composite Disk Write Bytes/sec", "Composite Disk Read Bytes/sec"] + + queries.extend( + [ + AzureMetricQuery.create( + metric_name=name, + metric_namespace="microsoft.compute/disks", + metric_normalization_name=metric_name, + instance_id=volume_id, + aggregation=("average",), + ref_id=volume_id, + unit="BytesPerSecond", + normalization=NormalizerFactory.bytes, + ) + for name, metric_name in [ + ("Composite Disk Write Bytes/sec", MetricName.VolumeWrite), + ("Composite Disk Read Bytes/sec", MetricName.VolumeRead), ] - ) - queries.extend( - [ - AzureMetricQuery.create( - metric_name=metric_name, - metric_namespace="microsoft.compute/disks", - instance_id=volume_id, - aggregation=("average",), - ref_id=volume_id, - unit="CountPerSecond", - ) - for metric_name in ["Composite Disk Write Operations/sec", "Composite Disk Read Operations/sec"] + ] + ) + queries.extend( + [ + AzureMetricQuery.create( + metric_name=name, + metric_namespace="microsoft.compute/disks", + metric_normalization_name=metric_name, + instance_id=volume_id, + aggregation=("average",), + ref_id=volume_id, + unit="CountPerSecond", + normalization=NormalizerFactory.iops, + ) + for name, metric_name in [ + ("Composite Disk Write Operations/sec", MetricName.VolumeWrite), + ("Composite Disk Read Operations/sec", MetricName.VolumeRead), ] - ) - - metric_normalizers = { - "Composite Disk Write Bytes/sec": MetricNormalization( - metric_name=MetricName.VolumeWrite, unit=MetricUnit.Bytes - ), - "Composite Disk Read Bytes/sec": MetricNormalization( - metric_name=MetricName.VolumeRead, unit=MetricUnit.Bytes - ), - "Composite Disk Write Operations/sec": MetricNormalization( - metric_name=MetricName.VolumeWrite, unit=MetricUnit.IOPS - ), - "Composite Disk Read Operations/sec": MetricNormalization( - metric_name=MetricName.VolumeRead, unit=MetricUnit.IOPS - ), - } - - metric_result = AzureMetricData.query_for(builder, queries, start, now, delta) + ] + ) - update_resource_metrics(volumes, metric_result, metric_normalizers) + return queries @staticmethod def _get_nearest_size(size: int, lookup_map: Dict[int, Any]) -> int: @@ -2983,12 +2965,6 @@ def collect_resources(cls, builder: GraphBuilder, **kwargs: Any) -> List["AzureC # Collect VM sizes for the VM in this location AzureComputeVirtualMachineBase._collect_vm_sizes(builder, location) - if builder.config.collect_usage_metrics: - try: - cls.collect_usage_metrics(builder, collected) - except Exception as e: - log.warning(f"Failed to collect usage metrics for {cls.__name__}: {e}") - return collected return [] @@ -3017,83 +2993,75 @@ def collect_vm_sizes() -> None: graph_builder.submit_work(service_name, collect_vm_sizes) - @classmethod - def collect_usage_metrics( - cls: Type[MicrosoftResource], builder: GraphBuilder, collected_resources: List[MicrosoftResourceType] - ) -> None: - virtual_machines = {vm.id: vm for vm in collected_resources if vm} + def collect_usage_metrics(self, builder: GraphBuilder) -> List[AzureMetricQuery]: + vm_id = self.id queries = [] - start = builder.metrics_start - now = builder.created_at - delta = builder.metrics_delta - for vm_id in virtual_machines: - queries.append( + + queries.append( + AzureMetricQuery.create( + metric_name="Percentage CPU", + metric_namespace="Microsoft.Compute/virtualMachines", + metric_normalization_name=MetricName.CpuUtilization, + instance_id=vm_id, + aggregation=("average", "minimum", "maximum"), + ref_id=vm_id, + unit="Percent", + normalization=NormalizerFactory.percent, + ) + ) + queries.extend( + [ AzureMetricQuery.create( - metric_name="Percentage CPU", + metric_name=name, metric_namespace="Microsoft.Compute/virtualMachines", + metric_normalization_name=metric_name, instance_id=vm_id, aggregation=("average", "minimum", "maximum"), ref_id=vm_id, - unit="Percent", + unit="Bytes", + normalization=NormalizerFactory.bytes, ) - ) - queries.extend( - [ - AzureMetricQuery.create( - metric_name=metric_name, - metric_namespace="Microsoft.Compute/virtualMachines", - instance_id=vm_id, - aggregation=("average", "minimum", "maximum"), - ref_id=vm_id, - unit="Bytes", - ) - for metric_name in ["Disk Write Bytes", "Disk Read Bytes"] + for name, metric_name in [ + ("Disk Write Bytes", MetricName.DiskWrite), + ("Disk Read Bytes", MetricName.DiskRead), ] - ) - queries.extend( - [ - AzureMetricQuery.create( - metric_name=metric_name, - metric_namespace="Microsoft.Compute/virtualMachines", - instance_id=vm_id, - aggregation=("average", "minimum", "maximum"), - ref_id=vm_id, - unit="CountPerSecond", - ) - for metric_name in ["Disk Write Operations/Sec", "Disk Read Operations/Sec"] - ] - ) - queries.extend( - [ - AzureMetricQuery.create( - metric_name=metric_name, - metric_namespace="Microsoft.Compute/virtualMachines", - instance_id=vm_id, - aggregation=("average", "minimum", "maximum"), - ref_id=vm_id, - unit="Bytes", - ) - for metric_name in ["Network In", "Network Out"] + ] + ) + queries.extend( + [ + AzureMetricQuery.create( + metric_name=name, + metric_namespace="Microsoft.Compute/virtualMachines", + metric_normalization_name=metric_name, + instance_id=vm_id, + aggregation=("average", "minimum", "maximum"), + ref_id=vm_id, + unit="CountPerSecond", + normalization=NormalizerFactory.iops, + ) + for name, metric_name in [ + ("Disk Write Operations/Sec", MetricName.DiskWrite), + ("Disk Read Operations/Sec", MetricName.DiskRead), ] - ) - - metric_normalizers = { - "Percentage CPU": MetricNormalization( - metric_name=MetricName.CpuUtilization, - unit=MetricUnit.Percent, - normalize_value=lambda x: round(x, ndigits=3), - ), - "Network In": MetricNormalization(metric_name=MetricName.NetworkIn, unit=MetricUnit.Bytes), - "Network Out": MetricNormalization(metric_name=MetricName.NetworkOut, unit=MetricUnit.Bytes), - "Disk Read Operations/Sec": MetricNormalization(metric_name=MetricName.DiskRead, unit=MetricUnit.IOPS), - "Disk Write Operations/Sec": MetricNormalization(metric_name=MetricName.DiskWrite, unit=MetricUnit.IOPS), - "Disk Read Bytes": MetricNormalization(metric_name=MetricName.DiskRead, unit=MetricUnit.Bytes), - "Disk Write Bytes": MetricNormalization(metric_name=MetricName.DiskWrite, unit=MetricUnit.Bytes), - } - - metric_result = AzureMetricData.query_for(builder, queries, start, now, delta) + ] + ) + queries.extend( + [ + AzureMetricQuery.create( + metric_name=name, + metric_namespace="Microsoft.Compute/virtualMachines", + metric_normalization_name=metric_name, + instance_id=vm_id, + aggregation=("average", "minimum", "maximum"), + ref_id=vm_id, + unit="Bytes", + normalization=NormalizerFactory.bytes, + ) + for name, metric_name in [("Network In", MetricName.NetworkIn), ("Network Out", MetricName.NetworkOut)] + ] + ) - update_resource_metrics(virtual_machines, metric_result, metric_normalizers) + return queries def connect_in_graph(self, builder: GraphBuilder, source: Json) -> None: if placement_group_id := self.proximity_placement_group: diff --git a/plugins/azure/fix_plugin_azure/resource/machinelearning.py b/plugins/azure/fix_plugin_azure/resource/machinelearning.py index 3ef4599e02..b46647eaac 100644 --- a/plugins/azure/fix_plugin_azure/resource/machinelearning.py +++ b/plugins/azure/fix_plugin_azure/resource/machinelearning.py @@ -604,12 +604,6 @@ def collect_resources(cls, builder: GraphBuilder, **kwargs: Any) -> List["AzureM # Collect VM sizes for the compute resources in this location cls._collect_vm_sizes(builder, location, compute_resources) - if builder.config.collect_usage_metrics: - try: - cls.collect_usage_metrics(builder, collected) - except Exception as e: - log.warning(f"Failed to collect usage metrics for {cls.__name__}: {e}") - return collected return [] diff --git a/plugins/azure/fix_plugin_azure/resource/metrics.py b/plugins/azure/fix_plugin_azure/resource/metrics.py index e22b243f28..d91221d7af 100644 --- a/plugins/azure/fix_plugin_azure/resource/metrics.py +++ b/plugins/azure/fix_plugin_azure/resource/metrics.py @@ -1,6 +1,6 @@ from copy import deepcopy from datetime import datetime, timedelta -from concurrent.futures import as_completed +from functools import cached_property import logging from typing import ClassVar, Dict, Optional, List, Tuple, TypeVar @@ -11,9 +11,14 @@ from attr import define, field from fix_plugin_azure.azure_client import AzureResourceSpec -from fix_plugin_azure.resource.base import GraphBuilder -from fix_plugin_azure.utils import MetricNormalization -from fixlib.baseresources import BaseResource +from fix_plugin_azure.resource.base import ( + GraphBuilder, + AzureMetricQuery, + MetricNormalization, + STAT_MAP, + MicrosoftResource, +) +from fixlib.baseresources import BaseResource, MetricUnit from fixlib.json import from_json from fixlib.json_bender import Bender, S, ForallBend, Bend, bend from fixlib.utils import utc_str @@ -93,39 +98,6 @@ class AzureMetricValue: timeseries: Optional[List[AzureMetricTimeSeries]] = field(default=None) -@define(hash=True, frozen=True) -class AzureMetricQuery: - metric_name: str - metric_namespace: str - ref_id: str - instance_id: str - metric_id: str - aggregation: Tuple[str, ...] - unit: str = "Count" - - @staticmethod - def create( - metric_name: str, - metric_namespace: str, - instance_id: str, - ref_id: str, - aggregation: Tuple[str, ...], - metric_id: Optional[str] = None, - unit: str = "Count", - ) -> "AzureMetricQuery": - metric_id = f"{instance_id}/providers/Microsoft.Insights/metrics/{metric_name}" - # noinspection PyTypeChecker - return AzureMetricQuery( - metric_name=metric_name, - metric_namespace=metric_namespace, - instance_id=instance_id, - metric_id=metric_id, - aggregation=aggregation, - ref_id=ref_id, - unit=unit, - ) - - @define(eq=False, slots=False) class AzureMetricData: kind: ClassVar[str] = "azure_metric" @@ -193,27 +165,11 @@ def compute_interval(delta: timedelta) -> str: @staticmethod def query_for( builder: GraphBuilder, + resource: MicrosoftResource, queries: List[AzureMetricQuery], start_time: datetime, end_time: datetime, - delta: timedelta, - ) -> "Dict[AzureMetricQuery, AzureMetricData]": - """ - A static method to query Azure metrics for multiple queries simultaneously. - - Args: - builder (GraphBuilder): An instance of GraphBuilder used for submitting work. - queries (List[AzureMetricQuery]): A list of AzureMetricQuery objects representing the metrics to query. - start_time (datetime): The start time for the metrics query. - end_time (datetime): The end time for the metrics query. - delta (timedelta): The delta over which to aggregate the metrics. - - Returns: - Dict[AzureMetricQuery, AzureMetricData]: A dictionary mapping each query to its corresponding metric data. - """ - # Create a lookup dictionary for efficient mapping of metric IDs to queries - lookup = {q.metric_id: q for q in queries} - result: Dict[AzureMetricQuery, AzureMetricData] = {} + ) -> None: # Define API specifications for querying Azure metrics api_spec = AzureResourceSpec( @@ -234,49 +190,38 @@ def query_for( access_path=None, expect_array=False, ) - # Define the timespan and interval for the query - timespan = f"{utc_str(start_time)}/{utc_str(end_time)}" - interval = AzureMetricData.compute_interval(delta) # Submit queries for each AzureMetricQuery - futures = [] for query in queries: - future = builder.submit_work( + builder.submit_work( service_name, AzureMetricData._query_for_single, builder, query, api_spec, - timespan, - interval, + start_time, + end_time, + resource, ) - futures.append(future) - - # Retrieve results from submitted queries and populate the result dictionary - for future in as_completed(futures): - try: - metric, metric_id = future.result() - if metric is not None and metric_id is not None: - result[lookup[metric_id]] = metric - except Exception as e: - log.warning(f"An error occurred while processing a metric query: {e}") - raise e - return result @staticmethod def _query_for_single( builder: GraphBuilder, query: AzureMetricQuery, api_spec: AzureResourceSpec, - timespan: str, - interval: str, - ) -> "Tuple[Optional[AzureMetricData], Optional[str]]": + start_time: datetime, + end_time: datetime, + resource: MicrosoftResource, + ) -> None: try: local_api_spec = deepcopy(api_spec) # Set the path for the API call based on the instance ID of the query local_api_spec.path = f"{query.instance_id}/providers/Microsoft.Insights/metrics" # Retrieve metric data from the API aggregation = ",".join(query.aggregation) + # Define the timespan and interval for the query + timespan = f"{utc_str(query.start_time or start_time)}/{utc_str(end_time)}" + interval = AzureMetricData.compute_interval(query.period or builder.metrics_delta) part = builder.client.list( local_api_spec, metricnames=query.metric_name, @@ -291,37 +236,94 @@ def _query_for_single( for single in part: metric: AzureMetricData = from_json(bend(AzureMetricData.mapping, single), AzureMetricData) metric.set_values(query.aggregation) - metric_id = metric.metric_id - if metric_id is not None: - return metric, metric_id - return None, None + update_resource_metrics(resource, query, metric) except HttpResponseError as e: # Handle unsupported metric namespace error - log.warning(f"Request error occurred: {e}.") - return None, None + log.warning(f"Request error occurredwhile processing metrics: {e}.") except Exception as e: - raise e + log.warning(f"An error occurred while processing metrics: {e}.") V = TypeVar("V", bound=BaseResource) def update_resource_metrics( - resources_map: Dict[str, V], - metric_result: Dict[AzureMetricQuery, AzureMetricData], - metric_normalizers: Dict[str, MetricNormalization], + resource: MicrosoftResource, + query: AzureMetricQuery, + metric: AzureMetricData, ) -> None: - for query, metric in metric_result.items(): - resource = resources_map.get(query.ref_id) - if resource is None: - continue - metric_data = metric.metric_values - if metric_data: - for aggregation, metric_value in metric_data.items(): - normalizer = metric_normalizers.get(query.metric_name) - if not normalizer: - continue - name = normalizer.metric_name + "_" + normalizer.unit - value = metric_normalizers[query.metric_name].normalize_value(metric_value) - - resource._resource_usage[name][normalizer.stat_map[aggregation]] = value + + metric_data = metric.metric_values + normalizer = query.normalization + if metric_data: + for aggregation, metric_value in metric_data.items(): + name = query.metric_normalization_name + "_" + normalizer.unit + value = normalizer.normalize_value(metric_value) + stat_name = STAT_MAP.get(aggregation) + try: + if stat_name: + resource._resource_usage[name][str(stat_name)] = value + except KeyError as e: + log.warning(f"An error occurred while setting metric values: {e}") + raise + + +class __NormalizerFactory: + __instance = None + + def __new__(cls) -> "__NormalizerFactory": + if cls.__instance is None: + cls.__instance = super().__new__(cls) + return cls.__instance + + @cached_property + def count(self) -> MetricNormalization: + return MetricNormalization( + unit=MetricUnit.Count, + normalize_value=lambda x: round(x, ndigits=4), + ) + + @cached_property + def bytes(self) -> MetricNormalization: + return MetricNormalization( + unit=MetricUnit.Bytes, + normalize_value=lambda x: round(x, ndigits=4), + ) + + @cached_property + def bytes_per_second(self) -> MetricNormalization: + return MetricNormalization( + unit=MetricUnit.BytesPerSecond, + normalize_value=lambda x: round(x, ndigits=4), + ) + + @cached_property + def iops(self) -> MetricNormalization: + return MetricNormalization( + unit=MetricUnit.IOPS, + normalize_value=lambda x: round(x, ndigits=4), + ) + + @cached_property + def seconds(self) -> MetricNormalization: + return MetricNormalization( + unit=MetricUnit.Seconds, + normalize_value=lambda x: round(x, ndigits=4), + ) + + @cached_property + def milliseconds(self) -> MetricNormalization: + return MetricNormalization( + unit=MetricUnit.Milliseconds, + normalize_value=lambda x: round(x, ndigits=4), + ) + + @cached_property + def percent(self) -> MetricNormalization: + return MetricNormalization( + unit=MetricUnit.Percent, + normalize_value=lambda x: round(x, ndigits=4), + ) + + +NormalizerFactory = __NormalizerFactory() diff --git a/plugins/azure/fix_plugin_azure/resource/mysql.py b/plugins/azure/fix_plugin_azure/resource/mysql.py index d9e928ec34..945eda6560 100644 --- a/plugins/azure/fix_plugin_azure/resource/mysql.py +++ b/plugins/azure/fix_plugin_azure/resource/mysql.py @@ -753,12 +753,6 @@ def collect_resources(cls, builder: GraphBuilder, **kwargs: Any) -> List["AzureM # Collect MySQL server types for the servers in this group AzureMysqlServer._collect_mysql_server_types(builder, location, sku_name, sku_tier, version) - if builder.config.collect_usage_metrics: - try: - cls.collect_usage_metrics(builder, collected) - except Exception as e: - log.warning(f"Failed to collect usage metrics for {cls.__name__} in {location}: {e}") - return collected return [] diff --git a/plugins/azure/fix_plugin_azure/resource/postgresql.py b/plugins/azure/fix_plugin_azure/resource/postgresql.py index 4f5df0f35e..42e06a5241 100644 --- a/plugins/azure/fix_plugin_azure/resource/postgresql.py +++ b/plugins/azure/fix_plugin_azure/resource/postgresql.py @@ -636,12 +636,6 @@ def collect_resources(cls, builder: GraphBuilder, **kwargs: Any) -> List["AzureP # Collect PostgreSQL server types for the servers in this group AzurePostgresqlServer._collect_postgresql_server_types(builder, location, sku_name, sku_tier, version) - if builder.config.collect_usage_metrics: - try: - cls.collect_usage_metrics(builder, collected) - except Exception as e: - log.warning(f"Failed to collect usage metrics for {cls.__name__} in {location}: {e}") - return collected return [] diff --git a/plugins/azure/fix_plugin_azure/resource/storage.py b/plugins/azure/fix_plugin_azure/resource/storage.py index 256c3e43a2..ae19c8fa09 100644 --- a/plugins/azure/fix_plugin_azure/resource/storage.py +++ b/plugins/azure/fix_plugin_azure/resource/storage.py @@ -8,22 +8,20 @@ from fix_plugin_azure.resource.base import ( AzureBaseUsage, MicrosoftResource, - MicrosoftResourceType, GraphBuilder, AzureExtendedLocation, AzureSku, AzureManagedServiceIdentity, AzurePrivateEndpointConnection, + AzureMetricQuery, ) -from fix_plugin_azure.resource.metrics import AzureMetricData, AzureMetricQuery, update_resource_metrics -from fix_plugin_azure.utils import MetricNormalization +from fix_plugin_azure.resource.metrics import NormalizerFactory from fixlib.baseresources import ( BaseBucket, BaseNetworkShare, BaseQueue, EdgeType, MetricName, - MetricUnit, ModelReference, PhantomBaseResource, QueueType, @@ -945,11 +943,7 @@ def sku_filter(sku: AzureStorageSku) -> bool: clazz=AzureStorageSku, ) - @classmethod - def collect_usage_metrics( - cls: Type[MicrosoftResource], builder: GraphBuilder, collected_resources: List[MicrosoftResourceType] - ) -> None: - accounts = {storage_acc.id: storage_acc for storage_acc in collected_resources} + def collect_usage_metrics(self, builder: GraphBuilder) -> List[AzureMetricQuery]: queries = [] start = builder.metrics_start now = builder.created_at @@ -958,117 +952,140 @@ def collect_usage_metrics( if delta.total_seconds() < 3600: delta = timedelta(hours=1) start = now - delta - for account_id in accounts: - blob_instance_id = account_id + "/blobServices/default" - file_instance_id = account_id + "/fileServices/default" - table_instance_id = account_id + "/tableServices/default" - queue_instance_id = account_id + "/queueServices/default" - queries.append( - AzureMetricQuery.create( - metric_name="UsedCapacity", - metric_namespace="microsoft.storage/storageaccounts", - instance_id=account_id, - aggregation=("average",), - ref_id=account_id, - unit="Bytes", - ) + account_id = self.id + blob_instance_id = account_id + "/blobServices/default" + file_instance_id = account_id + "/fileServices/default" + table_instance_id = account_id + "/tableServices/default" + queue_instance_id = account_id + "/queueServices/default" + + queries.append( + AzureMetricQuery.create( + metric_name="UsedCapacity", + metric_namespace="microsoft.storage/storageaccounts", + metric_normalization_name=MetricName.UsedCapacity, + instance_id=account_id, + aggregation=("average",), + ref_id=account_id, + unit="Bytes", + normalization=NormalizerFactory.bytes, + period=delta, + start_time=start, ) - queries.append( - AzureMetricQuery.create( - metric_name="TableCapacity", - metric_namespace="microsoft.storage/storageaccounts/tableservices", - instance_id=table_instance_id, - aggregation=("average",), - ref_id=account_id, - unit="Bytes", - ) + ) + queries.append( + AzureMetricQuery.create( + metric_name="TableCapacity", + metric_namespace="microsoft.storage/storageaccounts/tableservices", + metric_normalization_name=MetricName.TableCapacity, + instance_id=table_instance_id, + aggregation=("average",), + ref_id=account_id, + unit="Bytes", + normalization=NormalizerFactory.bytes, + period=delta, + start_time=start, ) - queries.append( - AzureMetricQuery.create( - metric_name="TableCount", - metric_namespace="microsoft.storage/storageaccounts/tableservices", - instance_id=table_instance_id, - aggregation=("average",), - ref_id=account_id, - unit="Count", - ) + ) + queries.append( + AzureMetricQuery.create( + metric_name="TableCount", + metric_namespace="microsoft.storage/storageaccounts/tableservices", + metric_normalization_name=MetricName.TableCount, + instance_id=table_instance_id, + aggregation=("average",), + ref_id=account_id, + unit="Count", + normalization=NormalizerFactory.count, + period=delta, + start_time=start, ) - queries.append( - AzureMetricQuery.create( - metric_name="QueueCapacity", - metric_namespace="microsoft.storage/storageaccounts/queueservices", - instance_id=queue_instance_id, - aggregation=("average",), - ref_id=account_id, - unit="Bytes", - ) + ) + queries.append( + AzureMetricQuery.create( + metric_name="QueueCapacity", + metric_namespace="microsoft.storage/storageaccounts/queueservices", + metric_normalization_name=MetricName.QueueCapacity, + instance_id=queue_instance_id, + aggregation=("average",), + ref_id=account_id, + unit="Bytes", + normalization=NormalizerFactory.bytes, + period=delta, + start_time=start, ) - queries.append( - AzureMetricQuery.create( - metric_name="QueueCount", - metric_namespace="microsoft.storage/storageaccounts/queueservices", - instance_id=queue_instance_id, - aggregation=("average",), - ref_id=account_id, - unit="Count", - ) + ) + queries.append( + AzureMetricQuery.create( + metric_name="QueueCount", + metric_namespace="microsoft.storage/storageaccounts/queueservices", + metric_normalization_name=MetricName.QueueCount, + instance_id=queue_instance_id, + aggregation=("average",), + ref_id=account_id, + unit="Count", + normalization=NormalizerFactory.count, + period=delta, + start_time=start, ) - queries.append( - AzureMetricQuery.create( - metric_name="FileCapacity", - metric_namespace="microsoft.storage/storageaccounts/fileservices", - instance_id=file_instance_id, - aggregation=("average",), - ref_id=account_id, - unit="Bytes", - ) + ) + queries.append( + AzureMetricQuery.create( + metric_name="FileCapacity", + metric_namespace="microsoft.storage/storageaccounts/fileservices", + metric_normalization_name=MetricName.FileCapacity, + instance_id=file_instance_id, + aggregation=("average",), + ref_id=account_id, + unit="Bytes", + normalization=NormalizerFactory.bytes, + period=delta, + start_time=start, ) - queries.append( - AzureMetricQuery.create( - metric_name="FileCount", - metric_namespace="microsoft.storage/storageaccounts/fileservices", - instance_id=file_instance_id, - aggregation=("average",), - ref_id=account_id, - unit="Count", - ) + ) + queries.append( + AzureMetricQuery.create( + metric_name="FileCount", + metric_namespace="microsoft.storage/storageaccounts/fileservices", + metric_normalization_name=MetricName.FileCount, + instance_id=file_instance_id, + aggregation=("average",), + ref_id=account_id, + unit="Count", + normalization=NormalizerFactory.count, + period=delta, + start_time=start, ) - queries.append( - AzureMetricQuery.create( - metric_name="BlobCapacity", - metric_namespace="microsoft.storage/storageaccounts/blobservices", - instance_id=blob_instance_id, - aggregation=("average",), - ref_id=account_id, - unit="Bytes", - ) + ) + queries.append( + AzureMetricQuery.create( + metric_name="BlobCapacity", + metric_namespace="microsoft.storage/storageaccounts/blobservices", + metric_normalization_name=MetricName.BlobCapacity, + instance_id=blob_instance_id, + aggregation=("average",), + ref_id=account_id, + unit="Bytes", + normalization=NormalizerFactory.bytes, + period=delta, + start_time=start, ) - queries.append( - AzureMetricQuery.create( - metric_name="BlobCount", - metric_namespace="microsoft.storage/storageaccounts/blobservices", - instance_id=blob_instance_id, - aggregation=("average",), - ref_id=account_id, - unit="Count", - ) + ) + queries.append( + AzureMetricQuery.create( + metric_name="BlobCount", + metric_namespace="microsoft.storage/storageaccounts/blobservices", + metric_normalization_name=MetricName.BlobCount, + instance_id=blob_instance_id, + aggregation=("average",), + ref_id=account_id, + unit="Count", + normalization=NormalizerFactory.count, + period=delta, + start_time=start, ) + ) - metric_normalizers = { - "UsedCapacity": MetricNormalization(metric_name=MetricName.UsedCapacity, unit=MetricUnit.Bytes), - "TableCapacity": MetricNormalization(metric_name=MetricName.TableCapacity, unit=MetricUnit.Bytes), - "TableCount": MetricNormalization(metric_name=MetricName.TableCount, unit=MetricUnit.Count), - "QueueCapacity": MetricNormalization(metric_name=MetricName.QueueCapacity, unit=MetricUnit.Bytes), - "QueueCount": MetricNormalization(metric_name=MetricName.QueueCount, unit=MetricUnit.Count), - "FileCapacity": MetricNormalization(metric_name=MetricName.FileCapacity, unit=MetricUnit.Bytes), - "FileCount": MetricNormalization(metric_name=MetricName.FileCount, unit=MetricUnit.Count), - "BlobCapacity": MetricNormalization(metric_name=MetricName.BlobCapacity, unit=MetricUnit.Bytes), - "BlobCount": MetricNormalization(metric_name=MetricName.BlobCount, unit=MetricUnit.Count), - } - - metric_result = AzureMetricData.query_for(builder, queries, start, now, delta) - - update_resource_metrics(accounts, metric_result, metric_normalizers) + return queries @define(eq=False, slots=False) diff --git a/plugins/azure/fix_plugin_azure/utils.py b/plugins/azure/fix_plugin_azure/utils.py index 4f5b33f73a..f3cba0e59b 100644 --- a/plugins/azure/fix_plugin_azure/utils.py +++ b/plugins/azure/fix_plugin_azure/utils.py @@ -1,10 +1,8 @@ import logging from datetime import datetime -from typing import Callable, Dict, TypeVar, Any -from attr import frozen +from typing import Dict, TypeVar, Any import functools -from fixlib.baseresources import StatName, MetricName, MetricUnit from fixlib.json_bender import F T = TypeVar("T") @@ -36,10 +34,6 @@ def rgetvalue(data: Dict[str, Any], key_path: str, default: Any = None) -> Any: return nested_value -def identity(x: T) -> T: - return x - - def case_insensitive_eq(left: T, right: T) -> bool: if isinstance(left, str) and isinstance(right, str): return left.lower() == right.lower() @@ -70,15 +64,3 @@ def set_bool(val: str) -> bool: TimestampToIso = F(lambda x: datetime.fromtimestamp(x).isoformat()) NoneIfEmpty = F(lambda x: x if x else None) - - -@frozen(kw_only=True) -class MetricNormalization: - metric_name: MetricName - unit: MetricUnit - stat_map: Dict[str, StatName] = { - "minimum": StatName.min, - "average": StatName.avg, - "maximum": StatName.max, - } - normalize_value: Callable[[float], float] = identity diff --git a/plugins/azure/test/compute_test.py b/plugins/azure/test/compute_test.py index bf085444da..0319dcacdd 100644 --- a/plugins/azure/test/compute_test.py +++ b/plugins/azure/test/compute_test.py @@ -130,6 +130,7 @@ def test_virtual_machine(builder: GraphBuilder) -> None: def test_virtual_machine_resources(builder: GraphBuilder) -> None: collected = roundtrip_check(AzureComputeVirtualMachine, builder)[0] + builder.executor.wait_for_submitted_work() assert collected.instance_type == "Standard_A1_V2" assert collected.instance_status == InstanceStatus.RUNNING diff --git a/plugins/azure/test/metric_test.py b/plugins/azure/test/metric_test.py index 0a384795dd..313a562dae 100644 --- a/plugins/azure/test/metric_test.py +++ b/plugins/azure/test/metric_test.py @@ -1,7 +1,10 @@ from datetime import timedelta, datetime, timezone -from fix_plugin_azure.resource.base import GraphBuilder +from fix_plugin_azure.resource.base import GraphBuilder, AzureMetricQuery -from fix_plugin_azure.resource.metrics import AzureMetricQuery, AzureMetricData +from fix_plugin_azure.resource.compute import AzureComputeVirtualMachine +from fix_plugin_azure.resource.metrics import AzureMetricData, NormalizerFactory + +from fixlib.baseresources import MetricName def test_metric(builder: GraphBuilder) -> None: @@ -9,17 +12,22 @@ def test_metric(builder: GraphBuilder) -> None: earlier = now - timedelta(days=60) delta = now - earlier resource_id = "/subscriptions/rwqrr2-31f1-rwqrrw-5325-wrq2r/resourceGroups/FOO/providers/Microsoft.Compute/virtualMachines/test1" + vm = AzureComputeVirtualMachine(id=resource_id, name="test1") write = AzureMetricQuery.create( - "Disk Write Operations/Sec", - "Microsoft.Compute/virtualMachines", - resource_id, - resource_id, - ("average", "minimum", "maximum"), + metric_name="Disk Write Operations/Sec", + metric_namespace="Microsoft.Compute/virtualMachines", + metric_normalization_name=MetricName.DiskWrite, + normalization=NormalizerFactory.iops, + period=delta, + instance_id=resource_id, + ref_id=resource_id, + aggregation=("average", "minimum", "maximum"), unit="CountPerSecond", ) - result = AzureMetricData.query_for(builder=builder, queries=[write], start_time=earlier, end_time=now, delta=delta) - assert result[write].metric_values == { - "average": 247685.56222444447, - "minimum": 291286.29000000004, - "maximum": 193903.44666666666, + AzureMetricData.query_for(builder=builder, resource=vm, queries=[write], start_time=earlier, end_time=now) + builder.executor.wait_for_submitted_work() + assert vm._resource_usage["disk_write_iops"] == { + "avg": 247685.5622, + "min": 291286.2900, + "max": 193903.4467, }