diff --git a/cloud_governance/common/clouds/azure/compute/common_operations.py b/cloud_governance/common/clouds/azure/compute/common_operations.py index f0054d25..cef714b3 100644 --- a/cloud_governance/common/clouds/azure/compute/common_operations.py +++ b/cloud_governance/common/clouds/azure/compute/common_operations.py @@ -1,9 +1,10 @@ -from typing import Optional, Union +from typing import Optional from azure.core.paging import ItemPaged from azure.identity import DefaultAzureCredential from cloud_governance.cloud_resource_orchestration.utils.common_operations import string_equal_ignore_case +from cloud_governance.common.logger.init_logger import logger from cloud_governance.main.environment_variables import environment_variables @@ -67,7 +68,7 @@ def __convert_cast_type(self, type_cast: str, value: str): else: return str(value) - def _get_resource_group_name_from_resource_id(self, resource_id: str): + def get_resource_group_name_from_resource_id(self, resource_id: str): """ This method returns the resource_group from resource_id :param resource_id: @@ -78,3 +79,15 @@ def _get_resource_group_name_from_resource_id(self, resource_id: str): id_list = resource_id.split('/') key_values = {id_list[i].lower(): id_list[i+1] for i in range(0, len(id_list) - 1)} return key_values.get('resourcegroups').lower() + + def get_id_dict_data(self, resource_id: str): + """ + This method generates the vm id dictionary + :param resource_id: + :type resource_id: + :return: + :rtype: + """ + pairs = resource_id.split('/')[1:] + key_pairs = {pairs[i].lower(): pairs[i + 1] for i in range(0, len(pairs), 2)} + return key_pairs diff --git a/cloud_governance/common/clouds/azure/compute/compute_operations.py b/cloud_governance/common/clouds/azure/compute/compute_operations.py index fc3916b9..e77d8a23 100644 --- a/cloud_governance/common/clouds/azure/compute/compute_operations.py +++ b/cloud_governance/common/clouds/azure/compute/compute_operations.py @@ -41,23 +41,11 @@ def get_instance_statuses(self, resource_id: str, vm_name: str) -> dict: :return: :rtype: """ - resource_group_name = self._get_resource_group_name_from_resource_id(resource_id=resource_id) + resource_group_name = self.get_resource_group_name_from_resource_id(resource_id=resource_id) virtual_machine = self.__compute_client.virtual_machines.instance_view(resource_group_name=resource_group_name, vm_name=vm_name) return virtual_machine.as_dict() - def get_id_dict_data(self, resource_id: str): - """ - This method generates the vm id dictionary - :param resource_id: - :type resource_id: - :return: - :rtype: - """ - pairs = resource_id.split('/')[1:] - key_pairs = {pairs[i].lower(): pairs[i + 1] for i in range(0, len(pairs), 2)} - return key_pairs - def stop_vm(self, resource_id: str): """ This method stops the vm diff --git a/cloud_governance/common/clouds/azure/compute/network_operations.py b/cloud_governance/common/clouds/azure/compute/network_operations.py new file mode 100644 index 00000000..8f5d6c2b --- /dev/null +++ b/cloud_governance/common/clouds/azure/compute/network_operations.py @@ -0,0 +1,72 @@ +from azure.mgmt.network import NetworkManagementClient + +from cloud_governance.common.clouds.azure.compute.common_operations import CommonOperations +from cloud_governance.common.utils.utils import Utils + + +class NetworkOperations(CommonOperations): + + def __init__(self): + super().__init__() + self.__network_client = NetworkManagementClient(self._default_creds, subscription_id=self._subscription_id) + + def __get_all_public_ip_address(self): + """ + This method returns all the Public address + :return: + :rtype: + """ + ips = self.__network_client.public_ip_addresses.list_all() + return self._item_paged_iterator(item_paged_object=ips, as_dict=True) + + def get_public_ipv4_addresses(self): + """ + This method returns the Public IPV4 + :return: + :rtype: + """ + public_addresses = [] + for ipaddress in self.__get_all_public_ip_address(): + if Utils.equal_ignore_case(ipaddress.get('public_ip_address_version'), 'IPv4'): + if Utils.equal_ignore_case(ipaddress.get('public_ip_allocation_method'), 'Static'): + public_addresses.append(ipaddress) + return public_addresses + + def __get_network_interfaces(self): + """ + This method returns the network interfaces + :return: + :rtype: + """ + network_interfaces = self.__network_client.network_interfaces.list_all() + network_interfaces = self._item_paged_iterator(item_paged_object=network_interfaces, as_dict=True) + return network_interfaces + + def get_public_ipv4_network_interfaces(self): + """ + This method returns the network interfaces have the public ip address attached + :return: + :rtype: + """ + public_ipv4_network_interfaces = {} + network_interfaces = self.__get_network_interfaces() + for network_interface in network_interfaces: + for ip_configuration in network_interface.get('ip_configurations', []): + if ip_configuration.get('public_ip_address', {}): + public_ipv4_address_id = ip_configuration.get('public_ip_address', {}).get('id') + public_ipv4_network_interfaces.setdefault(public_ipv4_address_id, []).append(network_interface) + return public_ipv4_network_interfaces + + # delete operations + def release_public_ip(self, resource_id: str): + """ + This method releases the public ip + :return: + :rtype: + """ + id_key_pairs = self.get_id_dict_data(resource_id) + resource_group_name = id_key_pairs.get('resourcegroups') + public_ip_address_name = id_key_pairs.get('publicipaddresses') + status = self.__network_client.public_ip_addresses.begin_delete(resource_group_name=resource_group_name, + public_ip_address_name=public_ip_address_name) + return status.done() diff --git a/cloud_governance/main/main_oerations/main_operations.py b/cloud_governance/main/main_oerations/main_operations.py index ffe7a619..fa326a68 100644 --- a/cloud_governance/main/main_oerations/main_operations.py +++ b/cloud_governance/main/main_oerations/main_operations.py @@ -38,7 +38,8 @@ def run(self): policy_runner = self.get_policy_runner() for policy_type, policies in policies_list.items(): # @Todo support for all the aws policies, currently supports ec2_run as urgent requirement - if self._policy in policies and self._policy in ["instance_run", "unattached_volume", "cluster_run"]: + if self._policy in policies and self._policy in ["instance_run", "unattached_volume", "cluster_run", + "ip_unattached"]: policy_runner.run(source=policy_type) return True return False diff --git a/cloud_governance/policy/aws/ip_unattached.py b/cloud_governance/policy/aws/ip_unattached.py index 8cefd079..87d01370 100644 --- a/cloud_governance/policy/aws/ip_unattached.py +++ b/cloud_governance/policy/aws/ip_unattached.py @@ -1,52 +1,55 @@ +from cloud_governance.policy.helpers.aws.aws_policy_operations import AWSPolicyOperations -from cloud_governance.policy.policy_operations.aws.zombie_non_cluster.run_zombie_non_cluster_policies import NonClusterZombiePolicy - -class IpUnattached(NonClusterZombiePolicy): +class IpUnattached(AWSPolicyOperations): """ Fetched the Unused elastic_ips( based on network interface Id) and delete it after 7 days of unused, alert user after 4 days of unused elastic_ip. """ + RESOURCE_ACTION = "Delete" + def __init__(self): super().__init__() - def run(self): + def run_policy_operations(self): """ - This method returns zombie elastic_ip's and delete if dry_run no - @return: + This method returns the list of unattached IPV4 addresses + :return: + :rtype: """ addresses = self._ec2_operations.get_elastic_ips() - zombie_addresses = [] + active_cluster_ids = self._get_active_cluster_ids() + unattached_addresses = [] for address in addresses: - ip_no_used = False tags = address.get('Tags', []) - if not self._check_cluster_tag(tags=tags) or self._get_policy_value(tags=tags) not in ('NOTDELETE', 'SKIP'): + cleanup_result = False + ip_not_used = False + resource_id = address.get('AllocationId') + cluster_tag = self._get_cluster_tag(tags=address.get('Tags')) + if cluster_tag not in active_cluster_ids: if not address.get('NetworkInterfaceId'): - ip_no_used = True - unused_days = self._get_resource_last_used_days(tags=tags) - eip_cost = self.resource_pricing.get_const_prices(resource_type='eip', hours=(self.DAILY_HOURS * unused_days)) - delta_cost = 0 - if unused_days == self.DAYS_TO_NOTIFY_ADMINS: - delta_cost = self.resource_pricing.get_const_prices(resource_type='eip', hours=(self.DAILY_HOURS * (unused_days - self.DAYS_TO_TRIGGER_RESOURCE_MAIL))) - else: - if unused_days >= self.DAYS_TO_DELETE_RESOURCE: - delta_cost = self.resource_pricing.get_const_prices(resource_type='eip', hours=(self.DAILY_HOURS * (unused_days - self.DAYS_TO_NOTIFY_ADMINS))) - zombie_eip = self._check_resource_and_delete(resource_name='ElasticIp', - resource_id='AllocationId', - resource_type='AllocateAddress', - resource=address, - empty_days=unused_days, - days_to_delete_resource=self.DAYS_TO_DELETE_RESOURCE, tags=tags, - extra_purse=eip_cost, delta_cost=delta_cost) - if zombie_eip: - zombie_addresses.append({'ResourceId': address.get('AllocationId'), - 'Name': self._get_tag_name_from_tags(tags=tags), - 'User': self._get_tag_name_from_tags(tags=tags, tag_name='User'), - 'PublicIp': address.get('PublicIp'), - 'Skip': self._get_policy_value(tags=tags), - 'Days': unused_days}) + cleanup_days = self.get_clean_up_days_count(tags=tags) + ip_not_used = True + cleanup_result = self.verify_and_delete_resource(resource_id=resource_id, tags=tags, + clean_up_days=cleanup_days) + resource_data = self._get_es_schema(resource_id=resource_id, + user=self.get_tag_name_from_tags(tags=tags, tag_name='User'), + skip_policy=self.get_skip_policy_value(tags=tags), + cleanup_days=cleanup_days, dry_run=self._dry_run, + name=self.get_tag_name_from_tags(tags=tags, tag_name='Name'), + region=self._region, + cleanup_result=str(cleanup_result), + resource_action=self.RESOURCE_ACTION, + cloud_name=self._cloud_name, + resource_type='PublicIPv4', + resource_state='disassociated' if not cleanup_result else "Deleted" + ) + unattached_addresses.append(resource_data) else: - unused_days = 0 - self._update_resource_tags(resource_id=address.get('AllocationId'), tags=tags, left_out_days=unused_days, resource_left_out=ip_no_used) - return zombie_addresses + cleanup_days = 0 + if not cleanup_result: + if self.get_tag_name_from_tags(tags, tag_name='DaysCount') or ip_not_used: + self.update_resource_day_count_tag(resource_id=resource_id, cleanup_days=cleanup_days, tags=tags) + + return unattached_addresses diff --git a/cloud_governance/policy/azure/cleanup/ip_unattached.py b/cloud_governance/policy/azure/cleanup/ip_unattached.py new file mode 100644 index 00000000..6cfb61e0 --- /dev/null +++ b/cloud_governance/policy/azure/cleanup/ip_unattached.py @@ -0,0 +1,72 @@ + +from cloud_governance.policy.helpers.azure.azure_policy_operations import AzurePolicyOperations + + +class IpUnattached(AzurePolicyOperations): + + RESOURCE_ACTION = "Delete" + + def __init__(self): + super().__init__() + self.__network_interfaces = self.network_operations.get_public_ipv4_network_interfaces() + + def __check_ipv4_not_associated(self, ipv4_address_dict: dict): + """ + This method returns bool + :param ipv4_address_dict: + :type ipv4_address_dict: + :return: + :rtype: + """ + found = False + ip_address_id = ipv4_address_dict.get('id') + if not ipv4_address_dict.get('ip_configuration'): + found = True + else: + network_interface_id = ipv4_address_dict.get('ip_configuration', {}).get('id') + if ip_address_id in self.__network_interfaces: + for network_interface in self.__network_interfaces.get(ip_address_id): + if network_interface.get('id') in network_interface_id: + if not network_interface.get('virtual_machine'): + found = True + return found + + def run_policy_operations(self, volume=None): + """ + This method returns the list of unattached IPV4's + :return: + :rtype: + """ + unattached_ips = [] + active_cluster_ids = self._get_active_cluster_ids() + public_ipv4_address = self.network_operations.get_public_ipv4_addresses() + for ip_address in public_ipv4_address: + tags = ip_address.get('tags') + cleanup_result = False + is_disassociated_ipv4 = False + cluster_tag = self._get_cluster_tag(tags=tags) + if cluster_tag not in active_cluster_ids: + is_disassociated_ipv4 = self.__check_ipv4_not_associated(ip_address) + if is_disassociated_ipv4: + cleanup_days = self.get_clean_up_days_count(tags=tags) + cleanup_result = self.verify_and_delete_resource(resource_id=ip_address.get('id'), tags=tags, + clean_up_days=cleanup_days) + resource_data = self._get_es_schema(resource_id=ip_address.get('name'), + user=self.get_tag_name_from_tags(tags=tags, tag_name='User'), + skip_policy=self.get_skip_policy_value(tags=tags), + cleanup_days=cleanup_days, dry_run=self._dry_run, + name=ip_address.get('name'), region=ip_address.get('location'), + cleanup_result=str(cleanup_result), + resource_action=self.RESOURCE_ACTION, + cloud_name=self._cloud_name, + resource_type="PublicIPv4 Static", + resource_state="disassociated" if not cleanup_result else "Deleted" + ) + unattached_ips.append(resource_data) + else: + cleanup_days = 0 + if not cleanup_result: + if self.get_tag_name_from_tags(tags, tag_name='DaysCount') or is_disassociated_ipv4: + self.update_resource_day_count_tag(resource_id=ip_address.get("id"), + cleanup_days=cleanup_days, tags=tags) + return unattached_ips diff --git a/cloud_governance/policy/helpers/abstract_policy_operations.py b/cloud_governance/policy/helpers/abstract_policy_operations.py index a79e7fb2..1b27a208 100644 --- a/cloud_governance/policy/helpers/abstract_policy_operations.py +++ b/cloud_governance/policy/helpers/abstract_policy_operations.py @@ -192,6 +192,7 @@ def _get_es_schema(self, resource_id: str, user: str, skip_policy: str, cleanup_ 'RegionName': region, f'Resource{resource_action}': cleanup_result, 'PublicCloud': cloud_name, + 'ExpireDays': self._days_to_take_action, 'index-id': f'{current_date}-{cloud_name.lower()}-{self.account.lower()}-{region.lower()}-{resource_id}-{resource_state.lower()}' } if kwargs.get('launch_time'): diff --git a/cloud_governance/policy/helpers/azure/azure_policy_operations.py b/cloud_governance/policy/helpers/azure/azure_policy_operations.py index de09f15d..bfd54d32 100644 --- a/cloud_governance/policy/helpers/azure/azure_policy_operations.py +++ b/cloud_governance/policy/helpers/azure/azure_policy_operations.py @@ -1,5 +1,6 @@ from cloud_governance.common.clouds.azure.compute.compute_operations import ComputeOperations +from cloud_governance.common.clouds.azure.compute.network_operations import NetworkOperations from cloud_governance.common.clouds.azure.compute.resource_group_operations import ResourceGroupOperations from cloud_governance.policy.helpers.abstract_policy_operations import AbstractPolicyOperations from cloud_governance.common.logger.init_logger import logger @@ -11,6 +12,7 @@ class AzurePolicyOperations(AbstractPolicyOperations): def __init__(self): self._cloud_name = 'Azure' self.compute_operations = ComputeOperations() + self.network_operations = NetworkOperations() self.resource_group_operations = ResourceGroupOperations() super().__init__() @@ -45,6 +47,8 @@ def _delete_resource(self, resource_id: str): self.compute_operations.stop_vm(resource_id=resource_id) elif self._policy == 'unattached_volume': self.compute_operations.delete_disk(resource_id=resource_id) + elif self._policy == 'ip_unattached': + delete_status = self.network_operations.release_public_ip(resource_id=resource_id) logger.info(f'{self._policy} {action}: {resource_id}') except Exception as err: logger.info(f'Exception raised: {err}: {resource_id}') @@ -52,7 +56,7 @@ def _delete_resource(self, resource_id: str): def update_resource_day_count_tag(self, resource_id: str, cleanup_days: int, tags: dict): tags = self._update_tag_value(tags=tags, tag_name='DaysCount', tag_value=str(cleanup_days)) try: - if self._policy in ['instance_run', 'unattached_volume']: + if self._policy in ['instance_run', 'unattached_volume', 'ip_unattached']: self.resource_group_operations.creates_or_updates_tags(resource_id=resource_id, tags=tags) except Exception as err: logger.info(f'Exception raised: {err}: {resource_id}') diff --git a/tests/unittest/cloud_governance/aws/zombie_non_cluster/test_ip_unattached.py b/tests/unittest/cloud_governance/aws/zombie_non_cluster/test_ip_unattached.py deleted file mode 100644 index 11edc06f..00000000 --- a/tests/unittest/cloud_governance/aws/zombie_non_cluster/test_ip_unattached.py +++ /dev/null @@ -1,60 +0,0 @@ -import os - -import boto3 -from moto import mock_ec2 - -from cloud_governance.policy.policy_operations.aws.zombie_non_cluster.run_zombie_non_cluster_policies import NonClusterZombiePolicy - -os.environ['AWS_DEFAULT_REGION'] = 'us-east-2' -os.environ['dry_run'] = 'no' - - -@mock_ec2 -def test_ip_unattached(): - """ - This method tests delete of zombie elastic ips - @return: - """ - os.environ['policy'] = 'ip_unattached' - ec2_client = boto3.client('ec2', region_name=os.environ.get('AWS_DEFAULT_REGION')) - ec2_client.allocate_address(Domain='vpc') - ip_unattached = NonClusterZombiePolicy() - ip_unattached.set_dryrun(value='no') - ip_unattached.set_policy(value='ip_unattached') - ip_unattached.DAYS_TO_TRIGGER_RESOURCE_MAIL = -1 - ip_unattached._check_resource_and_delete(resource_name='ElasticIp', - resource_id='AllocationId', - resource_type='AllocateAddress', - resource=ec2_client.describe_addresses()['Addresses'][0], - empty_days=0, - days_to_delete_resource=0) - addresses = ec2_client.describe_addresses()['Addresses'] - assert len(addresses) == 0 - - -@mock_ec2 -def test_ip_unattached_not_delete(): - """ - This method tests not delete of zombie elastic ips,if policy=NOT_DELETE - @return: - """ - os.environ['policy'] = 'ip_unattached' - tags = [ - {'Key': 'Name', 'Value': 'CloudGovernanceTestZombieElasticIp'}, - {'Key': 'Owner', 'Value': 'CloudGovernance'}, - {'Key': 'policy', 'Value': 'skip'} - ] - ec2_client = boto3.client('ec2', region_name=os.environ.get('AWS_DEFAULT_REGION')) - ec2_client.allocate_address(Domain='vpc', TagSpecifications=[{'ResourceType': 'elastic-ip', 'Tags': tags}]) - ip_unattached = NonClusterZombiePolicy() - ip_unattached.set_dryrun(value='no') - ip_unattached.set_policy(value='ip_unattached') - ip_unattached.DAYS_TO_TRIGGER_RESOURCE_MAIL = -1 - ip_unattached._check_resource_and_delete(resource_name='ElasticIp', - resource_id='AllocationId', - resource_type='AllocateAddress', - resource=ec2_client.describe_addresses()['Addresses'][0], - empty_days=0, - days_to_delete_resource=0) - addresses = ec2_client.describe_addresses()['Addresses'] - assert len(addresses) == 1 diff --git a/tests/unittest/cloud_governance/policy/aws/test_ip_unattached.py b/tests/unittest/cloud_governance/policy/aws/test_ip_unattached.py new file mode 100644 index 00000000..29855c07 --- /dev/null +++ b/tests/unittest/cloud_governance/policy/aws/test_ip_unattached.py @@ -0,0 +1,167 @@ +import boto3 +from moto import mock_ec2 + +from cloud_governance.main.environment_variables import environment_variables +from cloud_governance.policy.aws.ip_unattached import IpUnattached +from tests.unittest.configs import AWS_DEFAULT_REGION, DRY_RUN_YES, DRY_RUN_NO, CURRENT_DATE, DEFAULT_AMI_ID, \ + INSTANCE_TYPE + + +@mock_ec2 +def test_ip_unattached__verify_count_zero_dry_run_yes(): + """ + This method tests ip unattached, get the data and verify counter as 0 + @return: + """ + environment_variables.environment_variables_dict['dry_run'] = DRY_RUN_YES + environment_variables.environment_variables_dict['AWS_DEFAULT_REGION'] = AWS_DEFAULT_REGION + environment_variables.environment_variables_dict['policy'] = 'ip_unattached' + ec2_client = boto3.client('ec2', region_name=AWS_DEFAULT_REGION) + ec2_client.allocate_address(Domain='vpc') + ip_unattached = IpUnattached() + response = ip_unattached.run() + addresses = ec2_client.describe_addresses()['Addresses'] + assert len(addresses) == 1 + assert len(response) == 1 + assert response[0]['CleanUpDays'] == 0 + + +@mock_ec2 +def test_ip_unattached__verify_count_increased_dry_run_no(): + """ + This method tests ip unattached, get the data and verify counter increased to 1 + @return: + """ + environment_variables.environment_variables_dict['dry_run'] = DRY_RUN_NO + environment_variables.environment_variables_dict['AWS_DEFAULT_REGION'] = AWS_DEFAULT_REGION + environment_variables.environment_variables_dict['DAYS_TO_TAKE_ACTION'] = 7 + environment_variables.environment_variables_dict['policy'] = 'ip_unattached' + ec2_client = boto3.client('ec2', region_name=AWS_DEFAULT_REGION) + ec2_client.allocate_address(Domain='vpc') + ip_unattached = IpUnattached() + response = ip_unattached.run() + addresses = ec2_client.describe_addresses()['Addresses'] + assert len(addresses) == 1 + assert len(response) == 1 + assert response[0]['CleanUpDays'] == 1 + + +@mock_ec2 +def test_ip_unattached__verify_not_delete_on_dry_run_yes(): + """ + This method tests ip unattached, get the data and verify the resource not deleted on dry run yes + @return: + """ + environment_variables.environment_variables_dict['AWS_DEFAULT_REGION'] = AWS_DEFAULT_REGION + environment_variables.environment_variables_dict['dry_run'] = DRY_RUN_YES + environment_variables.environment_variables_dict['policy'] = 'ip_unattached' + ec2_client = boto3.client('ec2', region_name=AWS_DEFAULT_REGION) + tags = [ + {'Key': 'DaysCount', 'Value': f'{CURRENT_DATE.__str__()}@7'} + ] + ec2_client.allocate_address(Domain='vpc',TagSpecifications=[{'ResourceType': 'elastic-ip', 'Tags': tags}]) + ip_unattached = IpUnattached() + response = ip_unattached.run() + addresses = ec2_client.describe_addresses()['Addresses'] + assert len(addresses) == 1 + assert len(response) == 1 + assert response[0]['CleanUpDays'] == 0 + + +@mock_ec2 +def test_ip_unattached__verify_delete_on_dry_run_no(): + """ + This method tests ip unattached, deletes the ip + @return: + """ + environment_variables.environment_variables_dict['dry_run'] = DRY_RUN_NO + environment_variables.environment_variables_dict['AWS_DEFAULT_REGION'] = AWS_DEFAULT_REGION + environment_variables.environment_variables_dict['policy'] = 'ip_unattached' + ec2_client = boto3.client('ec2', region_name=AWS_DEFAULT_REGION) + tags = [ + {'Key': 'DaysCount', 'Value': f'{CURRENT_DATE.__str__()}@7'} + ] + ec2_client.allocate_address(Domain='vpc', TagSpecifications=[{'ResourceType': 'elastic-ip', 'Tags': tags}]) + ip_unattached = IpUnattached() + response = ip_unattached.run() + addresses = ec2_client.describe_addresses()['Addresses'] + assert len(addresses) == 0 + assert len(response) == 1 + assert response[0]['ResourceState'] == 'Deleted' + + +@mock_ec2 +def test_ip_unattached__skips_delete_on_dry_run_no(): + """ + This method tests ip unattached, skips delete if tags have Policy=skip + @return: + """ + environment_variables.environment_variables_dict['AWS_DEFAULT_REGION'] = AWS_DEFAULT_REGION + environment_variables.environment_variables_dict['policy'] = 'ip_unattached' + environment_variables.environment_variables_dict['dry_run'] = DRY_RUN_NO + ec2_client = boto3.client('ec2', region_name=AWS_DEFAULT_REGION) + tags = [ + {'Key': 'DaysCount', 'Value': f'{CURRENT_DATE.__str__()}@07'}, + {'Key': 'Policy', 'Value': 'skip'}, + ] + ec2_client.allocate_address(Domain='vpc', TagSpecifications=[{'ResourceType': 'elastic-ip', 'Tags': tags}]) + ip_unattached = IpUnattached() + response = ip_unattached.run() + addresses = ec2_client.describe_addresses()['Addresses'] + assert len(addresses) == 1 + assert len(response) == 1 + assert response[0]['CleanUpDays'] == 7 + assert response[0]['ResourceState'] == 'disassociated' + + +@mock_ec2 +def test_ip_unattached__skips_active_ip(): + """ + This method tests ip unattached, skips active ip + @return: + """ + environment_variables.environment_variables_dict['AWS_DEFAULT_REGION'] = AWS_DEFAULT_REGION + environment_variables.environment_variables_dict['policy'] = 'ip_unattached' + environment_variables.environment_variables_dict['dry_run'] = DRY_RUN_NO + ec2_client = boto3.client('ec2', region_name=AWS_DEFAULT_REGION) + tags = [ + {'Key': 'DaysCount', 'Value': f'{CURRENT_DATE.__str__()}@7'}, + {'Key': 'Policy', 'Value': 'skip'}, + ] + instance = ec2_client.run_instances(ImageId=DEFAULT_AMI_ID, InstanceType=INSTANCE_TYPE, MaxCount=1, MinCount=1, + TagSpecifications=[{'ResourceType': 'instance', 'Tags': tags}]).get('Instances')[0] + ip_address = ec2_client.allocate_address(Domain='vpc', TagSpecifications=[{'ResourceType': 'elastic-ip', 'Tags': tags}]) + ec2_client.associate_address(AllocationId=ip_address.get('AllocationId'), InstanceId=instance.get('InstanceId')) + ip_unattached = IpUnattached() + response = ip_unattached.run() + addresses = ec2_client.describe_addresses()['Addresses'] + assert len(addresses) == 1 + assert len(response) == 0 + + +@mock_ec2 +def test_ip_unattached__create_run_associate(): + """ + This method tests ip unattached, skips active ip + @return: + """ + environment_variables.environment_variables_dict['AWS_DEFAULT_REGION'] = AWS_DEFAULT_REGION + environment_variables.environment_variables_dict['policy'] = 'ip_unattached' + environment_variables.environment_variables_dict['DAYS_TO_TAKE_ACTION'] = 7 + environment_variables.environment_variables_dict['dry_run'] = DRY_RUN_NO + ec2_client = boto3.client('ec2', region_name=AWS_DEFAULT_REGION) + tags = [ + {'Key': 'DaysCount', 'Value': f'{CURRENT_DATE.__str__()}@0'}, + ] + instance = ec2_client.run_instances(ImageId=DEFAULT_AMI_ID, InstanceType=INSTANCE_TYPE, MaxCount=1, MinCount=1, + TagSpecifications=[{'ResourceType': 'instance', 'Tags': tags}]).get('Instances')[0] + ip_address = ec2_client.allocate_address(Domain='vpc', TagSpecifications=[{'ResourceType': 'elastic-ip', 'Tags': tags}]) + ip_unattached = IpUnattached() + response = ip_unattached.run() + assert len(response) == 1 + ec2_client.associate_address(AllocationId=ip_address.get('AllocationId'), InstanceId=instance.get('InstanceId')) + ip_unattached = IpUnattached() + response = ip_unattached.run() + addresses = ec2_client.describe_addresses()['Addresses'] + assert len(addresses) == 1 + assert len(response) == 0 diff --git a/tests/unittest/cloud_governance/policy/azure/test_instance_run.py b/tests/unittest/cloud_governance/policy/azure/test_instance_run.py index a1a23a8b..713325c4 100644 --- a/tests/unittest/cloud_governance/policy/azure/test_instance_run.py +++ b/tests/unittest/cloud_governance/policy/azure/test_instance_run.py @@ -6,7 +6,7 @@ from cloud_governance.main.environment_variables import environment_variables from cloud_governance.policy.azure.cleanup.instance_run import InstanceRun -from tests.unittest.mocks.azure.mock_compute import MockVirtualMachine, MockAzure +from tests.unittest.mocks.azure.mock_computes import MockVirtualMachine, MockAzure def test_instance_run(): diff --git a/tests/unittest/cloud_governance/policy/azure/test_ip_unattached.py b/tests/unittest/cloud_governance/policy/azure/test_ip_unattached.py new file mode 100644 index 00000000..8a87d7c7 --- /dev/null +++ b/tests/unittest/cloud_governance/policy/azure/test_ip_unattached.py @@ -0,0 +1,215 @@ +from azure.mgmt.compute import ComputeManagementClient +from azure.mgmt.network import NetworkManagementClient +from azure.mgmt.network.models import NetworkInterfaceIPConfiguration, PublicIPAddress, IPConfiguration + +from cloud_governance.main.environment_variables import environment_variables +from cloud_governance.policy.azure.cleanup.ip_unattached import IpUnattached +from tests.unittest.configs import SUBSCRIPTION_ID, CURRENT_DATE +from tests.unittest.mocks.azure.mock_compute.mock_compute import mock_compute +from tests.unittest.mocks.azure.mock_identity.mock_default_credential import MockDefaultAzureCredential +from tests.unittest.mocks.azure.mock_network.mock_network import mock_network + + +@mock_compute +@mock_network +def test_ip_unattached_using_addresses(): + """ + This method tests ip_unattached not collect the used ip addresses + :return: + :rtype: + """ + environment_variables.environment_variables_dict['dry_run'] = 'yes' + network_client = NetworkManagementClient(subscription_id=SUBSCRIPTION_ID, credential=MockDefaultAzureCredential()) + ip_address = (network_client.public_ip_addresses. + begin_create_or_update(public_ip_address_name='test', location='useast', + public_ip_address_version='IPv4', + public_ip_allocation_method='Static')) + network_interface = network_client.network_interfaces.begin_create_or_update(network_interface_name='unitest', + virtual_machine='test-unitest', + ip_configurations=[ + NetworkInterfaceIPConfiguration( + id="interface-1", + public_ip_address= + PublicIPAddress(id=ip_address.id) + ) + ]) + network_client.public_ip_addresses.begin_create_or_update(public_ip_address_name='test', location='useast', + public_ip_address_version='IPv4', + public_ip_allocation_method='Static', + ip_configuration=IPConfiguration(id=network_interface.id)) + ip_unattached = IpUnattached() + response = ip_unattached.run() + assert len(response) == 0 + + +@mock_compute +@mock_network +def test_ip_unattached_skip_live_cluster_id(): + """ + This method tests ip_unattached policy skips the ip connected to active cluster + :return: + :rtype: + """ + environment_variables.environment_variables_dict['dry_run'] = 'yes' + network_client = NetworkManagementClient(subscription_id=SUBSCRIPTION_ID, credential=MockDefaultAzureCredential()) + compute_client = ComputeManagementClient(subscription_id=SUBSCRIPTION_ID, credential=MockDefaultAzureCredential()) + tags = {'kubernetes.io/cluster/unittest-vm': 'owned'} + compute_client.virtual_machines.begin_create_or_update(vm_name='test-unitest', tags=tags, location='useast') + ip_address = (network_client.public_ip_addresses. + begin_create_or_update(public_ip_address_name='test', location='useast', + public_ip_address_version='IPv4', + public_ip_allocation_method='Static', tags=tags)) + network_interface = network_client.network_interfaces.begin_create_or_update(network_interface_name='unitest', + virtual_machine='test-unittest', + ip_configurations=[ + NetworkInterfaceIPConfiguration( + id="interface-1", + public_ip_address= + PublicIPAddress( + id=ip_address.id) + ) + ], tags=tags) + network_client.public_ip_addresses.begin_create_or_update(public_ip_address_name='test', location='useast', + public_ip_address_version='IPv4', + public_ip_allocation_method='Static', + ip_configuration=IPConfiguration(id=network_interface.id), + tags=tags) + ip_unattached = IpUnattached() + response = ip_unattached.run() + assert len(response) == 0 + + +@mock_compute +@mock_network +def test_ip_unattached_not_live_cluster_id(): + """ + This method tests ip_unattached policy collect ip not connected to active cluster + :return: + :rtype: + """ + environment_variables.environment_variables_dict['dry_run'] = 'yes' + network_client = NetworkManagementClient(subscription_id=SUBSCRIPTION_ID, credential=MockDefaultAzureCredential()) + tags = {'kubernetes.io/cluster/unittest-vm': 'owned'} + ip_address = (network_client.public_ip_addresses. + begin_create_or_update(public_ip_address_name='test', location='useast', + public_ip_address_version='IPv4', + public_ip_allocation_method='Static', tags=tags)) + network_interface = network_client.network_interfaces.begin_create_or_update(network_interface_name='unitest', + ip_configurations=[ + NetworkInterfaceIPConfiguration( + id="interface-1", + public_ip_address= + PublicIPAddress( + id=ip_address.id) + ) + ], tags=tags) + network_client.public_ip_addresses.begin_create_or_update(public_ip_address_name='test', location='useast', + public_ip_address_version='IPv4', + public_ip_allocation_method='Static', + ip_configuration=IPConfiguration(id=network_interface.id), + tags=tags) + ip_unattached = IpUnattached() + response = ip_unattached.run() + assert len(response) == 1 + + +@mock_compute +@mock_network +def test_ip_unattached_dry_run_yes(): + """ + This method tests ip_unattached, not attached to any instance or network interface + :return: + :rtype: + """ + environment_variables.environment_variables_dict['dry_run'] = 'yes' + network_client = NetworkManagementClient(subscription_id=SUBSCRIPTION_ID, credential=MockDefaultAzureCredential()) + network_client.public_ip_addresses.begin_create_or_update(public_ip_address_name='test', location='useast', + public_ip_address_version='IPv4', + public_ip_allocation_method='Static') + ip_unattached = IpUnattached() + response = ip_unattached.run() + assert len(response) == 1 + assert response[0]['CleanUpDays'] == 0 + + +@mock_compute +@mock_network +def test_ip_unattached_dryrun_no(): + """ + This method tests ip_unattached, not attached to any instance or network interface + :return: + :rtype: + """ + environment_variables.environment_variables_dict['dry_run'] = 'no' + network_client = NetworkManagementClient(subscription_id=SUBSCRIPTION_ID, credential=MockDefaultAzureCredential()) + network_client.public_ip_addresses.begin_create_or_update(public_ip_address_name='test', location='useast', + public_ip_address_version='IPv4', + public_ip_allocation_method='Static') + ip_unattached = IpUnattached() + response = ip_unattached.run() + assert len(response) == 1 + assert response[0]['CleanUpDays'] == 1 + assert response[0]['ResourceState'] == 'disassociated' + + +@mock_compute +@mock_network +def test_ip_unattached_delete(): + """ + This method tests ip_unattached to delete ip address, not attached to any instance or network interface + :return: + :rtype: + """ + environment_variables.environment_variables_dict['dry_run'] = 'no' + network_client = NetworkManagementClient(subscription_id=SUBSCRIPTION_ID, credential=MockDefaultAzureCredential()) + network_client.public_ip_addresses.begin_create_or_update(public_ip_address_name='test', location='useast', + public_ip_address_version='IPv4', + public_ip_allocation_method='Static', + tags={'DaysCount': f'{CURRENT_DATE}@7'}) + ip_unattached = IpUnattached() + response = ip_unattached.run() + assert len(response) == 1 + assert response[0]['CleanUpDays'] == 7 + assert response[0]['ResourceState'] == 'Deleted' + + +@mock_compute +@mock_network +def test_ip_unattached_skips_delete(): + """ + This method tests ip_unattached to delete ip address, not attached to any instance or network interface + :return: + :rtype: + """ + environment_variables.environment_variables_dict['dry_run'] = 'no' + network_client = NetworkManagementClient(subscription_id=SUBSCRIPTION_ID, credential=MockDefaultAzureCredential()) + network_client.public_ip_addresses.begin_create_or_update(public_ip_address_name='test', location='useast', + public_ip_address_version='IPv4', + public_ip_allocation_method='Static', + tags={'DaysCount': f'{CURRENT_DATE}@7', 'Policy': 'skip'}) + ip_unattached = IpUnattached() + response = ip_unattached.run() + assert len(response) == 1 + assert response[0]['CleanUpDays'] == 7 + assert response[0]['ResourceState'] == 'disassociated' + + +@mock_compute +@mock_network +def test_ip_unattached_set_counter_zero(): + """ + This method tests ip_unattached to set days counter to 0, not attached to any instance or network interface + :return: + :rtype: + """ + environment_variables.environment_variables_dict['dry_run'] = 'yes' + network_client = NetworkManagementClient(subscription_id=SUBSCRIPTION_ID, credential=MockDefaultAzureCredential()) + network_client.public_ip_addresses.begin_create_or_update(public_ip_address_name='test', location='useast', + public_ip_address_version='IPv4', + public_ip_allocation_method='Static', + tags={'DaysCount': f'{CURRENT_DATE}@7'}) + ip_unattached = IpUnattached() + response = ip_unattached.run() + assert len(response) == 1 + assert response[0]['CleanUpDays'] == 0 + assert response[0]['ResourceState'] == 'disassociated' diff --git a/tests/unittest/cloud_governance/policy/azure/test_unattached_volume.py b/tests/unittest/cloud_governance/policy/azure/test_unattached_volume.py index b1e75f0b..2b66c5cd 100644 --- a/tests/unittest/cloud_governance/policy/azure/test_unattached_volume.py +++ b/tests/unittest/cloud_governance/policy/azure/test_unattached_volume.py @@ -4,7 +4,7 @@ from cloud_governance.main.environment_variables import environment_variables from cloud_governance.policy.azure.cleanup.unattached_volume import UnattachedVolume -from tests.unittest.mocks.azure.mock_compute import MockDisk, MockAzure, MockVirtualMachine +from tests.unittest.mocks.azure.mock_computes import MockDisk, MockAzure, MockVirtualMachine def test_unattached_volume_dry_run_yes_0_unattached(): diff --git a/tests/unittest/configs.py b/tests/unittest/configs.py new file mode 100644 index 00000000..1e13b99e --- /dev/null +++ b/tests/unittest/configs.py @@ -0,0 +1,22 @@ +import datetime + + +# Common Variables + +DRY_RUN_YES = 'yes' +DRY_RUN_NO = 'no' +CURRENT_DATE = datetime.datetime.utcnow().date() + + +# AWS +AWS_DEFAULT_REGION = 'us-west-2' +DEFAULT_AMI_ID = 'ami-03cf127a' +INSTANCE_TYPE = 't2.micro' + + +# Azure +SUBSCRIPTION_ID = 'unitest-subscription' +RESOURCE_GROUP = 'unittest' +SUB_ID = f'/subscription/{SUBSCRIPTION_ID}/resourceGroups/{RESOURCE_GROUP}' +NETWORK_PROVIDER = f'providers/Microsoft.Network' +COMPUTE_PROVIDER = 'providers/Microsoft.Compute' diff --git a/tests/unittest/mocks/azure/common_operations.py b/tests/unittest/mocks/azure/common_operations.py new file mode 100644 index 00000000..e7004d33 --- /dev/null +++ b/tests/unittest/mocks/azure/common_operations.py @@ -0,0 +1,19 @@ +from azure.core.paging import ItemPaged + + +class Status: + + def __init__(self, success: bool): + self.__success = success + + def done(self): + return self.__success + + +class CustomItemPaged(ItemPaged): + + def __init__(self, resource_list: list = None): + super().__init__() + self._page_iterator = iter(resource_list if resource_list else []) + + diff --git a/tests/unittest/mocks/azure/mock_compute/__init__.py b/tests/unittest/mocks/azure/mock_compute/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/unittest/mocks/azure/mock_compute/mock_compute.py b/tests/unittest/mocks/azure/mock_compute/mock_compute.py new file mode 100644 index 00000000..7eb70f8c --- /dev/null +++ b/tests/unittest/mocks/azure/mock_compute/mock_compute.py @@ -0,0 +1,28 @@ +from functools import wraps +from unittest.mock import patch + +from azure.mgmt.compute import ComputeManagementClient + +from tests.unittest.mocks.azure.mock_compute.mock_virtual_machines_operations import MockVirtualMachinesOperations +from tests.unittest.mocks.azure.mock_network.mock_public_ip_address_operations import MockPublicIpAddressOperations + + +def mock_compute(method): + """ + This method is mocking for Jira class methods which are used in Jira Operations @param method: + @return: + """ + + @wraps(method) + def method_wrapper(*args, **kwargs): + """ + This is the wrapper method to wraps the method inside the function + @param args: + @param kwargs: + @return: + """ + with patch.object(ComputeManagementClient, 'virtual_machines', MockVirtualMachinesOperations()): + result = method(*args, **kwargs) + return result + + return method_wrapper diff --git a/tests/unittest/mocks/azure/mock_compute/mock_virtual_machines_operations.py b/tests/unittest/mocks/azure/mock_compute/mock_virtual_machines_operations.py new file mode 100644 index 00000000..ea10d1a2 --- /dev/null +++ b/tests/unittest/mocks/azure/mock_compute/mock_virtual_machines_operations.py @@ -0,0 +1,45 @@ +from azure.mgmt.compute.v2023_03_01.models import VirtualMachine, HardwareProfile + +from tests.unittest.mocks.azure.common_operations import CustomItemPaged +from tests.unittest.configs import CURRENT_DATE, SUB_ID, COMPUTE_PROVIDER + + +class MockVirtualMachine(VirtualMachine): + + def __init__(self, vm_name: str, *args, **kwargs): + super().__init__(*args, **kwargs) + self.vm_name = vm_name + + if not kwargs.get('time_created'): + self.time_created = CURRENT_DATE.__str__() + if not kwargs.get('hardware_profile'): + self.hardware_profile = HardwareProfile(vm_size='Standard_D2s_v3') + + +class MockVirtualMachinesOperations: + + VIRTUAL_MACHINE = 'virtualMachines' + + def __init__(self): + self.__virtual_machines = {} + + def begin_create_or_update(self, vm_name: str, **kwargs): + """ + This method create or update the virtual_machine + :param vm_name: + :type vm_name: + :param kwargs: + :type kwargs: + :return: + :rtype: + """ + virtual_machine_id = f'{SUB_ID}/{COMPUTE_PROVIDER}/{self.VIRTUAL_MACHINE}/{vm_name}' + self.__virtual_machines[vm_name] = MockVirtualMachine(vm_name=vm_name, id=virtual_machine_id, **kwargs) + + def list_all(self): + """ + This method list all virtual machines + :return: + :rtype: + """ + return CustomItemPaged(resource_list=list(self.__virtual_machines.values())) diff --git a/tests/unittest/mocks/azure/mock_compute.py b/tests/unittest/mocks/azure/mock_computes.py similarity index 84% rename from tests/unittest/mocks/azure/mock_compute.py rename to tests/unittest/mocks/azure/mock_computes.py index d83660e4..6b6bdda5 100644 --- a/tests/unittest/mocks/azure/mock_compute.py +++ b/tests/unittest/mocks/azure/mock_computes.py @@ -1,11 +1,12 @@ import uuid from datetime import datetime -from azure.core.paging import ItemPaged from azure.mgmt.compute.v2023_01_02.models import Disk, DiskSku from azure.mgmt.compute.v2023_03_01.models import VirtualMachine, HardwareProfile, VirtualMachineInstanceView, \ InstanceViewStatus +from tests.unittest.mocks.azure.common_operations import CustomItemPaged + class MockVirtualMachine(VirtualMachine): @@ -39,16 +40,9 @@ def __init__(self, status1: str = "Unknown", status2: str = 'Vm Running'): ] -class CustomItemPaged(ItemPaged): - - def __init__(self, resource_list: list = None): - super().__init__() - self._page_iterator = iter(resource_list if resource_list else []) - - class MockAzure: - def __init__(self, vms: list = None, disks: list = [], status1: str = "Unknown", status2: str = 'Vm Running'): + def __init__(self, vms: list = None, disks: list = None, status1: str = "Unknown", status2: str = 'Vm Running'): self.vms = vms if vms else [] self.status1 = status1 self.status2 = status2 diff --git a/tests/unittest/mocks/azure/mock_identity/__init__.py b/tests/unittest/mocks/azure/mock_identity/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/unittest/mocks/azure/mock_identity/mock_default_credential.py b/tests/unittest/mocks/azure/mock_identity/mock_default_credential.py new file mode 100644 index 00000000..c42bf775 --- /dev/null +++ b/tests/unittest/mocks/azure/mock_identity/mock_default_credential.py @@ -0,0 +1,13 @@ +from azure.core.credentials import AccessToken +from azure.identity import DefaultAzureCredential + +from tests.unittest.configs import CURRENT_DATE + + +class MockDefaultAzureCredential: + + def __init__(self, **kwargs): + super().__init__(**kwargs) + + def get_token(self, *scopes: str, **kwargs) -> AccessToken: + return AccessToken(token='unittest', expires_on=CURRENT_DATE) diff --git a/tests/unittest/mocks/azure/mock_network/__init__.py b/tests/unittest/mocks/azure/mock_network/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/unittest/mocks/azure/mock_network/mock_network.py b/tests/unittest/mocks/azure/mock_network/mock_network.py new file mode 100644 index 00000000..041af964 --- /dev/null +++ b/tests/unittest/mocks/azure/mock_network/mock_network.py @@ -0,0 +1,30 @@ +from functools import wraps +from unittest.mock import patch + +from azure.mgmt.network import NetworkManagementClient + +from tests.unittest.mocks.azure.mock_network.mock_network_interface_operations import MockNetworkInterfaceOperations +from tests.unittest.mocks.azure.mock_network.mock_public_ip_address_operations import MockPublicIpAddressOperations + + + +def mock_network(method): + """ + This method is mocking for Jira class methods which are used in Jira Operations @param method: + @return: + """ + + @wraps(method) + def method_wrapper(*args, **kwargs): + """ + This is the wrapper method to wraps the method inside the function + @param args: + @param kwargs: + @return: + """ + with patch.object(NetworkManagementClient, 'public_ip_addresses', MockPublicIpAddressOperations()), \ + patch.object(NetworkManagementClient, 'network_interfaces', MockNetworkInterfaceOperations()): + result = method(*args, **kwargs) + return result + + return method_wrapper diff --git a/tests/unittest/mocks/azure/mock_network/mock_network_interface_operations.py b/tests/unittest/mocks/azure/mock_network/mock_network_interface_operations.py new file mode 100644 index 00000000..cdac06b8 --- /dev/null +++ b/tests/unittest/mocks/azure/mock_network/mock_network_interface_operations.py @@ -0,0 +1,45 @@ +from azure.mgmt.network.models import NetworkInterface + +from tests.unittest.mocks.azure.common_operations import CustomItemPaged +from tests.unittest.configs import SUB_ID, NETWORK_PROVIDER + + +class MockNetworkInterface(NetworkInterface): + + def __init__(self, **kwargs): + super().__init__(**kwargs) + if kwargs.get('virtual_machine'): + self.virtual_machine = kwargs['virtual_machine'] + + +class MockNetworkInterfaceOperations: + + NETWORK_INTERFACE = 'virtualNetworks' + + def __init__(self): + super().__init__() + self.__network_interfaces = {} + + def begin_create_or_update(self, network_interface_name: str, **kwargs): + """ + This method creates the network interface + :param network_interface_name: + :type network_interface_name: + :param kwargs: + :type kwargs: + :return: + :rtype: + """ + network_interface_id = f'{SUB_ID}/{NETWORK_PROVIDER}/{self.NETWORK_INTERFACE}/{network_interface_name}' + network_interface = MockNetworkInterface(network_interface_name=network_interface_name, id=network_interface_id, + **kwargs) + self.__network_interfaces[network_interface_name] = network_interface + return network_interface + + def list_all(self): + """ + This method returns all the network interfaces + :return: + :rtype: + """ + return CustomItemPaged(list(self.__network_interfaces.values())) diff --git a/tests/unittest/mocks/azure/mock_network/mock_public_ip_address_operations.py b/tests/unittest/mocks/azure/mock_network/mock_public_ip_address_operations.py new file mode 100644 index 00000000..be75cd72 --- /dev/null +++ b/tests/unittest/mocks/azure/mock_network/mock_public_ip_address_operations.py @@ -0,0 +1,55 @@ +from azure.mgmt.network.models import PublicIPAddress + +from tests.unittest.mocks.azure.common_operations import CustomItemPaged, Status +from tests.unittest.configs import SUB_ID, NETWORK_PROVIDER + + +class MockPublicIpAddress(PublicIPAddress): + + def __init__(self, name: str, tags: dict = None, *args, **kwargs): + super().__init__(*args, **kwargs) + if not tags: + tags = {} + self.name = name + self.tags = tags + if kwargs.get('ip_configuration'): + self.ip_configuration = kwargs.get('ip_configuration') + + +class MockPublicIpAddressOperations: + + PUBLIC_IP_ADDRESS = 'publicIPAddresses' + + def __init__(self): + self.__public_ipv4s = {} + + def begin_create_or_update(self, public_ip_address_name: str, **kwargs): + """ + This method creates the public ipv4 address + :param public_ip_address_name: + :param kwargs: + """ + public_ip_id = f'{SUB_ID}/{NETWORK_PROVIDER}/{self.PUBLIC_IP_ADDRESS}/{public_ip_address_name}' + ip_address = MockPublicIpAddress(name=public_ip_address_name, id=public_ip_id, **kwargs) + self.__public_ipv4s[public_ip_address_name] = ip_address + return ip_address + + def list_all(self): + """ + This method list all public ips + :return: + :rtype: + """ + return CustomItemPaged(resource_list=list(self.__public_ipv4s.values())) + + def begin_delete(self, resource_group_name: str, public_ip_address_name: str, **kwargs): + """ + This method release the public ip + :return: + :rtype: + """ + self.__public_ipv4s.pop(public_ip_address_name) + deleted = False + if public_ip_address_name not in self.__public_ipv4s: + deleted = True + return Status(deleted)