diff --git a/cloud_governance/common/elasticsearch/elasticsearch_operations.py b/cloud_governance/common/elasticsearch/elasticsearch_operations.py index 5281dd30..d23b4864 100644 --- a/cloud_governance/common/elasticsearch/elasticsearch_operations.py +++ b/cloud_governance/common/elasticsearch/elasticsearch_operations.py @@ -38,7 +38,7 @@ def __init__(self, es_host: str = None, es_port: str = None, region: str = '', b self.__account = self.__environment_variables_dict.get('account') try: self.__es = Elasticsearch([{'host': self.__es_host, 'port': self.__es_port}], timeout=self.__timeout, max_retries=2) - except: + except Exception as err: self.__es = None def __elasticsearch_get_index_hits(self, index: str, uuid: str = '', workload: str = '', fast_check: bool = False, @@ -343,3 +343,25 @@ def check_elastic_search_connection(self): if self.__es: return self.__es.ping() return False + + def post_query(self, query: dict, es_index: str, result_agg: bool = False): + """ + This method returns the es data + :param result_agg: + :type result_agg: + :param query: + :type query: + :param es_index: + :type es_index: + :return: + :rtype: + """ + try: + response = self.__es.search(index=es_index, body=query) + if result_agg: + return response.get('aggregations') + else: + return response.get('hits', {}).get('hits', {}) + except Exception as err: + logger.error(err) + raise err diff --git a/cloud_governance/common/mails/mail_message.py b/cloud_governance/common/mails/mail_message.py index 436b5d0f..a62909c6 100644 --- a/cloud_governance/common/mails/mail_message.py +++ b/cloud_governance/common/mails/mail_message.py @@ -503,3 +503,19 @@ def cro_monitor_budget_remain_high_alert(self, ticket_id: str, budget: int, user 'footer': self.FOOTER} body = template_loader.render(context) return subject, body + + def get_policy_alert_message(self, policy_data: list, user: str = ''): + """ + This method returns the policy alert message + :return: + :rtype: + """ + if user: + display_name = self.get_user_ldap_details(user_name=user) + user = display_name if display_name else user + subject = f"Cloud Governance: {self.account} Policy Alerts" + template_loader = self.env_loader.get_template('policy_alert_agg_message.j2') + columns = ['User', 'PublicCloud', 'policy', 'RegionName', 'ResourceId', 'Name', 'DeleteDate'] + context = {'records': policy_data, 'columns': columns, 'User': user, 'account': self.account, 'cloud_name': self.__public_cloud_name} + body = template_loader.render(context) + return subject, body diff --git a/cloud_governance/common/mails/postfix.py b/cloud_governance/common/mails/postfix.py index 4cc9191c..eeb2fe0e 100644 --- a/cloud_governance/common/mails/postfix.py +++ b/cloud_governance/common/mails/postfix.py @@ -6,6 +6,7 @@ from cloud_governance.common.clouds.aws.s3.s3_operations import S3Operations from cloud_governance.common.elasticsearch.elasticsearch_operations import ElasticSearchOperations +from cloud_governance.common.ldap.ldap_search import LdapSearch from cloud_governance.common.logger.init_logger import logger # https://github.com/redhat-performance/quads/blob/master/quads/tools/postman.py @@ -28,6 +29,8 @@ class Postfix: def __init__(self): self.__environment_variables_dict = environment_variables.environment_variables_dict + self.__LDAP_HOST_NAME = self.__environment_variables_dict.get('LDAP_HOST_NAME') + self.__ldap_search = LdapSearch(ldap_host_name=self.__LDAP_HOST_NAME) self.reply_to = self.__environment_variables_dict.get('REPLY_TO', 'dev-null@redhat.com') self.__es_host = self.__environment_variables_dict.get('es_host', '') self.__policy = self.__environment_variables_dict.get('policy', '') @@ -36,7 +39,7 @@ def __init__(self): self.__policy_output = self.__environment_variables_dict.get('policy_output', '') self.__default_admins = self.__environment_variables_dict.get('DEFAULT_ADMINS') self.__email_alert = self.__environment_variables_dict.get('EMAIL_ALERT') - self.__mail_to = self.__environment_variables_dict.get('EMAIL_TO') # testing purposes + self.__mail_to = self.__environment_variables_dict.get('EMAIL_TO') self.__mail_cc = self.__environment_variables_dict.get('EMAIL_CC') self.bucket_name, self.key = self.get_bucket_name() self.__es_index = 'cloud-governance-mail-messages' @@ -62,6 +65,8 @@ def send_email_postfix(self, subject: str, to: any, cc: list, content: str, **kw to = self.__mail_to if self.__mail_cc: cc = self.__mail_cc + if not self.__ldap_search.get_user_details(user_name=to): + cc.append('athiruma@redhat.com') cc = [cc_user for cc_user in cc if to and to not in cc_user] cc = [cc_user if '@redhat.com' in cc_user else f'{cc_user}@redhat.com' for cc_user in cc] msg = MIMEMultipart('alternative') @@ -115,7 +120,7 @@ def send_email_postfix(self, subject: str, to: any, cc: list, content: str, **kw if kwargs.get('extra_purse'): data['extra_purse'] = round(kwargs['extra_purse'], 3) if self.__es_host: - self.__es_operations.upload_to_elasticsearch(data=data, index=self.__es_index) + # self.__es_operations.upload_to_elasticsearch(data=data, index=self.__es_index) logger.warn(f'Uploaded to es index: {self.__es_index}') else: logger.warn('Error missing the es_host') diff --git a/cloud_governance/common/mails/templates/policy_alert_agg_message.j2 b/cloud_governance/common/mails/templates/policy_alert_agg_message.j2 new file mode 100644 index 00000000..428731b3 --- /dev/null +++ b/cloud_governance/common/mails/templates/policy_alert_agg_message.j2 @@ -0,0 +1,84 @@ + + + + + +
+ Hi {{ User }}, +
+
+
+

You can find below your unused resources in the {{ cloud_name }} account ({{ account }}).

+

If you want to keep them, please add "Policy=Not_Delete" or "Policy=skip" tag for each resource

+
+ + + + {% for col in columns %} + + {% endfor %} + + + + {% for record in records %} + + {% for col in columns %} + {% if col == 'Action' %} + {% if record['ResourceDelete'] == "True" or record['ResourceStopped'] == "True" %} + + {% else %} + + {% endif %} + {% else %} + {% if col == 'PublicCloud' %} + + {% elif col == 'RegionName' %} + + {% elif 'kubernetes.io/cluster' in record[col] %} + + {% else %} + + {% endif %} + {% endif %} + {% endfor %} + + {% endfor %} + +
{{ col | title }}
{{ "Deleted" }}{{ "Alert" }}{{ record[col] or 'AWS' }}{{ record[col] or record['region_name'] }}{{ record[col].split('/')[-1] }}{{ record[col] or 'NA' }}
+
+ + + diff --git a/cloud_governance/main/environment_variables.py b/cloud_governance/main/environment_variables.py index 2448ebdb..2b634c82 100644 --- a/cloud_governance/main/environment_variables.py +++ b/cloud_governance/main/environment_variables.py @@ -195,6 +195,7 @@ def __init__(self): self._environment_variables_dict['DEFAULT_ADMINS'] = literal_eval(EnvironmentVariables.get_env('DEFAULT_ADMINS', '[]')) self._environment_variables_dict['KERBEROS_USERS'] = literal_eval(EnvironmentVariables.get_env('KERBEROS_USERS', '[]')) self._environment_variables_dict['POLICIES_TO_ALERT'] = literal_eval(EnvironmentVariables.get_env('POLICIES_TO_ALERT', '[]')) + self._environment_variables_dict['ONLY_ADMINS'] = EnvironmentVariables.get_boolean_from_environment('ONLY_ADMINS', False) if self._environment_variables_dict.get('policy') in ['send_aggregated_alerts']: self._environment_variables_dict['COMMON_POLICIES'] = True # CRO -- Cloud Resource Orch diff --git a/cloud_governance/policy/common_policies/send_aggregated_alerts.py b/cloud_governance/policy/common_policies/send_aggregated_alerts.py index 820f53e0..7d67c17e 100644 --- a/cloud_governance/policy/common_policies/send_aggregated_alerts.py +++ b/cloud_governance/policy/common_policies/send_aggregated_alerts.py @@ -1,16 +1,8 @@ import json -import logging -import os -import tempfile from datetime import date, datetime, timedelta -import typeguard -from botocore.exceptions import ClientError - -from cloud_governance.common.clouds.aws.ec2.ec2_operations import EC2Operations -from cloud_governance.common.clouds.aws.s3.s3_operations import S3Operations -from cloud_governance.common.jira.jira import logger -from cloud_governance.common.logger.init_logger import handler +import pandas +from cloud_governance.common.elasticsearch.elasticsearch_operations import ElasticSearchOperations from cloud_governance.common.logger.logger_time_stamp import logger_time_stamp from cloud_governance.common.mails.mail_message import MailMessage from cloud_governance.common.mails.postfix import Postfix @@ -22,138 +14,182 @@ class SendAggregatedAlerts: This class send alerts to users which conditions are not satisfied by the policies """ - FILE_NAME = 'resources.json' - GLOBAL_REGION = 'us-east-1' - TODAY_DATE = str(date.today()).replace('-', '/') - def __init__(self): self.__environment_variables = environment_variables.environment_variables_dict - self.__bucket_name = self.__environment_variables.get('BUCKET_NAME') - self.__bucket_key = self.__environment_variables.get('BUCKET_KEY') - self.__policies = self.__environment_variables.get('POLICIES_TO_ALERT') - self.__s3_operations = S3Operations(region_name='us-east-2', bucket=self.__bucket_name, logs_bucket_key=self.__bucket_key) - self.__active_regions = EC2Operations().get_active_regions() - self.__kerberos_users = self.__get_kerberos_users_for_iam_users() - self.__global_region_policies = ['s3-inactive', 'empty-roles'] - self.__mail_alert_days = self.__environment_variables.get('MAIL_ALERT_DAYS') - self.__policy_action_days = self.__environment_variables.get('POLICY_ACTIONS_DAYS') + self.__days_to_delete_resource = int(self.__environment_variables.get('DAYS_TO_DELETE_RESOURCE')) + self.__mail_to = self.__environment_variables.get('EMAIL_TO') # testing purposes + self.__mail_cc = self.__environment_variables.get('EMAIL_CC', []) self.__mail_message = MailMessage() self.__postfix = Postfix() + self.__es_operations = ElasticSearchOperations() - @logger_time_stamp - def __get_kerberos_users_for_iam_users(self): - """ - This method returns the users which IAM users are not kerberos username - :return: - """ - responses = {} - users = self.__environment_variables.get('KERBEROS_USERS') - if users: - for iam_user, kerberos_user in users.items(): - responses[iam_user.lower()] = kerberos_user.lower() - return responses - - def __get_users_agg_result(self, policy_result: list, agg_users_result: dict, policy_name: str, region: str): + def __get_es_data(self): """ - This method returns the aggregated users resources list - :param agg_users_result: - :param policy_result: + This method returns the current day policy data from the elastic_search database :return: - """ - if policy_result: - for response in policy_result: - if type(response) == dict: - skip_policy = response.get('Skip') - if skip_policy in ('NA', '', None): - user = response.pop('User').lower() - response['Region'] = region - response['Policy'] = policy_name - if user in self.__kerberos_users.keys(): - user = self.__kerberos_users.get(user) - agg_users_result.setdefault(user, []).append(response) + :rtype: + """ + current_date = (datetime.utcnow().date() - timedelta(days=1)).__str__() + policy_es_index = self.__environment_variables.get('es_index') + account_name = (self.__environment_variables.get('account', '').upper() + .replace('OPENSHIFT-', '') + .replace('OPENSHIFT', '').strip()) + query = { + "size": 10000, + "query": { + "bool": { + "must": [ + { + "term": { + "account.keyword": { + "value": account_name + } + } + } + ], + "must_not": [ + { + "terms": { + "policy.keyword": [ + "ebs_in_use", "zombie_cluster_resource", + "instance_run", "cluster_run", "optimize_resource_report", + "optimize_resources_report", "skipped_resources" + ] + } + } + ], + "filter": [ + { + "range": { + "timestamp": { + "format": "yyyy-MM-dd", + "lte": current_date, + "gte": current_date + } + } + } + ] + } + } + } + # print(json.dumps(query, indent=4)) + records = self.__es_operations.post_query(query=query, es_index=policy_es_index) + return [record.get('_source') for record in records] - def __get_policy_data_in_bucket(self, region: str, policy: str): + def __remove_duplicates(self, policy_es_data: list): """ - This method returns the policy data in s3 bucket - :param region: - :param policy: + This method removes the duplicate data :return: - """ - try: - policy_save_path = f'{self.__bucket_key}/{region}/{policy}' - bucket_path_file = self.__s3_operations.get_last_objects(bucket=self.__bucket_name, key_prefix=f'{policy_save_path}/{self.TODAY_DATE}') - if bucket_path_file: - policy_s3_response = self.__s3_operations.get_last_s3_policy_content(s3_file_path=bucket_path_file, file_name=self.FILE_NAME) - return json.loads(policy_s3_response) if policy_s3_response else [] - else: - logger.warn(f"No file_path: {policy_save_path}/{self.TODAY_DATE} exists in s3 bucket") - except ClientError as err: - logger.info(err) - return [] - return [] + :rtype: + """ + if policy_es_data: + df = pandas.DataFrame(policy_es_data) + df.sort_values(inplace=True, by=['policy']) + df.fillna(value='', inplace=True) + df.drop_duplicates(subset='ResourceId', inplace=True) + return df.to_dict(orient="records") + return policy_es_data - @logger_time_stamp - def __get_policy_users_list(self): + def __group_by_policy(self, policy_data: list): """ - This method gets the latest policy responses + This method returns the data grouped by policy + :param policy_data: + :type policy_data: :return: - """ - agg_users_result = {} - for policy in self.__policies: - run_global_region = True if policy in self.__global_region_policies else False - for region in self.__active_regions: - if (region == self.GLOBAL_REGION and run_global_region) or not run_global_region: - self.__get_users_agg_result(policy_result=self.__get_policy_data_in_bucket(region=region, policy=policy), - agg_users_result=agg_users_result, policy_name=policy, region=region) - if region == self.GLOBAL_REGION and run_global_region: - break - return agg_users_result + :rtype: + """ + policy_group_data = {} + for record in policy_data: + policy_group_data.setdefault(record.get('policy', 'NA'), []).append(record) + policy_data_list = [] + for _, values in policy_group_data.items(): + policy_data_list.extend(values) + return policy_data_list - def __get_policy_agg_data_by_region(self, policy_data: dict): + def __group_by_user(self, policy_data: list): """ - This method returns the policy data agg by region + This method returns the data grouped by user files :param policy_data: + :type policy_data: :return: + :rtype: """ - agg_policy_region_result = {} - for policy_name, policy_region_data in policy_data.items(): - agg_policy_region_result[policy_name] = {} - for region_data in policy_region_data: - region_name = region_data.get('Region').lower() - agg_policy_region_result[policy_name].setdefault(region_name, []).append(region_data) - return agg_policy_region_result + user_data = {} + for record in policy_data: + user_data.setdefault(record.get('User', 'NA'), []).append(record) + return user_data - @logger_time_stamp - def __get_policy_agg_data(self, user_policy_data: list): + def __update_delete_days(self, policy_es_data: list): """ - This method returns the data agg by policy - :param user_policy_data: + This method returns the resource delete date + :param policy_es_data: + :type policy_es_data: :return: - """ - agg_policy_result = {} - for result in user_policy_data: - policy_name = result.get('Policy').lower() - days = int(result.get('Days', 0)) - if days in self.__mail_alert_days or days in self.__policy_action_days: - result['Action'] = 'Deleted' if days in self.__policy_action_days else 'Monitoring' - result['DeletedDay'] = (datetime.now() + timedelta(days=self.__policy_action_days[0] - days)).date() - agg_policy_result.setdefault(policy_name, []).append(result) - return self.__get_policy_agg_data_by_region(policy_data=agg_policy_result) + :rtype: + """ + filtered_policy_es_data = [] + for record in policy_es_data: + try: + days = record.get('ClusterResourcesCount') + if not days: + days = record.get('CleanUpDays') + if not days: + days = record.get('Days') + if not days: + days = record.get('StoppedDays') + if days: + days = int(days) + if not days: + days = 0 + alert_user = False + delete_date = '' + if self.__days_to_delete_resource - 5 == days: + delete_date = (datetime.utcnow() + timedelta(days=5)).date() + alert_user = True + elif days == self.__days_to_delete_resource - 3: + delete_date = (datetime.utcnow() + timedelta(days=3)).date() + alert_user = True + else: + if days >= self.__days_to_delete_resource: + delete_date = datetime.utcnow().date().__str__() + if days > self.__days_to_delete_resource: + if not record.get('Skip'): + record['Skip'] = 'NA' + if record.get('Skip') != 'NA': + delete_date = 'skip delete' + else: + delete_date = 'dry_run=yes' + alert_user = True + if alert_user: + record['DeleteDate'] = delete_date.__str__() + if record.get('policy') in ['empty_roles', 's3_inactive']: + record['RegionName'] = 'us-east-1' + filtered_policy_es_data.append(record) + except Exception as err: + raise err + return filtered_policy_es_data - @logger_time_stamp - def __send_mail_alerts_to_users(self): + def __send_aggregate_email_by_es_data(self): """ - This method send mail alerts to users + This method sends an alert using the elasticsearch data :return: - """ - policy_agg_users_list = self.__get_policy_users_list() - for user, user_policy_data in policy_agg_users_list.items(): - handler.setLevel(logging.WARN) - agg_policy_data = self.__get_policy_agg_data(user_policy_data=user_policy_data) - if agg_policy_data: - handler.setLevel(logging.INFO) - subject, body = self.__mail_message.get_agg_policies_mail_message(user=user, user_resources=agg_policy_data) - self.__postfix.send_email_postfix(subject=subject, content=body, to=user, cc=[], mime_type='html') + :rtype: + """ + policy_es_data = self.__get_es_data() + policy_es_data = self.__remove_duplicates(policy_es_data=policy_es_data) + policy_es_data = self.__update_delete_days(policy_es_data) + if self.__environment_variables.get('ONLY_ADMINS', ''): + group_by_policy = self.__group_by_policy(policy_data=policy_es_data) + if group_by_policy: + subject, body = self.__mail_message.get_policy_alert_message(policy_data=group_by_policy) + self.__postfix.send_email_postfix(subject=subject, content=body, to=self.__mail_to, cc=[], mime_type='html') + else: + user_policy_data = self.__group_by_user(policy_data=policy_es_data) + for user, user_records in user_policy_data.items(): + if user_records: + subject, body = self.__mail_message.get_policy_alert_message(policy_data=user_records, user=user) + self.__postfix.send_email_postfix(subject=subject, content=body, to=user, cc=[], + mime_type='html') @logger_time_stamp def run(self): @@ -161,4 +197,4 @@ def run(self): This method start the other methods :return: """ - self.__send_mail_alerts_to_users() + self.__send_aggregate_email_by_es_data() diff --git a/tests/unittest/cloud_governance/common/elasticsearch/test_elasticsearch_operations.py b/tests/unittest/cloud_governance/common/elasticsearch/test_elasticsearch_operations.py index 71a33107..17982767 100644 --- a/tests/unittest/cloud_governance/common/elasticsearch/test_elasticsearch_operations.py +++ b/tests/unittest/cloud_governance/common/elasticsearch/test_elasticsearch_operations.py @@ -1,6 +1,8 @@ import datetime from cloud_governance.common.elasticsearch.elasticsearch_operations import ElasticSearchOperations +from tests.unittest.configs import ES_INDEX, TEST_INDEX_ID +from tests.unittest.mocks.elasticsearch.mock_elasticsearch import mock_elasticsearch def test_missing_datetime(): @@ -29,3 +31,28 @@ def test_missing_param_value(): assert TypeError +@mock_elasticsearch +def test_post_query(): + """ + This method tests the elasticsearch post query + :return: + :rtype: + """ + es_index = ES_INDEX + es_operations = ElasticSearchOperations(es_host='localhost', es_port='9200') + # Upload data to es + es_data = { + 'index-id': TEST_INDEX_ID, + 'test_type': 'Unittest', + } + es_operations.upload_to_elasticsearch(index=es_index, data=es_data) + + # fetch data + query = { + "query": {} + } + try: + response = es_operations.post_query(es_index=es_index, query=query) + assert response + except TypeError: + assert TypeError \ No newline at end of file diff --git a/tests/unittest/configs.py b/tests/unittest/configs.py index 1e13b99e..4783492c 100644 --- a/tests/unittest/configs.py +++ b/tests/unittest/configs.py @@ -20,3 +20,8 @@ SUB_ID = f'/subscription/{SUBSCRIPTION_ID}/resourceGroups/{RESOURCE_GROUP}' NETWORK_PROVIDER = f'providers/Microsoft.Network' COMPUTE_PROVIDER = 'providers/Microsoft.Compute' + + +# ES +ES_INDEX = 'test-unittest-index' +TEST_INDEX_ID = 'test-unittest-index-01' diff --git a/tests/unittest/mocks/elasticsearch/__init__.py b/tests/unittest/mocks/elasticsearch/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/unittest/mocks/elasticsearch/mock_elasticsearch.py b/tests/unittest/mocks/elasticsearch/mock_elasticsearch.py new file mode 100644 index 00000000..9f5b9cb3 --- /dev/null +++ b/tests/unittest/mocks/elasticsearch/mock_elasticsearch.py @@ -0,0 +1,51 @@ +import uuid +from functools import wraps +from unittest.mock import patch + +from elasticsearch import Elasticsearch + + +class MockElasticsearch: + + def __init__(self): + self.__es_data = {} + + def index(self, index: str, body: str, **kwargs): + id = kwargs.get('id', uuid.uuid1()) + self.__es_data.setdefault(index, {}).setdefault(id, body) + return True + + def search(self, index: str, body: dict, **kwargs): + response = self.__es_data.get(index) + if response: + return { + 'hits': { + 'hits': response + } + } + return {'hits': { + 'hits': {} + }} + + +def mock_elasticsearch(method): + """ + This method is mocking for Jira class methods which are used in Jira Operations @param method: + @return: + """ + + @wraps(method) + def method_wrapper(*args, **kwargs): + """ + This is the wrapper method to wraps the method inside the function + @param args: + @param kwargs: + @return: + """ + mock_class = MockElasticsearch() + with patch.object(Elasticsearch, 'index', mock_class.index), \ + patch.object(Elasticsearch, 'search', mock_class.search): + result = method(*args, **kwargs) + return result + + return method_wrapper