From afdf497c733502cf9fed84e2991764fc1f95f437 Mon Sep 17 00:00:00 2001 From: Thirumalesh Aaraveti Date: Mon, 25 Mar 2024 16:38:57 +0530 Subject: [PATCH] Added the azure unused_nat_gateway policy --- README.md | 2 +- .../azure/compute/network_operations.py | 34 +++ .../monitor/monitor_management_operations.py | 29 +++ .../main/main_oerations/main_operations.py | 2 +- .../policy/aws/cleanup/unused_nat_gateway.py | 92 ++++++++ .../policy/aws/unused_nat_gateway.py | 92 -------- .../azure/cleanup/unused_nat_gateway.py | 70 ++++++ .../helpers/aws/aws_policy_operations.py | 2 + .../helpers/azure/azure_policy_operations.py | 6 +- docs/source/index.md | 2 +- .../test_unused_nat_gateways.py | 62 ----- .../aws/cleanup/test_unused_nat_gateway.py | 151 ++++++++++++ .../policy/azure/test_unused_nat_gateway.py | 218 ++++++++++++++++++ tests/unittest/configs.py | 2 + .../mock_monitor/mock_metric_operations.py | 31 +++ .../mocks/azure/mock_monitor/mock_monitor.py | 4 +- .../mock_nat_gateway_operations.py | 48 ++++ .../mocks/azure/mock_network/mock_network.py | 4 +- 18 files changed, 691 insertions(+), 160 deletions(-) create mode 100644 cloud_governance/policy/aws/cleanup/unused_nat_gateway.py delete mode 100644 cloud_governance/policy/aws/unused_nat_gateway.py create mode 100644 cloud_governance/policy/azure/cleanup/unused_nat_gateway.py delete mode 100644 tests/unittest/cloud_governance/aws/zombie_non_cluster/test_unused_nat_gateways.py create mode 100644 tests/unittest/cloud_governance/policy/aws/cleanup/test_unused_nat_gateway.py create mode 100644 tests/unittest/cloud_governance/policy/azure/test_unused_nat_gateway.py create mode 100644 tests/unittest/mocks/azure/mock_monitor/mock_metric_operations.py create mode 100644 tests/unittest/mocks/azure/mock_network/mock_nat_gateway_operations.py diff --git a/README.md b/README.md index 18371d742..d44ac89a6 100644 --- a/README.md +++ b/README.md @@ -34,7 +34,7 @@ This tool support the following policies: * [s3_inactive](cloud_governance/policy/aws/s3_inactive.py): Get the inactive/empty buckets and delete them after 7 days. * [empty_roles](cloud_governance/policy/aws/empty_roles.py): Get empty roles and delete it after 7 days. * [zombie_snapshots](cloud_governance/policy/aws/zombie_snapshots.py): Get the zombie snapshots and delete it after 7 days. -* [unused_nat_gateway](cloud_governance/policy/aws/unused_nat_gateway.py): Get the unused nat gateways and deletes it after 7 days. +* [unused_nat_gateway](cloud_governance/policy/aws/cleanup/unused_nat_gateway.py): Get the unused nat gateways and deletes it after 7 days. * gitleaks: scan Github repository git leak (security scan) * [cost_over_usage](cloud_governance/policy/aws/cost_over_usage.py): send mail to aws user if over usage cost diff --git a/cloud_governance/common/clouds/azure/compute/network_operations.py b/cloud_governance/common/clouds/azure/compute/network_operations.py index 0e7af9aa7..e26c56344 100644 --- a/cloud_governance/common/clouds/azure/compute/network_operations.py +++ b/cloud_governance/common/clouds/azure/compute/network_operations.py @@ -1,6 +1,8 @@ +from azure.core.exceptions import HttpResponseError from azure.mgmt.network import NetworkManagementClient from cloud_governance.common.clouds.azure.common.common_operations import CommonOperations +from cloud_governance.common.logger.init_logger import logger from cloud_governance.common.utils.utils import Utils @@ -57,7 +59,24 @@ def get_public_ipv4_network_interfaces(self): public_ipv4_network_interfaces.setdefault(public_ipv4_address_id, []).append(network_interface) return public_ipv4_network_interfaces + def describe_nat_gateways(self): + """ + This method lists all the azure nat gateways + :return: + :rtype: + """ + try: + nat_gateway_iter_object = self.__network_client.nat_gateways.list_all() + nat_gateways = self._item_paged_iterator(item_paged_object=nat_gateway_iter_object, as_dict=True) + return nat_gateways + except HttpResponseError as http_err: + logger.error(http_err) + raise http_err + except Exception as err: + raise err + # delete operations + def release_public_ip(self, resource_id: str): """ This method releases the public ip @@ -70,3 +89,18 @@ def release_public_ip(self, resource_id: str): status = self.__network_client.public_ip_addresses.begin_delete(resource_group_name=resource_group_name, public_ip_address_name=public_ip_address_name) return status.done() + + def delete_nat_gateway(self, resource_id: str): + """ + This method deletes the NatGateway + :param resource_id: + :type resource_id: + :return: + :rtype: + """ + id_key_pairs = self.get_id_dict_data(resource_id) + resource_group_name = id_key_pairs.get('resourcegroups') + nat_gateway_name = id_key_pairs.get('natgateways') + status = self.__network_client.nat_gateways.begin_delete(resource_group_name=resource_group_name, + nat_gateway_name=nat_gateway_name) + return status.done() diff --git a/cloud_governance/common/clouds/azure/monitor/monitor_management_operations.py b/cloud_governance/common/clouds/azure/monitor/monitor_management_operations.py index 3a59390a8..ea77818d8 100644 --- a/cloud_governance/common/clouds/azure/monitor/monitor_management_operations.py +++ b/cloud_governance/common/clouds/azure/monitor/monitor_management_operations.py @@ -58,3 +58,32 @@ def get_audit_records(self, resource_id: str, start_date: datetime = None, end_d logger.error(http_error) except Exception as err: logger.error(err) + + def get_resource_metrics(self, resource_id: str, metricnames: str, aggregation: str, + timespan: str = None, interval: timedelta = timedelta(days=1), **kwargs): + """ + This method returns the metrics object of individual resource + :param resource_id: + :type resource_id: + :param metricnames: + :type metricnames: + :param aggregation: + :type aggregation: + :param timespan: + :type timespan: + :param interval: + :type interval: + :param kwargs: + :type kwargs: + :return: + :rtype: + """ + if not timespan: + end_date = datetime.utcnow() + start_date = end_date - timedelta(days=7) + timespan = f'{start_date}/{end_date}' + response = self.__monitor_client.metrics.list(resource_uri=resource_id, timespan=timespan, + metricnames=metricnames, aggregation=aggregation, + result_type='Data', interval=interval, + **kwargs) + return response.as_dict() diff --git a/cloud_governance/main/main_oerations/main_operations.py b/cloud_governance/main/main_oerations/main_operations.py index 0ecefdd9d..e084c7a7d 100644 --- a/cloud_governance/main/main_oerations/main_operations.py +++ b/cloud_governance/main/main_oerations/main_operations.py @@ -39,7 +39,7 @@ def run(self): for policy_type, policies in policies_list.items(): # @Todo support for all the aws policies, currently supports ec2_run as urgent requirement if self._policy in policies and self._policy in ["instance_run", "unattached_volume", "cluster_run", - "ip_unattached"]: + "ip_unattached", "unused_nat_gateway"]: source = policy_type if Utils.equal_ignore_case(policy_type, self._public_cloud_name): source = '' diff --git a/cloud_governance/policy/aws/cleanup/unused_nat_gateway.py b/cloud_governance/policy/aws/cleanup/unused_nat_gateway.py new file mode 100644 index 000000000..adb063beb --- /dev/null +++ b/cloud_governance/policy/aws/cleanup/unused_nat_gateway.py @@ -0,0 +1,92 @@ +import datetime + +from cloud_governance.common.utils.utils import Utils +from cloud_governance.policy.helpers.aws.aws_policy_operations import AWSPolicyOperations + + +class UnUsedNatGateway(AWSPolicyOperations): + """ + This class sends an alert mail for zombie Nat gateways ( based on vpc routes ) + to the user after 4 days and delete after 7 days. + """ + + NAMESPACE = 'AWS/NATGateway' + UNUSED_DAYS = 7 + RESOURCE_ACTION = "Delete" + + def __init__(self): + super().__init__() + self.__active_cluster_ids = self._get_active_cluster_ids() + + def __check_cloud_watch_logs(self, resource_id: str, days: int = UNUSED_DAYS): + """ + This method returns weather the NatGateway is used in last input days + :param resource_id: + :param days: + :return: + """ + if days == 0: + days = 1 + end_time = datetime.datetime.utcnow() + start_time = end_time - datetime.timedelta(days=days) + response = self._cloudwatch.get_metric_data(start_time=start_time, end_time=end_time, resource_id=resource_id, + resource_type='NatGatewayId', namespace=self.NAMESPACE, + metric_names={'ActiveConnectionCount': 'Count'}, + statistic='Average')['MetricDataResults'][0] + for value in response.get('Values', []): + if value > 0: + return False + return True + + def __check_nat_gateway_in_routes(self, nat_gateway_id: str): + """ + This method check the nat gateway present in the routes or not. + :param nat_gateway_id: + :return: + """ + route_tables = self._ec2_client.describe_route_tables()['RouteTables'] + nat_gateway_found = False + for route_table in route_tables: + for route in route_table.get('Routes'): + if route.get('NatGatewayId') == nat_gateway_id: + nat_gateway_found = True + return nat_gateway_found + + def run_policy_operations(self): + """ + This method returns the list of unattached volumes + :return: + :rtype: + """ + unused_nat_gateways = [] + nat_gateways = self._ec2_operations.get_nat_gateways() + for nat_gateway in nat_gateways: + tags = nat_gateway.get('Tags', []) + resource_id = nat_gateway.get('NatGatewayId') + cleanup_result = False + cluster_tag = self._get_cluster_tag(tags=tags) + cleanup_days = 0 + if (Utils.equal_ignore_case(nat_gateway.get('State'), 'available') + and cluster_tag not in self.__active_cluster_ids): + if (not self.__check_nat_gateway_in_routes(nat_gateway_id=resource_id) or + self.__check_cloud_watch_logs(resource_id=resource_id)): + cleanup_days = self.get_clean_up_days_count(tags=tags) + cleanup_result = self.verify_and_delete_resource(resource_id=resource_id, tags=tags, + clean_up_days=cleanup_days) + resource_data = self._get_es_schema(resource_id=resource_id, + user=self.get_tag_name_from_tags(tags=tags, tag_name='User'), + skip_policy=self.get_skip_policy_value(tags=tags), + cleanup_days=cleanup_days, dry_run=self._dry_run, + name=self.get_tag_name_from_tags(tags=tags, tag_name='Name'), + region=self._region, + cleanup_result=str(cleanup_result), + resource_action=self.RESOURCE_ACTION, + cloud_name=self._cloud_name, + resource_type='NatGateway', + resource_state=nat_gateway.get('State') + if not cleanup_result else "Deleted") + unused_nat_gateways.append(resource_data) + if not cleanup_result: + self.update_resource_day_count_tag(resource_id=resource_id, cleanup_days=cleanup_days, tags=tags) + + return unused_nat_gateways diff --git a/cloud_governance/policy/aws/unused_nat_gateway.py b/cloud_governance/policy/aws/unused_nat_gateway.py deleted file mode 100644 index cdbb214bd..000000000 --- a/cloud_governance/policy/aws/unused_nat_gateway.py +++ /dev/null @@ -1,92 +0,0 @@ -import datetime - - -from cloud_governance.common.clouds.aws.cloudwatch.cloudwatch_operations import CloudWatchOperations -from cloud_governance.policy.policy_operations.aws.zombie_non_cluster.run_zombie_non_cluster_policies import NonClusterZombiePolicy - - -class UnusedNatGateway(NonClusterZombiePolicy): - """ - This class sends an alert mail for zombie Nat gateways ( based on vpc routes ) - to the user after 4 days and delete after 7 days. - """ - - NAMESPACE = 'AWS/NATGateway' - UNUSED_DAYS = 1 - - def __init__(self): - super().__init__() - self._cloudwatch = CloudWatchOperations(region=self._region) - - def __check_cloud_watch_logs(self, resource_id: str, days: int = UNUSED_DAYS): - """ - This method returns weather the NatGateway is used in last input days - :param resource_id: - :param days: - :return: - """ - if days == 0: - days = 1 - end_time = datetime.datetime.utcnow() - start_time = end_time - datetime.timedelta(days=days) - response = self._cloudwatch.get_metric_data(start_time=start_time, end_time=end_time, resource_id=resource_id, - resource_type='NatGatewayId', namespace=self.NAMESPACE, - metric_names={'ActiveConnectionCount': 'Count'}, - statistic='Average')['MetricDataResults'][0] - for value in response.get('Values'): - if value > 0: - return False - return True - - def __check_nat_gateway_in_routes(self, nat_gateway_id: str): - """ - This method check the nat gateway present in the routes or not. - :param nat_gateway_id: - :return: - """ - route_tables = self._ec2_client.describe_route_tables()['RouteTables'] - nat_gateway_found = False - for route_table in route_tables: - for route in route_table.get('Routes'): - if route.get('NatGatewayId') == nat_gateway_id: - nat_gateway_found = True - return nat_gateway_found - - def run(self): - """ - This method returns zombie NatGateways, delete if dry_run no - @return: - """ - nat_gateways = self._ec2_operations.get_nat_gateways() - nat_gateway_unused_data = [] - for nat_gateway in nat_gateways: - if self._get_policy_value(tags=nat_gateway.get('Tags', [])) not in ('NOTDELETE', 'SKIP'): - nat_gateway_id = nat_gateway.get('NatGatewayId') - tags = nat_gateway.get('Tags') - gateway_unused = False - last_used_days = int(self._ec2_operations.get_tag_value_from_tags(tags=tags, tag_name='LastUsedDay', default_value=1)) - if not self._check_cluster_tag(tags=tags): - if nat_gateway.get('State') == 'available': - if not self.__check_nat_gateway_in_routes(nat_gateway_id=nat_gateway_id) or self.__check_cloud_watch_logs(days=last_used_days, resource_id=nat_gateway_id): - gateway_unused = True - unused_days = self._get_resource_last_used_days(tags=tags) - zombie_nat_gateway = self._check_resource_and_delete(resource_name='NatGateway', - resource_id='NatGatewayId', - resource_type='CreateNatGateway', - resource=nat_gateway, - empty_days=unused_days, - days_to_delete_resource=self.DAYS_TO_DELETE_RESOURCE, - tags=tags) - if zombie_nat_gateway: - nat_gateway_unused_data.append( - {'ResourceId': nat_gateway_id, - 'Name': self._get_tag_name_from_tags(tags=tags, tag_name='Name'), - 'User': self._get_tag_name_from_tags(tags=tags, tag_name='User'), - 'VpcId': zombie_nat_gateway.get('VpcId'), - 'Skip': self._get_policy_value(tags=tags), - 'Days': unused_days, 'Policy': self._policy}) - else: - unused_days = 0 - self._update_resource_tags(resource_id=nat_gateway_id, tags=tags, left_out_days=unused_days, - resource_left_out=gateway_unused) - return nat_gateway_unused_data diff --git a/cloud_governance/policy/azure/cleanup/unused_nat_gateway.py b/cloud_governance/policy/azure/cleanup/unused_nat_gateway.py new file mode 100644 index 000000000..b8b893024 --- /dev/null +++ b/cloud_governance/policy/azure/cleanup/unused_nat_gateway.py @@ -0,0 +1,70 @@ +from cloud_governance.policy.helpers.azure.azure_policy_operations import AzurePolicyOperations + + +class UnUsedNatGateway(AzurePolicyOperations): + + """ + This class performs the azure unused nat gateway operations + """ + + RESOURCE_ACTION = "Delete" + + def __init__(self): + super().__init__() + self.__active_cluster_ids = self._get_active_cluster_ids() + + def run_policy_operations(self): + """ + This method returns the list of unused nat gateways + :return: + :rtype: + """ + unused_nat_gateways = [] + nat_gateways = self.network_operations.describe_nat_gateways() + for nat_gateway in nat_gateways: + tags = nat_gateway.get('tags') + cluster_tag = self._get_cluster_tag(tags=tags) + cleanup_result = False + if cluster_tag not in self.__active_cluster_ids: + metrics_data = self.monitor_operations.get_resource_metrics(resource_id=nat_gateway.get('id'), + metricnames='ByteCount', + aggregation='Average') + unused = False + if metrics_data.get('value'): + metrics_time_series_data = metrics_data.get('value', [])[0].get('timeseries', []) + if not metrics_time_series_data: + unused = True + else: + total_data = 0 + for metric_time_frame in metrics_time_series_data: + for data in metric_time_frame.get('data'): + total_data = (total_data + data.get('average')) / 2 + if total_data < 5: + unused = True + + if unused: + cleanup_days = self.get_clean_up_days_count(tags=tags) + cleanup_result = self.verify_and_delete_resource(resource_id=nat_gateway.get('id'), tags=tags, + clean_up_days=cleanup_days) + resource_data = self._get_es_schema(resource_id=nat_gateway.get('name'), + user=self.get_tag_name_from_tags(tags=tags, tag_name='User'), + skip_policy=self.get_skip_policy_value(tags=tags), + cleanup_days=cleanup_days, dry_run=self._dry_run, + name=nat_gateway.get('name'), region=nat_gateway.get('location'), + cleanup_result=str(cleanup_result), + resource_action=self.RESOURCE_ACTION, + cloud_name=self._cloud_name, + resource_type=f"NatGateway: " + f"{nat_gateway.get('sku', {}).get('name')}", + resource_state=nat_gateway.get('provisioning_state') + if not cleanup_result else "Deleted") + unused_nat_gateways.append(resource_data) + else: + cleanup_days = 0 + else: + cleanup_days = 0 + if not cleanup_result: + self.update_resource_day_count_tag(resource_id=nat_gateway.get("id"), cleanup_days=cleanup_days, + tags=tags) + + return unused_nat_gateways diff --git a/cloud_governance/policy/helpers/aws/aws_policy_operations.py b/cloud_governance/policy/helpers/aws/aws_policy_operations.py index 0b121027f..c6ed507bf 100644 --- a/cloud_governance/policy/helpers/aws/aws_policy_operations.py +++ b/cloud_governance/policy/helpers/aws/aws_policy_operations.py @@ -1,6 +1,7 @@ import boto3 +from cloud_governance.common.clouds.aws.cloudwatch.cloudwatch_operations import CloudWatchOperations from cloud_governance.common.clouds.aws.ec2.ec2_operations import EC2Operations from cloud_governance.common.clouds.aws.s3.s3_operations import S3Operations from cloud_governance.policy.helpers.abstract_policy_operations import AbstractPolicyOperations @@ -16,6 +17,7 @@ def __init__(self): self.__s3operations = S3Operations(region_name=self._region) self._ec2_client = boto3.client('ec2', region_name=self._region) self._ec2_operations = EC2Operations(region=self._region) + self._cloudwatch = CloudWatchOperations(region=self._region) self._s3_client = boto3.client('s3') self._iam_client = boto3.client('iam') diff --git a/cloud_governance/policy/helpers/azure/azure_policy_operations.py b/cloud_governance/policy/helpers/azure/azure_policy_operations.py index bfd54d32d..4ed4b277e 100644 --- a/cloud_governance/policy/helpers/azure/azure_policy_operations.py +++ b/cloud_governance/policy/helpers/azure/azure_policy_operations.py @@ -2,6 +2,7 @@ from cloud_governance.common.clouds.azure.compute.compute_operations import ComputeOperations from cloud_governance.common.clouds.azure.compute.network_operations import NetworkOperations from cloud_governance.common.clouds.azure.compute.resource_group_operations import ResourceGroupOperations +from cloud_governance.common.clouds.azure.monitor.monitor_management_operations import MonitorManagementOperations from cloud_governance.policy.helpers.abstract_policy_operations import AbstractPolicyOperations from cloud_governance.common.logger.init_logger import logger from cloud_governance.common.utils.utils import Utils @@ -14,6 +15,7 @@ def __init__(self): self.compute_operations = ComputeOperations() self.network_operations = NetworkOperations() self.resource_group_operations = ResourceGroupOperations() + self.monitor_operations = MonitorManagementOperations() super().__init__() def get_tag_name_from_tags(self, tags: dict, tag_name: str): @@ -49,6 +51,8 @@ def _delete_resource(self, resource_id: str): self.compute_operations.delete_disk(resource_id=resource_id) elif self._policy == 'ip_unattached': delete_status = self.network_operations.release_public_ip(resource_id=resource_id) + elif self._policy == 'unused_nat_gateway': + delete_status = self.network_operations.delete_nat_gateway(resource_id=resource_id) logger.info(f'{self._policy} {action}: {resource_id}') except Exception as err: logger.info(f'Exception raised: {err}: {resource_id}') @@ -56,7 +60,7 @@ def _delete_resource(self, resource_id: str): def update_resource_day_count_tag(self, resource_id: str, cleanup_days: int, tags: dict): tags = self._update_tag_value(tags=tags, tag_name='DaysCount', tag_value=str(cleanup_days)) try: - if self._policy in ['instance_run', 'unattached_volume', 'ip_unattached']: + if self._policy in ['instance_run', 'unattached_volume', 'ip_unattached', 'unused_nat_gateway']: self.resource_group_operations.creates_or_updates_tags(resource_id=resource_id, tags=tags) except Exception as err: logger.info(f'Exception raised: {err}: {resource_id}') diff --git a/docs/source/index.md b/docs/source/index.md index 484ea5bcd..d2e51d140 100644 --- a/docs/source/index.md +++ b/docs/source/index.md @@ -23,7 +23,7 @@ This tool support the following policies: * [s3_inactive](../../cloud_governance/policy/aws/s3_inactive.py): Get the inactive/empty buckets and delete them after 7 days. * [empty_roles](../../cloud_governance/policy/aws/empty_roles.py): Get empty roles and delete it after 7 days. * [zombie_snapshots](../../cloud_governance/policy/aws/zombie_snapshots.py): Get the zombie snapshots and delete it after 7 days. -* [nat_gateway_unused](../../cloud_governance/policy/aws/unused_nat_gateway.py): Get the unused nat gateways and deletes it after 7 days. +* [nat_gateway_unused](../../cloud_governance/policy/aws/cleanup/unused_nat_gateway.py): Get the unused nat gateways and deletes it after 7 days. * gitleaks: scan Github repository git leak (security scan) * [cost_over_usage](../../cloud_governance/policy/aws/cost_over_usage.py): send mail to aws user if over usage cost diff --git a/tests/unittest/cloud_governance/aws/zombie_non_cluster/test_unused_nat_gateways.py b/tests/unittest/cloud_governance/aws/zombie_non_cluster/test_unused_nat_gateways.py deleted file mode 100644 index 7ade0bd8b..000000000 --- a/tests/unittest/cloud_governance/aws/zombie_non_cluster/test_unused_nat_gateways.py +++ /dev/null @@ -1,62 +0,0 @@ -import os - -import boto3 -from moto import mock_ec2 - -from cloud_governance.policy.policy_operations.aws.zombie_non_cluster.run_zombie_non_cluster_policies import NonClusterZombiePolicy - -os.environ['AWS_DEFAULT_REGION'] = 'us-east-2' -os.environ['dry_run'] = 'no' - - -@mock_ec2 -def test_nat_gateway_unused(): - """ - This method tests, deletion od unused of NatGateways - @return: - """ - os.environ['policy'] = 'unused_nat_gateway' - ec2_client = boto3.client('ec2', region_name=os.environ.get('AWS_DEFAULT_REGION')) - subnet_id = ec2_client.describe_subnets()['Subnets'][0].get('SubnetId') - ec2_client.create_nat_gateway(SubnetId=subnet_id) - nat_gateway_unused = NonClusterZombiePolicy() - nat_gateway_unused.set_dryrun(value='no') - nat_gateway_unused.set_policy(value='unused_nat_gateway') - nat_gateway_unused.DAYS_TO_TRIGGER_RESOURCE_MAIL = -1 - nat_gateway_unused._check_resource_and_delete(resource_name='Nat Gateway', - resource_id='NatGatewayId', - resource_type='CreateNatGateway', - resource=ec2_client.describe_nat_gateways()['NatGateways'][0], - empty_days=0, - days_to_delete_resource=0) - nat_gateways = ec2_client.describe_nat_gateways()['NatGateways'] - assert len(nat_gateways) == 1 and nat_gateways[0].get('State') == 'deleted' - - -@mock_ec2 -def test_nat_gateway_unused_not_delete(): - """ - This method tests, deletion od unused of NatGateways - @return: - """ - os.environ['policy'] = 'unused_nat_gateway' - tags = [ - {'Key': 'Name', 'Value': 'CloudGovernanceTestZombieNatGateway'}, - {'Key': 'Owner', 'Value': 'CloudGovernance'}, - {'Key': 'policy', 'Value': 'not-delete'} - ] - ec2_client = boto3.client('ec2', region_name=os.environ.get('AWS_DEFAULT_REGION')) - subnet_id = ec2_client.describe_subnets()['Subnets'][0].get('SubnetId') - ec2_client.create_nat_gateway(SubnetId=subnet_id, TagSpecifications=[{'ResourceType': 'nat-gateway', 'Tags': tags}]) - nat_gateway_unused = NonClusterZombiePolicy() - nat_gateway_unused.set_dryrun(value='no') - nat_gateway_unused.set_policy(value='unused_nat_gateway') - nat_gateway_unused.DAYS_TO_TRIGGER_RESOURCE_MAIL = -1 - nat_gateway_unused._check_resource_and_delete(resource_name='Nat Gateway', - resource_id='NatGatewayId', - resource_type='CreateNatGateway', - resource=ec2_client.describe_nat_gateways()['NatGateways'][0], - empty_days=0, - days_to_delete_resource=0) - nat_gateways = ec2_client.describe_nat_gateways()['NatGateways'] - assert len(nat_gateways) == 1 and nat_gateways[0].get('State') == 'available' diff --git a/tests/unittest/cloud_governance/policy/aws/cleanup/test_unused_nat_gateway.py b/tests/unittest/cloud_governance/policy/aws/cleanup/test_unused_nat_gateway.py new file mode 100644 index 000000000..4c88019b0 --- /dev/null +++ b/tests/unittest/cloud_governance/policy/aws/cleanup/test_unused_nat_gateway.py @@ -0,0 +1,151 @@ +import datetime + +import boto3 +from moto import mock_ec2, mock_cloudwatch + +from cloud_governance.main.environment_variables import environment_variables +from cloud_governance.policy.aws.cleanup.unused_nat_gateway import UnUsedNatGateway +from tests.unittest.configs import AWS_DEFAULT_REGION, NAT_GATEWAY_NAMESPACE + + +@mock_ec2 +def test_unused_nat_gateway_dry_run_yes(): + """ + This method tests the unused_nat_gateway collected by dry_run=yes + :return: + :rtype: + """ + environment_variables.environment_variables_dict['dry_run'] = 'yes' + environment_variables.environment_variables_dict['AWS_DEFAULT_REGION'] = AWS_DEFAULT_REGION + environment_variables.environment_variables_dict['policy'] = 'unused_nat_gateway' + ec2_client = boto3.client('ec2', region_name=AWS_DEFAULT_REGION) + subnet_id = ec2_client.describe_subnets()['Subnets'][0].get('SubnetId') + ec2_client.create_nat_gateway(SubnetId=subnet_id) + unused_nat_gateway = UnUsedNatGateway() + response = unused_nat_gateway.run() + assert len(response) == 1 + assert response[0]['CleanUpDays'] == 0 + + +@mock_cloudwatch +@mock_ec2 +def test_unused_nat_gateway_dry_run_yes_collect_none(): + """ + This method tests the unused_nat_gateway not collected by dry_run=yes + :return: + :rtype: + """ + environment_variables.environment_variables_dict['dry_run'] = 'yes' + environment_variables.environment_variables_dict['AWS_DEFAULT_REGION'] = AWS_DEFAULT_REGION + environment_variables.environment_variables_dict['policy'] = 'unused_nat_gateway' + ec2_client = boto3.client('ec2', region_name=AWS_DEFAULT_REGION) + cloud_watch_client = boto3.client('cloudwatch', region_name=AWS_DEFAULT_REGION) + subnet = ec2_client.describe_subnets()['Subnets'][0] + route_table = ec2_client.create_route_table(VpcId=subnet.get('VpcId')).get('RouteTable', {}) + nat_gateway = ec2_client.create_nat_gateway(SubnetId=subnet.get('SubnetId')).get('NatGateway', {}) + ec2_client.create_route(NatGatewayId=nat_gateway.get('NatGatewayId'), RouteTableId=route_table.get('RouteTableId')) + cloud_watch_client.put_metric_data(Namespace=NAT_GATEWAY_NAMESPACE, MetricData=[ + { + 'MetricName': 'ActiveConnectionCount', + 'Dimensions': [ + { + 'Name': 'NatGatewayId', + 'Value': nat_gateway.get('NatGatewayId') + }, + ], + 'Timestamp': datetime.datetime.utcnow(), + 'Value': 123.0, + 'Values': [123.0], + 'Unit': 'Count', + } + ]) + unused_nat_gateway = UnUsedNatGateway() + response = unused_nat_gateway.run() + assert len(response) == 0 + + +@mock_ec2 +def test_unused_nat_gateway_dry_run_no(): + """ + This method verifies the data is collecting by dry_run = no + :return: + :rtype: + """ + environment_variables.environment_variables_dict['AWS_DEFAULT_REGION'] = AWS_DEFAULT_REGION + environment_variables.environment_variables_dict['policy'] = 'unused_nat_gateway' + environment_variables.environment_variables_dict['dry_run'] = 'no' + ec2_client = boto3.client('ec2', region_name=AWS_DEFAULT_REGION) + subnet_id = ec2_client.describe_subnets()['Subnets'][0].get('SubnetId') + ec2_client.create_nat_gateway(SubnetId=subnet_id) + unused_nat_gateway = UnUsedNatGateway() + response = unused_nat_gateway.run() + assert len(response) == 1 + response = response[0] + assert response.get('CleanUpDays') == 1 + + +@mock_ec2 +def test_unused_nat_gateway___dry_run_no_7_days_action_delete(): + """ + This method tests the deletion of unused_nat_gateway + :return: + :rtype: + """ + environment_variables.environment_variables_dict['AWS_DEFAULT_REGION'] = AWS_DEFAULT_REGION + environment_variables.environment_variables_dict['policy'] = 'unused_nat_gateway' + environment_variables.environment_variables_dict['dry_run'] = 'no' + ec2_client = boto3.client('ec2', region_name=AWS_DEFAULT_REGION) + subnet_id = ec2_client.describe_subnets()['Subnets'][0].get('SubnetId') + tags = [{'Key': 'DaysCount', 'Value': f'{datetime.datetime.utcnow().date()}@7'}] + ec2_client.create_nat_gateway(SubnetId=subnet_id, TagSpecifications=[{'ResourceType': 'nat-gateway', 'Tags': tags}]) + unused_nat_gateway = UnUsedNatGateway() + response = unused_nat_gateway.run() + assert len(response) == 1 + response = response[0] + assert response.get('CleanUpDays') == 7 + assert response.get('ResourceDelete') == 'True' + + +@mock_ec2 +def test_unused_nat_gateway___dry_run_no_skips_delete(): + """ + This method tests skip deletion of unused_nat_gateway + :return: + :rtype: + """ + environment_variables.environment_variables_dict['AWS_DEFAULT_REGION'] = AWS_DEFAULT_REGION + environment_variables.environment_variables_dict['policy'] = 'unused_nat_gateway' + environment_variables.environment_variables_dict['dry_run'] = 'no' + ec2_client = boto3.client('ec2', region_name=AWS_DEFAULT_REGION) + subnet_id = ec2_client.describe_subnets()['Subnets'][0].get('SubnetId') + tags = [{'Key': 'DaysCount', 'Value': f'{datetime.datetime.utcnow().date()}@7'}, + {'Key': 'policy', 'Value': 'not-delete'}] + ec2_client.create_nat_gateway(SubnetId=subnet_id, TagSpecifications=[{'ResourceType': 'nat-gateway', 'Tags': tags}]) + unused_nat_gateway = UnUsedNatGateway() + response = unused_nat_gateway.run() + assert len(response) == 1 + response = response[0] + assert response.get('CleanUpDays') == 7 + assert response.get('ResourceDelete') == 'False' + + +@mock_ec2 +def test_unused_nat_gateway___dry_run_no_skips_active_cluster_resource(): + """ + This method tests the skip collection of unused_nat_gateway + :return: + :rtype: + """ + environment_variables.environment_variables_dict['AWS_DEFAULT_REGION'] = AWS_DEFAULT_REGION + environment_variables.environment_variables_dict['policy'] = 'unused_nat_gateway' + environment_variables.environment_variables_dict['dry_run'] = 'no' + ec2_client = boto3.client('ec2', region_name=AWS_DEFAULT_REGION) + subnet_id = ec2_client.describe_subnets()['Subnets'][0].get('SubnetId') + tags = [{'Key': 'kubernetes.io/cluster/test', 'Value': 'owned'}] + default_ami_id = 'ami-03cf127a' + ec2_client.run_instances(ImageId=default_ami_id, InstanceType='t2.micro', MaxCount=1, + MinCount=1, TagSpecifications=[{'ResourceType': 'instance', 'Tags': tags}]) + ec2_client.create_nat_gateway(SubnetId=subnet_id, TagSpecifications=[{'ResourceType': 'nat-gateway', 'Tags': tags}]) + unused_nat_gateway = UnUsedNatGateway() + response = unused_nat_gateway.run() + assert len(response) == 0 diff --git a/tests/unittest/cloud_governance/policy/azure/test_unused_nat_gateway.py b/tests/unittest/cloud_governance/policy/azure/test_unused_nat_gateway.py new file mode 100644 index 000000000..88e2edc8e --- /dev/null +++ b/tests/unittest/cloud_governance/policy/azure/test_unused_nat_gateway.py @@ -0,0 +1,218 @@ +import datetime + +from azure.mgmt.compute import ComputeManagementClient +from azure.mgmt.monitor import MonitorManagementClient +from azure.mgmt.monitor.v2021_05_01.models import TimeSeriesElement, MetricValue +from azure.mgmt.network import NetworkManagementClient +from azure.mgmt.network.models import NetworkInterfaceIPConfiguration, PublicIPAddress, IPConfiguration + +from cloud_governance.main.environment_variables import environment_variables +from cloud_governance.policy.azure.cleanup.ip_unattached import IpUnattached +from cloud_governance.policy.azure.cleanup.unused_nat_gateway import UnUsedNatGateway +from tests.unittest.configs import SUBSCRIPTION_ID, CURRENT_DATE, NAT_GATEWAY_NAME +from tests.unittest.mocks.azure.mock_compute.mock_compute import mock_compute +from tests.unittest.mocks.azure.mock_identity.mock_default_credential import MockDefaultAzureCredential +from tests.unittest.mocks.azure.mock_monitor.mock_monitor import mock_monitor +from tests.unittest.mocks.azure.mock_network.mock_network import mock_network + + +@mock_compute +@mock_network +@mock_monitor +def test_unused_nat_gateway__check_unused(): + """ + This method tests ip_unattached not collect the unused_nat_gateways + :return: + :rtype: + """ + environment_variables.environment_variables_dict['dry_run'] = 'yes' + environment_variables.environment_variables_dict['policy'] = 'unused_nat_gateway' + network_client = NetworkManagementClient(subscription_id=SUBSCRIPTION_ID, credential=MockDefaultAzureCredential()) + monitor_client = MonitorManagementClient(credential='', subscription_id='') + nat_gateway = network_client.nat_gateways.begin_create_or_update(nat_gateway_name=NAT_GATEWAY_NAME) + monitor_client.metrics.create_metric(resource_id=nat_gateway.id, type='NatGateway', name='test-metric', + unit='ByteCount', timeseries=[]) + unused_nat_gateway = UnUsedNatGateway() + response = unused_nat_gateway.run() + assert len(response) == 1 + + +@mock_compute +@mock_network +@mock_monitor +def test_unused_nat_gateway__check_used(): + """ + This method tests ip_unattached not collect the used_nat_gateways + :return: + :rtype: + """ + environment_variables.environment_variables_dict['dry_run'] = 'yes' + environment_variables.environment_variables_dict['policy'] = 'unused_nat_gateway' + network_client = NetworkManagementClient(subscription_id=SUBSCRIPTION_ID, credential=MockDefaultAzureCredential()) + monitor_client = MonitorManagementClient(credential='', subscription_id='') + nat_gateway = network_client.nat_gateways.begin_create_or_update(nat_gateway_name=NAT_GATEWAY_NAME) + monitor_client.metrics.create_metric(resource_id=nat_gateway.id, type='NatGateway', name='test-metric', + unit='ByteCount', + timeseries=[TimeSeriesElement(data=[ + MetricValue(time_stamp=datetime.datetime.utcnow(), average=100) + ])]) + unused_nat_gateway = UnUsedNatGateway() + response = unused_nat_gateway.run() + assert len(response) == 0 + + +@mock_compute +@mock_network +@mock_monitor +def test_unused_nat_gateway__skip_live_cluster_id(): + """ + This method tests unused_natgateway not collect the active cluster resources + :return: + :rtype: + """ + environment_variables.environment_variables_dict['dry_run'] = 'yes' + environment_variables.environment_variables_dict['policy'] = 'unused_nat_gateway' + network_client = NetworkManagementClient(subscription_id=SUBSCRIPTION_ID, credential=MockDefaultAzureCredential()) + monitor_client = MonitorManagementClient(subscription_id=SUBSCRIPTION_ID, credential=MockDefaultAzureCredential()) + compute_client = ComputeManagementClient(subscription_id=SUBSCRIPTION_ID, credential=MockDefaultAzureCredential()) + tags = {'kubernetes.io/cluster/unittest-vm': 'owned'} + compute_client.virtual_machines.begin_create_or_update(vm_name='test-unitest', tags=tags, location='useast') + nat_gateway = network_client.nat_gateways.begin_create_or_update(nat_gateway_name=NAT_GATEWAY_NAME, tags=tags) + monitor_client.metrics.create_metric(resource_id=nat_gateway.id, type='NatGateway', name='test-metric', + unit='ByteCount', + timeseries=[TimeSeriesElement(data=[ + MetricValue(time_stamp=datetime.datetime.utcnow(), average=0) + ])]) + unused_nat_gateway = UnUsedNatGateway() + response = unused_nat_gateway.run() + assert len(response) == 0 + + +@mock_compute +@mock_network +@mock_monitor +def test_unused_nat_gateway__collect_not_live_cluster_id(): + """ + This method tests collect the non-active cluster unused_natgateways + :return: + :rtype: + """ + environment_variables.environment_variables_dict['dry_run'] = 'yes' + environment_variables.environment_variables_dict['policy'] = 'unused_nat_gateway' + network_client = NetworkManagementClient(subscription_id=SUBSCRIPTION_ID, credential=MockDefaultAzureCredential()) + monitor_client = MonitorManagementClient(subscription_id=SUBSCRIPTION_ID, credential=MockDefaultAzureCredential()) + tags = {'kubernetes.io/cluster/unittest-vm': 'owned'} + nat_gateway = network_client.nat_gateways.begin_create_or_update(nat_gateway_name=NAT_GATEWAY_NAME, tags=tags) + monitor_client.metrics.create_metric(resource_id=nat_gateway.id, type='NatGateway', name='test-metric', + unit='ByteCount', + timeseries=[TimeSeriesElement(data=[ + MetricValue(time_stamp=datetime.datetime.utcnow(), average=0) + ])]) + unused_nat_gateway = UnUsedNatGateway() + response = unused_nat_gateway.run() + assert len(response) == 1 + + +@mock_compute +@mock_network +@mock_monitor +def test_unused_nat_gateway__dryrun_no(): + """ + This method tests unused_nat_gateway + :return: + :rtype: + """ + environment_variables.environment_variables_dict['dry_run'] = 'no' + environment_variables.environment_variables_dict['policy'] = 'unused_nat_gateway' + network_client = NetworkManagementClient(subscription_id=SUBSCRIPTION_ID, credential=MockDefaultAzureCredential()) + monitor_client = MonitorManagementClient(subscription_id=SUBSCRIPTION_ID, credential=MockDefaultAzureCredential()) + tags = {'kubernetes.io/cluster/unittest-vm': 'owned'} + nat_gateway = network_client.nat_gateways.begin_create_or_update(nat_gateway_name=NAT_GATEWAY_NAME, tags=tags) + monitor_client.metrics.create_metric(resource_id=nat_gateway.id, type='NatGateway', name='test-metric', + unit='ByteCount', + timeseries=[TimeSeriesElement(data=[ + MetricValue(time_stamp=datetime.datetime.utcnow(), average=0) + ])]) + unused_nat_gateway = UnUsedNatGateway() + response = unused_nat_gateway.run() + assert response[0]['CleanUpDays'] == 1 + + +@mock_compute +@mock_network +@mock_monitor +def test_unused_nat_gateway__dryrun_no_delete(): + """ + This method tests deletion of unused_nat_gateway + :return: + :rtype: + """ + environment_variables.environment_variables_dict['dry_run'] = 'no' + environment_variables.environment_variables_dict['policy'] = 'unused_nat_gateway' + network_client = NetworkManagementClient(subscription_id=SUBSCRIPTION_ID, credential=MockDefaultAzureCredential()) + monitor_client = MonitorManagementClient(subscription_id=SUBSCRIPTION_ID, credential=MockDefaultAzureCredential()) + tags = {'DaysCount': f'{CURRENT_DATE}@7'} + nat_gateway = network_client.nat_gateways.begin_create_or_update(nat_gateway_name=NAT_GATEWAY_NAME, tags=tags) + monitor_client.metrics.create_metric(resource_id=nat_gateway.id, type='NatGateway', name='test-metric', + unit='ByteCount', + timeseries=[TimeSeriesElement(data=[ + MetricValue(time_stamp=datetime.datetime.utcnow(), average=0) + ])]) + unused_nat_gateway = UnUsedNatGateway() + response = unused_nat_gateway.run() + assert len(response) == 1 + assert response[0]['CleanUpDays'] == 7 + assert response[0]['ResourceState'] == 'Deleted' + + +@mock_compute +@mock_network +@mock_monitor +def test_unused_nat_gateway__skips_delete(): + """ + This method tests skip deletion of unused_nat_gateway + :return: + :rtype: + """ + environment_variables.environment_variables_dict['dry_run'] = 'no' + environment_variables.environment_variables_dict['policy'] = 'unused_nat_gateway' + network_client = NetworkManagementClient(subscription_id=SUBSCRIPTION_ID, credential=MockDefaultAzureCredential()) + monitor_client = MonitorManagementClient(subscription_id=SUBSCRIPTION_ID, credential=MockDefaultAzureCredential()) + tags = {'DaysCount': f'{CURRENT_DATE}@7', 'Policy': 'skip'} + nat_gateway = network_client.nat_gateways.begin_create_or_update(nat_gateway_name=NAT_GATEWAY_NAME, tags=tags) + monitor_client.metrics.create_metric(resource_id=nat_gateway.id, type='NatGateway', name='test-metric', + unit='ByteCount', + timeseries=[TimeSeriesElement(data=[ + MetricValue(time_stamp=datetime.datetime.utcnow(), average=0) + ])]) + unused_nat_gateway = UnUsedNatGateway() + response = unused_nat_gateway.run() + assert len(response) == 1 + assert response[0]['CleanUpDays'] == 7 + assert response[0]['ResourceState'] != 'Deleted' + + +@mock_compute +@mock_network +@mock_monitor +def test_unused_nat_gateway__set_counter_zero(): + """ + This method tests unused_nat_gateway to set days counter to 0 + :return: + :rtype: + """ + environment_variables.environment_variables_dict['dry_run'] = 'yes' + environment_variables.environment_variables_dict['policy'] = 'unused_nat_gateway' + network_client = NetworkManagementClient(subscription_id=SUBSCRIPTION_ID, credential=MockDefaultAzureCredential()) + monitor_client = MonitorManagementClient(subscription_id=SUBSCRIPTION_ID, credential=MockDefaultAzureCredential()) + tags = {'DaysCount': f'{CURRENT_DATE}@7'} + nat_gateway = network_client.nat_gateways.begin_create_or_update(nat_gateway_name=NAT_GATEWAY_NAME, tags=tags) + monitor_client.metrics.create_metric(resource_id=nat_gateway.id, type='NatGateway', name='test-metric', + unit='ByteCount', + timeseries=[TimeSeriesElement(data=[ + MetricValue(time_stamp=datetime.datetime.utcnow(), average=0) + ])]) + unused_nat_gateway = UnUsedNatGateway() + response = unused_nat_gateway.run() + assert len(response) == 1 + assert response[0]['CleanUpDays'] == 0 diff --git a/tests/unittest/configs.py b/tests/unittest/configs.py index a97693962..f7d239832 100644 --- a/tests/unittest/configs.py +++ b/tests/unittest/configs.py @@ -12,6 +12,7 @@ AWS_DEFAULT_REGION = 'us-west-2' DEFAULT_AMI_ID = 'ami-03cf127a' INSTANCE_TYPE = 't2.micro' +NAT_GATEWAY_NAMESPACE = 'AWS/NATGateway' # Azure @@ -21,6 +22,7 @@ NETWORK_PROVIDER = f'providers/Microsoft.Network' COMPUTE_PROVIDER = 'providers/Microsoft.Compute' AZURE_RESOURCE_ID = f'{SUB_ID}/{COMPUTE_PROVIDER}/cloud-governance-unittest' +NAT_GATEWAY_NAME = 'test-cloud-governance-nat-1' # ES diff --git a/tests/unittest/mocks/azure/mock_monitor/mock_metric_operations.py b/tests/unittest/mocks/azure/mock_monitor/mock_metric_operations.py new file mode 100644 index 000000000..bc785d6bf --- /dev/null +++ b/tests/unittest/mocks/azure/mock_monitor/mock_metric_operations.py @@ -0,0 +1,31 @@ +from azure.mgmt.monitor.v2021_05_01.models import Metric, Response + + +class MockMetric(Metric): + + def __init__(self, **kwargs): + super().__init__(**kwargs) + + +class MockResponse(Response): + + def __init__(self, **kwargs): + super().__init__(**kwargs) + + +class MockMetricOperations: + + def __init__(self): + self.__metrics = {} + + def create_metric(self, resource_id: str, type: str, name: str, unit: str, timeseries: list, **kwargs): + name = {'value': name} + + metric = MockMetric(id=resource_id, type=type, name=name, unit=unit, timeseries=timeseries, **kwargs) + self.__metrics[resource_id] = MockResponse(timespan='', value=[metric]) + return metric + + def list(self, resource_uri: str, **kwargs): + return self.__metrics[resource_uri] + + diff --git a/tests/unittest/mocks/azure/mock_monitor/mock_monitor.py b/tests/unittest/mocks/azure/mock_monitor/mock_monitor.py index f5d52db20..b47e42aab 100644 --- a/tests/unittest/mocks/azure/mock_monitor/mock_monitor.py +++ b/tests/unittest/mocks/azure/mock_monitor/mock_monitor.py @@ -5,6 +5,7 @@ from azure.mgmt.monitor import MonitorManagementClient from tests.unittest.mocks.azure.mock_monitor.mock_activity_logs_operations import MockActivityLogsOperations +from tests.unittest.mocks.azure.mock_monitor.mock_metric_operations import MockMetricOperations def mock_monitor(method): @@ -21,7 +22,8 @@ def method_wrapper(*args, **kwargs): @param kwargs: @return: """ - with patch.object(MonitorManagementClient, 'activity_logs', MockActivityLogsOperations()): + with patch.object(MonitorManagementClient, 'activity_logs', MockActivityLogsOperations()), \ + patch.object(MonitorManagementClient, 'metrics', MockMetricOperations()): result = method(*args, **kwargs) return result diff --git a/tests/unittest/mocks/azure/mock_network/mock_nat_gateway_operations.py b/tests/unittest/mocks/azure/mock_network/mock_nat_gateway_operations.py new file mode 100644 index 000000000..bcf9f0095 --- /dev/null +++ b/tests/unittest/mocks/azure/mock_network/mock_nat_gateway_operations.py @@ -0,0 +1,48 @@ +from azure.mgmt.network.models import NatGateway + +from tests.unittest.configs import NETWORK_PROVIDER, SUB_ID +from tests.unittest.mocks.azure.common_operations import CustomItemPaged + + +class MockNatGateway(NatGateway): + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.provisioning_state = kwargs.get('provisioning_state') + + +class MockNatGatewayOperations: + + NAT_GATEWAY = 'virtualNetworks' + + def __init__(self): + self.__nat_gateways = {} + + def begin_create_or_update(self, nat_gateway_name: str, location: str = 'eastus', + provisioning_state: str = 'Succeed', **kwargs): + """ + This method creates the nat gateway resource + :param provisioning_state: + :type provisioning_state: + :param location: + :type location: + :param nat_gateway_name: + :type nat_gateway_name: + :param kwargs: + :type kwargs: + :return: + :rtype: + """ + nat_gateway_id = f'{SUB_ID}/{NETWORK_PROVIDER}/{self.NAT_GATEWAY}/{nat_gateway_name}' + nat_gateway = MockNatGateway(id=nat_gateway_id, name=nat_gateway_name, location=location, + provisioning_state=provisioning_state, **kwargs) + self.__nat_gateways[nat_gateway_name] = nat_gateway + return nat_gateway + + def list_all(self): + """ + This method returns all the network interfaces + :return: + :rtype: + """ + return CustomItemPaged(list(self.__nat_gateways.values())) diff --git a/tests/unittest/mocks/azure/mock_network/mock_network.py b/tests/unittest/mocks/azure/mock_network/mock_network.py index 041af964e..9c5dab9ad 100644 --- a/tests/unittest/mocks/azure/mock_network/mock_network.py +++ b/tests/unittest/mocks/azure/mock_network/mock_network.py @@ -3,6 +3,7 @@ from azure.mgmt.network import NetworkManagementClient +from tests.unittest.mocks.azure.mock_network.mock_nat_gateway_operations import MockNatGatewayOperations from tests.unittest.mocks.azure.mock_network.mock_network_interface_operations import MockNetworkInterfaceOperations from tests.unittest.mocks.azure.mock_network.mock_public_ip_address_operations import MockPublicIpAddressOperations @@ -23,7 +24,8 @@ def method_wrapper(*args, **kwargs): @return: """ with patch.object(NetworkManagementClient, 'public_ip_addresses', MockPublicIpAddressOperations()), \ - patch.object(NetworkManagementClient, 'network_interfaces', MockNetworkInterfaceOperations()): + patch.object(NetworkManagementClient, 'network_interfaces', MockNetworkInterfaceOperations()), \ + patch.object(NetworkManagementClient, 'nat_gateways', MockNatGatewayOperations()): result = method(*args, **kwargs) return result