-
Notifications
You must be signed in to change notification settings - Fork 15
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Updated the zombie_snapshots to new method
- Loading branch information
Showing
4 changed files
with
281 additions
and
36 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
221 changes: 221 additions & 0 deletions
221
tests/unittest/cloud_governance/policy/aws/test_zombie_snapshots.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,221 @@ | ||
import os | ||
from datetime import datetime | ||
|
||
import boto3 | ||
from moto import mock_ec2 | ||
|
||
from cloud_governance.common.clouds.aws.utils.common_methods import get_tag_value_from_tags | ||
from cloud_governance.main.environment_variables import environment_variables | ||
from cloud_governance.policy.aws.zombie_snapshots import ZombieSnapshots | ||
from tests.unittest.configs import DRY_RUN_YES, AWS_DEFAULT_REGION, INSTANCE_TYPE_T2_MICRO, DEFAULT_AMI_ID, \ | ||
TEST_USER_NAME, DRY_RUN_NO | ||
|
||
os.environ['AWS_DEFAULT_REGION'] = 'us-east-2' | ||
os.environ['dry_run'] = 'no' | ||
|
||
|
||
@mock_ec2 | ||
def test_zombie_snapshots(): | ||
""" | ||
This method tests lists of the ami related snapshots | ||
@return: | ||
""" | ||
environment_variables.environment_variables_dict['dry_run'] = DRY_RUN_YES | ||
environment_variables.environment_variables_dict['AWS_DEFAULT_REGION'] = AWS_DEFAULT_REGION | ||
environment_variables.environment_variables_dict['policy'] = 'zombie_snapshots' | ||
tags = [{'Key': 'User', 'Value': TEST_USER_NAME}] | ||
ec2_client = boto3.client('ec2', region_name=AWS_DEFAULT_REGION) | ||
|
||
# delete default snapshots and images | ||
snapshots = ec2_client.describe_snapshots(OwnerIds=['self'])['Snapshots'] | ||
images = ec2_client.describe_images()['Images'] | ||
for image in images: | ||
ec2_client.deregister_image(ImageId=image.get('ImageId')) | ||
for snapshot in snapshots: | ||
ec2_client.delete_snapshot(SnapshotId=snapshot.get('SnapshotId')) | ||
|
||
# create infra | ||
instance_id = ec2_client.run_instances(ImageId=DEFAULT_AMI_ID, InstanceType=INSTANCE_TYPE_T2_MICRO, | ||
MaxCount=1, MinCount=1, | ||
TagSpecifications=[{'ResourceType': 'instance', 'Tags': tags}] | ||
)['Instances'][0]['InstanceId'] | ||
image_id = ec2_client.create_image(InstanceId=instance_id, Name=TEST_USER_NAME, | ||
TagSpecifications=[{'ResourceType': 'image', 'Tags': tags}]).get('ImageId') | ||
snapshot_id = (ec2_client.describe_images(ImageIds=[image_id])['Images'][0].get('BlockDeviceMappings')[0] | ||
.get('Ebs').get('SnapshotId')) | ||
ec2_client.create_tags(Resources=[snapshot_id], Tags=tags) | ||
ec2_client.deregister_image(ImageId=image_id) | ||
ec2_client.terminate_instances(InstanceIds=[instance_id]) | ||
|
||
# run zombie_snapshots | ||
zombie_snapshots = ZombieSnapshots() | ||
response = zombie_snapshots.run() | ||
assert len(ec2_client.describe_snapshots(OwnerIds=['self'])['Snapshots']) == 1 | ||
assert len(response) == 1 | ||
assert response[0]['CleanUpDays'] == 0 | ||
assert get_tag_value_from_tags(tags=ec2_client.describe_snapshots(OwnerIds=['self'], | ||
SnapshotIds=[snapshot_id])['Snapshots'][0]['Tags'], | ||
tag_name='DaysCount') | ||
|
||
|
||
@mock_ec2 | ||
def test_zombie_snapshots_delete(): | ||
""" | ||
This method tests delete of the ami related snapshots | ||
@return: | ||
""" | ||
environment_variables.environment_variables_dict['dry_run'] = DRY_RUN_NO | ||
environment_variables.environment_variables_dict['AWS_DEFAULT_REGION'] = AWS_DEFAULT_REGION | ||
environment_variables.environment_variables_dict['policy'] = 'zombie_snapshots' | ||
tags = [{'Key': 'User', 'Value': TEST_USER_NAME}, | ||
{'Key': 'DaysCount', 'Value': f'{datetime.utcnow().date()}@7'}] | ||
ec2_client = boto3.client('ec2', region_name=AWS_DEFAULT_REGION) | ||
|
||
# delete default snapshots and images | ||
snapshots = ec2_client.describe_snapshots(OwnerIds=['self'])['Snapshots'] | ||
images = ec2_client.describe_images()['Images'] | ||
for image in images: | ||
ec2_client.deregister_image(ImageId=image.get('ImageId')) | ||
for snapshot in snapshots: | ||
ec2_client.delete_snapshot(SnapshotId=snapshot.get('SnapshotId')) | ||
|
||
# create infra | ||
instance_id = ec2_client.run_instances(ImageId=DEFAULT_AMI_ID, InstanceType=INSTANCE_TYPE_T2_MICRO, | ||
MaxCount=1, MinCount=1, | ||
TagSpecifications=[{'ResourceType': 'instance', 'Tags': tags}] | ||
)['Instances'][0]['InstanceId'] | ||
image_id = ec2_client.create_image(InstanceId=instance_id, Name=TEST_USER_NAME, | ||
TagSpecifications=[{'ResourceType': 'image', 'Tags': tags}]).get('ImageId') | ||
snapshot_id = (ec2_client.describe_images(ImageIds=[image_id])['Images'][0].get('BlockDeviceMappings')[0] | ||
.get('Ebs').get('SnapshotId')) | ||
ec2_client.create_tags(Resources=[snapshot_id], Tags=tags) | ||
ec2_client.deregister_image(ImageId=image_id) | ||
ec2_client.terminate_instances(InstanceIds=[instance_id]) | ||
|
||
# run zombie_snapshots | ||
zombie_snapshots = ZombieSnapshots() | ||
response = zombie_snapshots.run() | ||
assert len(ec2_client.describe_snapshots(OwnerIds=['self'])['Snapshots']) == 0 | ||
assert len(response) == 1 | ||
|
||
|
||
@mock_ec2 | ||
def test_zombie_snapshots_skip(): | ||
""" | ||
This method tests skip delete of the ami related snapshots | ||
@return: | ||
""" | ||
environment_variables.environment_variables_dict['dry_run'] = DRY_RUN_NO | ||
environment_variables.environment_variables_dict['AWS_DEFAULT_REGION'] = AWS_DEFAULT_REGION | ||
environment_variables.environment_variables_dict['policy'] = 'zombie_snapshots' | ||
tags = [{'Key': 'User', 'Value': TEST_USER_NAME}, {'Key': 'policy', 'Value': 'not-delete'}, | ||
{'Key': 'DaysCount', 'Value': f'{datetime.utcnow().date()}@7'}] | ||
ec2_client = boto3.client('ec2', region_name=AWS_DEFAULT_REGION) | ||
|
||
# delete default snapshots and images | ||
snapshots = ec2_client.describe_snapshots(OwnerIds=['self'])['Snapshots'] | ||
images = ec2_client.describe_images()['Images'] | ||
for image in images: | ||
ec2_client.deregister_image(ImageId=image.get('ImageId')) | ||
for snapshot in snapshots: | ||
ec2_client.delete_snapshot(SnapshotId=snapshot.get('SnapshotId')) | ||
|
||
# create infra | ||
instance_id = ec2_client.run_instances(ImageId=DEFAULT_AMI_ID, InstanceType=INSTANCE_TYPE_T2_MICRO, | ||
MaxCount=1, MinCount=1, | ||
TagSpecifications=[{'ResourceType': 'instance', 'Tags': tags}] | ||
)['Instances'][0]['InstanceId'] | ||
image_id = ec2_client.create_image(InstanceId=instance_id, Name=TEST_USER_NAME, | ||
TagSpecifications=[{'ResourceType': 'image', 'Tags': tags}]).get('ImageId') | ||
snapshot_id = (ec2_client.describe_images(ImageIds=[image_id])['Images'][0].get('BlockDeviceMappings')[0] | ||
.get('Ebs').get('SnapshotId')) | ||
ec2_client.create_tags(Resources=[snapshot_id], Tags=tags) | ||
ec2_client.deregister_image(ImageId=image_id) | ||
ec2_client.terminate_instances(InstanceIds=[instance_id]) | ||
|
||
# run zombie_snapshots | ||
zombie_snapshots = ZombieSnapshots() | ||
response = zombie_snapshots.run() | ||
assert len(ec2_client.describe_snapshots(OwnerIds=['self'])['Snapshots']) == 1 | ||
assert len(response) == 1 | ||
|
||
|
||
@mock_ec2 | ||
def test_zombie_snapshots_contains_cluster_tag(): | ||
""" | ||
This method tests snapshot having the live cluster | ||
@return: | ||
""" | ||
environment_variables.environment_variables_dict['dry_run'] = DRY_RUN_NO | ||
environment_variables.environment_variables_dict['AWS_DEFAULT_REGION'] = AWS_DEFAULT_REGION | ||
environment_variables.environment_variables_dict['policy'] = 'zombie_snapshots' | ||
tags = [{'Key': 'User', 'Value': TEST_USER_NAME}, {'Key': 'policy', 'Value': 'not-delete'}, | ||
{'Key': 'DaysCount', 'Value': f'{datetime.utcnow().date()}@7'}, | ||
{'Key': 'kubernetes.io/cluster/test-zombie-cluster', 'Value': f'owned'}] | ||
ec2_client = boto3.client('ec2', region_name=AWS_DEFAULT_REGION) | ||
|
||
# delete default snapshots and images | ||
snapshots = ec2_client.describe_snapshots(OwnerIds=['self'])['Snapshots'] | ||
images = ec2_client.describe_images()['Images'] | ||
for image in images: | ||
ec2_client.deregister_image(ImageId=image.get('ImageId')) | ||
for snapshot in snapshots: | ||
ec2_client.delete_snapshot(SnapshotId=snapshot.get('SnapshotId')) | ||
|
||
# create infra | ||
instance_id = ec2_client.run_instances(ImageId=DEFAULT_AMI_ID, InstanceType=INSTANCE_TYPE_T2_MICRO, | ||
MaxCount=1, MinCount=1, | ||
TagSpecifications=[{'ResourceType': 'instance', 'Tags': tags}] | ||
)['Instances'][0]['InstanceId'] | ||
image_id = ec2_client.create_image(InstanceId=instance_id, Name=TEST_USER_NAME, | ||
TagSpecifications=[{'ResourceType': 'image', 'Tags': tags}]).get('ImageId') | ||
snapshot_id = (ec2_client.describe_images(ImageIds=[image_id])['Images'][0].get('BlockDeviceMappings')[0] | ||
.get('Ebs').get('SnapshotId')) | ||
ec2_client.create_tags(Resources=[snapshot_id], Tags=tags) | ||
ec2_client.deregister_image(ImageId=image_id) | ||
|
||
# run zombie_snapshots | ||
zombie_snapshots = ZombieSnapshots() | ||
response = zombie_snapshots.run() | ||
assert len(ec2_client.describe_snapshots(OwnerIds=['self'])['Snapshots']) == 1 | ||
assert len(response) == 0 | ||
|
||
|
||
@mock_ec2 | ||
def test_zombie_snapshots_no_zombies(): | ||
""" | ||
This method tests snapshot having the active AMI | ||
@return: | ||
""" | ||
environment_variables.environment_variables_dict['dry_run'] = DRY_RUN_NO | ||
environment_variables.environment_variables_dict['AWS_DEFAULT_REGION'] = AWS_DEFAULT_REGION | ||
environment_variables.environment_variables_dict['policy'] = 'zombie_snapshots' | ||
tags = [{'Key': 'User', 'Value': TEST_USER_NAME}, {'Key': 'policy', 'Value': 'not-delete'}, | ||
{'Key': 'DaysCount', 'Value': f'{datetime.utcnow().date()}@7'}, | ||
{'Key': 'kubernetes.io/cluster/test-zombie-cluster', 'Value': f'owned'}] | ||
ec2_client = boto3.client('ec2', region_name=AWS_DEFAULT_REGION) | ||
|
||
# delete default snapshots and images | ||
snapshots = ec2_client.describe_snapshots(OwnerIds=['self'])['Snapshots'] | ||
images = ec2_client.describe_images()['Images'] | ||
for image in images: | ||
ec2_client.deregister_image(ImageId=image.get('ImageId')) | ||
for snapshot in snapshots: | ||
ec2_client.delete_snapshot(SnapshotId=snapshot.get('SnapshotId')) | ||
|
||
# create infra | ||
instance_id = ec2_client.run_instances(ImageId=DEFAULT_AMI_ID, InstanceType=INSTANCE_TYPE_T2_MICRO, | ||
MaxCount=1, MinCount=1, | ||
TagSpecifications=[{'ResourceType': 'instance', 'Tags': tags}] | ||
)['Instances'][0]['InstanceId'] | ||
image_id = ec2_client.create_image(InstanceId=instance_id, Name=TEST_USER_NAME, | ||
TagSpecifications=[{'ResourceType': 'image', 'Tags': tags}]).get('ImageId') | ||
snapshot_id = (ec2_client.describe_images(ImageIds=[image_id])['Images'][0].get('BlockDeviceMappings')[0] | ||
.get('Ebs').get('SnapshotId')) | ||
ec2_client.create_tags(Resources=[snapshot_id], Tags=tags) | ||
|
||
# run zombie_snapshots | ||
zombie_snapshots = ZombieSnapshots() | ||
response = zombie_snapshots.run() | ||
assert len(ec2_client.describe_snapshots(OwnerIds=['self'])['Snapshots']) == 1 | ||
assert len(response) == 0 |