diff --git a/.bandit b/.bandit new file mode 100644 index 0000000..cf6c1fc --- /dev/null +++ b/.bandit @@ -0,0 +1,10 @@ +skips: [B101,B105,B108,B113,B311,B314,B405,B404,B501,B506,B602,B604,B605,B607,B608] + +assert_used: + skips: ['*_test.py', '*test_*.py'] + +B311: + skips: ['*_test.py', '*test_*.py', '*mock*.py', '*mock_*.py'] + +B105: + skips: ['*_test.py', '*test_*.py'] diff --git a/.flake8 b/.flake8 new file mode 100644 index 0000000..f77ad44 --- /dev/null +++ b/.flake8 @@ -0,0 +1,3 @@ +[flake8] +exclude = + "sumomongodbatlascollector/test_mock.py" diff --git a/.github/workflows/runtest.yaml b/.github/workflows/runtest.yaml new file mode 100644 index 0000000..b52a789 --- /dev/null +++ b/.github/workflows/runtest.yaml @@ -0,0 +1,57 @@ +name: "Run Static tests" +on: [workflow_dispatch, pull_request] + +jobs: + lint-test: + name: "Run Security & Linting Tests" + runs-on: "ubuntu-latest" + + steps: + - uses: actions/checkout@v4 + + - name: Set up Python 3.10 + uses: actions/setup-python@v5 + with: + python-version: '3.10' + + - name: Install dev dependencies + run: python -m pip install -U pip flake8 bandit mypy + + - name: Run Security Tests + run: | + bandit -c .bandit -r sumomongodbatlascollector/ -f custom + + - name: Run Linting Tests + run: | + flake8 --max-line-length=120 --ignore=E231,E225,F541,E501,E402,F841,W605,E731,E722,C901,F821 sumomongodbatlascollector/ + + # - name: Static Type Checks + # continue-on-error: true + # run: | + # cd sumomongodbatlascollector/ + # mypy --install-types ./main.py + # mypy ./main.py --disable-error-code=import-untyped + # mypy --install-types ./api.py + # mypy ./api.py --disable-error-code=import-untyped + + run-test: + name: "Run Build and Install Tests" + runs-on: "ubuntu-latest" + + steps: + - uses: actions/checkout@v4 + + - name: Set up Python 3.10 + uses: actions/setup-python@v5 + with: + python-version: '3.10' + + - name: Install build and test dependencies + run: | + python -m pip install --upgrade pip + pip install -e . + pip install pytest + + - name: "Run Pytest" + run: | + pytest -vvv sumomongodbatlascollector/test_mongo_collector.py diff --git a/requirements.txt b/requirements.txt index a74622c..ec973db 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,3 @@ -sumologic-appclient-sdk==1.0.17 +sumologic-appclient-sdk==2.0.10 +requests>=2.32.3 +future-fstrings diff --git a/setup.py b/setup.py index 6a2a561..a3a07f9 100644 --- a/setup.py +++ b/setup.py @@ -4,21 +4,19 @@ here = abspath(dirname(__file__)) -with open(join(here, 'VERSION')) as VERSION_FILE: +with open(join(here, "VERSION")) as VERSION_FILE: __versionstr__ = VERSION_FILE.read().strip() -with open(join(here, 'requirements.txt')) as REQUIREMENTS: - INSTALL_REQUIRES = REQUIREMENTS.read().split('\n') +with open(join(here, "requirements.txt")) as REQUIREMENTS: + INSTALL_REQUIRES = REQUIREMENTS.read().split("\n") -with io.open(join(here, 'README.md'), encoding='utf-8') as f: +with io.open(join(here, "README.md"), encoding="utf-8") as f: long_description = f.read() -CONSOLE_SCRIPTS = [ - 'sumomongodbatlascollector=sumomongodbatlascollector.main:main' -] +CONSOLE_SCRIPTS = ["sumomongodbatlascollector=sumomongodbatlascollector.main:main"] setup( name="sumologic-mongodb-atlas", @@ -26,9 +24,13 @@ packages=find_packages(), install_requires=INSTALL_REQUIRES, extras_require={ - 'aws': ["boto3>=1.9.66", "botocore>=1.12.66"], - "gcp": ["google-api-python-client>=1.7.8", "oauth2client>=4.1.3", "google-cloud-datastore>=1.7.3"], - "azure": ["azure-cosmosdb-table>=1.0.5", "bson>=0.5.8"] + "aws": ["boto3>=1.34.149", "botocore>=1.34.149"], + "gcp": [ + "google-api-python-client>=2.129.0", + "oauth2client>=4.1.3", + "google-cloud-datastore>=2.19.0", + ], + "azure": ["azure-cosmosdb-table>=1.0.6", "bson>=0.5.10"], }, # PyPI metadata author="SumoLogic", @@ -36,20 +38,23 @@ description="Sumo Logic collection solution for mongodb atlas", license="PSF", long_description=long_description, - long_description_content_type='text/markdown', + long_description_content_type="text/markdown", keywords="sumologic python rest api log management analytics logreduce mongodb atlas agent security siem collector forwarder", url="https://github.com/SumoLogic/sumologic-mongodb-atlas", zip_safe=True, include_package_data=True, classifiers=[ - 'Development Status :: 4 - Beta', - 'Intended Audience :: Developers', - 'Programming Language :: Python :: 2.7', - 'Programming Language :: Python :: 3.7', - 'Operating System :: OS Independent' + "Development Status :: 4 - Beta", + "Intended Audience :: Developers", + "Programming Language :: Python :: 2.7", + "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Operating System :: OS Independent", ], entry_points={ - 'console_scripts': CONSOLE_SCRIPTS, - } - + "console_scripts": CONSOLE_SCRIPTS, + }, ) diff --git a/sumomongodbatlascollector/api.py b/sumomongodbatlascollector/api.py index 447055a..279e896 100644 --- a/sumomongodbatlascollector/api.py +++ b/sumomongodbatlascollector/api.py @@ -2,39 +2,58 @@ import gzip import json +import os +# import psutil +# import tracemalloc from io import BytesIO import time from requests.auth import HTTPDigestAuth -import dateutil +# import dateutil from sumoappclient.sumoclient.base import BaseAPI from sumoappclient.sumoclient.factory import OutputHandlerFactory -from sumoappclient.common.utils import get_current_timestamp, convert_epoch_to_utc_date, convert_utc_date_to_epoch, convert_date_to_epoch +from sumoappclient.common.utils import ( + get_current_timestamp, + convert_epoch_to_utc_date, + convert_utc_date_to_epoch, + convert_date_to_epoch, +) from sumoappclient.sumoclient.httputils import ClientMixin +from time_and_memory_tracker import TimeAndMemoryTracker class MongoDBAPI(BaseAPI): MOVING_WINDOW_DELTA = 0.001 - isoformat = '%Y-%m-%dT%H:%M:%S.%fZ' - date_format = '%Y-%m-%dT%H:%M:%SZ' + isoformat = "%Y-%m-%dT%H:%M:%S.%fZ" + date_format = "%Y-%m-%dT%H:%M:%SZ" def __init__(self, kvstore, config): super(MongoDBAPI, self).__init__(kvstore, config) - self.api_config = self.config['MongoDBAtlas'] - self.digestauth = HTTPDigestAuth(username=self.api_config['PUBLIC_API_KEY'], password=self.api_config['PRIVATE_API_KEY']) + self.api_config = self.config["MongoDBAtlas"] + self.MAX_REQUEST_WINDOW_LENGTH = self.api_config.get("Collection", {}).get( + "MAX_REQUEST_WINDOW_LENGTH", 3600 + ) + self.MIN_REQUEST_WINDOW_LENGTH = self.api_config.get("Collection", {}).get( + "MIN_REQUEST_WINDOW_LENGTH", 60 + ) + self.digestauth = HTTPDigestAuth( + username=self.api_config["PUBLIC_API_KEY"], + password=self.api_config["PRIVATE_API_KEY"], + ) + activate_time_and_memory_tracker = self.collection_config.get( + "ACTIVATE_TIME_AND_MEMORY_TRACKER", False + ) or os.environ.get("ACTIVATE_TIME_AND_MEMORY_TRACKER", False) def get_window(self, last_time_epoch): start_time_epoch = last_time_epoch + self.MOVING_WINDOW_DELTA - end_time_epoch = get_current_timestamp() - self.collection_config['END_TIME_EPOCH_OFFSET_SECONDS'] - MIN_REQUEST_WINDOW_LENGTH = 60 - MAX_REQUEST_WINDOW_LENGTH = 3600 + end_time_epoch = (get_current_timestamp() - self.collection_config["END_TIME_EPOCH_OFFSET_SECONDS"]) - while not (end_time_epoch - start_time_epoch > MIN_REQUEST_WINDOW_LENGTH): + while not (end_time_epoch - start_time_epoch > self.MIN_REQUEST_WINDOW_LENGTH): # initially last_time_epoch is same as current_time_stamp so endtime becomes lesser than starttime - time.sleep(MIN_REQUEST_WINDOW_LENGTH) - end_time_epoch = get_current_timestamp() - self.collection_config['END_TIME_EPOCH_OFFSET_SECONDS'] + time.sleep(self.MIN_REQUEST_WINDOW_LENGTH) + end_time_epoch = (get_current_timestamp() - self.collection_config["END_TIME_EPOCH_OFFSET_SECONDS"]) - if ((end_time_epoch - start_time_epoch) > MAX_REQUEST_WINDOW_LENGTH): - end_time_epoch = start_time_epoch + MAX_REQUEST_WINDOW_LENGTH + if (end_time_epoch - start_time_epoch) > self.MAX_REQUEST_WINDOW_LENGTH: + end_time_epoch = start_time_epoch + self.MAX_REQUEST_WINDOW_LENGTH return start_time_epoch, end_time_epoch @@ -48,119 +67,208 @@ def _replace_cluster_name(self, full_name_with_cluster, cluster_mapping): class FetchMixin(MongoDBAPI): - def fetch(self): log_type = self.get_key() - output_handler = OutputHandlerFactory.get_handler(self.collection_config['OUTPUT_HANDLER'], path=self.pathname, config=self.config) - url, kwargs = self.build_fetch_params() - self.log.info(f'''Fetching LogType: {log_type} kwargs: {kwargs}''') - state = None - payload = [] - try: - - fetch_success, content = ClientMixin.make_request(url, method="get", logger=self.log, TIMEOUT=self.collection_config['TIMEOUT'], MAX_RETRY=self.collection_config['MAX_RETRY'], BACKOFF_FACTOR=self.collection_config['BACKOFF_FACTOR'], **kwargs) - if fetch_success and len(content) > 0: - payload, state = self.transform_data(content) - #Todo Make this atomic if after sending -> Ctrl - C happens then it fails to save state - params = self.build_send_params() - send_success = output_handler.send(payload, **params) - if send_success: - self.save_state(**state) - self.log.info(f'''Successfully sent LogType: {self.get_key()} Data: {len(content)}''') + with TimeAndMemoryTracker(activate=True) as tracker: + start_message = tracker.start("OutputHandlerFactory.get_handler") + self.log.info(start_message) + output_handler = OutputHandlerFactory.get_handler( + self.collection_config["OUTPUT_HANDLER"], + path=self.pathname, + config=self.config, + ) + + end_message = tracker.end("OutputHandlerFactory.get_handler") + self.log.info(end_message) + start_message = tracker.start("self.build_fetch_params") + url, kwargs = self.build_fetch_params() + end_message = tracker.end("self.build_fetch_params") + self.log.info(f'''Fetching LogType: {log_type} kwargs: {kwargs} url: {url} end_message: {end_message}''') + state = None + payload = [] + try: + start_message = tracker.start("ClientMixin.make_request") + fetch_success, content = ClientMixin.make_request( + url, + method="get", + logger=self.log, + TIMEOUT=self.collection_config["TIMEOUT"], + MAX_RETRY=self.collection_config["MAX_RETRY"], + BACKOFF_FACTOR=self.collection_config["BACKOFF_FACTOR"], + **kwargs, + ) + self.log.info(f'''Fetching LogType: {log_type} kwargs: {kwargs} url: {url} end_message: {start_message}''') + end_message = tracker.end("ClientMixin.make_request") + self.log.info(end_message) + if fetch_success and len(content) > 0: + payload, state = self.transform_data(content) + # Todo Make this atomic if after sending -> Ctrl - C happens then it fails to save state + params = self.build_send_params() + start_message = tracker.start("OutputHandler.send") + self.log.info(f'''Sending LogType: {self.get_key()} Data: {len(content)} url: {url} start_message: {start_message}''') + send_success = output_handler.send(payload, **params) + end_message = tracker.end("OutputHandler.send") + self.log.info(f'''Sending LogType: {self.get_key()} Data: {len(content)} kwargs: {kwargs} url: {url} end_message: {end_message}''') + if send_success: + self.save_state(**state) + self.log.info(f"""Successfully sent LogType: {self.get_key()} Data: {len(content)}""") + else: + self.log.error(f"""Failed to send LogType: {self.get_key()}""") + elif fetch_success and len(content) == 0: + self.log.info( + f"""No results window LogType: {log_type} kwargs: {kwargs} status: {fetch_success} url: {url}""" + ) + is_move_fetch_window, new_state = self.check_move_fetch_window(kwargs) + if is_move_fetch_window: + self.save_state(**new_state) + self.log.debug(f"""Moving fetched window newstate: {new_state}""") else: - self.log.error(f'''Failed to send LogType: {self.get_key()}''') - elif fetch_success and len(content) == 0: - self.log.info(f'''No results window LogType: {log_type} kwargs: {kwargs} status: {fetch_success}''') - is_move_fetch_window, new_state = self.check_move_fetch_window(kwargs) - if is_move_fetch_window: - self.save_state(**new_state) - self.log.debug(f'''Moving fetched window newstate: {new_state}''') - else: - self.log.error(f'''Error LogType: {log_type} status: {fetch_success} reason: {content}''') - finally: - output_handler.close() - self.log.info(f'''Completed LogType: {log_type} curstate: {state} datasent: {len(payload)}''') + self.log.error( + f"""Error LogType: {log_type} status: {fetch_success} reason: {content} kwargs: {kwargs} url: {url}""" + ) + finally: + output_handler.close() + self.log.info( + f"""Completed LogType: {log_type} curstate: {state} datasent: {len(payload)}""" + ) class PaginatedFetchMixin(MongoDBAPI): - def fetch(self): current_state = self.get_state() - output_handler = OutputHandlerFactory.get_handler(self.collection_config['OUTPUT_HANDLER'], path=self.pathname, config=self.config) - url, kwargs = self.build_fetch_params() - log_type = self.get_key() - next_request = True - count = 0 - sess = ClientMixin.get_new_session() - self.log.info(f'''Fetching LogType: {log_type} starttime: {kwargs['params']['minDate']} endtime: {kwargs['params']['maxDate']}''') - try: - while next_request: - send_success = has_next_page = False - status, data = ClientMixin.make_request(url, method="get", session=sess, logger=self.log, TIMEOUT=self.collection_config['TIMEOUT'], MAX_RETRY=self.collection_config['MAX_RETRY'], BACKOFF_FACTOR=self.collection_config['BACKOFF_FACTOR'], **kwargs) - fetch_success = status and "results" in data - if fetch_success: - has_next_page = len(data['results']) > 0 - if has_next_page: - payload, updated_state = self.transform_data(data) - params = self.build_send_params() - send_success = output_handler.send(payload, **params) - if send_success: - count +=1 - self.log.debug(f'''Successfully sent LogType: {log_type} Page: {kwargs['params']['pageNum']} Datalen: {len(payload)} starttime: {kwargs['params']['minDate']} endtime: {kwargs['params']['maxDate']}''') - kwargs['params']['pageNum'] += 1 - # save and update last_time_epoch required for next invocation - current_state.update(updated_state) - # time not available save current state new page num else continue - if not self.is_time_remaining(): - self.save_state({ - "start_time_epoch": convert_utc_date_to_epoch(kwargs['params']['minDate']), - "end_time_epoch": convert_utc_date_to_epoch(kwargs['params']['maxDate']), - "page_num": kwargs['params']["pageNum"], - "last_time_epoch": current_state['last_time_epoch'] - }) - else: - # show err unable to send save current state - self.log.error(f'''Failed to send LogType: {log_type} Page: {kwargs['params']['pageNum']} starttime: {kwargs['params']['minDate']} endtime: {kwargs['params']['maxDate']}''') - self.save_state({ - "start_time_epoch": convert_utc_date_to_epoch(kwargs['params']['minDate']), - "end_time_epoch": convert_utc_date_to_epoch(kwargs['params']['maxDate']), - "page_num": kwargs['params']["pageNum"], - "last_time_epoch": current_state['last_time_epoch'] - }) - else: - - # here fetch success is true and assuming pageNum starts from 1 - # page_num has finished increase window calc last_time_epoch - if kwargs['params']['pageNum'] > 1: - self.log.debug(f'''Moving starttime window LogType: {log_type} Page: {kwargs['params']['pageNum']} starttime: {kwargs['params']['minDate']} endtime: {kwargs['params']['maxDate']} to last_time_epoch": {convert_epoch_to_utc_date(current_state['last_time_epoch'], date_format=self.isoformat)}''') - self.save_state({ - "page_num": 0, - "last_time_epoch": current_state['last_time_epoch'] - }) - else: - # genuine no result window no change - self.log.info(f'''No results window LogType: {log_type} Page: {kwargs['params']['pageNum']} starttime: {kwargs['params']['minDate']} endtime: {kwargs['params']['maxDate']} status: {fetch_success}''') - is_move_fetch_window, updated_state = self.check_move_fetch_window(kwargs) - if is_move_fetch_window: + with TimeAndMemoryTracker(activate=True) as tracker: + output_handler = OutputHandlerFactory.get_handler(self.collection_config["OUTPUT_HANDLER"], path=self.pathname, config=self.config) + start_message = tracker.start("self.build_fetch_params") + url, kwargs = self.build_fetch_params() + end_message = tracker.end("self.build_fetch_params") + log_type = self.get_key() + self.log.info(f'''Fetching LogType: {log_type} kwargs: {kwargs} url: {url} end_message: {end_message} ''') + next_request = True + count = 0 + start_message = tracker.start("ClientMixin.get_new_session") + sess = ClientMixin.get_new_session() + end_message = tracker.end("ClientMixin.get_new_session") + self.log.info(f'''Fetching LogType: {log_type} kwargs: {kwargs} url: {url} end_message: {end_message} ''') + try: + while next_request: + send_success = has_next_page = False + status, data = ClientMixin.make_request( + url, + method="get", + session=sess, + logger=self.log, + TIMEOUT=self.collection_config["TIMEOUT"], + MAX_RETRY=self.collection_config["MAX_RETRY"], + BACKOFF_FACTOR=self.collection_config["BACKOFF_FACTOR"], + **kwargs, + ) + fetch_success = status and "results" in data + if fetch_success: + has_next_page = len(data["results"]) > 0 + if has_next_page: + payload, updated_state = self.transform_data(data) + params = self.build_send_params() + start_message = tracker.start("OutputHandler.send") + send_success = output_handler.send(payload, **params) + end_message = tracker.end("OutputHandler.send") + if send_success: + count += 1 + self.log.debug( + f"""Successfully sent LogType: {log_type} Page: {kwargs['params']['pageNum']} Datalen: {len(payload)} starttime: {kwargs['params']['minDate']} endtime: {kwargs['params']['maxDate']} end_message: {end_message}""" + ) + kwargs["params"]["pageNum"] += 1 + # save and update last_time_epoch required for next invocation current_state.update(updated_state) - self.log.debug(f'''Moving starttime window LogType: {log_type} Page: {kwargs['params']['pageNum']} starttime: {kwargs['params']['minDate']} endtime: {kwargs['params']['maxDate']} to last_time_epoch": {convert_epoch_to_utc_date(current_state['last_time_epoch'], date_format=self.isoformat)}''') - self.save_state({ - "page_num": 0, - "last_time_epoch": current_state['last_time_epoch'] - }) + # time not available save current state new page num else continue + if not self.is_time_remaining(): + self.save_state( + { + "start_time_epoch": convert_utc_date_to_epoch( + kwargs["params"]["minDate"] + ), + "end_time_epoch": convert_utc_date_to_epoch( + kwargs["params"]["maxDate"] + ), + "page_num": kwargs["params"]["pageNum"], + "last_time_epoch": current_state[ + "last_time_epoch" + ], + } + ) + else: + # show err unable to send save current state + self.log.error( + f"""Failed to send LogType: {log_type} Page: {kwargs['params']['pageNum']} starttime: {kwargs['params']['minDate']} endtime: {kwargs['params']['maxDate']}""" + ) + self.save_state( + { + "start_time_epoch": convert_utc_date_to_epoch( + kwargs["params"]["minDate"] + ), + "end_time_epoch": convert_utc_date_to_epoch( + kwargs["params"]["maxDate"] + ), + "page_num": kwargs["params"]["pageNum"], + "last_time_epoch": current_state["last_time_epoch"], + } + ) + else: + # here fetch success is true and assuming pageNum starts from 1 + # page_num has finished increase window calc last_time_epoch + if kwargs["params"]["pageNum"] > 1: + self.log.debug( + f"""Moving starttime window LogType: {log_type} Page: {kwargs['params']['pageNum']} starttime: {kwargs['params']['minDate']} endtime: {kwargs['params']['maxDate']} to last_time_epoch": {convert_epoch_to_utc_date(current_state['last_time_epoch'], date_format=self.isoformat)}""" + ) + self.save_state( + { + "page_num": 0, + "last_time_epoch": current_state["last_time_epoch"], + } + ) + else: + # genuine no result window no change + self.log.info( + f"""No results window LogType: {log_type} Page: {kwargs['params']['pageNum']} starttime: {kwargs['params']['minDate']} endtime: {kwargs['params']['maxDate']} status: {fetch_success}""" + ) + is_move_fetch_window, updated_state = ( + self.check_move_fetch_window(kwargs) + ) + if is_move_fetch_window: + current_state.update(updated_state) + self.log.debug( + f"""Moving starttime window LogType: {log_type} Page: {kwargs['params']['pageNum']} starttime: {kwargs['params']['minDate']} endtime: {kwargs['params']['maxDate']} to last_time_epoch": {convert_epoch_to_utc_date(current_state['last_time_epoch'], date_format=self.isoformat)}""" + ) + self.save_state( + { + "page_num": 0, + "last_time_epoch": current_state[ + "last_time_epoch" + ], + } + ) - else: - self.save_state({ - "start_time_epoch": convert_utc_date_to_epoch(kwargs['params']['minDate']), - "end_time_epoch": convert_utc_date_to_epoch(kwargs['params']['maxDate']), - "page_num": kwargs['params']["pageNum"], - "last_time_epoch": current_state['last_time_epoch'] - }) - self.log.error(f'''Failed to fetch LogType: {log_type} Page: {kwargs['params']['pageNum']} Reason: {data} starttime: {kwargs['params']['minDate']} endtime: {kwargs['params']['maxDate']}''') - next_request = fetch_success and send_success and has_next_page and self.is_time_remaining() - finally: - sess.close() - self.log.info(f'''Completed LogType: {log_type} Count: {count} Page: {kwargs['params']['pageNum']} starttime: {kwargs['params']['minDate']} endtime: {kwargs['params']['maxDate']}''') + else: + self.save_state( + { + "start_time_epoch": convert_utc_date_to_epoch( + kwargs["params"]["minDate"] + ), + "end_time_epoch": convert_utc_date_to_epoch( + kwargs["params"]["maxDate"] + ), + "page_num": kwargs["params"]["pageNum"], + "last_time_epoch": current_state["last_time_epoch"], + } + ) + self.log.error( + f"""Failed to fetch LogType: {log_type} Page: {kwargs['params']['pageNum']} Reason: {data} starttime: {kwargs['params']['minDate']} endtime: {kwargs['params']['maxDate']}""" + ) + next_request = (fetch_success and send_success and has_next_page and self.is_time_remaining()) + finally: + sess.close() + self.log.info( + f"""Completed LogType: {log_type} Count: {count} Page: {kwargs['params']['pageNum']} starttime: {kwargs['params']['minDate']} endtime: {kwargs['params']['maxDate']}""" + ) class LogAPI(FetchMixin): @@ -171,11 +279,13 @@ def __init__(self, kvstore, hostname, filename, config, cluster_mapping): super(LogAPI, self).__init__(kvstore, config) self.hostname = hostname self.filename = filename - self.pathname = "db_logs.json" if "audit" not in self.filename else "db_auditlogs.json" + self.pathname = ( + "db_logs.json" if "audit" not in self.filename else "db_auditlogs.json" + ) self.cluster_mapping = cluster_mapping def get_key(self): - key = f'''{self.api_config['PROJECT_ID']}-{self.hostname}-{self.filename}''' + key = f"""{self.api_config['PROJECT_ID']}-{self.hostname}-{self.filename}""" return key def save_state(self, last_time_epoch): @@ -192,26 +302,34 @@ def get_state(self): # API Ref: https://www.mongodb.com/docs/atlas/reference/api-resources-spec/v1/#tag/Monitoring-and-Logs/operation/downloadHostLogs def build_fetch_params(self): - start_time_epoch, end_time_epoch = self.get_window(self.get_state()['last_time_epoch']) + start_time_epoch, end_time_epoch = self.get_window( + self.get_state()["last_time_epoch"] + ) - return f'''{self.api_config['BASE_URL']}/groups/{self.api_config['PROJECT_ID']}/clusters/{self.hostname}/logs/{self.filename}''', { + return ( + f"""{self.api_config['BASE_URL']}/groups/{self.api_config['PROJECT_ID']}/clusters/{self.hostname}/logs/{self.filename}""", + { "auth": self.digestauth, - "params": {"startDate": int(start_time_epoch), "endDate": int(end_time_epoch)}, # this api does not take ms + "params": { + "startDate": int(start_time_epoch), + "endDate": int(end_time_epoch), + }, # this api does not take ms "headers": {"Accept": "application/gzip"}, - "is_file": True - } + "is_file": True, + }, + ) def build_send_params(self): return { - "extra_headers": {'X-Sumo-Name': self.filename}, - "endpoint_key": "HTTP_LOGS_ENDPOINT" + "extra_headers": {"X-Sumo-Name": self.filename}, + "endpoint_key": "HTTP_LOGS_ENDPOINT", } def check_move_fetch_window(self, kwargs): # https://www.mongodb.com/docs/atlas/reference/api/logs/ # Process and audit logs are updated from the cluster backend infrastructure every five minutes and contain log data from the previous five minutes. - data_availablity_max_endDate = int(get_current_timestamp() - 5*60) - api_endDate = kwargs['params']["endDate"] + data_availablity_max_endDate = int(get_current_timestamp() - 5 * 60) + api_endDate = kwargs["params"]["endDate"] if api_endDate < data_availablity_max_endDate: return True, {"last_time_epoch": api_endDate} else: @@ -229,8 +347,10 @@ def transform_data(self, content): if not line.strip(): # for JSONDecoderror in case of empty lines continue - line = line.decode('utf-8') - hostname_alias = self._replace_cluster_name(self.hostname, self.cluster_mapping) + line = line.decode("utf-8") + hostname_alias = self._replace_cluster_name( + self.hostname, self.cluster_mapping + ) cluster_name = self._get_cluster_name(hostname_alias) if "audit" in self.filename: @@ -242,12 +362,15 @@ def transform_data(self, content): except ValueError as e: # checking for multiline messages last_line = line - self.log.warn("Multiline Message in line no: %d last_log: %s current_log: %s" % (line_no, all_logs[-1:], line)) + self.log.warn( + "Multiline Message in line no: %d last_log: %s current_log: %s" + % (line_no, all_logs[-1:], line) + ) continue - msg['project_id'] = self.api_config['PROJECT_ID'] - msg['hostname'] = hostname_alias - msg['cluster_name'] = cluster_name - current_date = msg['ts']['$date'] + msg["project_id"] = self.api_config["PROJECT_ID"] + msg["hostname"] = hostname_alias + msg["cluster_name"] = cluster_name + current_date = msg["ts"]["$date"] else: if last_line: line = last_line + line @@ -257,15 +380,18 @@ def transform_data(self, content): except ValueError as e: # checking for multiline messages last_line = line - self.log.warn("Multiline Message in line no: %d last_log: %s current_log: %s" % (line_no, all_logs[-1:], line)) + self.log.warn( + "Multiline Message in line no: %d last_log: %s current_log: %s" + % (line_no, all_logs[-1:], line) + ) continue - msg['project_id'] = self.api_config['PROJECT_ID'] - msg['hostname'] = hostname_alias - msg['cluster_name'] = cluster_name - current_date = msg['t']['$date'] + msg["project_id"] = self.api_config["PROJECT_ID"] + msg["hostname"] = hostname_alias + msg["cluster_name"] = cluster_name + current_date = msg["t"]["$date"] current_date_timestamp = convert_date_to_epoch(current_date.strip()) - msg['created'] = current_date # taking out date + msg["created"] = current_date # taking out date last_time_epoch = max(current_date_timestamp, last_time_epoch) all_logs.append(msg) @@ -273,7 +399,6 @@ def transform_data(self, content): class ProcessMetricsAPI(FetchMixin): - pathname = "process_metrics.log" def __init__(self, kvstore, process_id, config, cluster_mapping): @@ -282,7 +407,7 @@ def __init__(self, kvstore, process_id, config, cluster_mapping): self.cluster_mapping = cluster_mapping def get_key(self): - key = f'''{self.api_config['PROJECT_ID']}-{self.process_id}''' + key = f"""{self.api_config['PROJECT_ID']}-{self.process_id}""" return key def save_state(self, last_time_epoch): @@ -299,30 +424,43 @@ def get_state(self): # API Ref: https://www.mongodb.com/docs/atlas/reference/api-resources-spec/v1/#tag/Monitoring-and-Logs/operation/getHostMeasurements def build_fetch_params(self): - start_time_epoch, end_time_epoch = self.get_window(self.get_state()['last_time_epoch']) - start_time_date = convert_epoch_to_utc_date(start_time_epoch, date_format=self.isoformat) - end_time_date = convert_epoch_to_utc_date(end_time_epoch, date_format=self.isoformat) - return f'''{self.api_config['BASE_URL']}/groups/{self.api_config['PROJECT_ID']}/processes/{self.process_id}/measurements''', { + start_time_epoch, end_time_epoch = self.get_window( + self.get_state()["last_time_epoch"] + ) + start_time_date = convert_epoch_to_utc_date( + start_time_epoch, date_format=self.isoformat + ) + end_time_date = convert_epoch_to_utc_date( + end_time_epoch, date_format=self.isoformat + ) + return ( + f"""{self.api_config['BASE_URL']}/groups/{self.api_config['PROJECT_ID']}/processes/{self.process_id}/measurements""", + { "auth": self.digestauth, "params": { - "itemsPerPage": self.api_config['PAGINATION_LIMIT'], "granularity": "PT1M", - "start": start_time_date, "end": end_time_date, - "m": self.api_config["METRIC_TYPES"]["PROCESS_METRICS"] - } - } + "itemsPerPage": self.api_config["PAGINATION_LIMIT"], + "granularity": "PT1M", + "start": start_time_date, + "end": end_time_date, + "m": self.api_config["METRIC_TYPES"]["PROCESS_METRICS"], + }, + }, + ) def build_send_params(self): return { - "extra_headers": {'Content-Type': 'application/vnd.sumologic.carbon2'}, + "extra_headers": {"Content-Type": "application/vnd.sumologic.carbon2"}, "endpoint_key": "HTTP_METRICS_ENDPOINT", - "jsondump": False + "jsondump": False, } def check_move_fetch_window(self, kwargs): # https://www.mongodb.com/docs/atlas/reference/api/process-measurements/ # Atlas retrieves database metrics every 20 minutes by default. Results include data points with 20 minute intervals. - data_availablity_max_endDate = get_current_timestamp() - 20*60 - api_endDate = convert_utc_date_to_epoch(kwargs['params']["end"], date_format=self.isoformat) + data_availablity_max_endDate = get_current_timestamp() - 20 * 60 + api_endDate = convert_utc_date_to_epoch( + kwargs["params"]["end"], date_format=self.isoformat + ) if api_endDate < data_availablity_max_endDate: return True, {"last_time_epoch": api_endDate} else: @@ -331,22 +469,30 @@ def check_move_fetch_window(self, kwargs): def transform_data(self, data): metrics = [] last_time_epoch = self.DEFAULT_START_TIME_EPOCH - for measurement in data['measurements']: - for datapoints in measurement['dataPoints']: - if datapoints['value'] is None: + for measurement in data["measurements"]: + for datapoints in measurement["dataPoints"]: + if datapoints["value"] is None: continue - current_timestamp = convert_utc_date_to_epoch(datapoints['timestamp'], date_format=self.date_format) - host_id = self._replace_cluster_name(data['hostId'], self.cluster_mapping) - process_id = self._replace_cluster_name(data['processId'], self.cluster_mapping) + current_timestamp = convert_utc_date_to_epoch( + datapoints["timestamp"], date_format=self.date_format + ) + host_id = self._replace_cluster_name( + data["hostId"], self.cluster_mapping + ) + process_id = self._replace_cluster_name( + data["processId"], self.cluster_mapping + ) cluster_name = self._get_cluster_name(host_id) - metrics.append(f'''projectId={data['groupId']} hostId={host_id} processId={process_id} metric={measurement['name']} units={measurement['units']} cluster_name={cluster_name} {datapoints['value']} {current_timestamp}''') + metrics.append( + f"""projectId={data['groupId']} hostId={host_id} processId={process_id} metric={measurement['name']} units={measurement['units']} cluster_name={cluster_name} {datapoints['value']} {current_timestamp}""" + ) last_time_epoch = max(current_timestamp, last_time_epoch) return metrics, {"last_time_epoch": last_time_epoch} class DiskMetricsAPI(FetchMixin): - isoformat = '%Y-%m-%dT%H:%M:%S.%fZ' - date_format = '%Y-%m-%dT%H:%M:%SZ' + isoformat = "%Y-%m-%dT%H:%M:%S.%fZ" + date_format = "%Y-%m-%dT%H:%M:%SZ" pathname = "disk_metrics.log" def __init__(self, kvstore, process_id, disk_name, config, cluster_mapping): @@ -356,7 +502,7 @@ def __init__(self, kvstore, process_id, disk_name, config, cluster_mapping): self.cluster_mapping = cluster_mapping def get_key(self): - key = f'''{self.api_config['PROJECT_ID']}-{self.process_id}-{self.disk_name}''' + key = f"""{self.api_config['PROJECT_ID']}-{self.process_id}-{self.disk_name}""" return key def save_state(self, last_time_epoch): @@ -370,32 +516,46 @@ def get_state(self): self.save_state(self.DEFAULT_START_TIME_EPOCH) obj = self.kvstore.get(key) return obj + # API Ref: https://www.mongodb.com/docs/atlas/reference/api-resources-spec/v1/#tag/Monitoring-and-Logs/operation/getDiskMeasurements def build_fetch_params(self): - start_time_epoch, end_time_epoch = self.get_window(self.get_state()['last_time_epoch']) - start_time_date = convert_epoch_to_utc_date(start_time_epoch, date_format=self.isoformat) - end_time_date = convert_epoch_to_utc_date(end_time_epoch, date_format=self.isoformat) - return f'''{self.api_config['BASE_URL']}/groups/{self.api_config['PROJECT_ID']}/processes/{self.process_id}/disks/{self.disk_name}/measurements''', { - "auth": self.digestauth, - "params": { - "itemsPerPage": self.api_config['PAGINATION_LIMIT'], "granularity": "PT1M", - "start": start_time_date, "end": end_time_date, - "m": self.api_config["METRIC_TYPES"]["DISK_METRICS"] - } - } + start_time_epoch, end_time_epoch = self.get_window( + self.get_state()["last_time_epoch"] + ) + start_time_date = convert_epoch_to_utc_date( + start_time_epoch, date_format=self.isoformat + ) + end_time_date = convert_epoch_to_utc_date( + end_time_epoch, date_format=self.isoformat + ) + return ( + f"""{self.api_config['BASE_URL']}/groups/{self.api_config['PROJECT_ID']}/processes/{self.process_id}/disks/{self.disk_name}/measurements""", + { + "auth": self.digestauth, + "params": { + "itemsPerPage": self.api_config["PAGINATION_LIMIT"], + "granularity": "PT1M", + "start": start_time_date, + "end": end_time_date, + "m": self.api_config["METRIC_TYPES"]["DISK_METRICS"], + }, + }, + ) def build_send_params(self): return { - "extra_headers": {'Content-Type': 'application/vnd.sumologic.carbon2'}, + "extra_headers": {"Content-Type": "application/vnd.sumologic.carbon2"}, "endpoint_key": "HTTP_METRICS_ENDPOINT", - "jsondump": False + "jsondump": False, } def check_move_fetch_window(self, kwargs): # hhttps://www.mongodb.com/docs/atlas/reference/api/process-disks-measurements/ # Atlas retrieves database metrics every 20 minutes by default. Results include data points with 20 minute intervals. - data_availablity_max_endDate = get_current_timestamp() - 20*60 - api_endDate = convert_utc_date_to_epoch(kwargs['params']["end"], date_format=self.isoformat) + data_availablity_max_endDate = get_current_timestamp() - 20 * 60 + api_endDate = convert_utc_date_to_epoch( + kwargs["params"]["end"], date_format=self.isoformat + ) if api_endDate < data_availablity_max_endDate: return True, {"last_time_epoch": api_endDate} else: @@ -404,22 +564,30 @@ def check_move_fetch_window(self, kwargs): def transform_data(self, data): metrics = [] last_time_epoch = self.DEFAULT_START_TIME_EPOCH - for measurement in data['measurements']: - for datapoints in measurement['dataPoints']: - if datapoints['value'] is None: + for measurement in data["measurements"]: + for datapoints in measurement["dataPoints"]: + if datapoints["value"] is None: continue - current_timestamp = convert_utc_date_to_epoch(datapoints['timestamp'], date_format=self.date_format) - host_id = self._replace_cluster_name(data['hostId'], self.cluster_mapping) - process_id = self._replace_cluster_name(data['processId'], self.cluster_mapping) + current_timestamp = convert_utc_date_to_epoch( + datapoints["timestamp"], date_format=self.date_format + ) + host_id = self._replace_cluster_name( + data["hostId"], self.cluster_mapping + ) + process_id = self._replace_cluster_name( + data["processId"], self.cluster_mapping + ) cluster_name = self._get_cluster_name(host_id) - metrics.append(f'''projectId={data['groupId']} partitionName={data['partitionName']} hostId={host_id} processId={process_id} metric={measurement['name']} units={measurement['units']} cluster_name={cluster_name} {datapoints['value']} {current_timestamp}''') + metrics.append( + f"""projectId={data['groupId']} partitionName={data['partitionName']} hostId={host_id} processId={process_id} metric={measurement['name']} units={measurement['units']} cluster_name={cluster_name} {datapoints['value']} {current_timestamp}""" + ) last_time_epoch = max(current_timestamp, last_time_epoch) return metrics, {"last_time_epoch": last_time_epoch} class DatabaseMetricsAPI(FetchMixin): - isoformat = '%Y-%m-%dT%H:%M:%S.%fZ' - date_format = '%Y-%m-%dT%H:%M:%SZ' + isoformat = "%Y-%m-%dT%H:%M:%S.%fZ" + date_format = "%Y-%m-%dT%H:%M:%SZ" pathname = "database_metrics.log" def __init__(self, kvstore, process_id, database_name, config, cluster_mapping): @@ -429,7 +597,7 @@ def __init__(self, kvstore, process_id, database_name, config, cluster_mapping): self.cluster_mapping = cluster_mapping def get_key(self): - key = f'''{self.api_config['PROJECT_ID']}-{self.process_id}-{self.database_name}''' + key = f"""{self.api_config['PROJECT_ID']}-{self.process_id}-{self.database_name}""" return key def save_state(self, last_time_epoch): @@ -437,7 +605,6 @@ def save_state(self, last_time_epoch): obj = {"last_time_epoch": last_time_epoch} self.kvstore.set(key, obj) - def get_state(self): key = self.get_key() if not self.kvstore.has_key(key): @@ -447,30 +614,43 @@ def get_state(self): # API Ref: https://www.mongodb.com/docs/atlas/reference/api-resources-spec/v1/#tag/Monitoring-and-Logs/operation/getDatabaseMeasurements def build_fetch_params(self): - start_time_epoch, end_time_epoch = self.get_window(self.get_state()['last_time_epoch']) - start_time_date = convert_epoch_to_utc_date(start_time_epoch, date_format=self.isoformat) - end_time_date = convert_epoch_to_utc_date(end_time_epoch, date_format=self.isoformat) - return f'''{self.api_config['BASE_URL']}/groups/{self.api_config['PROJECT_ID']}/processes/{self.process_id}/databases/{self.database_name}/measurements''', { - "auth": self.digestauth, - "params": { - "itemsPerPage": self.api_config['PAGINATION_LIMIT'], "granularity": "PT1M", - "start": start_time_date, "end": end_time_date, - "m": self.api_config["METRIC_TYPES"]["DATABASE_METRICS"] - } - } + start_time_epoch, end_time_epoch = self.get_window( + self.get_state()["last_time_epoch"] + ) + start_time_date = convert_epoch_to_utc_date( + start_time_epoch, date_format=self.isoformat + ) + end_time_date = convert_epoch_to_utc_date( + end_time_epoch, date_format=self.isoformat + ) + return ( + f"""{self.api_config['BASE_URL']}/groups/{self.api_config['PROJECT_ID']}/processes/{self.process_id}/databases/{self.database_name}/measurements""", + { + "auth": self.digestauth, + "params": { + "itemsPerPage": self.api_config["PAGINATION_LIMIT"], + "granularity": "PT1M", + "start": start_time_date, + "end": end_time_date, + "m": self.api_config["METRIC_TYPES"]["DATABASE_METRICS"], + }, + }, + ) def build_send_params(self): return { - "extra_headers": {'Content-Type': 'application/vnd.sumologic.carbon2'}, + "extra_headers": {"Content-Type": "application/vnd.sumologic.carbon2"}, "endpoint_key": "HTTP_METRICS_ENDPOINT", - "jsondump": False + "jsondump": False, } def check_move_fetch_window(self, kwargs): # https://www.mongodb.com/docs/atlas/reference/api/process-databases-measurements/ # Atlas retrieves database metrics every 20 minutes by default. Results include data points with 20 minute intervals. - data_availablity_max_endDate = get_current_timestamp() - 20*60 - api_endDate = convert_utc_date_to_epoch(kwargs['params']["end"], date_format=self.isoformat) + data_availablity_max_endDate = get_current_timestamp() - 20 * 60 + api_endDate = convert_utc_date_to_epoch( + kwargs["params"]["end"], date_format=self.isoformat + ) if api_endDate < data_availablity_max_endDate: return True, {"last_time_epoch": api_endDate} else: @@ -479,27 +659,32 @@ def check_move_fetch_window(self, kwargs): def transform_data(self, data): metrics = [] last_time_epoch = self.DEFAULT_START_TIME_EPOCH - for measurement in data['measurements']: - for datapoints in measurement['dataPoints']: - if datapoints['value'] is None: + for measurement in data["measurements"]: + for datapoints in measurement["dataPoints"]: + if datapoints["value"] is None: continue - current_timestamp = convert_utc_date_to_epoch(datapoints['timestamp'], date_format=self.date_format) - process_id = self._replace_cluster_name(data['processId'], self.cluster_mapping) + current_timestamp = convert_utc_date_to_epoch( + datapoints["timestamp"], date_format=self.date_format + ) + process_id = self._replace_cluster_name( + data["processId"], self.cluster_mapping + ) cluster_name = self._get_cluster_name(process_id) - metrics.append(f'''projectId={data['groupId']} databaseName={data['databaseName']} hostId={data['hostId']} processId={process_id} metric={measurement['name']} units={measurement['units']} cluster_name={cluster_name} {datapoints['value']} {current_timestamp}''') + metrics.append( + f"""projectId={data['groupId']} databaseName={data['databaseName']} hostId={data['hostId']} processId={process_id} metric={measurement['name']} units={measurement['units']} cluster_name={cluster_name} {datapoints['value']} {current_timestamp}""" + ) last_time_epoch = max(current_timestamp, last_time_epoch) return metrics, {"last_time_epoch": last_time_epoch} class ProjectEventsAPI(PaginatedFetchMixin): - pathname = "projectevents.json" def __init__(self, kvstore, config): super(ProjectEventsAPI, self).__init__(kvstore, config) def get_key(self): - key = f'''{self.api_config['PROJECT_ID']}-projectevents''' + key = f"""{self.api_config['PROJECT_ID']}-projectevents""" return key def save_state(self, state): @@ -509,7 +694,9 @@ def save_state(self, state): def get_state(self): key = self.get_key() if not self.kvstore.has_key(key): - self.save_state({"last_time_epoch": self.DEFAULT_START_TIME_EPOCH, "page_num": 0}) + self.save_state( + {"last_time_epoch": self.DEFAULT_START_TIME_EPOCH, "page_num": 0} + ) obj = self.kvstore.get(key) return obj @@ -517,46 +704,59 @@ def get_state(self): def build_fetch_params(self): state = self.get_state() if state["page_num"] == 0: - start_time_epoch, end_time_epoch = self.get_window(state['last_time_epoch']) + start_time_epoch, end_time_epoch = self.get_window(state["last_time_epoch"]) page_num = 1 else: - start_time_epoch = state['start_time_epoch'] - end_time_epoch = state['end_time_epoch'] - page_num = state['page_num'] - - start_time_date = convert_epoch_to_utc_date(start_time_epoch, date_format=self.isoformat) - end_time_date = convert_epoch_to_utc_date(end_time_epoch, date_format=self.isoformat) - return f'''{self.api_config['BASE_URL']}/groups/{self.api_config['PROJECT_ID']}/events''', { - "auth": self.digestauth, - "params": {"itemsPerPage": self.api_config['PAGINATION_LIMIT'], "minDate": start_time_date , "maxDate": end_time_date, "pageNum": page_num} - } + start_time_epoch = state["start_time_epoch"] + end_time_epoch = state["end_time_epoch"] + page_num = state["page_num"] + + start_time_date = convert_epoch_to_utc_date( + start_time_epoch, date_format=self.isoformat + ) + end_time_date = convert_epoch_to_utc_date( + end_time_epoch, date_format=self.isoformat + ) + return ( + f"""{self.api_config['BASE_URL']}/groups/{self.api_config['PROJECT_ID']}/events""", + { + "auth": self.digestauth, + "params": { + "itemsPerPage": self.api_config["PAGINATION_LIMIT"], + "minDate": start_time_date, + "maxDate": end_time_date, + "pageNum": page_num, + }, + }, + ) def build_send_params(self): return { - "extra_headers": {'X-Sumo-Name': "events"}, - "endpoint_key": "HTTP_LOGS_ENDPOINT" + "extra_headers": {"X-Sumo-Name": "events"}, + "endpoint_key": "HTTP_LOGS_ENDPOINT", } def check_move_fetch_window(self, kwargs): # https://www.mongodb.com/docs/atlas/reference/api/events-projects-get-all/ # no information given so assuming data gets retrieved in 5 min similar to database logs - data_availablity_max_endDate = get_current_timestamp() - 5*60 - api_endDate = convert_utc_date_to_epoch(kwargs['params']["maxDate"], date_format=self.isoformat) + data_availablity_max_endDate = get_current_timestamp() - 5 * 60 + api_endDate = convert_utc_date_to_epoch( + kwargs["params"]["maxDate"], date_format=self.isoformat + ) if api_endDate < data_availablity_max_endDate: return True, {"last_time_epoch": api_endDate, "page_num": 0} else: return False, {} def transform_data(self, data): - # assuming file content is small so inmemory possible # https://stackoverflow.com/questions/11914472/stringio-in-python3 # https://stackoverflow.com/questions/8858414/using-python-how-do-you-untar-purely-in-memory last_time_epoch = self.DEFAULT_START_TIME_EPOCH event_logs = [] - for obj in data['results']: - current_timestamp = convert_date_to_epoch(obj['created']) + for obj in data["results"]: + current_timestamp = convert_date_to_epoch(obj["created"]) last_time_epoch = max(current_timestamp, last_time_epoch) event_logs.append(obj) @@ -570,7 +770,7 @@ def __init__(self, kvstore, config): super(OrgEventsAPI, self).__init__(kvstore, config) def get_key(self): - key = f'''{self.api_config['ORGANIZATION_ID']}-orgevents''' + key = f"""{self.api_config['ORGANIZATION_ID']}-orgevents""" return key def save_state(self, state): @@ -580,7 +780,9 @@ def save_state(self, state): def get_state(self): key = self.get_key() if not self.kvstore.has_key(key): - self.save_state({"last_time_epoch": self.DEFAULT_START_TIME_EPOCH, "page_num": 0}) + self.save_state( + {"last_time_epoch": self.DEFAULT_START_TIME_EPOCH, "page_num": 0} + ) obj = self.kvstore.get(key) return obj @@ -588,46 +790,59 @@ def get_state(self): def build_fetch_params(self): state = self.get_state() if state["page_num"] == 0: - start_time_epoch, end_time_epoch = self.get_window(state['last_time_epoch']) + start_time_epoch, end_time_epoch = self.get_window(state["last_time_epoch"]) page_num = 1 else: - start_time_epoch = state['start_time_epoch'] - end_time_epoch = state['end_time_epoch'] - page_num = state['page_num'] - - start_time_date = convert_epoch_to_utc_date(start_time_epoch, date_format=self.isoformat) - end_time_date = convert_epoch_to_utc_date(end_time_epoch, date_format=self.isoformat) - return f'''{self.api_config['BASE_URL']}/orgs/{self.api_config['ORGANIZATION_ID']}/events''', { - "auth": self.digestauth, - "params": {"itemsPerPage": self.api_config['PAGINATION_LIMIT'], "minDate": start_time_date , "maxDate": end_time_date, "pageNum": page_num} - } + start_time_epoch = state["start_time_epoch"] + end_time_epoch = state["end_time_epoch"] + page_num = state["page_num"] + + start_time_date = convert_epoch_to_utc_date( + start_time_epoch, date_format=self.isoformat + ) + end_time_date = convert_epoch_to_utc_date( + end_time_epoch, date_format=self.isoformat + ) + return ( + f"""{self.api_config['BASE_URL']}/orgs/{self.api_config['ORGANIZATION_ID']}/events""", + { + "auth": self.digestauth, + "params": { + "itemsPerPage": self.api_config["PAGINATION_LIMIT"], + "minDate": start_time_date, + "maxDate": end_time_date, + "pageNum": page_num, + }, + }, + ) def build_send_params(self): return { - "extra_headers": {'X-Sumo-Name': "orgevents"}, - "endpoint_key": "HTTP_LOGS_ENDPOINT" + "extra_headers": {"X-Sumo-Name": "orgevents"}, + "endpoint_key": "HTTP_LOGS_ENDPOINT", } def check_move_fetch_window(self, kwargs): # https://www.mongodb.com/docs/atlas/reference/api/events-projects-get-all/ # no information given so assuming data gets retrieved in 5 min similar to database logs - data_availablity_max_endDate = get_current_timestamp() - 5*60 - api_endDate = convert_utc_date_to_epoch(kwargs['params']["maxDate"], date_format=self.isoformat) + data_availablity_max_endDate = get_current_timestamp() - 5 * 60 + api_endDate = convert_utc_date_to_epoch( + kwargs["params"]["maxDate"], date_format=self.isoformat + ) if api_endDate < data_availablity_max_endDate: return True, {"last_time_epoch": api_endDate, "page_num": 0} else: return False, {} def transform_data(self, data): - # assuming file content is small so inmemory possible # https://stackoverflow.com/questions/11914472/stringio-in-python3 # https://stackoverflow.com/questions/8858414/using-python-how-do-you-untar-purely-in-memory last_time_epoch = self.DEFAULT_START_TIME_EPOCH event_logs = [] - for obj in data['results']: - current_timestamp = convert_date_to_epoch(obj['created']) + for obj in data["results"]: + current_timestamp = convert_date_to_epoch(obj["created"]) last_time_epoch = max(current_timestamp, last_time_epoch) event_logs.append(obj) @@ -643,7 +858,7 @@ def __init__(self, kvstore, config): super(AlertsAPI, self).__init__(kvstore, config) def get_key(self): - key = f'''{self.api_config['PROJECT_ID']}-alerts''' + key = f"""{self.api_config['PROJECT_ID']}-alerts""" return key def save_state(self, state): @@ -663,84 +878,129 @@ def build_fetch_params(self): if state["page_num"] == 0: page_num = 1 else: - page_num = state['page_num'] + page_num = state["page_num"] - return f'''{self.api_config['BASE_URL']}/groups/{self.api_config['PROJECT_ID']}/alerts''', { - "auth": self.digestauth, - "params": {"itemsPerPage": self.api_config['PAGINATION_LIMIT'], "pageNum": page_num} - } + return ( + f"""{self.api_config['BASE_URL']}/groups/{self.api_config['PROJECT_ID']}/alerts""", + { + "auth": self.digestauth, + "params": { + "itemsPerPage": self.api_config["PAGINATION_LIMIT"], + "pageNum": page_num, + }, + }, + ) def build_send_params(self): return { - "extra_headers": {'X-Sumo-Name': "alerts"}, - "endpoint_key": "HTTP_LOGS_ENDPOINT" + "extra_headers": {"X-Sumo-Name": "alerts"}, + "endpoint_key": "HTTP_LOGS_ENDPOINT", } def transform_data(self, data): - # assuming file content is small so inmemory possible # https://stackoverflow.com/questions/11914472/stringio-in-python3 # https://stackoverflow.com/questions/8858414/using-python-how-do-you-untar-purely-in-memory event_logs = [] - for obj in data['results']: + for obj in data["results"]: event_logs.append(obj) - return event_logs, {"last_page_offset": len(data["results"]) % self.api_config['PAGINATION_LIMIT']} + return event_logs, { + "last_page_offset": len(data["results"]) + % self.api_config["PAGINATION_LIMIT"] + } def fetch(self): current_state = self.get_state() - output_handler = OutputHandlerFactory.get_handler(self.collection_config['OUTPUT_HANDLER'], path=self.pathname, config=self.config) + output_handler = OutputHandlerFactory.get_handler( + self.collection_config["OUTPUT_HANDLER"], + path=self.pathname, + config=self.config, + ) url, kwargs = self.build_fetch_params() next_request = True sess = ClientMixin.get_new_session() log_type = self.get_key() count = 0 - self.log.info(f'''Fetching LogType: {log_type} pageNum: {kwargs["params"]["pageNum"]}''') + self.log.info( + f"""Fetching LogType: {log_type} pageNum: {kwargs["params"]["pageNum"]}""" + ) try: while next_request: send_success = has_next_page = False - status, data = ClientMixin.make_request(url, method="get", session=sess, logger=self.log, TIMEOUT=self.collection_config['TIMEOUT'], MAX_RETRY=self.collection_config['MAX_RETRY'], BACKOFF_FACTOR=self.collection_config['BACKOFF_FACTOR'], **kwargs) + status, data = ClientMixin.make_request( + url, + method="get", + session=sess, + logger=self.log, + TIMEOUT=self.collection_config["TIMEOUT"], + MAX_RETRY=self.collection_config["MAX_RETRY"], + BACKOFF_FACTOR=self.collection_config["BACKOFF_FACTOR"], + **kwargs, + ) fetch_success = status and "results" in data if fetch_success: - has_next_page = len(data['results']) > 0 + has_next_page = len(data["results"]) > 0 if has_next_page: payload, updated_state = self.transform_data(data) - send_success = output_handler.send(payload, **self.build_send_params()) + send_success = output_handler.send( + payload, **self.build_send_params() + ) if send_success: count += 1 - self.log.debug(f'''Successfully sent LogType: {log_type} Project: {self.api_config['PROJECT_ID']} Alerts Page: {kwargs['params']['pageNum']} Datalen: {len(payload)} ''') + self.log.debug( + f"""Successfully sent LogType: {log_type} Project: {self.api_config['PROJECT_ID']} Alerts Page: {kwargs['params']['pageNum']} Datalen: {len(payload)} """ + ) current_state.update(updated_state) - if current_state['last_page_offset'] == 0: + if current_state["last_page_offset"] == 0: # do not increase if num alerts < page limit - kwargs['params']['pageNum'] += 1 + kwargs["params"]["pageNum"] += 1 else: has_next_page = False # time not available save current state new page num else continue if (not self.is_time_remaining()) or (not has_next_page): - self.save_state({ - "page_num": kwargs['params']["pageNum"], - "last_page_offset": current_state['last_page_offset'] - }) + self.save_state( + { + "page_num": kwargs["params"]["pageNum"], + "last_page_offset": current_state[ + "last_page_offset" + ], + } + ) else: # show err unable to send save current state - self.log.error(f'''Unable to send Project: {self.api_config['PROJECT_ID']} Alerts Page: {kwargs['params']['pageNum']} ''') - self.save_state({ - "page_num": kwargs['params']["pageNum"], - "last_page_offset": current_state['last_page_offset'] - }) + self.log.error( + f"""Unable to send Project: {self.api_config['PROJECT_ID']} Alerts Page: {kwargs['params']['pageNum']} """ + ) + self.save_state( + { + "page_num": kwargs["params"]["pageNum"], + "last_page_offset": current_state[ + "last_page_offset" + ], + } + ) else: - self.log.debug(f'''Moving starttime window Project: {self.api_config['PROJECT_ID']} Alerts Page: {kwargs['params']['pageNum']} ''') + self.log.debug( + f"""Moving starttime window Project: {self.api_config['PROJECT_ID']} Alerts Page: {kwargs['params']['pageNum']} """ + ) # here send success is false # genuine no result window no change # page_num has finished increase window calc last_time_epoch and add 1 - self.save_state({ - "page_num": kwargs['params']["pageNum"], - "last_page_offset": current_state['last_page_offset'] - }) + self.save_state( + { + "page_num": kwargs["params"]["pageNum"], + "last_page_offset": current_state["last_page_offset"], + } + ) else: - self.log.error(f'''Unable to fetch Project: {self.api_config['PROJECT_ID']} Alerts Page: {kwargs['params']['pageNum']} Reason: {data} ''') - next_request = fetch_success and send_success and has_next_page and self.is_time_remaining() + self.log.error( + f"""Unable to fetch Project: {self.api_config['PROJECT_ID']} Alerts Page: {kwargs['params']['pageNum']} Reason: {data} """ + ) + next_request = (fetch_success and send_success and has_next_page and self.is_time_remaining()) finally: sess.close() - self.log.info(f'''Completed LogType: {log_type} Count: {count} Page: {kwargs['params']['pageNum']}''') \ No newline at end of file + self.log.info( + f"""Completed LogType: {log_type} Count: {count} Page: {kwargs['params']['pageNum']}""" + ) diff --git a/sumomongodbatlascollector/main.py b/sumomongodbatlascollector/main.py index 0361bdd..e29a698 100644 --- a/sumomongodbatlascollector/main.py +++ b/sumomongodbatlascollector/main.py @@ -5,27 +5,43 @@ from concurrent import futures from random import shuffle from requests.auth import HTTPDigestAuth +from time_and_memory_tracker import TimeAndMemoryTracker from sumoappclient.sumoclient.base import BaseCollector from sumoappclient.sumoclient.httputils import ClientMixin from sumoappclient.common.utils import get_current_timestamp -from api import ProcessMetricsAPI, ProjectEventsAPI, OrgEventsAPI, DiskMetricsAPI, LogAPI, AlertsAPI, DatabaseMetricsAPI +from api import ( + ProcessMetricsAPI, + ProjectEventsAPI, + OrgEventsAPI, + DiskMetricsAPI, + LogAPI, + AlertsAPI, + DatabaseMetricsAPI, +) class MongoDBAtlasCollector(BaseCollector): - ''' - Design Doc: https://docs.google.com/document/d/15TgilyyuGTMjRIZUXVJa1UhpTu3wS-gMl-dDsXAV2gw/edit?usp=sharing - ''' - SINGLE_PROCESS_LOCK_KEY = 'is_mongodbatlascollector_running' + """ + Design Doc: https://docs.google.com/document/d/15TgilyyuGTMjRIZUXVJa1UhpTu3wS-gMl-dDsXAV2gw/edit?usp=sharing + """ + + SINGLE_PROCESS_LOCK_KEY = "is_mongodbatlascollector_running" CONFIG_FILENAME = "mongodbatlas.yaml" - DATA_REFRESH_TIME = 60*60*1000 + DATA_REFRESH_TIME = 60 * 60 * 1000 def __init__(self): self.project_dir = self.get_current_dir() super(MongoDBAtlasCollector, self).__init__(self.project_dir) - self.api_config = self.config['MongoDBAtlas'] - self.digestauth = HTTPDigestAuth(username=self.api_config['PUBLIC_API_KEY'], password=self.api_config['PRIVATE_API_KEY']) - self.mongosess = ClientMixin.get_new_session(MAX_RETRY=self.collection_config['MAX_RETRY'], BACKOFF_FACTOR=self.collection_config['BACKOFF_FACTOR']) + self.api_config = self.config["MongoDBAtlas"] + self.digestauth = HTTPDigestAuth( + username=self.api_config["PUBLIC_API_KEY"], + password=self.api_config["PRIVATE_API_KEY"], + ) + self.mongosess = ClientMixin.get_new_session( + MAX_RETRY=self.collection_config["MAX_RETRY"], + BACKOFF_FACTOR=self.collection_config["BACKOFF_FACTOR"], + ) def get_current_dir(self): cur_dir = os.path.dirname(__file__) @@ -37,10 +53,19 @@ def getpaginateddata(self, url, **kwargs): while True: page_num += 1 - status, data = ClientMixin.make_request(url, method="get", session=self.mongosess, logger=self.log, TIMEOUT=self.collection_config['TIMEOUT'], MAX_RETRY=self.collection_config['MAX_RETRY'], BACKOFF_FACTOR=self.collection_config['BACKOFF_FACTOR'], **kwargs) - if status and "results" in data and len(data['results']) > 0: + status, data = ClientMixin.make_request( + url, + method="get", + session=self.mongosess, + logger=self.log, + TIMEOUT=self.collection_config["TIMEOUT"], + MAX_RETRY=self.collection_config["MAX_RETRY"], + BACKOFF_FACTOR=self.collection_config["BACKOFF_FACTOR"], + **kwargs, + ) + if status and "results" in data and len(data["results"]) > 0: all_data.append(data) - kwargs['params']['pageNum'] = page_num + 1 + kwargs["params"]["pageNum"] = page_num + 1 else: break @@ -50,22 +75,51 @@ def _get_all_databases(self, process_ids): database_names = [] for process_id in process_ids: url = f"{self.api_config['BASE_URL']}/groups/{self.api_config['PROJECT_ID']}/processes/{process_id}/databases" - kwargs = {'auth': self.digestauth, "params": {"itemsPerPage": self.api_config['PAGINATION_LIMIT']}} + kwargs = { + "auth": self.digestauth, + "params": {"itemsPerPage": self.api_config["PAGINATION_LIMIT"]}, + } all_data = self.getpaginateddata(url, **kwargs) - database_names.extend([obj['databaseName'] for data in all_data for obj in data['results']]) + database_names.extend( + [obj["databaseName"] for data in all_data for obj in data["results"]] + ) return list(set(database_names)) def _get_cluster_name(self, fullname): return fullname.split("-shard")[0] + def _get_user_provided_cluster_name(self): + if self.collection_config and self.collection_config.get("Clusters"): + return self.collection_config.get("Clusters") + return [] + def _get_all_processes_from_project(self): url = f"{self.api_config['BASE_URL']}/groups/{self.api_config['PROJECT_ID']}/processes" - kwargs = {'auth': self.digestauth, "params": {"itemsPerPage": self.api_config['PAGINATION_LIMIT']}} + kwargs = { + "auth": self.digestauth, + "params": {"itemsPerPage": self.api_config["PAGINATION_LIMIT"]}, + } all_data = self.getpaginateddata(url, **kwargs) - process_ids = [obj['id'] for data in all_data for obj in data['results']] - hostnames = [obj['hostname'] for data in all_data for obj in data['results']] + process_ids = [obj["id"] for data in all_data for obj in data["results"]] + hostnames = [obj["hostname"] for data in all_data for obj in data["results"]] # 'port': 27017, 'replicaSetName': 'M10AWSTestCluster-config-0', 'typeName': 'SHARD_CONFIG_PRIMARY' - cluster_mapping = {self._get_cluster_name(obj['hostname']): self._get_cluster_name(obj['userAlias']) for data in all_data for obj in data['results']} + user_provided_clusters = self._get_user_provided_cluster_name() + cluster_mapping = {} + if len(user_provided_clusters) > 0: + for obj in all_data: + for obj in obj["results"]: + if obj["hostname"] in user_provided_clusters: + cluster_mapping[self._get_cluster_name(obj["hostname"])] = ( + self._get_cluster_name(obj["userAlias"]) + ) + else: + cluster_mapping = { + self._get_cluster_name(obj["hostname"]): self._get_cluster_name( + obj["userAlias"] + ) + for data in all_data + for obj in data["results"] + } hostnames = list(set(hostnames)) return process_ids, hostnames, cluster_mapping @@ -73,63 +127,99 @@ def _get_all_disks_from_host(self, process_ids): disks = [] for process_id in process_ids: url = f"{self.api_config['BASE_URL']}/groups/{self.api_config['PROJECT_ID']}/processes/{process_id}/disks" - kwargs = {'auth': self.digestauth, "params": {"itemsPerPage": self.api_config['PAGINATION_LIMIT']}} + kwargs = { + "auth": self.digestauth, + "params": {"itemsPerPage": self.api_config["PAGINATION_LIMIT"]}, + } all_data = self.getpaginateddata(url, **kwargs) - disks.extend([obj['partitionName'] for data in all_data for obj in data['results']]) + disks.extend( + [obj["partitionName"] for data in all_data for obj in data["results"]] + ) return list(set(disks)) def _set_database_names(self, process_ids): database_names = self._get_all_databases(process_ids) - self.kvstore.set("database_names", {"last_set_date": get_current_timestamp(milliseconds=True), "values": database_names}) + self.kvstore.set( + "database_names", + { + "last_set_date": get_current_timestamp(milliseconds=True), + "values": database_names, + }, + ) def _set_processes(self): process_ids, hostnames, cluster_mapping = self._get_all_processes_from_project() - self.kvstore.set("processes", {"last_set_date": get_current_timestamp(milliseconds=True), "process_ids": process_ids, "hostnames": hostnames}) - self.kvstore.set("cluster_mapping", {"last_set_date": get_current_timestamp(milliseconds=True), "values": cluster_mapping}) + self.kvstore.set( + "processes", + { + "last_set_date": get_current_timestamp(milliseconds=True), + "process_ids": process_ids, + "hostnames": hostnames, + }, + ) + self.kvstore.set( + "cluster_mapping", + { + "last_set_date": get_current_timestamp(milliseconds=True), + "values": cluster_mapping, + }, + ) def _set_disk_names(self, process_ids): disks = self._get_all_disks_from_host(process_ids) - self.kvstore.set("disk_names", {"last_set_date": get_current_timestamp(milliseconds=True), "values": disks}) + self.kvstore.set( + "disk_names", + { + "last_set_date": get_current_timestamp(milliseconds=True), + "values": disks, + }, + ) def _get_database_names(self): - if not self.kvstore.has_key('database_names'): + if not self.kvstore.has_key("database_names"): process_ids, _ = self._get_process_names() self._set_database_names(process_ids) current_timestamp = get_current_timestamp(milliseconds=True) - databases = self.kvstore.get('database_names') - if current_timestamp - databases['last_set_date'] > self.DATA_REFRESH_TIME or (len(databases['values']) == 0): + databases = self.kvstore.get("database_names") + if current_timestamp - databases["last_set_date"] > self.DATA_REFRESH_TIME or ( + len(databases["values"]) == 0 + ): process_ids, _ = self._get_process_names() self._set_database_names(process_ids) - database_names = self.kvstore.get('database_names')['values'] + database_names = self.kvstore.get("database_names")["values"] return database_names def _get_disk_names(self): - if not self.kvstore.has_key('disk_names'): + if not self.kvstore.has_key("disk_names"): process_ids, _ = self._get_process_names() self._set_disk_names(process_ids) current_timestamp = get_current_timestamp(milliseconds=True) - disks = self.kvstore.get('disk_names') - if current_timestamp - disks['last_set_date'] > self.DATA_REFRESH_TIME or (len(disks['values']) == 0): + disks = self.kvstore.get("disk_names") + if current_timestamp - disks["last_set_date"] > self.DATA_REFRESH_TIME or ( + len(disks["values"]) == 0 + ): process_ids, _ = self._get_process_names() self._set_disk_names(process_ids) - disk_names = self.kvstore.get('disk_names')["values"] + disk_names = self.kvstore.get("disk_names")["values"] return disk_names def _get_process_names(self): - if not self.kvstore.has_key('processes'): + if not self.kvstore.has_key("processes"): self._set_processes() current_timestamp = get_current_timestamp(milliseconds=True) - processes = self.kvstore.get('processes') - if current_timestamp - processes['last_set_date'] > self.DATA_REFRESH_TIME or (len(processes['process_ids']) == 0): + processes = self.kvstore.get("processes") + if current_timestamp - processes["last_set_date"] > self.DATA_REFRESH_TIME or ( + len(processes["process_ids"]) == 0 + ): self._set_processes() - processes = self.kvstore.get('processes') - process_ids, hostnames = processes['process_ids'], processes['hostnames'] + processes = self.kvstore.get("processes") + process_ids, hostnames = processes["process_ids"], processes["hostnames"] return process_ids, hostnames def is_running(self): @@ -141,7 +231,6 @@ def stop_running(self): return self.kvstore.release_lock(self.SINGLE_PROCESS_LOCK_KEY) def build_task_params(self): - audit_files = ["mongodb-audit-log.gz", "mongos-audit-log.gz"] dblog_files = ["mongodb.gz", "mongos.gz"] filenames = [] @@ -149,66 +238,99 @@ def build_task_params(self): process_ids, hostnames = self._get_process_names() cluster_mapping = self.kvstore.get("cluster_mapping", {}).get("values", {}) - if 'LOG_TYPES' in self.api_config: - if "DATABASE" in self.api_config['LOG_TYPES']: + if "LOG_TYPES" in self.api_config: + if "DATABASE" in self.api_config["LOG_TYPES"]: filenames.extend(dblog_files) - if "AUDIT" in self.api_config['LOG_TYPES']: + if "AUDIT" in self.api_config["LOG_TYPES"]: filenames.extend(audit_files) for filename in filenames: for hostname in hostnames: - tasks.append(LogAPI(self.kvstore, hostname, filename, self.config, cluster_mapping)) - - if "EVENTS_PROJECT" in self.api_config['LOG_TYPES']: + tasks.append( + LogAPI( + self.kvstore, + hostname, + filename, + self.config, + cluster_mapping, + ) + ) + + if "EVENTS_PROJECT" in self.api_config["LOG_TYPES"]: tasks.append(ProjectEventsAPI(self.kvstore, self.config)) - if "EVENTS_ORG" in self.api_config['LOG_TYPES']: + if "EVENTS_ORG" in self.api_config["LOG_TYPES"]: tasks.append(OrgEventsAPI(self.kvstore, self.config)) - if "ALERTS" in self.api_config['LOG_TYPES']: + if "ALERTS" in self.api_config["LOG_TYPES"]: tasks.append(AlertsAPI(self.kvstore, self.config)) - if 'METRIC_TYPES' in self.api_config: - if self.api_config['METRIC_TYPES'].get("PROCESS_METRICS", []): + if "METRIC_TYPES" in self.api_config: + if self.api_config["METRIC_TYPES"].get("PROCESS_METRICS", []): for process_id in process_ids: - tasks.append(ProcessMetricsAPI(self.kvstore, process_id, self.config, cluster_mapping)) + tasks.append( + ProcessMetricsAPI( + self.kvstore, process_id, self.config, cluster_mapping + ) + ) - if self.api_config['METRIC_TYPES'].get("DISK_METRICS", []): + if self.api_config["METRIC_TYPES"].get("DISK_METRICS", []): disk_names = self._get_disk_names() for process_id in process_ids: for disk_name in disk_names: - tasks.append(DiskMetricsAPI(self.kvstore, process_id, disk_name, self.config, cluster_mapping)) - - if self.api_config['METRIC_TYPES'].get("DATABASE_METRICS", []): + tasks.append( + DiskMetricsAPI( + self.kvstore, + process_id, + disk_name, + self.config, + cluster_mapping, + ) + ) + + if self.api_config["METRIC_TYPES"].get("DATABASE_METRICS", []): database_names = self._get_database_names() for process_id in process_ids: for database_name in database_names: - tasks.append(DatabaseMetricsAPI(self.kvstore, process_id, database_name, self.config, cluster_mapping)) - + tasks.append( + DatabaseMetricsAPI( + self.kvstore, + process_id, + database_name, + self.config, + cluster_mapping, + ) + ) self.log.info("%d Tasks Generated" % len(tasks)) return tasks def run(self): if self.is_running(): try: - self.log.info('Starting MongoDB Atlas Forwarder...') - task_params = self.build_task_params() - shuffle(task_params) - all_futures = {} - self.log.debug("spawning %d workers" % self.config['Collection']['NUM_WORKERS']) - with futures.ThreadPoolExecutor(max_workers=self.config['Collection']['NUM_WORKERS']) as executor: - results = {executor.submit(apiobj.fetch): apiobj for apiobj in task_params} - all_futures.update(results) - for future in futures.as_completed(all_futures): - param = all_futures[future] - api_type = str(param) - try: - future.result() - obj = self.kvstore.get(api_type) - except Exception as exc: - self.log.error(f"API Type: {api_type} thread generated an exception: {exc}", exc_info=True) - else: - self.log.info(f"API Type: {api_type} thread completed {obj}") + self.log.info("Starting MongoDB Atlas Forwarder...") + with TimeAndMemoryTracker(activate=True) as tracker: + start_message = tracker.start("self.build_task_params") + task_params = self.build_task_params() + end_message = tracker.end("self.build_task_params") + self.log.info(f'''Building Task Params end_message: {end_message}''') + shuffle(task_params) + all_futures = {} + self.log.debug("spawning %d workers" % self.config["Collection"]["NUM_WORKERS"]) + with futures.ThreadPoolExecutor( + max_workers=self.config["Collection"]["NUM_WORKERS"] + ) as executor: + results = {executor.submit(apiobj.fetch): apiobj for apiobj in task_params} + all_futures.update(results) + for future in futures.as_completed(all_futures): + param = all_futures[future] + api_type = str(param) + try: + future.result() + obj = self.kvstore.get(api_type) + except Exception as exc: + self.log.error(f"API Type: {api_type} thread generated an exception: {exc}", exc_info=True,) + else: + self.log.info(f"API Type: {api_type} thread completed {obj}") finally: self.stop_running() self.mongosess.close() @@ -216,21 +338,23 @@ def run(self): if not self.is_process_running(["sumomongodbatlascollector"]): self.kvstore.release_lock_on_expired_key(self.SINGLE_PROCESS_LOCK_KEY, expiry_min=10) + # def execute_api_with_logging(self, apiobj): + # api_type = str(apiobj.__class__.__name__) + # result = apiobj.fetch() + # return result + def test(self): if self.is_running(): task_params = self.build_task_params() shuffle(task_params) - # print(task_params) try: for apiobj in task_params: apiobj.fetch() - # print(apiobj.__class__.__name__) finally: self.stop_running() def main(*args, **kwargs): - try: ns = MongoDBAtlasCollector() ns.run() @@ -239,5 +363,5 @@ def main(*args, **kwargs): traceback.print_exc() -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/sumomongodbatlascollector/mock_functions/process_metric_mock.py b/sumomongodbatlascollector/mock_functions/process_metric_mock.py new file mode 100644 index 0000000..7152fed --- /dev/null +++ b/sumomongodbatlascollector/mock_functions/process_metric_mock.py @@ -0,0 +1,57 @@ +import json +from datetime import datetime, timedelta +import random + + +def process_metric_mock(): + current_time = datetime.utcnow() + end_time = current_time.replace(second=0, microsecond=0) + start_time = end_time - timedelta(minutes=5) + + def generate_datapoints(start_value, end_value): + datapoints = [] + for i in range(6): + timestamp = start_time + timedelta(minutes=i) + value = start_value + (end_value - start_value) * i / 5 + value += random.uniform(-value * 0.1, value * 0.1) + datapoints.append( + { + "timestamp": timestamp.strftime("%Y-%m-%dT%H:%M:%SZ"), + "value": round(value, 2), + } + ) + return datapoints + + mock_response = { + "databaseName": "myDatabase", + "end": end_time.strftime("%Y-%m-%dT%H:%M:%SZ"), + "granularity": "PT1M", + "groupId": "32b6e34b3d91647abb20e7b8", + "hostId": "cluster0-shard-00-00.mongodb.net:27017", + "links": [ + { + "href": "https://cloud.mongodb.com/api/atlas/v1.0/groups/32b6e34b3d91647abb20e7b8/processes/cluster0-shard-00-00.mongodb.net:27017/measurements", + "rel": "self", + } + ], + "measurements": [ + { + "dataPoints": generate_datapoints(1024, 3072), + "name": "CACHE_BYTES_READ_INTO", + "units": "BYTES", + }, + { + "dataPoints": generate_datapoints(50, 65), + "name": "CONNECTIONS", + "units": "SCALAR", + }, + ], + "partitionName": "P0", + "processId": "cluster0-shard-00-00.mongodb.net:27017", + "start": start_time.strftime("%Y-%m-%dT%H:%M:%SZ"), + } + + return json.dumps(mock_response, indent=2) + + +# print(get_mock_mongodb_metrics()) diff --git a/sumomongodbatlascollector/mock_mongo_server.py b/sumomongodbatlascollector/mock_mongo_server.py new file mode 100644 index 0000000..4e50369 --- /dev/null +++ b/sumomongodbatlascollector/mock_mongo_server.py @@ -0,0 +1,65 @@ +from flask import Flask, jsonify, request +import random +from datetime import datetime + +app = Flask(__name__) + + +def generate_dummy_data(): + return { + "cpu_usage": random.uniform(0, 100), + "memory_usage": random.uniform(0, 100), + "disk_usage": random.uniform(0, 100), + "connections": random.randint(0, 1000), + "operations": random.randint(0, 10000), + "timestamp": datetime.utcnow().isoformat(), + } + + +@app.route("/api/atlas/v1.0/groups//processes", methods=["GET"]) +def get_processes(group_id): + processes = [ + { + "id": f"process-{i}", + "hostname": f"host-{i}.example.com", + "userAlias": f"cluster-{i}", + "port": 27017, + "typeName": "REPLICA_PRIMARY", + } + for i in range(1, 4) + ] + return jsonify({"results": processes}) + + +@app.route( + "/api/atlas/v1.0/groups//processes//databases", + methods=["GET"], +) +def get_databases(group_id, process_id): + databases = [{"databaseName": f"db-{i}"} for i in range(1, 4)] + return jsonify({"results": databases}) + + +@app.route( + "/api/atlas/v1.0/groups//processes//disks", methods=["GET"] +) +def get_disks(group_id, process_id): + disks = [{"partitionName": f"disk-{i}"} for i in range(1, 3)] + return jsonify({"results": disks}) + + +@app.route( + "/api/atlas/v1.0/groups//processes//measurements", + methods=["GET"], +) +def get_measurements(group_id, process_id): + start = request.args.get("start", type=int) + end = request.args.get("end", type=int) + granularity = request.args.get("granularity", type=str) + + measurements = [generate_dummy_data() for _ in range(10)] + return jsonify({"measurements": measurements}) + + +if __name__ == "__main__": + app.run(port=8247) diff --git a/sumomongodbatlascollector/mongodbatlas.yaml b/sumomongodbatlascollector/mongodbatlas.yaml index 53f476d..be0ef82 100644 --- a/sumomongodbatlascollector/mongodbatlas.yaml +++ b/sumomongodbatlascollector/mongodbatlas.yaml @@ -4,8 +4,8 @@ MongoDBAtlas: PAGINATION_LIMIT: 500 ORGANIZATION_ID: null PROJECT_ID: null - PRIVATE_API_KEY: null PUBLIC_API_KEY: null + PRIVATE_API_KEY: null LOG_TYPES: - DATABASE - AUDIT @@ -72,13 +72,17 @@ Collection: OUTPUT_HANDLER: HTTP MAX_RETRY: 5 BACKOFF_FACTOR: 1 - TIMEOUT: 60 + TIMEOUT: 90 # Increase to 2 minutes if the sumo server response time is slow COMPRESSED: true - MAX_PAYLOAD_BYTESIZE: 500000 + MAX_PAYLOAD_BYTESIZE: 4190208 #4MB END_TIME_EPOCH_OFFSET_SECONDS: 120 - BACKFILL_DAYS: 0 + BACKFILL_DAYS: 0 # Determines the start_time of the collection DBNAME: "mongodbatlas" DB_DIR: ~/sumo + MIN_REQUEST_WINDOW_LENGTH: 60 # IN SECONDS + MAX_REQUEST_WINDOW_LENGTH: 900 # IN SECONDS + ACTIVATE_TIME_AND_MEMORY_TRACKING: False + # Clusters: ["cluster1-shard-00-00.abc123.mongodb.net"] DeployMetaData: PACKAGENAME: "sumologic-mongodb-atlas" diff --git a/sumomongodbatlascollector/test_mock.py b/sumomongodbatlascollector/test_mock.py new file mode 100644 index 0000000..ba83e50 --- /dev/null +++ b/sumomongodbatlascollector/test_mock.py @@ -0,0 +1,789 @@ +import gzip +import json +import pytest +import tracemalloc +import time +from unittest.mock import Mock, patch + +from api import ( + # AlertsAPI, + DatabaseMetricsAPI, + DiskMetricsAPI, + FetchMixin, + LogAPI, + # OrgEventsAPI, + PaginatedFetchMixin, + ProcessMetricsAPI, + ProjectEventsAPI, +) +from main import MongoDBAtlasCollector + + +@patch("sumoappclient.sumoclient.base.BaseAPI") +def test_mongodb_api(mock_base_api): + mock_config = { + "MongoDBAtlas": { + "PUBLIC_API_KEY": "test_public_key", + "PRIVATE_API_KEY": "test_private_key", + } + } + + api = MongoDBAtlasCollector(Mock(), mock_config) + + start_time, end_time = api.get_window(1000000) + assert isinstance(start_time, float) + assert isinstance(end_time, float) + + assert api._get_cluster_name("test-cluster-shard-0") == "test-cluster" + assert ( + api._replace_cluster_name("test-cluster-shard-0", {"test-cluster": "alias"}) == "alias-shard-0" + ) + + +@patch("sumoappclient.sumoclient.factory.OutputHandlerFactory") +@patch("sumoappclient.sumoclient.httputils.ClientMixin") +def test_fetch_mixin(mock_client_mixin, mock_output_handler_factory): + mock_output_handler = Mock() + mock_output_handler_factory.get_handler.return_value = mock_output_handler + + mock_client_mixin.make_request.return_value = (True, [{"data": "test"}]) + + fetch_mixin = FetchMixin(Mock(), {"MongoDBAtlas": {}}) + + fetch_mixin.build_fetch_params = Mock(return_value=("http://test.com", {})) + fetch_mixin.transform_data = Mock( + return_value=([{"transformed": "data"}], {"state": "updated"}) + ) + fetch_mixin.build_send_params = Mock(return_value={}) + fetch_mixin.save_state = Mock() + + fetch_mixin.fetch() + + fetch_mixin.build_fetch_params.assert_called_once() + mock_client_mixin.make_request.assert_called_once() + fetch_mixin.transform_data.assert_called_once() + mock_output_handler.send.assert_called_once() + fetch_mixin.save_state.assert_called_once() + + +@patch("sumoappclient.sumoclient.factory.OutputHandlerFactory") +@patch("sumoappclient.sumoclient.httputils.ClientMixin") +def test_paginated_fetch_mixin(mock_client_mixin, mock_output_handler_factory): + mock_output_handler = Mock() + mock_output_handler_factory.get_handler.return_value = mock_output_handler + + mock_session = Mock() + mock_client_mixin.get_new_session.return_value = mock_session + mock_client_mixin.make_request.side_effect = [ + (True, {"results": [{"data": "page1"}]}), + (True, {"results": [{"data": "page2"}]}), + (True, {"results": []}), # No more results + ] + + paginated_fetch_mixin = PaginatedFetchMixin(Mock(), {"MongoDBAtlas": {}}) + + paginated_fetch_mixin.get_state = Mock(return_value={"last_time_epoch": 1000000}) + paginated_fetch_mixin.build_fetch_params = Mock( + return_value=( + "http://test.com", + { + "params": { + "pageNum": 1, + "minDate": "2023-01-01", + "maxDate": "2023-01-02", + } + }, + ) + ) + paginated_fetch_mixin.transform_data = Mock( + return_value=([{"transformed": "data"}], {"state": "updated"}) + ) + paginated_fetch_mixin.build_send_params = Mock(return_value={}) + paginated_fetch_mixin.save_state = Mock() + paginated_fetch_mixin.is_time_remaining = Mock(side_effect=[True, True, False]) + + paginated_fetch_mixin.fetch() + + assert mock_client_mixin.make_request.call_count == 3 + assert paginated_fetch_mixin.transform_data.call_count == 2 + assert mock_output_handler.send.call_count == 2 + assert paginated_fetch_mixin.save_state.call_count == 1 + + +@patch("sumoappclient.sumoclient.base.BaseAPI") +@patch("sumoappclient.sumoclient.factory.OutputHandlerFactory") +@patch("sumoappclient.sumoclient.httputils.ClientMixin") +def test_log_api(mock_client_mixin, mock_output_handler_factory, mock_base_api): + mock_kvstore = Mock() + mock_config = { + "MongoDBAtlas": { + "PROJECT_ID": "test_project", + "BASE_URL": "http://localhost:8247", + } + } + mock_cluster_mapping = {"test-cluster": "alias"} + + log_api = LogAPI( + mock_kvstore, "test-hostname", "mongodb.log", mock_config, mock_cluster_mapping + ) + + assert log_api.get_key() == "test_project-test-hostname-mongodb.log" + + log_api.save_state(1000000) + mock_kvstore.set.assert_called_with( + "test_project-test-hostname-mongodb.log", {"last_time_epoch": 1000000} + ) + + mock_kvstore.has_key.return_value = False + log_api.get_state() + mock_kvstore.set.assert_called_with( + "test_project-test-hostname-mongodb.log", + {"last_time_epoch": log_api.DEFAULT_START_TIME_EPOCH}, + ) + + log_api.get_window = Mock(return_value=(1000000, 2000000)) + url, params = log_api.build_fetch_params() + assert ( + url == "http://localhost:8247/groups/test_project/clusters/test-hostname/logs/mongodb.log" + ) + assert params["params"] == {"startDate": 1000000, "endDate": 2000000} + + send_params = log_api.build_send_params() + assert send_params == { + "extra_headers": {"X-Sumo-Name": "mongodb.log"}, + "endpoint_key": "HTTP_LOGS_ENDPOINT", + } + + with patch( + "sumoappclient.sumoclient.mongodbapi.get_current_timestamp", + return_value=1600000, + ): + result, state = log_api.check_move_fetch_window( + {"params": {"endDate": 1500000}} + ) + assert result is True + assert state == {"last_time_epoch": 1500000} + + sample_log = {"t": {"$date": "2023-07-26T12:00:00.000Z"}, "msg": "Test log message"} + gzip_content = gzip.compress(json.dumps(sample_log).encode("utf-8")) + + transformed_data, state = log_api.transform_data(gzip_content) + + assert len(transformed_data) == 1 + assert transformed_data[0]["project_id"] == "test_project" + assert transformed_data[0]["hostname"] == "alias-hostname" + assert transformed_data[0]["cluster_name"] == "alias" + assert "created" in transformed_data[0] + assert "last_time_epoch" in state + + log_api.filename = "mongodb-audit-log.json" + sample_audit_log = { + "ts": {"$date": "2023-07-26T12:00:00.000Z"}, + "msg": "Test audit log message", + } + gzip_content = gzip.compress(json.dumps(sample_audit_log).encode("utf-8")) + + transformed_data, state = log_api.transform_data(gzip_content) + + assert len(transformed_data) == 1 + assert transformed_data[0]["project_id"] == "test_project" + assert transformed_data[0]["hostname"] == "alias-hostname" + assert transformed_data[0]["cluster_name"] == "alias" + assert "created" in transformed_data[0] + assert "last_time_epoch" in state + + +@patch("sumoappclient.sumoclient.base.BaseAPI") +@patch("sumoappclient.sumoclient.factory.OutputHandlerFactory") +@patch("sumoappclient.sumoclient.httputils.ClientMixin") +def test_process_metrics_api( + mock_client_mixin, mock_output_handler_factory, mock_base_api +): + mock_kvstore = Mock() + mock_config = { + "MongoDBAtlas": { + "PROJECT_ID": "test_project", + "BASE_URL": "http://localhost:8247", + "PAGINATION_LIMIT": 100, + "METRIC_TYPES": {"PROCESS_METRICS": ["metric1", "metric2"]}, + } + } + mock_cluster_mapping = {"test-cluster": "alias"} + + process_metrics_api = ProcessMetricsAPI( + mock_kvstore, "test-process-id", mock_config, mock_cluster_mapping + ) + + assert process_metrics_api.get_key() == "test_project-test-process-id" + + process_metrics_api.save_state(1000000) + mock_kvstore.set.assert_called_with( + "test_project-test-process-id", {"last_time_epoch": 1000000} + ) + + mock_kvstore.has_key.return_value = False + process_metrics_api.get_state() + mock_kvstore.set.assert_called_with( + "test_project-test-process-id", + {"last_time_epoch": process_metrics_api.DEFAULT_START_TIME_EPOCH}, + ) + + with patch( + "sumoappclient.sumoclient.mongodbapi.convert_epoch_to_utc_date", + side_effect=["2023-07-26T00:00:00Z", "2023-07-26T01:00:00Z"], + ): + process_metrics_api.get_window = Mock(return_value=(1000000, 2000000)) + url, params = process_metrics_api.build_fetch_params() + assert ( + url == "http://localhost:8247/groups/test_project/processes/test-process-id/measurements" + ) + assert params["params"]["start"] == "2023-07-26T00:00:00Z" + assert params["params"]["end"] == "2023-07-26T01:00:00Z" + assert params["params"]["m"] == ["metric1", "metric2"] + + send_params = process_metrics_api.build_send_params() + assert send_params == { + "extra_headers": {"Content-Type": "application/vnd.sumologic.carbon2"}, + "endpoint_key": "HTTP_METRICS_ENDPOINT", + "jsondump": False, + } + + with patch( + "sumoappclient.sumoclient.mongodbapi.get_current_timestamp", + return_value=1600000, + ): + with patch( + "sumoappclient.sumoclient.mongodbapi.convert_utc_date_to_epoch", + return_value=1500000, + ): + result, state = process_metrics_api.check_move_fetch_window( + {"params": {"end": "2023-07-26T01:00:00Z"}} + ) + assert result is True + assert state == {"last_time_epoch": 1500000} + + sample_data = { + "measurements": [ + { + "name": "metric1", + "units": "MB", + "dataPoints": [ + {"timestamp": "2023-07-26T00:00:00Z", "value": 100}, + {"timestamp": "2023-07-26T00:01:00Z", "value": None}, + ], + } + ], + "hostId": "test-cluster-host", + "processId": "test-cluster-process", + "groupId": "test_project", + } + + with patch( + "sumoappclient.sumoclient.mongodbapi.convert_utc_date_to_epoch", + return_value=1627257600, + ): + transformed_data, state = process_metrics_api.transform_data(sample_data) + + assert len(transformed_data) == 1 + assert "projectId=test_project" in transformed_data[0] + assert "hostId=alias-host" in transformed_data[0] + assert "processId=alias-process" in transformed_data[0] + assert "metric=metric1" in transformed_data[0] + assert "units=MB" in transformed_data[0] + assert "cluster_name=alias" in transformed_data[0] + assert "100 1627257600" in transformed_data[0] + assert state == {"last_time_epoch": 1627257600} + + +@patch("sumoappclient.sumoclient.base.BaseAPI") +@patch("sumoappclient.sumoclient.factory.OutputHandlerFactory") +@patch("sumoappclient.sumoclient.httputils.ClientMixin") +def test_disk_metrics_api( + mock_client_mixin, mock_output_handler_factory, mock_base_api +): + mock_kvstore = Mock() + mock_config = { + "MongoDBAtlas": { + "PROJECT_ID": "test_project", + "BASE_URL": "http://localhost:8247", + "PAGINATION_LIMIT": 100, + "METRIC_TYPES": {"DISK_METRICS": ["disk_metric1", "disk_metric2"]}, + } + } + mock_cluster_mapping = {"test-cluster": "alias"} + + disk_metrics_api = DiskMetricsAPI( + mock_kvstore, "test-process-id", "test-disk", mock_config, mock_cluster_mapping + ) + + assert disk_metrics_api.get_key() == "test_project-test-process-id-test-disk" + + disk_metrics_api.save_state(1000000) + mock_kvstore.set.assert_called_with( + "test_project-test-process-id-test-disk", {"last_time_epoch": 1000000} + ) + + mock_kvstore.has_key.return_value = False + disk_metrics_api.get_state() + mock_kvstore.set.assert_called_with( + "test_project-test-process-id-test-disk", + {"last_time_epoch": disk_metrics_api.DEFAULT_START_TIME_EPOCH}, + ) + + with patch( + "sumoappclient.sumoclient.mongodbapi.convert_epoch_to_utc_date", + side_effect=["2023-07-26T00:00:00.000Z", "2023-07-26T01:00:00.000Z"], + ): + disk_metrics_api.get_window = Mock(return_value=(1000000, 2000000)) + url, params = disk_metrics_api.build_fetch_params() + assert ( + url == "http://localhost:4287/groups/test_project/processes/test-process-id/disks/test-disk/measurements" + ) + assert params["params"]["start"] == "2023-07-26T00:00:00.000Z" + assert params["params"]["end"] == "2023-07-26T01:00:00.000Z" + assert params["params"]["m"] == ["disk_metric1", "disk_metric2"] + + send_params = disk_metrics_api.build_send_params() + assert send_params == { + "extra_headers": {"Content-Type": "application/vnd.sumologic.carbon2"}, + "endpoint_key": "HTTP_METRICS_ENDPOINT", + "jsondump": False, + } + + with patch( + "sumoappclient.sumoclient.mongodbapi.get_current_timestamp", + return_value=1600000, + ): + with patch( + "sumoappclient.sumoclient.mongodbapi.convert_utc_date_to_epoch", + return_value=1500000, + ): + result, state = disk_metrics_api.check_move_fetch_window( + {"params": {"end": "2023-07-26T01:00:00.000Z"}} + ) + assert result is True + assert state == {"last_time_epoch": 1500000} + + sample_data = { + "measurements": [ + { + "name": "disk_metric1", + "units": "GB", + "dataPoints": [ + {"timestamp": "2023-07-26T00:00:00Z", "value": 50}, + {"timestamp": "2023-07-26T00:01:00Z", "value": None}, + ], + } + ], + "hostId": "test-cluster-host", + "processId": "test-cluster-process", + "groupId": "test_project", + "partitionName": "test-partition", + } + + with patch( + "sumoappclient.sumoclient.mongodbapi.convert_utc_date_to_epoch", + return_value=1627257600, + ): + transformed_data, state = disk_metrics_api.transform_data(sample_data) + + assert len(transformed_data) == 1 + assert "projectId=test_project" in transformed_data[0] + assert "partitionName=test-partition" in transformed_data[0] + assert "hostId=alias-host" in transformed_data[0] + assert "processId=alias-process" in transformed_data[0] + assert "metric=disk_metric1" in transformed_data[0] + assert "units=GB" in transformed_data[0] + assert "cluster_name=alias" in transformed_data[0] + assert "50 1627257600" in transformed_data[0] + assert state == {"last_time_epoch": 1627257600} + + +@patch("sumoappclient.sumoclient.base.BaseAPI") +@patch("sumoappclient.sumoclient.factory.OutputHandlerFactory") +@patch("sumoappclient.sumoclient.httputils.ClientMixin") +def test_database_metrics_api( + mock_client_mixin, mock_output_handler_factory, mock_base_api +): + mock_kvstore = Mock() + mock_config = { + "MongoDBAtlas": { + "PROJECT_ID": "test_project", + "BASE_URL": "http://localhost:8247", + "PAGINATION_LIMIT": 100, + "METRIC_TYPES": {"DATABASE_METRICS": ["db_metric1", "db_metric2"]}, + } + } + mock_cluster_mapping = {"test-cluster": "alias"} + + db_metrics_api = DatabaseMetricsAPI( + mock_kvstore, + "test-process-id", + "test-database", + mock_config, + mock_cluster_mapping, + ) + + assert db_metrics_api.get_key() == "test_project-test-process-id-test-database" + + db_metrics_api.save_state(1000000) + mock_kvstore.set.assert_called_with( + "test_project-test-process-id-test-database", {"last_time_epoch": 1000000} + ) + + mock_kvstore.has_key.return_value = False + db_metrics_api.get_state() + mock_kvstore.set.assert_called_with( + "test_project-test-process-id-test-database", + {"last_time_epoch": db_metrics_api.DEFAULT_START_TIME_EPOCH}, + ) + + with patch( + "sumoappclient.sumoclient.mongodbapi.convert_epoch_to_utc_date", + side_effect=["2023-07-26T00:00:00.000Z", "2023-07-26T01:00:00.000Z"], + ): + db_metrics_api.get_window = Mock(return_value=(1000000, 2000000)) + url, params = db_metrics_api.build_fetch_params() + assert ( + url == "http://localhost:8247/groups/test_project/processes/test-process-id/databases/test-database/measurements" + ) + assert params["params"]["start"] == "2023-07-26T00:00:00.000Z" + assert params["params"]["end"] == "2023-07-26T01:00:00.000Z" + assert params["params"]["m"] == ["db_metric1", "db_metric2"] + + send_params = db_metrics_api.build_send_params() + assert send_params == { + "extra_headers": {"Content-Type": "application/vnd.sumologic.carbon2"}, + "endpoint_key": "HTTP_METRICS_ENDPOINT", + "jsondump": False, + } + + with patch( + "sumoappclient.sumoclient.mongodbapi.get_current_timestamp", + return_value=1600000, + ): + with patch( + "sumoappclient.sumoclient.mongodbapi.convert_utc_date_to_epoch", + return_value=1500000, + ): + result, state = db_metrics_api.check_move_fetch_window( + {"params": {"end": "2023-07-26T01:00:00.000Z"}} + ) + assert result is True + assert state == {"last_time_epoch": 1500000} + + sample_data = { + "measurements": [ + { + "name": "db_metric1", + "units": "MB", + "dataPoints": [ + {"timestamp": "2023-07-26T00:00:00Z", "value": 100}, + {"timestamp": "2023-07-26T00:01:00Z", "value": None}, + ], + } + ], + "hostId": "test-host", + "processId": "test-cluster-process", + "groupId": "test_project", + "databaseName": "test-database", + } + + with patch( + "sumoappclient.sumoclient.mongodbapi.convert_utc_date_to_epoch", + return_value=1627257600, + ): + transformed_data, state = db_metrics_api.transform_data(sample_data) + + assert len(transformed_data) == 1 + assert "projectId=test_project" in transformed_data[0] + assert "databaseName=test-database" in transformed_data[0] + assert "hostId=test-host" in transformed_data[0] + assert "processId=alias-process" in transformed_data[0] + assert "metric=db_metric1" in transformed_data[0] + assert "units=MB" in transformed_data[0] + assert "cluster_name=alias" in transformed_data[0] + assert "100 1627257600" in transformed_data[0] + assert state == {"last_time_epoch": 1627257600} + + +@patch("sumoappclient.sumoclient.base.BaseAPI") +@patch("sumoappclient.sumoclient.factory.OutputHandlerFactory") +@patch("sumoappclient.sumoclient.httputils.ClientMixin") +def test_project_events_api( + mock_client_mixin, mock_output_handler_factory, mock_base_api +): + mock_kvstore = Mock() + mock_config = { + "MongoDBAtlas": { + "PROJECT_ID": "test_project", + "BASE_URL": "http://localhost:8247", + "PAGINATION_LIMIT": 100, + } + } + + project_events_api = ProjectEventsAPI(mock_kvstore, mock_config) + + assert project_events_api.get_key() == "test_project-projectevents" + + project_events_api.save_state({"last_time_epoch": 1000000, "page_num": 1}) + mock_kvstore.set.assert_called_with( + "test_project-projectevents", {"last_time_epoch": 1000000, "page_num": 1} + ) + + mock_kvstore.has_key.return_value = False + state = project_events_api.get_state() + mock_kvstore.set.assert_called_with( + "test_project-projectevents", + {"last_time_epoch": project_events_api.DEFAULT_START_TIME_EPOCH, "page_num": 0}, + ) + + with patch( + "sumoappclient.sumoclient.mongodbapi.convert_epoch_to_utc_date", + side_effect=["2023-07-26T00:00:00.000Z", "2023-07-26T01:00:00.000Z"], + ): + project_events_api.get_window = Mock(return_value=(1000000, 2000000)) + url, params = project_events_api.build_fetch_params() + assert url == "http://localhost:8247/groups/test_project/events" + assert params["params"]["minDate"] == "2023-07-26T00:00:00.000Z" + assert params["params"]["maxDate"] == "2023-07-26T01:00:00.000Z" + assert params["params"]["pageNum"] == 1 + + send_params = project_events_api.build_send_params() + assert send_params == { + "extra_headers": {"X-Sumo-Name": "events"}, + "endpoint_key": "HTTP_LOGS_ENDPOINT", + } + + with patch( + "sumoappclient.sumoclient.mongodbapi.get_current_timestamp", + return_value=1600000, + ): + with patch( + "sumoappclient.sumoclient.mongodbapi.convert_utc_date_to_epoch", + return_value=1500000, + ): + result, state = project_events_api.check_move_fetch_window( + {"params": {"maxDate": "2023-07-26T01:00:00.000Z"}} + ) + assert result is True + assert state == {"last_time_epoch": 1500000, "page_num": 0} + + sample_data = { + "results": [ + { + "created": "2023-07-26T00:00:00Z", + "eventType": "TEST_EVENT", + "id": "test-event-id", + } + ] + } + + with patch( + "sumoappclient.sumoclient.mongodbapi.convert_date_to_epoch", + return_value=1627257600, + ): + transformed_data, state = project_events_api.transform_data(sample_data) + + assert len(transformed_data) == 1 + assert transformed_data[0]["eventType"] == "TEST_EVENT" + assert transformed_data[0]["id"] == "test-event-id" + assert state == {"last_time_epoch": 1627257600} + + +@pytest.mark.parametrize("num_records", [10000, 100000, 1000000]) +@patch("sumoappclient.sumoclient.factory.OutputHandlerFactory") +@patch("sumoappclient.sumoclient.httputils.ClientMixin") +def test_log_api_load(mock_client_mixin, mock_output_handler_factory, num_records): + mock_kvstore = Mock() + mock_config = { + "MongoDBAtlas": { + "PROJECT_ID": "test_project", + "BASE_URL": "http://localhost:8247", + } + } + mock_cluster_mapping = {"test-cluster": "alias"} + + log_api = LogAPI( + mock_kvstore, "test-hostname", "mongodb.log", mock_config, mock_cluster_mapping + ) + + bulk_data = [ + { + "t": {"$date": f"2023-07-26T12:00:{i:02d}.000Z"}, + "msg": f"Test log message {i}", + } + for i in range(num_records) + ] + + mock_client_mixin.make_request.return_value = (True, bulk_data) + + tracemalloc.start() + + start_time = time.time() + log_api.fetch() + end_time = time.time() + + current, peak = tracemalloc.get_traced_memory() + + tracemalloc.stop() + + execution_time = end_time - start_time + + print(f"Log API load test results for {num_records} records:") + print(f"Execution time: {execution_time:.2f} seconds") + print(f"Current memory usage: {current / 10**6:.2f} MB") + print(f"Peak memory usage: {peak / 10**6:.2f} MB") + + snapshot = tracemalloc.take_snapshot() + top_stats = snapshot.statistics("lineno") + print("Top 10 lines for memory usage:") + for stat in top_stats[:10]: + print(stat) + + assert ( + execution_time < 300 + ), f"Processing {num_records} log records took more than 5 minutes" + assert ( + peak / 10**6 < 4096 + ), f"Memory usage exceeded 4GB while processing {num_records} log records" + + +@pytest.mark.parametrize("num_records", [10000, 100000, 1000000]) +@patch("sumoappclient.sumoclient.factory.OutputHandlerFactory") +@patch("sumoappclient.sumoclient.httputils.ClientMixin") +def test_process_metrics_api_load( + mock_client_mixin, mock_output_handler_factory, num_records +): + mock_kvstore = Mock() + mock_config = { + "MongoDBAtlas": { + "PROJECT_ID": "test_project", + "BASE_URL": "http://localhost:8247", + "PAGINATION_LIMIT": 100, + "METRIC_TYPES": {"PROCESS_METRICS": ["metric1", "metric2"]}, + } + } + mock_cluster_mapping = {"test-cluster": "alias"} + + process_metrics_api = ProcessMetricsAPI( + mock_kvstore, "test-process-id", mock_config, mock_cluster_mapping + ) + + bulk_data = { + "measurements": [ + { + "name": "metric1", + "units": "MB", + "dataPoints": [ + {"timestamp": f"2023-07-26T{i // 60:02d}:{i % 60:02d}:00Z", "value": i} + for i in range(num_records) + ], + } + ], + "hostId": "test-cluster-host", + "processId": "test-cluster-process", + "groupId": "test_project", + } + + mock_client_mixin.make_request.return_value = (True, bulk_data) + + tracemalloc.start() + + start_time = time.time() + process_metrics_api.fetch() + end_time = time.time() + + current, peak = tracemalloc.get_traced_memory() + + tracemalloc.stop() + + execution_time = end_time - start_time + + print(f"Process Metrics API load test results for {num_records} records:") + print(f"Execution time: {execution_time:.2f} seconds") + print(f"Current memory usage: {current / 10**6:.2f} MB") + print(f"Peak memory usage: {peak / 10**6:.2f} MB") + + snapshot = tracemalloc.take_snapshot() + top_stats = snapshot.statistics("lineno") + print("Top 10 lines for memory usage:") + for stat in top_stats[:10]: + print(stat) + + assert ( + execution_time < 300 + ), f"Processing {num_records} process metric records took more than 5 minutes" + assert ( + peak / 10**6 < 4096 + ), f"Memory usage exceeded 4GB while processing {num_records} process metric records" + + +@pytest.mark.parametrize("num_records", [10000, 100000, 1000000]) +@patch("sumoappclient.sumoclient.factory.OutputHandlerFactory") +@patch("sumoappclient.sumoclient.httputils.ClientMixin") +def test_disk_metrics_api_load( + mock_client_mixin, mock_output_handler_factory, num_records +): + mock_kvstore = Mock() + mock_config = { + "MongoDBAtlas": { + "PROJECT_ID": "test_project", + "BASE_URL": "http://localhost:8247", + "PAGINATION_LIMIT": 100, + "METRIC_TYPES": {"DISK_METRICS": ["disk_metric1", "disk_metric2"]}, + } + } + mock_cluster_mapping = {"test-cluster": "alias"} + + disk_metrics_api = DiskMetricsAPI( + mock_kvstore, "test-process-id", "test-disk", mock_config, mock_cluster_mapping + ) + + bulk_data = { + "measurements": [ + { + "name": "disk_metric1", + "units": "GB", + "dataPoints": [ + {"timestamp": f"2023-07-26T{i // 60:02d}:{i % 60:02d}:00Z", "value": i} + for i in range(num_records) + ], + } + ], + "hostId": "test-cluster-host", + "processId": "test-cluster-process", + "groupId": "test_project", + "partitionName": "test-disk", + } + + mock_client_mixin.make_request.return_value = (True, bulk_data) + + tracemalloc.start() + + start_time = time.time() + disk_metrics_api.fetch() + end_time = time.time() + + current, peak = tracemalloc.get_traced_memory() + + tracemalloc.stop() + + execution_time = end_time - start_time + print(f"Disk Metrics API load test results for {num_records} records:") + print(f"Execution time: {execution_time:.2f} seconds") + print(f"Current memory usage: {current / 10**6:.2f} MB") + print(f"Peak memory usage: {peak / 10**6:.2f} MB") + + snapshot = tracemalloc.take_snapshot() + top_stats = snapshot.statistics("lineno") + print("Top 10 lines for memory usage:") + for stat in top_stats[:10]: + print(stat) + + assert ( + execution_time < 300 + ), f"Processing {num_records} disk metric records took more than 5 minutes" + assert ( + peak / 10**6 < 4096 + ), f"Memory usage exceeded 4GB while processing {num_records} disk metric records" diff --git a/sumomongodbatlascollector/test_mongo_collector.py b/sumomongodbatlascollector/test_mongo_collector.py new file mode 100644 index 0000000..4b76e89 --- /dev/null +++ b/sumomongodbatlascollector/test_mongo_collector.py @@ -0,0 +1,670 @@ +import pytest +import yaml +import tempfile +import os +from unittest.mock import patch, MagicMock, call +from main import MongoDBAtlasCollector +from sumoappclient.sumoclient.base import BaseCollector +# from sumoappclient.provider.factory import ProviderFactory +from requests.auth import HTTPDigestAuth + + +@pytest.fixture +def mock_config(): + return { + "MongoDBAtlas": { + "PUBLIC_API_KEY": "test_public_key", + "PRIVATE_API_KEY": "test_private_key", + "BASE_URL": "https://cloud.mongodb.com/api/atlas/v1.0", + "PROJECT_ID": "test_project_id", + "PAGINATION_LIMIT": 100, + }, + "Collection": { + "MAX_RETRY": 3, + "BACKOFF_FACTOR": 0.3, + "DBNAME": "test_db", + "NUM_WORKERS": 2, + "TIMEOUT": 30, + "Clusters": [ + "cluster1-shard-00-00.abc123.mongodb.net", + "cluster2-shard-00-00.xyz789.mongodb.net", + ], + "DATA_REFRESH_TIME": 3600000, + }, + "Logging": { + "LOG_LEVEL": "DEBUG", + }, + } + + +@pytest.fixture +def temp_config_dir(mock_config): + with tempfile.TemporaryDirectory() as temp_dir: + config_file_path = os.path.join(temp_dir, "mongodbatlas.yaml") + with open(config_file_path, "w") as config_file: + yaml.dump(mock_config, config_file) + yield temp_dir + + +@pytest.fixture +def mock_provider(mock_config): + mock_provider = MagicMock() + mock_config_instance = MagicMock() + mock_config_instance.get_config.return_value = mock_config + mock_provider.get_config.return_value = mock_config_instance + mock_provider.get_storage.return_value = MagicMock() + return mock_provider + + +@pytest.fixture +def mongodb_atlas_collector(mock_config, mock_provider, temp_config_dir): + def mock_base_init(self, project_dir): + self.config = mock_config + self.project_dir = project_dir + self.kvstore = MagicMock() + + with patch( + "sumoappclient.provider.factory.ProviderFactory.get_provider", + return_value=mock_provider, + ), patch("os.path.dirname", return_value=temp_config_dir), patch.object( + BaseCollector, "__init__", mock_base_init + ), patch.object( + MongoDBAtlasCollector, "__init__", return_value=None + ): + collector = MongoDBAtlasCollector() + BaseCollector.__init__( + collector, temp_config_dir + ) # Call mocked BaseCollector.__init__ + + # Manually set attributes specific to MongoDBAtlasCollector + collector.api_config = collector.config["MongoDBAtlas"] + collector.digestauth = HTTPDigestAuth( + username=collector.api_config["PUBLIC_API_KEY"], + password=collector.api_config["PRIVATE_API_KEY"], + ) + collector.collection_config = collector.config["Collection"] + collector.mongosess = MagicMock() + collector.log = MagicMock() + return collector + + +def test_mongodb_atlas_collector_init( + mongodb_atlas_collector, mock_config, temp_config_dir +): + config_file_path = os.path.join(temp_config_dir, "mongodbatlas.yaml") + assert os.path.exists(config_file_path) + assert mongodb_atlas_collector.config == mock_config + assert mongodb_atlas_collector.project_dir == temp_config_dir + assert mongodb_atlas_collector.api_config == mock_config["MongoDBAtlas"] + assert isinstance(mongodb_atlas_collector.digestauth, HTTPDigestAuth) + assert ( + mongodb_atlas_collector.digestauth.username == mock_config["MongoDBAtlas"]["PUBLIC_API_KEY"] + ) + assert ( + mongodb_atlas_collector.digestauth.password == mock_config["MongoDBAtlas"]["PRIVATE_API_KEY"] + ) + assert hasattr(mongodb_atlas_collector, "mongosess") + + +def test_get_current_dir(mongodb_atlas_collector): + expected_dir = os.path.dirname(os.path.abspath(__file__)) + assert mongodb_atlas_collector.get_current_dir() == expected_dir + + +def test_get_cluster_name(mongodb_atlas_collector): + assert ( + mongodb_atlas_collector._get_cluster_name("cluster1-shard-00-01") == "cluster1" + ) + assert ( + mongodb_atlas_collector._get_cluster_name("cluster2-shard-00-02") == "cluster2" + ) + + +def test_get_user_provided_cluster_name(mongodb_atlas_collector): + assert mongodb_atlas_collector._get_user_provided_cluster_name() == [ + "cluster1-shard-00-00.abc123.mongodb.net", + "cluster2-shard-00-00.xyz789.mongodb.net", + ] + + mongodb_atlas_collector.collection_config.pop("Clusters") + assert mongodb_atlas_collector._get_user_provided_cluster_name() == [] + + +@pytest.mark.skip() +def test_getpaginateddata(mongodb_atlas_collector): + url = "https://test.com/api" + kwargs = {"auth": mongodb_atlas_collector.digestauth, "params": {"pageNum": 1}} + + with patch("main.ClientMixin.make_request") as mock_make_request: + mock_make_request.side_effect = [ + (True, {"results": [{"id": 1}, {"id": 2}]}), + (True, {"results": [{"id": 3}]}), + (True, {"results": []}), + ] + + result = mongodb_atlas_collector.getpaginateddata(url, **kwargs) + + assert len(result) == 2 + assert result[0]["results"] == [{"id": 1}, {"id": 2}] + assert result[1]["results"] == [{"id": 3}] + + assert mock_make_request.call_count == 3 + mock_make_request.assert_has_calls( + [ + call( + url, + method="get", + session=mongodb_atlas_collector.mongosess, + logger=mongodb_atlas_collector.log, + TIMEOUT=30, + MAX_RETRY=3, + BACKOFF_FACTOR=0.3, + auth=mongodb_atlas_collector.digestauth, + params={"pageNum": 1}, + ), + call( + url, + method="get", + session=mongodb_atlas_collector.mongosess, + logger=mongodb_atlas_collector.log, + TIMEOUT=30, + MAX_RETRY=3, + BACKOFF_FACTOR=0.3, + auth=mongodb_atlas_collector.digestauth, + params={"pageNum": 2}, + ), + call( + url, + method="get", + session=mongodb_atlas_collector.mongosess, + logger=mongodb_atlas_collector.log, + TIMEOUT=30, + MAX_RETRY=3, + BACKOFF_FACTOR=0.3, + auth=mongodb_atlas_collector.digestauth, + params={"pageNum": 3}, + ), + ] + ) + + +@patch("main.MongoDBAtlasCollector.getpaginateddata") +def test_get_all_processes_from_project(mock_getpaginateddata, mongodb_atlas_collector): + mock_data = [ + { + "results": [ + { + "id": "process1", + "hostname": "cluster1-shard-00-00.abc123.mongodb.net", + "userAlias": "Cluster1-shard-00-00", + }, + { + "id": "process2", + "hostname": "cluster2-shard-00-00.xyz789.mongodb.net", + "userAlias": "Cluster2-shard-00-00", + }, + ] + } + ] + mock_getpaginateddata.return_value = mock_data + + process_ids, hostnames, cluster_mapping = ( + mongodb_atlas_collector._get_all_processes_from_project() + ) + + assert process_ids == ["process1", "process2"] + assert set(hostnames) == { + "cluster1-shard-00-00.abc123.mongodb.net", + "cluster2-shard-00-00.xyz789.mongodb.net", + } + assert cluster_mapping == {"cluster1": "Cluster1", "cluster2": "Cluster2"} + + expected_url = f"{mongodb_atlas_collector.api_config['BASE_URL']}/groups/{mongodb_atlas_collector.api_config['PROJECT_ID']}/processes" + expected_kwargs = { + "auth": mongodb_atlas_collector.digestauth, + "params": { + "itemsPerPage": mongodb_atlas_collector.api_config["PAGINATION_LIMIT"] + }, + } + mock_getpaginateddata.assert_called_once_with(expected_url, **expected_kwargs) + + +@patch("main.MongoDBAtlasCollector.getpaginateddata") +def test_get_all_processes_from_project_with_user_provided_clusters( + mock_getpaginateddata, mongodb_atlas_collector +): + mock_data = [ + { + "results": [ + { + "id": "process1", + "hostname": "cluster1-shard-00-00.abc123.mongodb.net", + "userAlias": "Cluster1-shard-00-00", + }, + { + "id": "process2", + "hostname": "cluster2-shard-00-00.xyz789.mongodb.net", + "userAlias": "Cluster2-shard-00-00", + }, + { + "id": "process3", + "hostname": "cluster3-shard-00-00.def456.mongodb.net", + "userAlias": "Cluster3-shard-00-00", + }, + ] + } + ] + mock_getpaginateddata.return_value = mock_data + + process_ids, hostnames, cluster_mapping = ( + mongodb_atlas_collector._get_all_processes_from_project() + ) + + assert process_ids == ["process1", "process2", "process3"] + assert set(hostnames) == { + "cluster1-shard-00-00.abc123.mongodb.net", + "cluster2-shard-00-00.xyz789.mongodb.net", + "cluster3-shard-00-00.def456.mongodb.net", + } + assert cluster_mapping == {"cluster1": "Cluster1", "cluster2": "Cluster2"} + + expected_url = f"{mongodb_atlas_collector.api_config['BASE_URL']}/groups/{mongodb_atlas_collector.api_config['PROJECT_ID']}/processes" + expected_kwargs = { + "auth": mongodb_atlas_collector.digestauth, + "params": { + "itemsPerPage": mongodb_atlas_collector.api_config["PAGINATION_LIMIT"] + }, + } + mock_getpaginateddata.assert_called_once_with(expected_url, **expected_kwargs) + + +@patch("main.MongoDBAtlasCollector.getpaginateddata") +def test_get_all_disks_from_host(mock_getpaginateddata, mongodb_atlas_collector): + mock_data = [ + { + "results": [ + {"partitionName": "disk1"}, + {"partitionName": "disk2"}, + ] + }, + { + "results": [ + {"partitionName": "disk2"}, + {"partitionName": "disk3"}, + ] + }, + ] + mock_getpaginateddata.return_value = mock_data + + process_ids = ["process1", "process2"] + disks = mongodb_atlas_collector._get_all_disks_from_host(process_ids) + + assert set(disks) == {"disk1", "disk2", "disk3"} + + expected_calls = [ + call( + f"{mongodb_atlas_collector.api_config['BASE_URL']}/groups/{mongodb_atlas_collector.api_config['PROJECT_ID']}/processes/{process_id}/disks", + auth=mongodb_atlas_collector.digestauth, + params={ + "itemsPerPage": mongodb_atlas_collector.api_config["PAGINATION_LIMIT"] + }, + ) + for process_id in process_ids + ] + mock_getpaginateddata.assert_has_calls(expected_calls) + + +@patch("main.MongoDBAtlasCollector._get_all_databases") +@patch("main.get_current_timestamp") +def test_set_database_names( + mock_get_current_timestamp, mock_get_all_databases, mongodb_atlas_collector +): + mock_get_all_databases.return_value = ["db1", "db2", "db3"] + mock_get_current_timestamp.return_value = 1234567890 + + process_ids = ["process1", "process2"] + mongodb_atlas_collector._set_database_names(process_ids) + + mock_get_all_databases.assert_called_once_with(process_ids) + mock_get_current_timestamp.assert_called_once_with(milliseconds=True) + + mongodb_atlas_collector.kvstore.set.assert_called_once_with( + "database_names", + { + "last_set_date": 1234567890, + "values": ["db1", "db2", "db3"], + }, + ) + + +@pytest.fixture +def mock_get_current_timestamp(): + with patch("main.get_current_timestamp") as mock: + mock.return_value = 1627776000000 # Example timestamp + yield mock + + +def test_set_processes(mongodb_atlas_collector, mock_get_current_timestamp): + mongodb_atlas_collector._get_all_processes_from_project = MagicMock( + return_value=( + ["process1", "process2"], + ["host1", "host2"], + {"cluster1": ["process1"], "cluster2": ["process2"]}, + ) + ) + + mongodb_atlas_collector._set_processes() + + mongodb_atlas_collector.kvstore.set.assert_has_calls( + [ + call( + "processes", + { + "last_set_date": 1627776000000, + "process_ids": ["process1", "process2"], + "hostnames": ["host1", "host2"], + }, + ), + call( + "cluster_mapping", + { + "last_set_date": 1627776000000, + "values": {"cluster1": ["process1"], "cluster2": ["process2"]}, + }, + ), + ], + any_order=True, + ) + + +def test_set_disk_names(mongodb_atlas_collector, mock_get_current_timestamp): + # Mock the _get_all_disks_from_host method + mongodb_atlas_collector._get_all_disks_from_host = MagicMock( + return_value={"process1": ["disk1", "disk2"], "process2": ["disk3", "disk4"]} + ) + + # Call the method + process_ids = ["process1", "process2"] + mongodb_atlas_collector._set_disk_names(process_ids) + + # Assert that kvstore.set was called with the correct arguments + mongodb_atlas_collector.kvstore.set.assert_called_once_with( + "disk_names", + { + "last_set_date": 1627776000000, + "values": {"process1": ["disk1", "disk2"], "process2": ["disk3", "disk4"]}, + }, + ) + + +def test_set_processes_empty_results( + mongodb_atlas_collector, mock_get_current_timestamp +): + mongodb_atlas_collector._get_all_processes_from_project = MagicMock( + return_value=([], [], {}) + ) + mongodb_atlas_collector._set_processes() + mongodb_atlas_collector.kvstore.set.assert_has_calls( + [ + call( + "processes", + {"last_set_date": 1627776000000, "process_ids": [], "hostnames": []}, + ), + call("cluster_mapping", {"last_set_date": 1627776000000, "values": {}}), + ], + any_order=True, + ) + + +def test_set_processes_large_number( + mongodb_atlas_collector, mock_get_current_timestamp +): + large_process_ids = [f"process{i}" for i in range(1000)] + large_hostnames = [f"host{i}" for i in range(1000)] + large_cluster_mapping = {f"cluster{i}": [f"process{i}"] for i in range(1000)} + mongodb_atlas_collector._get_all_processes_from_project = MagicMock( + return_value=(large_process_ids, large_hostnames, large_cluster_mapping) + ) + mongodb_atlas_collector._set_processes() + mongodb_atlas_collector.kvstore.set.assert_has_calls( + [ + call( + "processes", + { + "last_set_date": 1627776000000, + "process_ids": large_process_ids, + "hostnames": large_hostnames, + }, + ), + call( + "cluster_mapping", + {"last_set_date": 1627776000000, "values": large_cluster_mapping}, + ), + ], + any_order=True, + ) + + +def test_set_processes_exception(mongodb_atlas_collector, mock_get_current_timestamp): + mongodb_atlas_collector._get_all_processes_from_project = MagicMock( + side_effect=Exception("API Error") + ) + with pytest.raises(Exception): + mongodb_atlas_collector._set_processes() + assert not mongodb_atlas_collector.kvstore.set.called + + +def test_set_disk_names_empty_results( + mongodb_atlas_collector, mock_get_current_timestamp +): + mongodb_atlas_collector._get_all_disks_from_host = MagicMock(return_value={}) + mongodb_atlas_collector._set_disk_names([]) + mongodb_atlas_collector.kvstore.set.assert_called_once_with( + "disk_names", {"last_set_date": 1627776000000, "values": {}} + ) + + +def test_set_disk_names_some_empty(mongodb_atlas_collector, mock_get_current_timestamp): + mongodb_atlas_collector._get_all_disks_from_host = MagicMock( + return_value={ + "process1": ["disk1", "disk2"], + "process2": [], + "process3": ["disk3"], + } + ) + mongodb_atlas_collector._set_disk_names(["process1", "process2", "process3"]) + mongodb_atlas_collector.kvstore.set.assert_called_once_with( + "disk_names", + { + "last_set_date": 1627776000000, + "values": { + "process1": ["disk1", "disk2"], + "process2": [], + "process3": ["disk3"], + }, + }, + ) + + +def test_set_disk_names_large_number( + mongodb_atlas_collector, mock_get_current_timestamp +): + large_disks = {f"process{i}": [f"disk{j}" for j in range(100)] for i in range(100)} + mongodb_atlas_collector._get_all_disks_from_host = MagicMock( + return_value=large_disks + ) + mongodb_atlas_collector._set_disk_names([f"process{i}" for i in range(100)]) + mongodb_atlas_collector.kvstore.set.assert_called_once_with( + "disk_names", {"last_set_date": 1627776000000, "values": large_disks} + ) + + +def test_set_disk_names_exception(mongodb_atlas_collector, mock_get_current_timestamp): + mongodb_atlas_collector._get_all_disks_from_host = MagicMock( + side_effect=Exception("API Error") + ) + with pytest.raises(Exception): + mongodb_atlas_collector._set_disk_names(["process1", "process2"]) + assert not mongodb_atlas_collector.kvstore.set.called + + +def test_get_database_names_initial_fetch( + mongodb_atlas_collector, mock_get_current_timestamp +): + mongodb_atlas_collector.kvstore.has_key = MagicMock(return_value=False) + mongodb_atlas_collector._get_process_names = MagicMock( + return_value=(["process1", "process2"], None) + ) + mongodb_atlas_collector._set_database_names = MagicMock() + mongodb_atlas_collector.kvstore.get = MagicMock( + return_value={"last_set_date": 1627776000000, "values": ["db1", "db2"]} + ) + + # Execute + result = mongodb_atlas_collector._get_database_names() + + # Assert + assert result == ["db1", "db2"] + mongodb_atlas_collector._get_process_names.assert_called_once() + mongodb_atlas_collector._set_database_names.assert_called_once_with( + ["process1", "process2"] + ) + mongodb_atlas_collector.kvstore.get.assert_called_with("database_names") + + +def test_get_database_names_refresh( + mongodb_atlas_collector, mock_get_current_timestamp +): + # Setup + mongodb_atlas_collector.kvstore.has_key = MagicMock(return_value=True) + mongodb_atlas_collector._get_process_names = MagicMock( + return_value=(["process1", "process2"], None) + ) + mongodb_atlas_collector._set_database_names = MagicMock() + mongodb_atlas_collector.kvstore.get = MagicMock( + side_effect=[ + {"last_set_date": 1627772400000, "values": []}, # Old data + {"last_set_date": 1627776000000, "values": ["db1", "db2"]}, # New data + ] + ) + mongodb_atlas_collector.DATA_REFRESH_TIME = 3600000 # 1 hour + + # Execute + result = mongodb_atlas_collector._get_database_names() + + # Assert + assert result == ["db1", "db2"] + assert mongodb_atlas_collector._get_process_names.call_count == 1 + assert mongodb_atlas_collector._set_database_names.call_count == 1 + assert mongodb_atlas_collector.kvstore.get.call_count == 2 + + +def test_get_disk_names_initial_fetch( + mongodb_atlas_collector, mock_get_current_timestamp +): + mongodb_atlas_collector.kvstore.has_key = MagicMock(return_value=False) + mongodb_atlas_collector._get_process_names = MagicMock( + return_value=(["process1", "process2"], None) + ) + mongodb_atlas_collector._set_disk_names = MagicMock() + mongodb_atlas_collector.kvstore.get = MagicMock( + return_value={ + "last_set_date": 1627776000000, + "values": {"process1": ["disk1", "disk2"], "process2": ["disk3"]}, + } + ) + + # Execute + result = mongodb_atlas_collector._get_disk_names() + + # Assert + assert result == {"process1": ["disk1", "disk2"], "process2": ["disk3"]} + mongodb_atlas_collector._get_process_names.assert_called_once() + mongodb_atlas_collector._set_disk_names.assert_called_once_with( + ["process1", "process2"] + ) + mongodb_atlas_collector.kvstore.get.assert_called_with("disk_names") + + +def test_get_disk_names_refresh(mongodb_atlas_collector, mock_get_current_timestamp): + # Setup + mongodb_atlas_collector.kvstore.has_key = MagicMock(return_value=True) + mongodb_atlas_collector._get_process_names = MagicMock( + return_value=(["process1", "process2"], None) + ) + mongodb_atlas_collector._set_disk_names = MagicMock() + mongodb_atlas_collector.kvstore.get = MagicMock( + side_effect=[ + {"last_set_date": 1627772400000, "values": {}}, # Old data + { + "last_set_date": 1627776000000, + "values": {"process1": ["disk1", "disk2"], "process2": ["disk3"]}, + }, # New data + ] + ) + mongodb_atlas_collector.DATA_REFRESH_TIME = 3600000 # 1 hour + + # Execute + result = mongodb_atlas_collector._get_disk_names() + + # Assert + assert result == {"process1": ["disk1", "disk2"], "process2": ["disk3"]} + assert mongodb_atlas_collector._get_process_names.call_count == 1 + assert mongodb_atlas_collector._set_disk_names.call_count == 1 + assert mongodb_atlas_collector.kvstore.get.call_count == 2 + + +def test_get_database_names_no_refresh_needed( + mongodb_atlas_collector, mock_get_current_timestamp +): + # Setup + mongodb_atlas_collector.kvstore.has_key = MagicMock(return_value=True) + mongodb_atlas_collector.kvstore.get = MagicMock( + return_value={ + "last_set_date": 1627775000000, # 1000 seconds ago + "values": ["db1", "db2"], + } + ) + mongodb_atlas_collector.DATA_REFRESH_TIME = 3600000 # 1 hour + mongodb_atlas_collector._get_process_names = MagicMock() + mongodb_atlas_collector._set_database_names = MagicMock() + + # Execute + result = mongodb_atlas_collector._get_database_names() + + # Assert + assert result == ["db1", "db2"] + mongodb_atlas_collector._get_process_names.assert_not_called() + mongodb_atlas_collector._set_database_names.assert_not_called() + assert mongodb_atlas_collector.kvstore.get.call_count == 2 + mongodb_atlas_collector.kvstore.get.assert_has_calls( + [call("database_names"), call("database_names")] + ) + + +def test_get_disk_names_no_refresh_needed( + mongodb_atlas_collector, mock_get_current_timestamp +): + # Setup + mongodb_atlas_collector.kvstore.has_key = MagicMock(return_value=True) + mongodb_atlas_collector.kvstore.get = MagicMock( + return_value={ + "last_set_date": 1627775000000, # 1000 seconds ago + "values": {"process1": ["disk1", "disk2"], "process2": ["disk3"]}, + } + ) + mongodb_atlas_collector.DATA_REFRESH_TIME = 3600000 # 1 hour + mongodb_atlas_collector._get_process_names = MagicMock() + mongodb_atlas_collector._set_disk_names = MagicMock() + + # Execute + result = mongodb_atlas_collector._get_disk_names() + + # Assert + assert result == {"process1": ["disk1", "disk2"], "process2": ["disk3"]} + mongodb_atlas_collector._get_process_names.assert_not_called() + mongodb_atlas_collector._set_disk_names.assert_not_called() + assert mongodb_atlas_collector.kvstore.get.call_count == 2 + mongodb_atlas_collector.kvstore.get.assert_has_calls( + [call("disk_names"), call("disk_names")] + ) diff --git a/sumomongodbatlascollector/test_mongo_db_api.py b/sumomongodbatlascollector/test_mongo_db_api.py new file mode 100644 index 0000000..499130f --- /dev/null +++ b/sumomongodbatlascollector/test_mongo_db_api.py @@ -0,0 +1,107 @@ +import pytest +from unittest.mock import MagicMock, patch +# from datetime import datetime, timedelta +# import time +from requests.auth import HTTPDigestAuth + +from sumoappclient.sumoclient.base import BaseAPI +# from sumoappclient.common.utils import get_current_timestamp +from api import MongoDBAPI + + +class ConcreteMongoDBAPI(MongoDBAPI): + MOVING_WINDOW_DELTA = 0.001 + + def get_key(self): + return "test_key" + + def save_state(self, *args, **kwargs): + pass + + def get_state(self): + return {} + + def build_fetch_params(self): + return {} + + def build_send_params(self): + return {} + + def transform_data(self, content): + return content + + def fetch(self): + return [] + + +@pytest.fixture +def mongodb_api(): + kvstore = MagicMock() + config = { + "MongoDBAtlas": { + "PUBLIC_API_KEY": "public_key", + "PRIVATE_API_KEY": "private_key", + "Collection": { + "MAX_REQUEST_WINDOW_LENGTH": 900, + "MIN_REQUEST_WINDOW_LENGTH": 60, + }, + }, + "Collection": { + "END_TIME_EPOCH_OFFSET_SECONDS": 60, + "TIMEOUT": 300, + "BACKFILL_DAYS": 7, + "ENVIRONMENT": "aws", + }, + "Logging": {}, + "SumoLogic": {}, + } + return ConcreteMongoDBAPI(kvstore, config) + + +def test_init(mongodb_api): + assert isinstance(mongodb_api.digestauth, HTTPDigestAuth) + assert mongodb_api.digestauth.username == "public_key" + assert mongodb_api.digestauth.password == "private_key" + assert mongodb_api.MAX_REQUEST_WINDOW_LENGTH == 3600 + assert mongodb_api.MIN_REQUEST_WINDOW_LENGTH == 60 + assert isinstance(mongodb_api, MongoDBAPI) + assert isinstance(mongodb_api, BaseAPI) + + +@pytest.mark.skip() +@patch("sumoappclient.common.utils.get_current_timestamp") +def test_get_window(mock_get_current_timestamp, mongodb_api): + mock_get_current_timestamp.return_value = 1000000 + last_time_epoch = 999000 + with patch("time.sleep") as mock_sleep: + start, end = mongodb_api.get_window(last_time_epoch) + print(f"Case 1 - Start: {start}, End: {end}") + assert start == 999000.001 + assert end == 999940 + mock_sleep.assert_not_called() + + # Test case 2: Window initially too small, requires sleep + mock_get_current_timestamp.side_effect = [1000000, 1000060] + last_time_epoch = 999999 + with patch("time.sleep") as mock_sleep: + start, end = mongodb_api.get_window(last_time_epoch) + print(f"Case 2 - Start: {start}, End: {end}") + assert start == 999999.001 + assert end == 1000000 + mock_sleep.assert_called_once_with(60) + + # Test case 3: Window exceeds MAX_REQUEST_WINDOW_LENGTH + mock_get_current_timestamp.return_value = 1005000 + last_time_epoch = 1000000 + with patch("time.sleep") as mock_sleep: + start, end = mongodb_api.get_window(last_time_epoch) + print(f"Case 3 - Start: {start}, End: {end}") + assert start == 1000000.001 + assert end == 1003600.001 # start + MAX_REQUEST_WINDOW_LENGTH + mock_sleep.assert_not_called() + + print(f"MIN_REQUEST_WINDOW_LENGTH: {mongodb_api.MIN_REQUEST_WINDOW_LENGTH}") + print(f"MAX_REQUEST_WINDOW_LENGTH: {mongodb_api.MAX_REQUEST_WINDOW_LENGTH}") + print( + f"END_TIME_EPOCH_OFFSET_SECONDS: {mongodb_api.collection_config['END_TIME_EPOCH_OFFSET_SECONDS']}" + ) diff --git a/sumomongodbatlascollector/time_and_memory_tracker.py b/sumomongodbatlascollector/time_and_memory_tracker.py new file mode 100644 index 0000000..6c6e1cf --- /dev/null +++ b/sumomongodbatlascollector/time_and_memory_tracker.py @@ -0,0 +1,64 @@ +import time +import psutil +from typing import List, Dict, Any + + +class TimeAndMemoryTracker: + def __init__(self, activate: bool = False): + self._stack: List[Dict[str, Any]] = [] + self.activate = activate + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + while self._stack: + self.end() + + def start(self, operation_name: str) -> str: + if not self.activate: + return "" + operation_name = operation_name.lower() + entry = { + "operation_name": operation_name, + "start_time": time.time(), + "start_memory": psutil.Process().memory_info().rss, + } + self._stack.append(entry) + return self._format_start_message(entry) + + def end(self, operation_name: str = None) -> str: + if not self.activate: + return "" + if self._stack: + exit_time = time.time() + exit_memory = psutil.Process().memory_info().rss + entry = self._stack[-1] + + if operation_name: + operation_name = operation_name.lower() + if entry["operation_name"] != operation_name: + raise ValueError( + f"Attempting to end '{operation_name}' but the current operation is '{entry['operation_name']}'" + ) + + self._stack.pop() + return self._format_end_message(entry, exit_time, exit_memory) + return "" + + def _format_start_message(self, entry: Dict[str, Any]) -> str: + return ( + f"Starting {entry['operation_name']} at {entry['start_time']:.2f}, " + f"initial memory: {entry['start_memory'] / 1024 / 1024:.2f} MB" + ) + + def _format_end_message( + self, entry: Dict[str, Any], exit_time: float, exit_memory: int + ) -> str: + execution_time = exit_time - entry["start_time"] + memory_used = exit_memory - entry["start_memory"] + return ( + f"{entry['operation_name']} completed in {execution_time:.2f} seconds, " + f"used {memory_used / 1024 / 1024:.2f} MB, " + f"start time: {entry['start_time']:.2f}, end time: {exit_time:.2f}, final memory: {exit_memory / 1024 / 1024:.2f} MB" + ) diff --git a/template.yaml b/template.yaml index 3f72667..f0f536a 100644 --- a/template.yaml +++ b/template.yaml @@ -28,6 +28,9 @@ Outputs: Fn::GetAtt: - MongoDBAtlasFunction - Arn + MongoDBAtlasTable: + Description: MongoDBAtlasTable DynamoDB Table Name + Value: !Ref MongoDBAtlasTable Parameters: HttpLogsEndpoint: Type: String @@ -68,9 +71,32 @@ Resources: Type: Schedule Handler: main.main MemorySize: 256 - Policies: - - AmazonDynamoDBFullAccess + Policies: + - Version: "2012-10-17" + Statement: + - Effect: Allow + Action: + - dynamodb:PutItem + - dynamodb:UpdateItem + - dynamodb:GetItem + - dynamodb:Scan + - dynamodb:Query + Resource: !GetAtt MongoDBAtlasTableResource.Arn Runtime: python3.11 Timeout: 900 Type: AWS::Serverless::Function + + MongoDBAtlasTableResource: + Type: AWS::DynamoDB::Table + Properties: + TableName: !Sub ${AWS::StackName}-mongodbatlastable + AttributeDefinitions: + - AttributeName: id + AttributeType: S + KeySchema: + - AttributeName: id + KeyType: HASH + ProvisionedThroughput: + ReadCapacityUnits: 5 + WriteCapacityUnits: 5 Transform: AWS::Serverless-2016-10-31