From 0a330ef3ad9c1dbbb1beec9aff7a14dff31e9c62 Mon Sep 17 00:00:00 2001 From: MVarshini Date: Wed, 4 Dec 2024 12:00:31 +0530 Subject: [PATCH] PANDA-690 --- backend/app/api/api.py | 22 +- .../app/api/v1/commons/example_responses.py | 97 ++- backend/app/api/v1/commons/hasher.py | 24 +- backend/app/api/v1/commons/hce.py | 25 +- backend/app/api/v1/commons/ocm.py | 27 +- backend/app/api/v1/commons/ocp.py | 61 +- backend/app/api/v1/commons/quay.py | 42 +- backend/app/api/v1/commons/telco.py | 74 +- backend/app/api/v1/commons/utils.py | 16 +- backend/app/api/v1/endpoints/cpt/cptJobs.py | 86 +- backend/app/api/v1/endpoints/cpt/maps/hce.py | 17 +- backend/app/api/v1/endpoints/cpt/maps/ocm.py | 2 +- backend/app/api/v1/endpoints/cpt/maps/ocp.py | 2 +- backend/app/api/v1/endpoints/cpt/maps/quay.py | 2 +- .../app/api/v1/endpoints/cpt/maps/telco.py | 2 +- backend/app/api/v1/endpoints/jira/jira.py | 7 +- backend/app/api/v1/endpoints/ocm/ocmJobs.py | 55 +- backend/app/api/v1/endpoints/ocp/graph.py | 454 +++++----- backend/app/api/v1/endpoints/ocp/ocpJobs.py | 58 +- backend/app/api/v1/endpoints/ocp/results.py | 29 +- .../app/api/v1/endpoints/quay/quayGraphs.py | 326 +++---- backend/app/api/v1/endpoints/quay/quayJobs.py | 60 +- .../app/api/v1/endpoints/telco/telcoGraphs.py | 804 ++++++++++-------- .../app/api/v1/endpoints/telco/telcoJobs.py | 60 +- backend/app/async_util.py | 3 +- backend/app/config.py | 8 +- backend/app/main.py | 49 +- backend/app/services/jira_svc.py | 15 +- backend/app/services/search.py | 286 +++++-- backend/app/services/splunk.py | 48 +- 30 files changed, 1506 insertions(+), 1255 deletions(-) diff --git a/backend/app/api/api.py b/backend/app/api/api.py index b74b8ad4..22ce1ceb 100644 --- a/backend/app/api/api.py +++ b/backend/app/api/api.py @@ -16,26 +16,26 @@ router = APIRouter() # OCP endpoints -router.include_router(ocpJobs.router, tags=['ocp']) -router.include_router(results.router, tags=['ocp']) -router.include_router(graph.router, tags=['ocp.graphs']) +router.include_router(ocpJobs.router, tags=["ocp"]) +router.include_router(results.router, tags=["ocp"]) +router.include_router(graph.router, tags=["ocp.graphs"]) # CPT endpoints -router.include_router(cptJobs.router, tags=['cpt']) +router.include_router(cptJobs.router, tags=["cpt"]) # Quay endpoints -router.include_router(quayJobs.router, tags=['quay']) -router.include_router(quayGraphs.router, tags=['quay']) +router.include_router(quayJobs.router, tags=["quay"]) +router.include_router(quayGraphs.router, tags=["quay"]) # Telco endpoints -router.include_router(telcoJobs.router, tags=['telco']) -router.include_router(telcoGraphs.router, tags=['telco']) +router.include_router(telcoJobs.router, tags=["telco"]) +router.include_router(telcoGraphs.router, tags=["telco"]) # Jira endpoints -router.include_router(jira.router, tags=['jira']) +router.include_router(jira.router, tags=["jira"]) # Horreum endpoint -router.include_router(horreum.router, tags=['horreum']) +router.include_router(horreum.router, tags=["horreum"]) # OCM endpoint -router.include_router(ocmJobs.router, tags=['ocm']) +router.include_router(ocmJobs.router, tags=["ocm"]) diff --git a/backend/app/api/v1/commons/example_responses.py b/backend/app/api/v1/commons/example_responses.py index 46f0d5fa..1008b16b 100644 --- a/backend/app/api/v1/commons/example_responses.py +++ b/backend/app/api/v1/commons/example_responses.py @@ -1,22 +1,26 @@ def response_200(example): return { - "content": { - "application/json": { - "example": example, - } - }, + "content": { + "application/json": { + "example": example, } + }, + } + def response_422(): return { - "content": { - "application/json": { - "example": {"error": "invalid date format, start_date must be less than end_date"}, - } + "content": { + "application/json": { + "example": { + "error": "invalid date format, start_date must be less than end_date" }, } + }, + } -ocp_response_example ={ + +ocp_response_example = { "startDate": "2023-09-20", "endDate": "2023-09-20", "results": [ @@ -47,7 +51,7 @@ def response_422(): "startDate": "2023-09-20T02:14:07Z", "endDate": "2023-09-20T03:41:48Z", "timestamp": "2023-09-20T02:14:07Z", - "shortVersion": "4.14" + "shortVersion": "4.14", }, { "ciSystem": "PROW", @@ -76,12 +80,12 @@ def response_422(): "startDate": "2023-09-20T07:19:00Z", "endDate": "2023-09-20T07:28:42Z", "timestamp": "2023-09-20T07:19:00Z", - "shortVersion": "4.13" + "shortVersion": "4.13", }, - ] + ], } -quay_response_example ={ +quay_response_example = { "startDate": "2023-09-20", "endDate": "2023-09-20", "results": [ @@ -114,12 +118,12 @@ def response_422(): "startDate": "2023-09-20T02:14:07Z", "endDate": "2023-09-20T03:41:48Z", "timestamp": "2023-09-20T02:14:07Z", - "shortVersion": "4.14" + "shortVersion": "4.14", }, - ] + ], } -telco_response_example ={ +telco_response_example = { "startDate": "2023-09-20", "endDate": "2023-09-20", "results": [ @@ -138,50 +142,55 @@ def response_422(): "endDate": "2024-05-16 20:41:48+00:00", "buildUrl": "https://ci-jenkins-xxx.com/job/your-tests/532", "jobStatus": "success", - "jobDuration": 3720 + "jobDuration": 3720, }, - ] + ], } + def ocp_200_response(): return response_200(ocp_response_example) + def quay_200_response(): return response_200(quay_response_example) + def telco_200_response(): return response_200(telco_response_example) -cpt_response_example ={ - "startDate": "2023-11-18", - "endDate": "2023-11-23", - "results": [ + +cpt_response_example = { + "startDate": "2023-11-18", + "endDate": "2023-11-23", + "results": [ { - "ciSystem": "PROW", - "uuid": "f6d084d5-b154-4108-b4f7-165094ccc838", - "releaseStream": "Nightly", - "jobStatus": "success", - "buildUrl": "https://ci..org/view/1726571333392797696", - "startDate": "2023-11-20T13:16:34Z", - "endDate": "2023-11-20T13:28:48Z", - "product": "ocp", - "version": "4.13", - "testName": "cluster-density-v2" + "ciSystem": "PROW", + "uuid": "f6d084d5-b154-4108-b4f7-165094ccc838", + "releaseStream": "Nightly", + "jobStatus": "success", + "buildUrl": "https://ci..org/view/1726571333392797696", + "startDate": "2023-11-20T13:16:34Z", + "endDate": "2023-11-20T13:28:48Z", + "product": "ocp", + "version": "4.13", + "testName": "cluster-density-v2", }, { - "ciSystem": "JENKINS", - "uuid": "5b729011-3b4d-4ec4-953d-6881ac9da505", - "releaseStream": "Stable", - "jobStatus": "success", - "buildUrl": "https://ci..org/view/1726571333392797696", - "startDate": "2023-11-20T13:16:30Z", - "endDate": "2023-11-20T13:30:40Z", - "product": "ocp", - "version": "4.14", - "testName": "node-density-heavy" + "ciSystem": "JENKINS", + "uuid": "5b729011-3b4d-4ec4-953d-6881ac9da505", + "releaseStream": "Stable", + "jobStatus": "success", + "buildUrl": "https://ci..org/view/1726571333392797696", + "startDate": "2023-11-20T13:16:30Z", + "endDate": "2023-11-20T13:30:40Z", + "product": "ocp", + "version": "4.14", + "testName": "node-density-heavy", }, - ] + ], } + def cpt_200_response(): return response_200(cpt_response_example) diff --git a/backend/app/api/v1/commons/hasher.py b/backend/app/api/v1/commons/hasher.py index 7fe241b9..f9403856 100644 --- a/backend/app/api/v1/commons/hasher.py +++ b/backend/app/api/v1/commons/hasher.py @@ -2,40 +2,42 @@ import hashlib from cryptography.fernet import Fernet -symmetric_encryptor = b'k3tGwuK6O59c0SEMmnIeJUEpTN5kuxibPy8Q8VfYC6A=' +symmetric_encryptor = b"k3tGwuK6O59c0SEMmnIeJUEpTN5kuxibPy8Q8VfYC6A=" + def hash_encrypt_json(json_data): # Serialize the JSON data to a string json_str = str(json_data) - + # Generate an MD5 hash of the JSON string hash_digest = hashlib.md5(json_str.encode()).hexdigest() - + # Compress the JSON string compressed_data = zlib.compress(json_str.encode()) - + cipher = Fernet(symmetric_encryptor) - + # Encrypt the compressed JSON string encrypted_data = cipher.encrypt(compressed_data) - + return hash_digest, encrypted_data + def decrypt_unhash_json(hash_digest, encrypted_data): cipher = Fernet(symmetric_encryptor) - + # Decrypt the encrypted JSON data decompressed_data = cipher.decrypt(encrypted_data) - + # Decompress the decrypted data decompressed_json_str = zlib.decompress(decompressed_data).decode() - + # Verify hash digest calculated_hash = hashlib.md5(decompressed_json_str.encode()).hexdigest() if calculated_hash != hash_digest: raise ValueError("Hash digest does not match") - + # Deserialize the JSON string back to JSON data json_data = eval(decompressed_json_str) - + return json_data diff --git a/backend/app/api/v1/commons/hce.py b/backend/app/api/v1/commons/hce.py index 701f0c2e..cbef5192 100644 --- a/backend/app/api/v1/commons/hce.py +++ b/backend/app/api/v1/commons/hce.py @@ -5,26 +5,21 @@ async def getData(start_datetime: date, end_datetime: date, configpath: str): query = { - "query": { - "bool": { - "filter": { - "range": { - "date": { - "format": "yyyy-MM-dd" - } - } - } - } - } + "query": {"bool": {"filter": {"range": {"date": {"format": "yyyy-MM-dd"}}}}} } es = ElasticService(configpath=configpath) - response = await es.post(query=query, start_date=start_datetime, end_date=end_datetime, timestamp_field='date') + response = await es.post( + query=query, + start_date=start_datetime, + end_date=end_datetime, + timestamp_field="date", + ) await es.close() - tasks = [item['_source'] for item in response] + tasks = [item["_source"] for item in response] jobs = pd.json_normalize(tasks) - jobs[['group']] = jobs[['group']].fillna(0) - jobs.fillna('', inplace=True) + jobs[["group"]] = jobs[["group"]].fillna(0) + jobs.fillna("", inplace=True) if len(jobs) == 0: return jobs return jobs diff --git a/backend/app/api/v1/commons/ocm.py b/backend/app/api/v1/commons/ocm.py index 25870789..e9d6f588 100644 --- a/backend/app/api/v1/commons/ocm.py +++ b/backend/app/api/v1/commons/ocm.py @@ -7,36 +7,35 @@ async def getData(start_datetime: date, end_datetime: date, configpath: str): query = { "query": { "bool": { - "filter": { - "range": { - "metrics.earliest": { - "format": "yyyy-MM-dd" - } - } - } + "filter": {"range": {"metrics.earliest": {"format": "yyyy-MM-dd"}}} } } } es = ElasticService(configpath=configpath) - response = await es.post(query=query, start_date=start_datetime, end_date=end_datetime, timestamp_field='metrics.earliest') + response = await es.post( + query=query, + start_date=start_datetime, + end_date=end_datetime, + timestamp_field="metrics.earliest", + ) await es.close() - tasks = [item['_source'] for item in response] + tasks = [item["_source"] for item in response] jobs = pd.json_normalize(tasks) if len(jobs) == 0: return jobs - if 'buildUrl' not in jobs.columns: + if "buildUrl" not in jobs.columns: jobs.insert(len(jobs.columns), "buildUrl", "") - if 'ciSystem' not in jobs.columns: + if "ciSystem" not in jobs.columns: jobs.insert(len(jobs.columns), "ciSystem", "") - jobs.fillna('', inplace=True) - jobs['jobStatus'] = jobs.apply(convertJobStatus, axis=1) + jobs.fillna("", inplace=True) + jobs["jobStatus"] = jobs.apply(convertJobStatus, axis=1) return jobs def fillCiSystem(row): - currDate = datetime.strptime(row["metrics.earliest"][:26], '%Y-%m-%dT%H:%M:%S.%f') + currDate = datetime.strptime(row["metrics.earliest"][:26], "%Y-%m-%dT%H:%M:%S.%f") if currDate > datetime(2024, 6, 24): return "Jenkins" else: diff --git a/backend/app/api/v1/commons/ocp.py b/backend/app/api/v1/commons/ocp.py index b5e8b48a..759eabe7 100644 --- a/backend/app/api/v1/commons/ocp.py +++ b/backend/app/api/v1/commons/ocp.py @@ -7,47 +7,54 @@ async def getData(start_datetime: date, end_datetime: date, configpath: str): query = { "query": { - "bool": { - "filter": { - "range": { - "timestamp": { - "format": "yyyy-MM-dd" - } - } - } - } + "bool": {"filter": {"range": {"timestamp": {"format": "yyyy-MM-dd"}}}} } } es = ElasticService(configpath=configpath) - response = await es.post(query=query, start_date=start_datetime, end_date=end_datetime, timestamp_field='timestamp') + response = await es.post( + query=query, + start_date=start_datetime, + end_date=end_datetime, + timestamp_field="timestamp", + ) await es.close() - tasks = [item['_source'] for item in response] + tasks = [item["_source"] for item in response] jobs = pd.json_normalize(tasks) if len(jobs) == 0: return jobs - jobs[['masterNodesCount', 'workerNodesCount', - 'infraNodesCount', 'totalNodesCount']] = jobs[['masterNodesCount', 'workerNodesCount', 'infraNodesCount', 'totalNodesCount']].fillna(0) - jobs.fillna('', inplace=True) - jobs[['ipsec', 'fips', 'encrypted', - 'publish', 'computeArch', 'controlPlaneArch']] = jobs[['ipsec', 'fips', 'encrypted', - 'publish', 'computeArch', 'controlPlaneArch']].replace(r'^\s*$', "N/A", regex=True) - jobs['encryptionType'] = jobs.apply(fillEncryptionType, axis=1) - jobs['benchmark'] = jobs.apply(utils.updateBenchmark, axis=1) - jobs['platform'] = jobs.apply(utils.clasifyAWSJobs, axis=1) - jobs['jobType'] = jobs.apply(utils.jobType, axis=1) - jobs['isRehearse'] = jobs.apply(utils.isRehearse, axis=1) - jobs['jobStatus'] = jobs.apply(utils.updateStatus, axis=1) - jobs['build'] = jobs.apply(utils.getBuild, axis=1) - - cleanJobs = jobs[jobs['platform'] != ""] + jobs[ + ["masterNodesCount", "workerNodesCount", "infraNodesCount", "totalNodesCount"] + ] = jobs[ + ["masterNodesCount", "workerNodesCount", "infraNodesCount", "totalNodesCount"] + ].fillna( + 0 + ) + jobs.fillna("", inplace=True) + jobs[ + ["ipsec", "fips", "encrypted", "publish", "computeArch", "controlPlaneArch"] + ] = jobs[ + ["ipsec", "fips", "encrypted", "publish", "computeArch", "controlPlaneArch"] + ].replace( + r"^\s*$", "N/A", regex=True + ) + jobs["encryptionType"] = jobs.apply(fillEncryptionType, axis=1) + jobs["benchmark"] = jobs.apply(utils.updateBenchmark, axis=1) + jobs["platform"] = jobs.apply(utils.clasifyAWSJobs, axis=1) + jobs["jobType"] = jobs.apply(utils.jobType, axis=1) + jobs["isRehearse"] = jobs.apply(utils.isRehearse, axis=1) + jobs["jobStatus"] = jobs.apply(utils.updateStatus, axis=1) + jobs["build"] = jobs.apply(utils.getBuild, axis=1) + + cleanJobs = jobs[jobs["platform"] != ""] jbs = cleanJobs - jbs['shortVersion'] = jbs['ocpVersion'].str.slice(0, 4) + jbs["shortVersion"] = jbs["ocpVersion"].str.slice(0, 4) return jbs + def fillEncryptionType(row): if row["encrypted"] == "N/A": return "N/A" diff --git a/backend/app/api/v1/commons/quay.py b/backend/app/api/v1/commons/quay.py index 0f936852..acee5649 100644 --- a/backend/app/api/v1/commons/quay.py +++ b/backend/app/api/v1/commons/quay.py @@ -7,33 +7,35 @@ async def getData(start_datetime: date, end_datetime: date, configpath: str): query = { "query": { - "bool": { - "filter": { - "range": { - "timestamp": { - "format": "yyyy-MM-dd" - } - } - } - } + "bool": {"filter": {"range": {"timestamp": {"format": "yyyy-MM-dd"}}}} } } es = ElasticService(configpath=configpath) - response = await es.post(query=query, start_date=start_datetime, end_date=end_datetime, timestamp_field='timestamp') + response = await es.post( + query=query, + start_date=start_datetime, + end_date=end_datetime, + timestamp_field="timestamp", + ) await es.close() - tasks = [item['_source'] for item in response] + tasks = [item["_source"] for item in response] jobs = pd.json_normalize(tasks) if len(jobs) == 0: return jobs - jobs[['masterNodesCount', 'workerNodesCount', - 'infraNodesCount', 'totalNodesCount']] = jobs[['masterNodesCount', 'workerNodesCount', 'infraNodesCount', 'totalNodesCount']].fillna(0) - jobs.fillna('', inplace=True) - jobs['benchmark'] = jobs.apply(utils.updateBenchmark, axis=1) - jobs['platform'] = jobs.apply(utils.clasifyAWSJobs, axis=1) - jobs['jobStatus'] = jobs.apply(utils.updateStatus, axis=1) - jobs['build'] = jobs.apply(utils.getBuild, axis=1) - jobs['shortVersion'] = jobs['ocpVersion'].str.slice(0, 4) + jobs[ + ["masterNodesCount", "workerNodesCount", "infraNodesCount", "totalNodesCount"] + ] = jobs[ + ["masterNodesCount", "workerNodesCount", "infraNodesCount", "totalNodesCount"] + ].fillna( + 0 + ) + jobs.fillna("", inplace=True) + jobs["benchmark"] = jobs.apply(utils.updateBenchmark, axis=1) + jobs["platform"] = jobs.apply(utils.clasifyAWSJobs, axis=1) + jobs["jobStatus"] = jobs.apply(utils.updateStatus, axis=1) + jobs["build"] = jobs.apply(utils.getBuild, axis=1) + jobs["shortVersion"] = jobs["ocpVersion"].str.slice(0, 4) - return jobs[jobs['platform'] != ""] + return jobs[jobs["platform"] != ""] diff --git a/backend/app/api/v1/commons/telco.py b/backend/app/api/v1/commons/telco.py index 6f4f9a8d..e9d47ac4 100644 --- a/backend/app/api/v1/commons/telco.py +++ b/backend/app/api/v1/commons/telco.py @@ -9,10 +9,18 @@ async def getData(start_datetime: date, end_datetime: date, configpath: str): - test_types = ["oslat", "cyclictest", "cpu_util", "deployment", "ptp", "reboot", "rfc-2544"] + test_types = [ + "oslat", + "cyclictest", + "cpu_util", + "deployment", + "ptp", + "reboot", + "rfc-2544", + ] cfg = config.get_config() try: - jenkins_url = cfg.get('telco.config.job_url') + jenkins_url = cfg.get("telco.config.job_url") except Exception as e: print(f"Error reading telco configuration: {e}") test_type_execution_times = { @@ -25,44 +33,54 @@ async def getData(start_datetime: date, end_datetime: date, configpath: str): "rfc-2544": 5580, } query = { - "earliest_time": "{}T00:00:00".format(start_datetime.strftime('%Y-%m-%d')), - "latest_time": "{}T23:59:59".format(end_datetime.strftime('%Y-%m-%d')), - "output_mode": "json" + "earliest_time": "{}T00:00:00".format(start_datetime.strftime("%Y-%m-%d")), + "latest_time": "{}T23:59:59".format(end_datetime.strftime("%Y-%m-%d")), + "output_mode": "json", } - searchList = ' OR '.join(['test_type="{}"'.format(test_type) for test_type in test_types]) + searchList = " OR ".join( + ['test_type="{}"'.format(test_type) for test_type in test_types] + ) splunk = SplunkService(configpath=configpath) response = await splunk.query(query=query, searchList=searchList) mapped_list = [] for each_response in response: - end_timestamp = int(each_response['timestamp']) - test_data = each_response['data'] + end_timestamp = int(each_response["timestamp"]) + test_data = each_response["data"] threshold = await telcoGraphs.process_json(test_data, True) hash_digest, encrypted_data = hasher.hash_encrypt_json(each_response) - execution_time_seconds = test_type_execution_times.get(test_data['test_type'], 0) + execution_time_seconds = test_type_execution_times.get( + test_data["test_type"], 0 + ) start_timestamp = end_timestamp - execution_time_seconds start_time_utc = datetime.fromtimestamp(start_timestamp, tz=timezone.utc) end_time_utc = datetime.fromtimestamp(end_timestamp, tz=timezone.utc) - kernel = test_data['kernel'] if 'kernel' in test_data else "Undefined" + kernel = test_data["kernel"] if "kernel" in test_data else "Undefined" - mapped_list.append({ - "uuid": hash_digest, - "encryptedData": encrypted_data.decode('utf-8'), - "ciSystem": "Jenkins", - "benchmark": test_data['test_type'], - "kernel": kernel, - "shortVersion": test_data['ocp_version'], - "ocpVersion": test_data['ocp_build'], - "releaseStream": utils.getReleaseStream({'releaseStream': test_data['ocp_build']}), - "nodeName": test_data['node_name'], - "cpu": test_data['cpu'], - 'formal': test_data['formal'], - "startDate": str(start_time_utc), - "endDate": str(end_time_utc), - "buildUrl": jenkins_url + "/" + str(test_data['cluster_artifacts']['ref']['jenkins_build']), - "jobStatus": "failure" if (threshold != 0) else "success", - "jobDuration": execution_time_seconds, - }) + mapped_list.append( + { + "uuid": hash_digest, + "encryptedData": encrypted_data.decode("utf-8"), + "ciSystem": "Jenkins", + "benchmark": test_data["test_type"], + "kernel": kernel, + "shortVersion": test_data["ocp_version"], + "ocpVersion": test_data["ocp_build"], + "releaseStream": utils.getReleaseStream( + {"releaseStream": test_data["ocp_build"]} + ), + "nodeName": test_data["node_name"], + "cpu": test_data["cpu"], + "formal": test_data["formal"], + "startDate": str(start_time_utc), + "endDate": str(end_time_utc), + "buildUrl": jenkins_url + + "/" + + str(test_data["cluster_artifacts"]["ref"]["jenkins_build"]), + "jobStatus": "failure" if (threshold != 0) else "success", + "jobDuration": execution_time_seconds, + } + ) jobs = pd.json_normalize(mapped_list) if len(jobs) == 0: diff --git a/backend/app/api/v1/commons/utils.py b/backend/app/api/v1/commons/utils.py index 6985e9e7..517cc420 100644 --- a/backend/app/api/v1/commons/utils.py +++ b/backend/app/api/v1/commons/utils.py @@ -1,21 +1,16 @@ from app.services.search import ElasticService -async def getMetadata(uuid: str, configpath: str) : - query = { - "query": { - "query_string": { - "query": ( - f'uuid: "{uuid}"') - } - } - } + +async def getMetadata(uuid: str, configpath: str): + query = {"query": {"query_string": {"query": (f'uuid: "{uuid}"')}}} print(query) es = ElasticService(configpath=configpath) response = await es.post(query=query) await es.close() - meta = [item['_source'] for item in response] + meta = [item["_source"] for item in response] return meta[0] + def updateStatus(job): return job["jobStatus"].lower() @@ -51,6 +46,7 @@ def getBuild(job): ocpVersion = job["ocpVersion"] return ocpVersion.replace(releaseStream, "") + def getReleaseStream(row): if row["releaseStream"].__contains__("fast"): return "Fast" diff --git a/backend/app/api/v1/endpoints/cpt/cptJobs.py b/backend/app/api/v1/endpoints/cpt/cptJobs.py index fab46138..6c9a4089 100644 --- a/backend/app/api/v1/endpoints/cpt/cptJobs.py +++ b/backend/app/api/v1/endpoints/cpt/cptJobs.py @@ -17,27 +17,39 @@ router = APIRouter() products = { - "ocp": ocpMapper, - "quay": quayMapper, - "hce": hceMapper, - "telco": telcoMapper, - "ocm": ocmMapper, - } + "ocp": ocpMapper, + "quay": quayMapper, + "hce": hceMapper, + "telco": telcoMapper, + "ocm": ocmMapper, +} -@router.get('/api/v1/cpt/jobs', - summary="Returns a job list from all the products.", - description="Returns a list of jobs in the specified dates. \ +@router.get( + "/api/v1/cpt/jobs", + summary="Returns a job list from all the products.", + description="Returns a list of jobs in the specified dates. \ If not dates are provided the API will default the values. \ `startDate`: will be set to the day of the request minus 5 days.\ `endDate`: will be set to the day of the request.", - responses={ - 200: cpt_200_response(), - 422: response_422(), - },) -async def jobs(start_date: date = Query(None, description="Start date for searching jobs, format: 'YYYY-MM-DD'", examples=["2020-11-10"]), - end_date: date = Query(None, description="End date for searching jobs, format: 'YYYY-MM-DD'", examples=["2020-11-15"]), - pretty: bool = Query(False, description="Output contet in pretty format.")): + responses={ + 200: cpt_200_response(), + 422: response_422(), + }, +) +async def jobs( + start_date: date = Query( + None, + description="Start date for searching jobs, format: 'YYYY-MM-DD'", + examples=["2020-11-10"], + ), + end_date: date = Query( + None, + description="End date for searching jobs, format: 'YYYY-MM-DD'", + examples=["2020-11-15"], + ), + pretty: bool = Query(False, description="Output contet in pretty format."), +): if start_date is None: start_date = datetime.utcnow().date() start_date = start_date - timedelta(days=5) @@ -46,11 +58,19 @@ async def jobs(start_date: date = Query(None, description="Start date for search end_date = datetime.utcnow().date() if start_date > end_date: - return Response(content=json.dumps({'error': "invalid date format, start_date must be less than end_date"}), status_code=422) + return Response( + content=json.dumps( + {"error": "invalid date format, start_date must be less than end_date"} + ), + status_code=422, + ) results_df = pd.DataFrame() with ProcessPoolExecutor(max_workers=cpu_count()) as executor: - futures = {executor.submit(fetch_product, product, start_date, end_date): product for product in products} + futures = { + executor.submit(fetch_product, product, start_date, end_date): product + for product in products + } for future in as_completed(futures): product = futures[future] try: @@ -60,14 +80,14 @@ async def jobs(start_date: date = Query(None, description="Start date for search print(f"Error fetching data for product {product}: {e}") response = { - 'startDate': start_date.__str__(), - 'endDate': end_date.__str__(), - 'results': results_df.to_dict('records') + "startDate": start_date.__str__(), + "endDate": end_date.__str__(), + "results": results_df.to_dict("records"), } if pretty: json_str = json.dumps(response, indent=4) - return Response(content=json_str, media_type='application/json') + return Response(content=json_str, media_type="application/json") jsonstring = json.dumps(response) return jsonstring @@ -76,7 +96,25 @@ async def jobs(start_date: date = Query(None, description="Start date for search async def fetch_product_async(product, start_date, end_date): try: df = await products[product](start_date, end_date) - return df.loc[:, ["ciSystem", "uuid", "releaseStream", "jobStatus", "buildUrl", "startDate", "endDate", "product", "version", "testName"]] if len(df) != 0 else df + return ( + df.loc[ + :, + [ + "ciSystem", + "uuid", + "releaseStream", + "jobStatus", + "buildUrl", + "startDate", + "endDate", + "product", + "version", + "testName", + ], + ] + if len(df) != 0 + else df + ) except ConnectionError: print("Connection Error in mapper for product " + product) except Exception as e: @@ -85,4 +123,4 @@ async def fetch_product_async(product, start_date, end_date): def fetch_product(product, start_date, end_date): - return asyncio.run(fetch_product_async(product, start_date, end_date)) \ No newline at end of file + return asyncio.run(fetch_product_async(product, start_date, end_date)) diff --git a/backend/app/api/v1/endpoints/cpt/maps/hce.py b/backend/app/api/v1/endpoints/cpt/maps/hce.py index c4d6fa17..7f9a8636 100644 --- a/backend/app/api/v1/endpoints/cpt/maps/hce.py +++ b/backend/app/api/v1/endpoints/cpt/maps/hce.py @@ -1,6 +1,7 @@ from ....commons.hce import getData from datetime import date + ################################################################ # This will return a DataFrame from HCE required by the CPT # endpoint, it contians the following columns: @@ -16,15 +17,19 @@ # "testName" ################################################################ async def hceMapper(start_datetime: date, end_datetime: date): - df = await getData(start_datetime, end_datetime, f'hce.elasticsearch') + df = await getData(start_datetime, end_datetime, f"hce.elasticsearch") if len(df) == 0: return df df["releaseStream"] = "Nightly" df["ciSystem"] = "Jenkins" df["testName"] = df["product"] + ":" + df["test"] - df["product"] = df["group"] - df["jobStatus"] = df['result'].apply(lambda x: "SUCCESS" if x == 'PASS' else "FAILURE") - df["version"] = df['version'].apply(lambda x: x if len(x.split(":")) == 1 else x.split(":")[1][:7]) + df["product"] = df["group"] + df["jobStatus"] = df["result"].apply( + lambda x: "SUCCESS" if x == "PASS" else "FAILURE" + ) + df["version"] = df["version"].apply( + lambda x: x if len(x.split(":")) == 1 else x.split(":")[1][:7] + ) df["uuid"] = df["result_id"] df["buildUrl"] = df["link"] df["startDate"] = df["date"] @@ -34,5 +39,7 @@ async def hceMapper(start_datetime: date, end_datetime: date): def dropColumns(df): - df = df.drop(columns=["group","test","result","result_id","link","date","release"]) + df = df.drop( + columns=["group", "test", "result", "result_id", "link", "date", "release"] + ) return df diff --git a/backend/app/api/v1/endpoints/cpt/maps/ocm.py b/backend/app/api/v1/endpoints/cpt/maps/ocm.py index f53bd13e..3f38bfd0 100644 --- a/backend/app/api/v1/endpoints/cpt/maps/ocm.py +++ b/backend/app/api/v1/endpoints/cpt/maps/ocm.py @@ -6,7 +6,7 @@ # This will return a DataFrame from OCM required by the CPT endpoint ################################################################ async def ocmMapper(start_datetime: date, end_datetime: date): - df = await getData(start_datetime, end_datetime, f'ocm.elasticsearch') + df = await getData(start_datetime, end_datetime, f"ocm.elasticsearch") if len(df) == 0: return df df.insert(len(df.columns), "product", "ocm") diff --git a/backend/app/api/v1/endpoints/cpt/maps/ocp.py b/backend/app/api/v1/endpoints/cpt/maps/ocp.py index 69a3649d..af20220b 100644 --- a/backend/app/api/v1/endpoints/cpt/maps/ocp.py +++ b/backend/app/api/v1/endpoints/cpt/maps/ocp.py @@ -7,7 +7,7 @@ # This will return a DataFrame from OCP required by the CPT endpoint ################################################################ async def ocpMapper(start_datetime: date, end_datetime: date): - df = await getData(start_datetime, end_datetime, f'ocp.elasticsearch') + df = await getData(start_datetime, end_datetime, f"ocp.elasticsearch") if len(df) == 0: return df df.insert(len(df.columns), "product", "ocp") diff --git a/backend/app/api/v1/endpoints/cpt/maps/quay.py b/backend/app/api/v1/endpoints/cpt/maps/quay.py index 9eea25b1..c0c61e8f 100644 --- a/backend/app/api/v1/endpoints/cpt/maps/quay.py +++ b/backend/app/api/v1/endpoints/cpt/maps/quay.py @@ -6,7 +6,7 @@ # This will return a DataFrame from Quay required by the CPT endpoint ##################################################################### async def quayMapper(start_datetime: date, end_datetime: date): - df = await getData(start_datetime, end_datetime, f'quay.elasticsearch') + df = await getData(start_datetime, end_datetime, f"quay.elasticsearch") if len(df) == 0: return df df.insert(len(df.columns), "product", "quay") diff --git a/backend/app/api/v1/endpoints/cpt/maps/telco.py b/backend/app/api/v1/endpoints/cpt/maps/telco.py index 51bb2d41..e169d410 100644 --- a/backend/app/api/v1/endpoints/cpt/maps/telco.py +++ b/backend/app/api/v1/endpoints/cpt/maps/telco.py @@ -7,7 +7,7 @@ # This will return a DataFrame from Telco required by the CPT endpoint ##################################################################### async def telcoMapper(start_datetime: date, end_datetime: date): - df = await getData(start_datetime, end_datetime, f'telco.splunk') + df = await getData(start_datetime, end_datetime, f"telco.splunk") if len(df) == 0: return df df.insert(len(df.columns), "product", "telco") diff --git a/backend/app/api/v1/endpoints/jira/jira.py b/backend/app/api/v1/endpoints/jira/jira.py index b86a6860..f23cd6b0 100644 --- a/backend/app/api/v1/endpoints/jira/jira.py +++ b/backend/app/api/v1/endpoints/jira/jira.py @@ -3,12 +3,11 @@ router = APIRouter() + @router.get( - '/api/v1/jira', + "/api/v1/jira", summary="Query Jira Issues", ) -async def query( - q: str = Query(None, description="Jira query language string") -): +async def query(q: str = Query(None, description="Jira query language string")): jira = JiraService() return jira.jql(q) diff --git a/backend/app/api/v1/endpoints/ocm/ocmJobs.py b/backend/app/api/v1/endpoints/ocm/ocmJobs.py index bc76b7eb..de89285a 100644 --- a/backend/app/api/v1/endpoints/ocm/ocmJobs.py +++ b/backend/app/api/v1/endpoints/ocm/ocmJobs.py @@ -9,19 +9,31 @@ router = APIRouter() -@router.get('/api/v1/ocm/jobs', - summary="Returns a job list", - description="Returns a list of jobs in the specified dates. \ +@router.get( + "/api/v1/ocm/jobs", + summary="Returns a job list", + description="Returns a list of jobs in the specified dates. \ If not dates are provided the API will default the values. \ `startDate`: will be set to the day of the request minus 5 days.\ `endDate`: will be set to the day of the request.", - responses={ - 200: ocp_200_response(), - 422: response_422(), - },) -async def jobs(start_date: date = Query(None, description="Start date for searching jobs, format: 'YYYY-MM-DD'", examples=["2020-11-10"]), - end_date: date = Query(None, description="End date for searching jobs, format: 'YYYY-MM-DD'", examples=["2020-11-15"]), - pretty: bool = Query(False, description="Output contet in pretty format.")): + responses={ + 200: ocp_200_response(), + 422: response_422(), + }, +) +async def jobs( + start_date: date = Query( + None, + description="Start date for searching jobs, format: 'YYYY-MM-DD'", + examples=["2020-11-10"], + ), + end_date: date = Query( + None, + description="End date for searching jobs, format: 'YYYY-MM-DD'", + examples=["2020-11-15"], + ), + pretty: bool = Query(False, description="Output contet in pretty format."), +): if start_date is None: start_date = datetime.utcnow().date() start_date = start_date - timedelta(days=5) @@ -30,26 +42,31 @@ async def jobs(start_date: date = Query(None, description="Start date for search end_date = datetime.utcnow().date() if start_date > end_date: - return Response(content=json.dumps({'error': "invalid date format, start_date must be less than end_date"}), status_code=422) + return Response( + content=json.dumps( + {"error": "invalid date format, start_date must be less than end_date"} + ), + status_code=422, + ) - results = await getData(start_date, end_date, 'ocm.elasticsearch') + results = await getData(start_date, end_date, "ocm.elasticsearch") if len(results) >= 1: response = { - 'startDate': start_date.__str__(), - 'endDate': end_date.__str__(), - 'results': results.to_dict('records') + "startDate": start_date.__str__(), + "endDate": end_date.__str__(), + "results": results.to_dict("records"), } else: response = { - 'startDate': start_date.__str__(), - 'endDate': end_date.__str__(), - 'results': [] + "startDate": start_date.__str__(), + "endDate": end_date.__str__(), + "results": [], } if pretty: json_str = json.dumps(response, indent=4) - return Response(content=json_str, media_type='application/json') + return Response(content=json_str, media_type="application/json") jsonstring = json.dumps(response) return jsonstring diff --git a/backend/app/api/v1/endpoints/ocp/graph.py b/backend/app/api/v1/endpoints/ocp/graph.py index f3e090a0..eb05252e 100644 --- a/backend/app/api/v1/endpoints/ocp/graph.py +++ b/backend/app/api/v1/endpoints/ocp/graph.py @@ -1,4 +1,4 @@ -from datetime import datetime,timedelta +from datetime import datetime, timedelta from app.api.v1.commons.utils import getMetadata import trio import semver @@ -13,39 +13,42 @@ """ """ -@router.get('/api/v1/ocp/graph/trend/{version}/{count}/{benchmark}') -async def trend(benchmark: str,count: int,version: str): + + +@router.get("/api/v1/ocp/graph/trend/{version}/{count}/{benchmark}") +async def trend(benchmark: str, count: int, version: str): index = "ripsaw-kube-burner*" meta = {} - meta['benchmark'] = benchmark + meta["benchmark"] = benchmark # Instance types for self-managed. - if count > 50 : - meta['masterNodesType'] = "m6a.4xlarge" - meta['workerNodesType'] = "m5.xlarge" + if count > 50: + meta["masterNodesType"] = "m6a.4xlarge" + meta["workerNodesType"] = "m5.xlarge" else: - meta['masterNodesType'] = "m6a.xlarge" - meta['workerNodesType'] = "m6a.xlarge" - meta['masterNodesCount'] = 3 - meta['workerNodesCount'] = count - meta['platform'] = "AWS" + meta["masterNodesType"] = "m6a.xlarge" + meta["workerNodesType"] = "m6a.xlarge" + meta["masterNodesCount"] = 3 + meta["workerNodesCount"] = count + meta["platform"] = "AWS" # Query the up coming release data - meta['ocpVersion']= version - current_uuids = await getMatchRuns(meta,True) - if len(current_uuids) < 1 : + meta["ocpVersion"] = version + current_uuids = await getMatchRuns(meta, True) + if len(current_uuids) < 1: return [] current_jobs = await jobSummary(current_uuids) current_ids = jobFilter(current_jobs, current_jobs) # Capture result data - oData = await getBurnerResults("",current_ids,index) + oData = await getBurnerResults("", current_ids, index) odf = pd.json_normalize(oData) - columns = ['timestamp', 'quantileName','metricName', 'P99'] + columns = ["timestamp", "quantileName", "metricName", "P99"] odf = pd.DataFrame(odf, columns=columns) - odf = odf.sort_values(by=['timestamp']) - current = {'y' : (odf['P99']/1000).to_list(), - 'x' : odf['timestamp'].to_list(), - 'name' : version + ' results - PodLatency: Ready 99th%tile ( seconds )', - 'type' : 'scatter', + odf = odf.sort_values(by=["timestamp"]) + current = { + "y": (odf["P99"] / 1000).to_list(), + "x": odf["timestamp"].to_list(), + "name": version + " results - PodLatency: Ready 99th%tile ( seconds )", + "type": "scatter", } return [current] @@ -54,104 +57,124 @@ async def trend(benchmark: str,count: int,version: str): diff_cpu - Will accept the version, prev_version , count, benchmark and namespace to diff trend CPU data. """ -@router.get('/api/v1/ocp/graph/trend/{version}/{prev_version}/{count}/{benchmark}/cpu/{namespace}') -async def diff_cpu(namespace: str, benchmark: str, count: int, version: str, prev_version: str): - aTrend = await trend_cpu(namespace,benchmark,count,version) - bTrend = await trend_cpu(namespace,benchmark,count,prev_version) - return [aTrend[0],bTrend[0]] + + +@router.get( + "/api/v1/ocp/graph/trend/{version}/{prev_version}/{count}/{benchmark}/cpu/{namespace}" +) +async def diff_cpu( + namespace: str, benchmark: str, count: int, version: str, prev_version: str +): + aTrend = await trend_cpu(namespace, benchmark, count, version) + bTrend = await trend_cpu(namespace, benchmark, count, prev_version) + return [aTrend[0], bTrend[0]] + """ trend_cpu - Will accept the version, count, benchmark and namespace to trend CPU data. """ -@router.get('/api/v1/ocp/graph/trend/{version}/{count}/{benchmark}/cpu/{namespace}') + + +@router.get("/api/v1/ocp/graph/trend/{version}/{count}/{benchmark}/cpu/{namespace}") async def trend_cpu(namespace: str, benchmark: str, count: int, version: str): index = "ripsaw-kube-burner*" meta = {} - meta['benchmark'] = benchmark - meta['masterNodesCount'] = 3 + meta["benchmark"] = benchmark + meta["masterNodesCount"] = 3 # Instance types for self-managed. - if count > 50 : - meta['masterNodesType'] = "m6a.4xlarge" - meta['workerNodesType'] = "m5.xlarge" - elif count <= 24 and count > 20 : - meta['masterNodesType'] = "m6a.xlarge" - meta['workerNodesType'] = "m6a.xlarge" - else : - meta['masterNodesType'] = "m5.2large" - meta['workerNodesType'] = "m5.xlarge" - meta['workerNodesCount'] = count - meta['platform'] = "AWS" - meta['ocpVersion']= version - current_uuids = await getMatchRuns(meta,True) + if count > 50: + meta["masterNodesType"] = "m6a.4xlarge" + meta["workerNodesType"] = "m5.xlarge" + elif count <= 24 and count > 20: + meta["masterNodesType"] = "m6a.xlarge" + meta["workerNodesType"] = "m6a.xlarge" + else: + meta["masterNodesType"] = "m5.2large" + meta["workerNodesType"] = "m5.xlarge" + meta["workerNodesCount"] = count + meta["platform"] = "AWS" + meta["ocpVersion"] = version + current_uuids = await getMatchRuns(meta, True) # Query the current release data. current_jobs = await jobSummary(current_uuids) - current_ids = jobFilter(current_jobs,current_jobs) - result = await getBurnerCPUResults(current_ids, namespace,index) + current_ids = jobFilter(current_jobs, current_jobs) + result = await getBurnerCPUResults(current_ids, namespace, index) result = parseCPUResults(result) cdf = pd.json_normalize(result) - cdf = cdf.sort_values(by=['timestamp']) - current = {'y' : cdf['cpu_avg'].to_list(), - 'x' : cdf['timestamp'].to_list(), - 'name' : version+' results - '+namespace+' avg CPU usage - for benchmark '+benchmark, - 'type' : 'scatter', + cdf = cdf.sort_values(by=["timestamp"]) + current = { + "y": cdf["cpu_avg"].to_list(), + "x": cdf["timestamp"].to_list(), + "name": version + + " results - " + + namespace + + " avg CPU usage - for benchmark " + + benchmark, + "type": "scatter", } return [current] + def parseCPUResults(data: dict): res = [] - stamps = data['aggregations']['time']['buckets'] - cpu = data['aggregations']['uuid']['buckets'] - for stamp in stamps : + stamps = data["aggregations"]["time"]["buckets"] + cpu = data["aggregations"]["uuid"]["buckets"] + for stamp in stamps: dat = {} - dat['uuid'] = stamp['key'] - dat['timestamp'] = stamp['time']['value_as_string'] - acpu = next(item for item in cpu if item["key"] == stamp['key']) - dat['cpu_avg'] = acpu['cpu']['value'] + dat["uuid"] = stamp["key"] + dat["timestamp"] = stamp["time"]["value_as_string"] + acpu = next(item for item in cpu if item["key"] == stamp["key"]) + dat["cpu_avg"] = acpu["cpu"]["value"] res.append(dat) return res -@router.get('/api/v1/ocp/graph/{uuid}') + +@router.get("/api/v1/ocp/graph/{uuid}") async def graph(uuid: str): index = "" - meta = await getMetadata(uuid, 'ocp.elasticsearch') + meta = await getMetadata(uuid, "ocp.elasticsearch") print(meta) metrics = [] - if meta["benchmark"] == "k8s-netperf" : - uuids = await getMatchRuns(meta,False) + if meta["benchmark"] == "k8s-netperf": + uuids = await getMatchRuns(meta, False) print(uuids) index = "k8s-netperf" - oData = await getResults(uuid,uuids,index) - cData = await getResults(uuid,[uuid],index) + oData = await getResults(uuid, uuids, index) + cData = await getResults(uuid, [uuid], index) oMetrics = await processNetperf(oData) oMetrics = oMetrics.reset_index() nMetrics = await processNetperf(cData) nMetrics = nMetrics.reset_index() - x=[] - y=[] + x = [] + y = [] for index, row in oMetrics.iterrows(): - test = "{}-{}".format(row['profile'], row['messageSize']) - value = "{}".format(row['throughput']) + test = "{}-{}".format(row["profile"], row["messageSize"]) + value = "{}".format(row["throughput"]) x.append(value) y.append(test) - old = {'y' : x, - 'x' : y, - 'name' : 'Previous results average', - 'type' : 'bar', - 'orientation' : 'v'} - x=[] - y=[] + old = { + "y": x, + "x": y, + "name": "Previous results average", + "type": "bar", + "orientation": "v", + } + x = [] + y = [] for index, row in nMetrics.iterrows(): - test = "{}-{}".format(row['profile'], row['messageSize']) - value = "{}".format(row['throughput']) + test = "{}-{}".format(row["profile"], row["messageSize"]) + value = "{}".format(row["throughput"]) x.append(value) y.append(test) - new = {'y' : x, - 'x' : y, - 'name' : 'Current results average', - 'type' : 'bar', - 'orientation' : 'v'} + new = { + "y": x, + "x": y, + "name": "Current results average", + "type": "bar", + "orientation": "v", + } metrics.append(old) metrics.append(new) @@ -161,267 +184,272 @@ async def graph(uuid: str): data = await getResults(uuid, uuids, index) else: index = "ripsaw-kube-burner*" - uuids = await getMatchRuns(meta,True) + uuids = await getMatchRuns(meta, True) # We need to look at the jobSummary to ensure all UUIDs have similar iteration count. job = await jobSummary([uuid]) jobs = await jobSummary(uuids) - ids = jobFilter(job,jobs) + ids = jobFilter(job, jobs) - oData = await getBurnerResults(uuid,ids,index) + oData = await getBurnerResults(uuid, ids, index) oMetrics = await processBurner(oData) oMetrics = oMetrics.reset_index() - cData = await getBurnerResults(uuid,[uuid],index) + cData = await getBurnerResults(uuid, [uuid], index) nMetrics = await processBurner(cData) nMetrics = nMetrics.reset_index() - x=[] - y=[] + x = [] + y = [] for index, row in oMetrics.iterrows(): test = "PodLatency-p99" - value = row['P99'] - x.append(int(value)/1000) + value = row["P99"] + x.append(int(value) / 1000) y.append(test) - old = {'y' : x, - 'x' : y, - 'name' : 'Previous results p99', - 'type' : 'bar', - 'orientation' : 'v'} - x=[] - y=[] + old = { + "y": x, + "x": y, + "name": "Previous results p99", + "type": "bar", + "orientation": "v", + } + x = [] + y = [] for index, row in nMetrics.iterrows(): test = "PodLatency-p99" - value = row['P99'] - x.append(int(value)/1000) + value = row["P99"] + x.append(int(value) / 1000) y.append(test) - new = {'y' : x, - 'x' : y, - 'name' : 'Current results P99', - 'type' : 'bar', - 'orientation' : 'v'} + new = { + "y": x, + "x": y, + "name": "Current results P99", + "type": "bar", + "orientation": "v", + } metrics.append(old) metrics.append(new) return metrics + async def jobSummary(uuids: list): index = "ripsaw-kube-burner*" - ids = "\" OR uuid: \"".join(uuids) + ids = '" OR uuid: "'.join(uuids) query = { "query": { "query_string": { - "query": ( - f'( uuid: \"{ids}\" )' - f' AND metricName: "jobSummary"' - ) + "query": (f'( uuid: "{ids}" )' f' AND metricName: "jobSummary"') } } } print(query) - es = ElasticService(configpath="ocp.elasticsearch",index=index) + es = ElasticService(configpath="ocp.elasticsearch", index=index) response = await es.post(query=query) await es.close() - runs = [item['_source'] for item in response] + runs = [item["_source"] for item in response] return runs -async def processBurner(data: dict) : + +async def processBurner(data: dict): pprint.pprint(data) df = pd.json_normalize(data) filterDF = burnerFilter(df) - ptile = filterDF.groupby(['quantileName'])['P99'].quantile([.99]) + ptile = filterDF.groupby(["quantileName"])["P99"].quantile([0.99]) return ptile -async def processNetperf(data: dict) : + +async def processNetperf(data: dict): pprint.pprint(data) df = pd.json_normalize(data) filterDF = netperfFilter(df) - tput = filterDF.groupby(['profile','messageSize'])['throughput'].mean() + tput = filterDF.groupby(["profile", "messageSize"])["throughput"].mean() return tput + def jobFilter(pdata: dict, data: dict): - columns = ['uuid','jobConfig.jobIterations'] + columns = ["uuid", "jobConfig.jobIterations"] pdf = pd.json_normalize(pdata) pick_df = pd.DataFrame(pdf, columns=columns) - iterations = pick_df.iloc[0]['jobConfig.jobIterations'] + iterations = pick_df.iloc[0]["jobConfig.jobIterations"] df = pd.json_normalize(data) ndf = pd.DataFrame(df, columns=columns) - ids_df = ndf.loc[df['jobConfig.jobIterations'] == iterations ] - return ids_df['uuid'].to_list() + ids_df = ndf.loc[df["jobConfig.jobIterations"] == iterations] + return ids_df["uuid"].to_list() -def burnerFilter(data: dict) : + +def burnerFilter(data: dict): # # Filter out aspects of the test to norm results # pprint.pprint(data) - columns = ['quantileName','metricName', 'P99'] + columns = ["quantileName", "metricName", "P99"] ndf = pd.DataFrame(data, columns=columns) return ndf + def netperfFilter(df): # # Filter out aspects of the test to norm results # - columns = ['profile','hostNetwork','parallelism','service','acrossAZ','samples', - 'messageSize','throughput','test'] + columns = [ + "profile", + "hostNetwork", + "parallelism", + "service", + "acrossAZ", + "samples", + "messageSize", + "throughput", + "test", + ] ndf = pd.DataFrame(df, columns=columns) - hnfilter = df[ (ndf.hostNetwork == True) ].index + hnfilter = df[(ndf.hostNetwork == True)].index hnd = ndf.drop(hnfilter) - sfilter = hnd[ (hnd.service == True)].index + sfilter = hnd[(hnd.service == True)].index sdf = hnd.drop(sfilter) - azfilter = sdf[ (sdf.acrossAZ == True)].index + azfilter = sdf[(sdf.acrossAZ == True)].index adf = sdf.drop(azfilter) - d = adf[ (adf.parallelism == 1) ] - d = d[d.profile.str.contains('TCP_STREAM')] + d = adf[(adf.parallelism == 1)] + d = d[d.profile.str.contains("TCP_STREAM")] return d -async def getBurnerCPUResults(uuids: list, namespace: str, index: str ): - ids = "\" OR uuid: \"".join(uuids) + +async def getBurnerCPUResults(uuids: list, namespace: str, index: str): + ids = '" OR uuid: "'.join(uuids) print(ids) query = { "size": 0, "aggs": { "time": { - "terms": { - "field": "uuid.keyword" + "terms": {"field": "uuid.keyword"}, + "aggs": {"time": {"avg": {"field": "timestamp"}}}, }, - "aggs": { - "time": { - "avg": { - "field": "timestamp"} - } - } - }, - "uuid": { - "terms": { - "field": "uuid.keyword" + "uuid": { + "terms": {"field": "uuid.keyword"}, + "aggs": {"cpu": {"avg": {"field": "value"}}}, }, - "aggs": { - "cpu": { - "avg": { - "field": "value" - } - } - } - } }, "query": { "bool": { - "must": [{ - "query_string": { - "query": ( - f'( uuid: \"{ids}\" )' - f' AND metricName: "containerCPU"' - f' AND labels.namespace.keyword: {namespace}' - ) + "must": [ + { + "query_string": { + "query": ( + f'( uuid: "{ids}" )' + f' AND metricName: "containerCPU"' + f" AND labels.namespace.keyword: {namespace}" + ) + } } - }] + ] } - } + }, } print(query) - es = ElasticService(configpath="ocp.elasticsearch",index=index) - runs = await es.post(query,size=0) + es = ElasticService(configpath="ocp.elasticsearch", index=index) + runs = await es.post(query, size=0) await es.close() return runs -async def getBurnerResults(uuid: str, uuids: list, index: str ): - if len(uuids) > 1 : - if len(uuid) > 0 : + +async def getBurnerResults(uuid: str, uuids: list, index: str): + if len(uuids) > 1: + if len(uuid) > 0: uuids.remove(uuid) - if len(uuids) < 1 : + if len(uuids) < 1: return [] - ids = "\" OR uuid: \"".join(uuids) + ids = '" OR uuid: "'.join(uuids) print(ids) query = { "query": { "query_string": { "query": ( - f'( uuid: \"{ids}\" )' + f'( uuid: "{ids}" )' f' AND metricName: "podLatencyQuantilesMeasurement"' f' AND quantileName: "Ready"' - ) + ) } } } print(query) - es = ElasticService(configpath="ocp.elasticsearch",index=index) + es = ElasticService(configpath="ocp.elasticsearch", index=index) response = await es.post(query=query) await es.close() - runs = [item['_source'] for item in response] + runs = [item["_source"] for item in response] return runs -async def getResults(uuid: str, uuids: list, index: str ): - if len(uuids) > 1 : + +async def getResults(uuid: str, uuids: list, index: str): + if len(uuids) > 1: uuids.remove(uuid) - ids = "\" OR uuid: \"".join(uuids) + ids = '" OR uuid: "'.join(uuids) print(ids) - query = { - "query": { - "query_string": { - "query": ( - f'(uuid: \"{ids}\")') - } - } - } + query = {"query": {"query_string": {"query": (f'(uuid: "{ids}")')}}} print(query) - es = ElasticService(configpath="ocp.elasticsearch",index=index) + es = ElasticService(configpath="ocp.elasticsearch", index=index) response = await es.post(query=query) await es.close() - runs = [item['_source'] for item in response] + runs = [item["_source"] for item in response] return runs + async def getMatchRuns(meta: dict, workerCount: False): index = "perf_scale_ci" version = meta["ocpVersion"][:4] query = { "query": { "bool": { - "must": [{ - "query_string": { - "query": ( - f'benchmark: "{meta["benchmark"]}$"' - f' AND workerNodesType: "{meta["workerNodesType"]}"' - f' AND masterNodesType: "{meta["masterNodesType"]}"' - f' AND platform: "{meta["platform"]}"' - f' AND ocpVersion: {version}*' - f' AND jobStatus: success' - ) - } - }] + "must": [ + { + "query_string": { + "query": ( + f'benchmark: "{meta["benchmark"]}$"' + f' AND workerNodesType: "{meta["workerNodesType"]}"' + f' AND masterNodesType: "{meta["masterNodesType"]}"' + f' AND platform: "{meta["platform"]}"' + f" AND ocpVersion: {version}*" + f" AND jobStatus: success" + ) + } + } + ] } } } - if workerCount : + if workerCount: query = { - "query": { - "bool": { - "must": [{ - "query_string": { - "query": ( - f'benchmark: "{meta["benchmark"]}$"' - f' AND workerNodesType: "{meta["workerNodesType"]}"' - f' AND masterNodesType: "{meta["masterNodesType"]}"' - f' AND masterNodesCount: "{meta["masterNodesCount"]}"' - f' AND workerNodesCount: "{meta["workerNodesCount"]}"' - f' AND platform: "{meta["platform"]}"' - f' AND ocpVersion: {version}*' - f' AND jobStatus: success' - ) + "query": { + "bool": { + "must": [ + { + "query_string": { + "query": ( + f'benchmark: "{meta["benchmark"]}$"' + f' AND workerNodesType: "{meta["workerNodesType"]}"' + f' AND masterNodesType: "{meta["masterNodesType"]}"' + f' AND masterNodesCount: "{meta["masterNodesCount"]}"' + f' AND workerNodesCount: "{meta["workerNodesCount"]}"' + f' AND platform: "{meta["platform"]}"' + f" AND ocpVersion: {version}*" + f" AND jobStatus: success" + ) + } + } + ] } - }] } } - } print(query) es = ElasticService(configpath="ocp.elasticsearch") response = await es.post(query=query) await es.close() - runs = [item['_source'] for item in response] + runs = [item["_source"] for item in response] uuids = [] - for run in runs : + for run in runs: uuids.append(run["uuid"]) return uuids + """ [ { 'y' : ["4,13","4.14"], diff --git a/backend/app/api/v1/endpoints/ocp/ocpJobs.py b/backend/app/api/v1/endpoints/ocp/ocpJobs.py index ab56705a..561d4ea2 100644 --- a/backend/app/api/v1/endpoints/ocp/ocpJobs.py +++ b/backend/app/api/v1/endpoints/ocp/ocpJobs.py @@ -9,20 +9,31 @@ router = APIRouter() - -@router.get('/api/v1/ocp/jobs', - summary="Returns a job list", - description="Returns a list of jobs in the specified dates. \ +@router.get( + "/api/v1/ocp/jobs", + summary="Returns a job list", + description="Returns a list of jobs in the specified dates. \ If not dates are provided the API will default the values. \ `startDate`: will be set to the day of the request minus 5 days.\ `endDate`: will be set to the day of the request.", - responses={ - 200: ocp_200_response(), - 422: response_422(), - },) -async def jobs(start_date: date = Query(None, description="Start date for searching jobs, format: 'YYYY-MM-DD'", examples=["2020-11-10"]), - end_date: date = Query(None, description="End date for searching jobs, format: 'YYYY-MM-DD'", examples=["2020-11-15"]), - pretty: bool = Query(False, description="Output contet in pretty format.")): + responses={ + 200: ocp_200_response(), + 422: response_422(), + }, +) +async def jobs( + start_date: date = Query( + None, + description="Start date for searching jobs, format: 'YYYY-MM-DD'", + examples=["2020-11-10"], + ), + end_date: date = Query( + None, + description="End date for searching jobs, format: 'YYYY-MM-DD'", + examples=["2020-11-15"], + ), + pretty: bool = Query(False, description="Output contet in pretty format."), +): if start_date is None: start_date = datetime.utcnow().date() start_date = start_date - timedelta(days=5) @@ -31,26 +42,31 @@ async def jobs(start_date: date = Query(None, description="Start date for search end_date = datetime.utcnow().date() if start_date > end_date: - return Response(content=json.dumps({'error': "invalid date format, start_date must be less than end_date"}), status_code=422) + return Response( + content=json.dumps( + {"error": "invalid date format, start_date must be less than end_date"} + ), + status_code=422, + ) - results = await getData(start_date, end_date, 'ocp.elasticsearch') + results = await getData(start_date, end_date, "ocp.elasticsearch") if len(results) >= 1: response = { - 'startDate': start_date.__str__(), - 'endDate': end_date.__str__(), - 'results': results.to_dict('records') + "startDate": start_date.__str__(), + "endDate": end_date.__str__(), + "results": results.to_dict("records"), } - else : + else: response = { - 'startDate': start_date.__str__(), - 'endDate': end_date.__str__(), - 'results': [] + "startDate": start_date.__str__(), + "endDate": end_date.__str__(), + "results": [], } if pretty: json_str = json.dumps(response, indent=4) - return Response(content=json_str, media_type='application/json') + return Response(content=json_str, media_type="application/json") jsonstring = json.dumps(response) return jsonstring diff --git a/backend/app/api/v1/endpoints/ocp/results.py b/backend/app/api/v1/endpoints/ocp/results.py index 6b94d70d..09d101e2 100644 --- a/backend/app/api/v1/endpoints/ocp/results.py +++ b/backend/app/api/v1/endpoints/ocp/results.py @@ -6,22 +6,25 @@ router = APIRouter() -@router.get('/api/v1/ocp/jobs/{ci}/{job_id}', - summary="Returns the details of a specified Job.") +@router.get( + "/api/v1/ocp/jobs/{ci}/{job_id}", summary="Returns the details of a specified Job." +) async def results_for_job( - ci: str = Path(..., description="Name of the CI system tha tthe job belongs to.", examples=["PROW", "JENKINS"]), - job_id: str = Path(..., description="Unique identifier of the Jon, normally the UUID.", examples=["8b671d0b-8638-4423-b453-cc54b1caf529"])): - query = { - "query": { - "query_string": { - "query": ( - f'uuid: "{job_id}"') - } - } - } + ci: str = Path( + ..., + description="Name of the CI system tha tthe job belongs to.", + examples=["PROW", "JENKINS"], + ), + job_id: str = Path( + ..., + description="Unique identifier of the Jon, normally the UUID.", + examples=["8b671d0b-8638-4423-b453-cc54b1caf529"], + ), +): + query = {"query": {"query_string": {"query": (f'uuid: "{job_id}"')}}} es = ElasticService(configpath="ocp.elasticsearch") response = await es.post(query=query) await es.close() - tasks = [item['_source'] for item in response] + tasks = [item["_source"] for item in response] return tasks diff --git a/backend/app/api/v1/endpoints/quay/quayGraphs.py b/backend/app/api/v1/endpoints/quay/quayGraphs.py index 63c9e1a5..907f3075 100644 --- a/backend/app/api/v1/endpoints/quay/quayGraphs.py +++ b/backend/app/api/v1/endpoints/quay/quayGraphs.py @@ -9,11 +9,11 @@ router = APIRouter() -@router.get('/api/v1/quay/graph/{uuid}') +@router.get("/api/v1/quay/graph/{uuid}") async def graph(uuid: str): api_index = "quay-vegeta-results" image_push_pull_index = "quay-push-pull" - meta = await getMetadata(uuid, 'quay.elasticsearch') + meta = await getMetadata(uuid, "quay.elasticsearch") uuids = await getMatchRuns(meta) prevApiData = await getQuayMetrics(uuids, api_index) prevImagesData = await getImageMetrics(uuids, image_push_pull_index) @@ -33,251 +33,177 @@ async def graph(uuid: str): prevX, prevY = [], [] currX, currY = [], [] - for (pKey, pValue), (cKey, cValue) in zip(prevApiResults.items(), currentApiResults.items()): + for (pKey, pValue), (cKey, cValue) in zip( + prevApiResults.items(), currentApiResults.items() + ): prevX.append(pKey) currX.append(cKey) prevY.append(pValue) currY.append(cValue) prev = { - 'x': prevX, - 'y': prevY, - 'name': 'Previous API status codes', - 'type': 'bar', - 'orientation': 'v' + "x": prevX, + "y": prevY, + "name": "Previous API status codes", + "type": "bar", + "orientation": "v", } curr = { - 'x': currX, - 'y': currY, - 'name': 'Current API status codes', - 'type': 'bar', - 'orientation': 'v' + "x": currX, + "y": currY, + "name": "Current API status codes", + "type": "bar", + "orientation": "v", } apiResults.append(prev) apiResults.append(curr) prev = { - 'x': ['success_count', 'failure_count'], - 'y': [prevImagesResults['success_count'], prevImagesResults['failure_count']], - 'name': 'Previous images status count', - 'type': 'bar', - 'orientation': 'v' + "x": ["success_count", "failure_count"], + "y": [prevImagesResults["success_count"], prevImagesResults["failure_count"]], + "name": "Previous images status count", + "type": "bar", + "orientation": "v", } curr = { - 'x': ['success_count', 'failure_count'], - 'y': [currentImagesResults['success_count'], currentImagesResults['failure_count']], - 'name': 'Current images status count', - 'type': 'bar', - 'orientation': 'v' + "x": ["success_count", "failure_count"], + "y": [ + currentImagesResults["success_count"], + currentImagesResults["failure_count"], + ], + "name": "Current images status count", + "type": "bar", + "orientation": "v", } imageResults.append(prev) imageResults.append(curr) prev = { - 'x': ['api_latency', 'image_push_pull_latency'], - 'y': [(prevApiData['aggregations']['latency']['value'])/1000, prevImagesResults['latency'] * 1000], - 'name': 'Previous latencies', - 'type': 'bar', - 'orientation': 'v' + "x": ["api_latency", "image_push_pull_latency"], + "y": [ + (prevApiData["aggregations"]["latency"]["value"]) / 1000, + prevImagesResults["latency"] * 1000, + ], + "name": "Previous latencies", + "type": "bar", + "orientation": "v", } curr = { - 'x': ['api_latency', 'image_push_pull_latency'], - 'y': [(currentApiData['aggregations']['latency']['value'])/1000, currentImagesResults['latency'] * 1000], - 'name': 'Current latencies', - 'type': 'bar', - 'orientation': 'v' + "x": ["api_latency", "image_push_pull_latency"], + "y": [ + (currentApiData["aggregations"]["latency"]["value"]) / 1000, + currentImagesResults["latency"] * 1000, + ], + "name": "Current latencies", + "type": "bar", + "orientation": "v", } latencyResults.append(prev) latencyResults.append(curr) return { - 'apiResults': apiResults, - 'imageResults': imageResults, - 'latencyResults': latencyResults + "apiResults": apiResults, + "imageResults": imageResults, + "latencyResults": latencyResults, } def safe_add(source, output, key, target_key): - value = source.get(key, {}).get('value', 0.0) + value = source.get(key, {}).get("value", 0.0) if value is not None: output[target_key] += value async def parseApiResults(data: dict): - resultData = {'status_code_0': 0.0, 'status_code_2XX': 0.0, 'status_code_4XX': 0.0, 'status_code_5XX': 0.0} - safe_add(data['aggregations'], resultData, 'status_codes.0', 'status_code_0') - safe_add(data['aggregations'], resultData, 'status_codes.200', 'status_code_2XX') - safe_add(data['aggregations'], resultData, 'status_codes.201', 'status_code_2XX') - safe_add(data['aggregations'], resultData, 'status_codes.204', 'status_code_2XX') - safe_add(data['aggregations'], resultData, 'status_codes.400', 'status_code_4XX') - safe_add(data['aggregations'], resultData, 'status_codes.401', 'status_code_4XX') - safe_add(data['aggregations'], resultData, 'status_codes.403', 'status_code_4XX') - safe_add(data['aggregations'], resultData, 'status_codes.404', 'status_code_4XX') - safe_add(data['aggregations'], resultData, 'status_codes.405', 'status_code_4XX') - safe_add(data['aggregations'], resultData, 'status_codes.408', 'status_code_4XX') - safe_add(data['aggregations'], resultData, 'status_codes.500', 'status_code_5XX') - safe_add(data['aggregations'], resultData, 'status_codes.502', 'status_code_5XX') - safe_add(data['aggregations'], resultData, 'status_codes.503', 'status_code_5XX') - safe_add(data['aggregations'], resultData, 'status_codes.504', 'status_code_5XX') + resultData = { + "status_code_0": 0.0, + "status_code_2XX": 0.0, + "status_code_4XX": 0.0, + "status_code_5XX": 0.0, + } + safe_add(data["aggregations"], resultData, "status_codes.0", "status_code_0") + safe_add(data["aggregations"], resultData, "status_codes.200", "status_code_2XX") + safe_add(data["aggregations"], resultData, "status_codes.201", "status_code_2XX") + safe_add(data["aggregations"], resultData, "status_codes.204", "status_code_2XX") + safe_add(data["aggregations"], resultData, "status_codes.400", "status_code_4XX") + safe_add(data["aggregations"], resultData, "status_codes.401", "status_code_4XX") + safe_add(data["aggregations"], resultData, "status_codes.403", "status_code_4XX") + safe_add(data["aggregations"], resultData, "status_codes.404", "status_code_4XX") + safe_add(data["aggregations"], resultData, "status_codes.405", "status_code_4XX") + safe_add(data["aggregations"], resultData, "status_codes.408", "status_code_4XX") + safe_add(data["aggregations"], resultData, "status_codes.500", "status_code_5XX") + safe_add(data["aggregations"], resultData, "status_codes.502", "status_code_5XX") + safe_add(data["aggregations"], resultData, "status_codes.503", "status_code_5XX") + safe_add(data["aggregations"], resultData, "status_codes.504", "status_code_5XX") return resultData async def parseImageResults(data: dict): - totals = {'latency': 0.0, 'success_count': 0.0, 'failure_count': 0.0} - datapoints = data['aggregations']['uuid']['buckets'] + totals = {"latency": 0.0, "success_count": 0.0, "failure_count": 0.0} + datapoints = data["aggregations"]["uuid"]["buckets"] for each in datapoints: - safe_add(each, totals, 'latency', 'latency') - safe_add(each, totals, 'success_count', 'success_count') - safe_add(each, totals, 'failure_count', 'failure_count') - totals['latency'] /= len(datapoints) - totals['success_count'] /= len(datapoints) - totals['failure_count'] /= len(datapoints) + safe_add(each, totals, "latency", "latency") + safe_add(each, totals, "success_count", "success_count") + safe_add(each, totals, "failure_count", "failure_count") + totals["latency"] /= len(datapoints) + totals["success_count"] /= len(datapoints) + totals["failure_count"] /= len(datapoints) return totals async def getImageMetrics(uuids: list, index: str): - ids = "\" OR uuid: \"".join(uuids) + ids = '" OR uuid: "'.join(uuids) query = { "size": 0, "aggs": { "uuid": { - "terms": { - "field": "uuid.keyword" - }, - "aggs": { - "latency": { - "avg": { - "field": "elapsed_time" - } + "terms": {"field": "uuid.keyword"}, + "aggs": { + "latency": {"avg": {"field": "elapsed_time"}}, + "success_count": {"sum": {"field": "success_count"}}, + "failure_count": {"sum": {"field": "failure_count"}}, }, - "success_count": { - "sum": { - "field": "success_count" - } - }, - "failure_count": { - "sum": { - "field": "failure_count" - } - } - } } }, "query": { - "bool": { - "must": [{ - "query_string": { - "query": ( - f'( uuid: \"{ids}\" )' - ) - } - }] - } - } + "bool": {"must": [{"query_string": {"query": (f'( uuid: "{ids}" )')}}]} + }, } print(query) - es = ElasticService(configpath="quay.elasticsearch",index=index) - results = await es.post(query,size=0) + es = ElasticService(configpath="quay.elasticsearch", index=index) + results = await es.post(query, size=0) await es.close() return results async def getQuayMetrics(uuids: list, index: str): - ids = "\" OR uuid: \"".join(uuids) + ids = '" OR uuid: "'.join(uuids) query = { "size": 0, "aggs": { - "latency": { - "avg": { - "field": "req_latency" - } - }, - "status_codes.0": { - "avg": { - "field": "status_codes.0" - } - }, - "status_codes.200": { - "avg": { - "field": "status_codes.200" - } - }, - "status_codes.201": { - "avg": { - "field": "status_codes.201" - } - }, - "status_codes.204": { - "avg": { - "field": "status_codes.204" - } - }, - "status_codes.400": { - "avg": { - "field": "status_codes.400" - } - }, - "status_codes.401": { - "avg": { - "field": "status_codes.401" - } - }, - "status_codes.403": { - "avg": { - "field": "status_codes.403" - } - }, - "status_codes.404": { - "avg": { - "field": "status_codes.404" - } - }, - "status_codes.405": { - "avg": { - "field": "status_codes.405" - } - }, - "status_codes.408": { - "avg": { - "field": "status_codes.408" - } - }, - "status_codes.500": { - "avg": { - "field": "status_codes.500" - } - }, - "status_codes.502": { - "avg": { - "field": "status_codes.502" - } - }, - "status_codes.503": { - "avg": { - "field": "status_codes.503" - } - }, - "status_codes.504": { - "avg": { - "field": "status_codes.504" - } - } + "latency": {"avg": {"field": "req_latency"}}, + "status_codes.0": {"avg": {"field": "status_codes.0"}}, + "status_codes.200": {"avg": {"field": "status_codes.200"}}, + "status_codes.201": {"avg": {"field": "status_codes.201"}}, + "status_codes.204": {"avg": {"field": "status_codes.204"}}, + "status_codes.400": {"avg": {"field": "status_codes.400"}}, + "status_codes.401": {"avg": {"field": "status_codes.401"}}, + "status_codes.403": {"avg": {"field": "status_codes.403"}}, + "status_codes.404": {"avg": {"field": "status_codes.404"}}, + "status_codes.405": {"avg": {"field": "status_codes.405"}}, + "status_codes.408": {"avg": {"field": "status_codes.408"}}, + "status_codes.500": {"avg": {"field": "status_codes.500"}}, + "status_codes.502": {"avg": {"field": "status_codes.502"}}, + "status_codes.503": {"avg": {"field": "status_codes.503"}}, + "status_codes.504": {"avg": {"field": "status_codes.504"}}, }, "query": { - "bool": { - "must": [{ - "query_string": { - "query": ( - f'( uuid: \"{ids}\" )' - ) - } - }] - } - } + "bool": {"must": [{"query_string": {"query": (f'( uuid: "{ids}" )')}}]} + }, } print(query) - es = ElasticService(configpath="quay.elasticsearch",index=index) - results = await es.post(query,size=0) + es = ElasticService(configpath="quay.elasticsearch", index=index) + results = await es.post(query, size=0) await es.close() return results @@ -287,22 +213,24 @@ async def getMatchRuns(meta: dict): query = { "query": { "bool": { - "must": [{ - "query_string": { - "query": ( - f'benchmark: "{meta["benchmark"]}$"' - f' AND hitSize: "{meta["hitSize"]}"' - f' AND concurrency: "{meta["concurrency"]}"' - f' AND imagePushPulls: "{meta["imagePushPulls"]}"' - f' AND workerNodesType: "{meta["workerNodesType"]}"' - f' AND masterNodesType: "{meta["masterNodesType"]}"' - f' AND masterNodesCount: "{meta["masterNodesCount"]}"' - f' AND workerNodesCount: "{meta["workerNodesCount"]}"' - f' AND releaseStream: "{meta["releaseStream"]}"' - f' AND jobStatus: success' - ) - } - }] + "must": [ + { + "query_string": { + "query": ( + f'benchmark: "{meta["benchmark"]}$"' + f' AND hitSize: "{meta["hitSize"]}"' + f' AND concurrency: "{meta["concurrency"]}"' + f' AND imagePushPulls: "{meta["imagePushPulls"]}"' + f' AND workerNodesType: "{meta["workerNodesType"]}"' + f' AND masterNodesType: "{meta["masterNodesType"]}"' + f' AND masterNodesCount: "{meta["masterNodesCount"]}"' + f' AND workerNodesCount: "{meta["workerNodesCount"]}"' + f' AND releaseStream: "{meta["releaseStream"]}"' + f" AND jobStatus: success" + ) + } + } + ] } } } @@ -311,8 +239,8 @@ async def getMatchRuns(meta: dict): es = ElasticService(configpath="quay.elasticsearch") response = await es.post(query=query) await es.close() - runs = [item['_source'] for item in response] + runs = [item["_source"] for item in response] uuids = [] - for run in runs : + for run in runs: uuids.append(run["uuid"]) - return uuids \ No newline at end of file + return uuids diff --git a/backend/app/api/v1/endpoints/quay/quayJobs.py b/backend/app/api/v1/endpoints/quay/quayJobs.py index e3e8bf22..b141b107 100644 --- a/backend/app/api/v1/endpoints/quay/quayJobs.py +++ b/backend/app/api/v1/endpoints/quay/quayJobs.py @@ -9,20 +9,31 @@ router = APIRouter() - -@router.get('/api/v1/quay/jobs', - summary="Returns a job list", - description="Returns a list of jobs in the specified dates. \ +@router.get( + "/api/v1/quay/jobs", + summary="Returns a job list", + description="Returns a list of jobs in the specified dates. \ If not dates are provided the API will default the values. \ `startDate`: will be set to the day of the request minus 5 days.\ `endDate`: will be set to the day of the request.", - responses={ - 200: quay_200_response(), - 422: response_422(), - },) -async def jobs(start_date: date = Query(None, description="Start date for searching jobs, format: 'YYYY-MM-DD'", examples=["2020-11-10"]), - end_date: date = Query(None, description="End date for searching jobs, format: 'YYYY-MM-DD'", examples=["2020-11-15"]), - pretty: bool = Query(False, description="Output contet in pretty format.")): + responses={ + 200: quay_200_response(), + 422: response_422(), + }, +) +async def jobs( + start_date: date = Query( + None, + description="Start date for searching jobs, format: 'YYYY-MM-DD'", + examples=["2020-11-10"], + ), + end_date: date = Query( + None, + description="End date for searching jobs, format: 'YYYY-MM-DD'", + examples=["2020-11-15"], + ), + pretty: bool = Query(False, description="Output contet in pretty format."), +): if start_date is None: start_date = datetime.utcnow().date() start_date = start_date - timedelta(days=5) @@ -31,26 +42,31 @@ async def jobs(start_date: date = Query(None, description="Start date for search end_date = datetime.utcnow().date() if start_date > end_date: - return Response(content=json.dumps({'error': "invalid date format, start_date must be less than end_date"}), status_code=422) + return Response( + content=json.dumps( + {"error": "invalid date format, start_date must be less than end_date"} + ), + status_code=422, + ) - results = await getData(start_date, end_date, 'quay.elasticsearch') + results = await getData(start_date, end_date, "quay.elasticsearch") - if len(results) >= 1 : + if len(results) >= 1: response = { - 'startDate': start_date.__str__(), - 'endDate': end_date.__str__(), - 'results': results.to_dict('records') + "startDate": start_date.__str__(), + "endDate": end_date.__str__(), + "results": results.to_dict("records"), } - else : + else: response = { - 'startDate': start_date.__str__(), - 'endDate': end_date.__str__(), - 'results': [] + "startDate": start_date.__str__(), + "endDate": end_date.__str__(), + "results": [], } if pretty: json_str = json.dumps(response, indent=4) - return Response(content=json_str, media_type='application/json') + return Response(content=json_str, media_type="application/json") jsonstring = json.dumps(response) return jsonstring diff --git a/backend/app/api/v1/endpoints/telco/telcoGraphs.py b/backend/app/api/v1/endpoints/telco/telcoGraphs.py index 41b90cc1..1242460b 100644 --- a/backend/app/api/v1/endpoints/telco/telcoGraphs.py +++ b/backend/app/api/v1/endpoints/telco/telcoGraphs.py @@ -3,401 +3,453 @@ router = APIRouter() + @router.get("/api/v1/telco/graph/{uuid}/{encryptedData}") async def graph(uuid: str, encryptedData: str): - bytesData = encryptedData.encode("utf-8") - decrypted_data = hasher.decrypt_unhash_json(uuid, bytesData) - json_data = decrypted_data["data"] - return await process_json(json_data, False) + bytesData = encryptedData.encode("utf-8") + decrypted_data = hasher.decrypt_unhash_json(uuid, bytesData) + json_data = decrypted_data["data"] + return await process_json(json_data, False) + async def process_json(json_data: dict, is_row: bool): - function_mapper = { - "ptp": process_ptp, - "oslat": process_oslat, - "reboot": process_reboot, - "cpu_util": process_cpu_util, - "rfc-2544": process_rfc_2544, - "cyclictest": process_cyclictest, - "deployment": process_deployment, - } - mapped_function = function_mapper.get(json_data["test_type"]) - return mapped_function(json_data, is_row) + function_mapper = { + "ptp": process_ptp, + "oslat": process_oslat, + "reboot": process_reboot, + "cpu_util": process_cpu_util, + "rfc-2544": process_rfc_2544, + "cyclictest": process_cyclictest, + "deployment": process_deployment, + } + mapped_function = function_mapper.get(json_data["test_type"]) + return mapped_function(json_data, is_row) + def process_ptp(json_data: str, is_row: bool): - nic = json_data["nic"] - ptp4l_max_offset = json_data.get("ptp4l_max_offset", 0) - if "mellanox" in nic.lower(): - defined_offset_threshold = 200 - else: - defined_offset_threshold = 100 - minus_offset = 0 - if ptp4l_max_offset > defined_offset_threshold: - minus_offset = ptp4l_max_offset - defined_offset_threshold - - if is_row: - return minus_offset - else: - return { - "ptp": [ - { - "name": "Data Points", - "x": ["-inf", "ptp4l_max_offset", "inf"], - "y": [0, ptp4l_max_offset, 0], - "mode": "markers", - "marker": { - "size": 10, - }, - "error_y": { - "type": "data", - "symmetric": "false", - "array": [0, 0, 0], - "arrayminus": [0, minus_offset, 0] - }, - - }, - { - "name": "Threshold", - "x": ["-inf", "ptp4l_max_offset", "inf"], - "y": [defined_offset_threshold, defined_offset_threshold, defined_offset_threshold], - "mode": "lines", - "line": { - "dash": 'dot', - "width": 3, - }, - "marker": { - "size": 15, - }, - "type": "scatter", - } - ] - } + nic = json_data["nic"] + ptp4l_max_offset = json_data.get("ptp4l_max_offset", 0) + if "mellanox" in nic.lower(): + defined_offset_threshold = 200 + else: + defined_offset_threshold = 100 + minus_offset = 0 + if ptp4l_max_offset > defined_offset_threshold: + minus_offset = ptp4l_max_offset - defined_offset_threshold + + if is_row: + return minus_offset + else: + return { + "ptp": [ + { + "name": "Data Points", + "x": ["-inf", "ptp4l_max_offset", "inf"], + "y": [0, ptp4l_max_offset, 0], + "mode": "markers", + "marker": { + "size": 10, + }, + "error_y": { + "type": "data", + "symmetric": "false", + "array": [0, 0, 0], + "arrayminus": [0, minus_offset, 0], + }, + }, + { + "name": "Threshold", + "x": ["-inf", "ptp4l_max_offset", "inf"], + "y": [ + defined_offset_threshold, + defined_offset_threshold, + defined_offset_threshold, + ], + "mode": "lines", + "line": { + "dash": "dot", + "width": 3, + }, + "marker": { + "size": 15, + }, + "type": "scatter", + }, + ] + } + def process_reboot(json_data: str, is_row: bool): - max_minutes = 0.0 - avg_minutes = 0.0 - minus_max_minutes = 0.0 - minus_avg_minutes = 0.0 - defined_threshold = 20 - reboot_type = json_data["reboot_type"] - for each_iteration in json_data["Iterations"]: - max_minutes = max(max_minutes, each_iteration.get("total_minutes", 0)) - avg_minutes += each_iteration.get("total_minutes", 0) - avg_minutes /= len(json_data["Iterations"]) - if max_minutes > defined_threshold: - minus_max_minutes = max_minutes - defined_threshold - if avg_minutes > defined_threshold: - minus_avg_minutes = avg_minutes - defined_threshold - - if is_row: - return 1 if (minus_avg_minutes != 0 or minus_max_minutes != 0) else 0 - else: - return { - "reboot": [ - { - "name": "Data Points", - "x": [reboot_type + "_" + "max_minutes", reboot_type + "_" + "avg_minutes"], - "y": [max_minutes, avg_minutes], - "mode": "markers", - "marker": { - "size": 10, - }, - "error_y": { - "type": "data", - "symmetric": "false", - "array": [0, 0], - "arrayminus": [minus_max_minutes, minus_avg_minutes] - }, - "type": "scatter", - }, - { - "name": "Threshold", - "x": [reboot_type + "_" + "max_minutes", reboot_type + "_" + "avg_minutes"], - "y": [defined_threshold, defined_threshold], - "mode": "lines", - "marker": { - "size": 15, - }, - "line": { - "dash": "dot", - "width": 3, - }, - "type": "scatter", - } - ] - } + max_minutes = 0.0 + avg_minutes = 0.0 + minus_max_minutes = 0.0 + minus_avg_minutes = 0.0 + defined_threshold = 20 + reboot_type = json_data["reboot_type"] + for each_iteration in json_data["Iterations"]: + max_minutes = max(max_minutes, each_iteration.get("total_minutes", 0)) + avg_minutes += each_iteration.get("total_minutes", 0) + avg_minutes /= len(json_data["Iterations"]) + if max_minutes > defined_threshold: + minus_max_minutes = max_minutes - defined_threshold + if avg_minutes > defined_threshold: + minus_avg_minutes = avg_minutes - defined_threshold + + if is_row: + return 1 if (minus_avg_minutes != 0 or minus_max_minutes != 0) else 0 + else: + return { + "reboot": [ + { + "name": "Data Points", + "x": [ + reboot_type + "_" + "max_minutes", + reboot_type + "_" + "avg_minutes", + ], + "y": [max_minutes, avg_minutes], + "mode": "markers", + "marker": { + "size": 10, + }, + "error_y": { + "type": "data", + "symmetric": "false", + "array": [0, 0], + "arrayminus": [minus_max_minutes, minus_avg_minutes], + }, + "type": "scatter", + }, + { + "name": "Threshold", + "x": [ + reboot_type + "_" + "max_minutes", + reboot_type + "_" + "avg_minutes", + ], + "y": [defined_threshold, defined_threshold], + "mode": "lines", + "marker": { + "size": 15, + }, + "line": { + "dash": "dot", + "width": 3, + }, + "type": "scatter", + }, + ] + } + def process_cpu_util(json_data: str, is_row: bool): - total_max_cpu = 0.0 - total_avg_cpu = 0.0 - minus_max_cpu = 0.0 - minus_avg_cpu = 0.0 - defined_threshold = 3.0 - for each_scenario in json_data["scenarios"]: - if each_scenario["scenario_name"] == "steadyworkload": - for each_type in each_scenario["types"]: - if each_type["type_name"] == "total": - total_max_cpu = each_type.get("max_cpu", 0) - break - total_avg_cpu = each_scenario.get("avg_cpu_total", 0) - break - if total_max_cpu > defined_threshold: - minus_max_cpu = total_max_cpu - defined_threshold - if total_avg_cpu > defined_threshold: - minus_avg_cpu = total_avg_cpu - defined_threshold - - if is_row: - return 1 if (minus_avg_cpu != 0 or minus_max_cpu != 0) else 0 - else: - return { - "cpu_util": [ - { - "name": "Data Points", - "x": ["total_max_cpu", "total_avg_cpu"], - "y": [total_max_cpu, total_avg_cpu], - "mode": "markers", - "marker": { - "size": 10, - }, - "error_y": { - "type": "data", - "symmetric": "false", - "array": [0, 0], - "arrayminus": [minus_max_cpu, minus_avg_cpu] - }, - "type": "scatter", - }, - { - "name": "Threshold", - "x": ["total_max_cpu", "total_avg_cpu"], - "y": [defined_threshold, defined_threshold], - "mode": "lines", - "marker": { - "size": 15, - }, - "line": { - "dash": "dot", - "width": 3, - }, - "type": "scatter", - } - ] - } + total_max_cpu = 0.0 + total_avg_cpu = 0.0 + minus_max_cpu = 0.0 + minus_avg_cpu = 0.0 + defined_threshold = 3.0 + for each_scenario in json_data["scenarios"]: + if each_scenario["scenario_name"] == "steadyworkload": + for each_type in each_scenario["types"]: + if each_type["type_name"] == "total": + total_max_cpu = each_type.get("max_cpu", 0) + break + total_avg_cpu = each_scenario.get("avg_cpu_total", 0) + break + if total_max_cpu > defined_threshold: + minus_max_cpu = total_max_cpu - defined_threshold + if total_avg_cpu > defined_threshold: + minus_avg_cpu = total_avg_cpu - defined_threshold + + if is_row: + return 1 if (minus_avg_cpu != 0 or minus_max_cpu != 0) else 0 + else: + return { + "cpu_util": [ + { + "name": "Data Points", + "x": ["total_max_cpu", "total_avg_cpu"], + "y": [total_max_cpu, total_avg_cpu], + "mode": "markers", + "marker": { + "size": 10, + }, + "error_y": { + "type": "data", + "symmetric": "false", + "array": [0, 0], + "arrayminus": [minus_max_cpu, minus_avg_cpu], + }, + "type": "scatter", + }, + { + "name": "Threshold", + "x": ["total_max_cpu", "total_avg_cpu"], + "y": [defined_threshold, defined_threshold], + "mode": "lines", + "marker": { + "size": 15, + }, + "line": { + "dash": "dot", + "width": 3, + }, + "type": "scatter", + }, + ] + } + def process_rfc_2544(json_data: str, is_row: bool): - max_delay = json_data.get("max_delay", 0) - defined_delay_threshold = 30.0 - minus_max_delay = 0.0 - if max_delay > defined_delay_threshold: - minus_max_delay = max_delay - defined_delay_threshold - - if is_row: - return minus_max_delay - else: - return { - "rfc-2544": [ - { - "x": ["-inf", "max_delay", "inf"], - "y": [0, max_delay, 0], - "mode": "markers", - "marker": { - "size": 10, - }, - "name": "Data Points", - "error_y": { - "type": "data", - "symmetric": "false", - "array": [0, 0, 0], - "arrayminus": [0, minus_max_delay, 0] - }, - "type": "scatter", - }, - { - "x": ["-inf", "max_delay", "inf"], - "y": [defined_delay_threshold, defined_delay_threshold, defined_delay_threshold], - "name": "Threshold", - "mode": "lines", - "marker": { - "size": 15, - }, - "line": { - "dash": "dot", - "width": 3, - }, - "type": "scatter" - } - ] - } - + max_delay = json_data.get("max_delay", 0) + defined_delay_threshold = 30.0 + minus_max_delay = 0.0 + if max_delay > defined_delay_threshold: + minus_max_delay = max_delay - defined_delay_threshold + + if is_row: + return minus_max_delay + else: + return { + "rfc-2544": [ + { + "x": ["-inf", "max_delay", "inf"], + "y": [0, max_delay, 0], + "mode": "markers", + "marker": { + "size": 10, + }, + "name": "Data Points", + "error_y": { + "type": "data", + "symmetric": "false", + "array": [0, 0, 0], + "arrayminus": [0, minus_max_delay, 0], + }, + "type": "scatter", + }, + { + "x": ["-inf", "max_delay", "inf"], + "y": [ + defined_delay_threshold, + defined_delay_threshold, + defined_delay_threshold, + ], + "name": "Threshold", + "mode": "lines", + "marker": { + "size": 15, + }, + "line": { + "dash": "dot", + "width": 3, + }, + "type": "scatter", + }, + ] + } + + def process_oslat(json_data: str, is_row: bool): - result = get_oslat_or_cyclictest(json_data, is_row) - return result if is_row else { "oslat": result } + result = get_oslat_or_cyclictest(json_data, is_row) + return result if is_row else {"oslat": result} + def process_cyclictest(json_data: str, is_row: bool): - result = get_oslat_or_cyclictest(json_data, is_row) - return result if is_row else { "cyclictest": result } + result = get_oslat_or_cyclictest(json_data, is_row) + return result if is_row else {"cyclictest": result} + def process_deployment(json_data: str, is_row: bool): - total_minutes = json_data.get("total_minutes", 0) - reboot_count = json_data.get("reboot_count", 0) - defined_total_minutes_threshold = 180 - defined_total_reboot_count = 3 - minus_total_minutes = 0.0 - minus_total_reboot_count = 0.0 - if total_minutes > defined_total_minutes_threshold: - minus_total_minutes = total_minutes - defined_total_minutes_threshold - if reboot_count > defined_total_reboot_count: - minus_total_reboot_count = reboot_count - defined_total_reboot_count - - if is_row: - return 1 if (minus_total_minutes != 0 or minus_total_reboot_count != 0) else 0 - else: - return { - "deployment": { - "total_minutes": [ - { - "name": "Data Points", - "x": ["-inf", "total_minutes", "inf"], - "y": [0, total_minutes, 0], - "mode": "markers", - "marker": { - "size": 10, - }, - "error_y": { - "type": "data", - "symmetric": "false", - "array": [0, 0, 0], - "arrayminus": [0, minus_total_minutes, 0] - }, - "type": "scatter", - }, - { - "name": "Threshold", - "x": ["-inf", "total_minutes", "inf"], - "y": [defined_total_minutes_threshold, defined_total_minutes_threshold, defined_total_minutes_threshold], - "mode": "lines", - "marker": { - "size": 15, - }, - "line": { - "dash": "dot", - "width": 3, - }, - "type": "scatter", - } - ], - "total_reboot_count": [ - { - "name": "Data Points", - "x": ["-inf", "reboot_count", "inf"], - "y": [0, reboot_count, 0], - "mode": "markers", - "marker": { - "size": 10, - }, - "error_y": { - "type": "data", - "symmetric": "false", - "array": [0, 0, 0], - "arrayminus": [0, minus_total_reboot_count, 0] - }, - "type": "scatter", - }, - { - "name": "Threshold", - "x": ["-inf", "reboot_count", "inf"], - "y": [defined_total_reboot_count, defined_total_reboot_count, defined_total_reboot_count], - "mode": "lines", - "marker": { - "size": 15, - }, - "line": { - "dash": "dot", - "width": 3, - }, - "type": "scatter", - } - ] - } - } + total_minutes = json_data.get("total_minutes", 0) + reboot_count = json_data.get("reboot_count", 0) + defined_total_minutes_threshold = 180 + defined_total_reboot_count = 3 + minus_total_minutes = 0.0 + minus_total_reboot_count = 0.0 + if total_minutes > defined_total_minutes_threshold: + minus_total_minutes = total_minutes - defined_total_minutes_threshold + if reboot_count > defined_total_reboot_count: + minus_total_reboot_count = reboot_count - defined_total_reboot_count + + if is_row: + return 1 if (minus_total_minutes != 0 or minus_total_reboot_count != 0) else 0 + else: + return { + "deployment": { + "total_minutes": [ + { + "name": "Data Points", + "x": ["-inf", "total_minutes", "inf"], + "y": [0, total_minutes, 0], + "mode": "markers", + "marker": { + "size": 10, + }, + "error_y": { + "type": "data", + "symmetric": "false", + "array": [0, 0, 0], + "arrayminus": [0, minus_total_minutes, 0], + }, + "type": "scatter", + }, + { + "name": "Threshold", + "x": ["-inf", "total_minutes", "inf"], + "y": [ + defined_total_minutes_threshold, + defined_total_minutes_threshold, + defined_total_minutes_threshold, + ], + "mode": "lines", + "marker": { + "size": 15, + }, + "line": { + "dash": "dot", + "width": 3, + }, + "type": "scatter", + }, + ], + "total_reboot_count": [ + { + "name": "Data Points", + "x": ["-inf", "reboot_count", "inf"], + "y": [0, reboot_count, 0], + "mode": "markers", + "marker": { + "size": 10, + }, + "error_y": { + "type": "data", + "symmetric": "false", + "array": [0, 0, 0], + "arrayminus": [0, minus_total_reboot_count, 0], + }, + "type": "scatter", + }, + { + "name": "Threshold", + "x": ["-inf", "reboot_count", "inf"], + "y": [ + defined_total_reboot_count, + defined_total_reboot_count, + defined_total_reboot_count, + ], + "mode": "lines", + "marker": { + "size": 15, + }, + "line": { + "dash": "dot", + "width": 3, + }, + "type": "scatter", + }, + ], + } + } + def get_oslat_or_cyclictest(json_data: str, is_row: bool): - min_number_of_nines = 10000 - max_latency = 0 - minus_max_latency = 0 - defined_latency_threshold = 20 - defined_number_of_nines_threshold = 100 - for each_test_unit in json_data["test_units"]: - max_latency = max(max_latency, each_test_unit.get("max_latency", 0)) - min_number_of_nines = min(min_number_of_nines, each_test_unit.get("number_of_nines", 0)) - if max_latency > defined_latency_threshold: - minus_max_latency = max_latency - defined_latency_threshold - - if is_row: - return 1 if ((min_number_of_nines - defined_number_of_nines_threshold) != 0 or minus_max_latency != 0) else 0 - else: - return { + min_number_of_nines = 10000 + max_latency = 0 + minus_max_latency = 0 + defined_latency_threshold = 20 + defined_number_of_nines_threshold = 100 + for each_test_unit in json_data["test_units"]: + max_latency = max(max_latency, each_test_unit.get("max_latency", 0)) + min_number_of_nines = min( + min_number_of_nines, each_test_unit.get("number_of_nines", 0) + ) + if max_latency > defined_latency_threshold: + minus_max_latency = max_latency - defined_latency_threshold + + if is_row: + return ( + 1 + if ( + (min_number_of_nines - defined_number_of_nines_threshold) != 0 + or minus_max_latency != 0 + ) + else 0 + ) + else: + return { "number_of_nines": [ - { - "name": "Data Points", - "x": ["-inf", "min_number_of_nines", "inf"], - "y": [0, min_number_of_nines, 0], - "mode": "markers", - "marker": { - "size": 10, - }, - "error_y": { - "type": "data", - "symmetric": "false", - "array": [0, 0, 0], - "arrayminus": [0, min_number_of_nines - defined_number_of_nines_threshold, 0] - }, - "type": "scatter", - }, - { - "name": "Threshold", - "x": ["-inf", "min_number_of_nines", "inf"], - "y": [defined_number_of_nines_threshold, defined_number_of_nines_threshold, defined_number_of_nines_threshold], - "mode": "lines", - "marker": { - "size": 15, - }, - "line": { - "dash": "dot", - "width": 3, - }, - "type": "scatter", - } + { + "name": "Data Points", + "x": ["-inf", "min_number_of_nines", "inf"], + "y": [0, min_number_of_nines, 0], + "mode": "markers", + "marker": { + "size": 10, + }, + "error_y": { + "type": "data", + "symmetric": "false", + "array": [0, 0, 0], + "arrayminus": [ + 0, + min_number_of_nines - defined_number_of_nines_threshold, + 0, + ], + }, + "type": "scatter", + }, + { + "name": "Threshold", + "x": ["-inf", "min_number_of_nines", "inf"], + "y": [ + defined_number_of_nines_threshold, + defined_number_of_nines_threshold, + defined_number_of_nines_threshold, + ], + "mode": "lines", + "marker": { + "size": 15, + }, + "line": { + "dash": "dot", + "width": 3, + }, + "type": "scatter", + }, ], "max_latency": [ - { - "name": "Data Points", - "x": ["-inf", "max_latency", "inf"], - "y": [0, max_latency, 0], - "mode": "markers", - "marker": { - "size": 10, - }, - "error_y": { - "type": "data", - "symmetric": "false", - "array": [0, 0, 0], - "arrayminus": [0, minus_max_latency, 0] - }, - "type": "scatter", - }, - { - "name": "Threshold", - "x": ["-inf", "max_latency", "inf"], - "y": [defined_latency_threshold, defined_latency_threshold, defined_latency_threshold], - "mode": "lines", - "marker": { - "size": 15, - }, - "line": { - "dash": "dot", - "width": 3, - }, - "type": "scatter", - } - ] - } + { + "name": "Data Points", + "x": ["-inf", "max_latency", "inf"], + "y": [0, max_latency, 0], + "mode": "markers", + "marker": { + "size": 10, + }, + "error_y": { + "type": "data", + "symmetric": "false", + "array": [0, 0, 0], + "arrayminus": [0, minus_max_latency, 0], + }, + "type": "scatter", + }, + { + "name": "Threshold", + "x": ["-inf", "max_latency", "inf"], + "y": [ + defined_latency_threshold, + defined_latency_threshold, + defined_latency_threshold, + ], + "mode": "lines", + "marker": { + "size": 15, + }, + "line": { + "dash": "dot", + "width": 3, + }, + "type": "scatter", + }, + ], + } diff --git a/backend/app/api/v1/endpoints/telco/telcoJobs.py b/backend/app/api/v1/endpoints/telco/telcoJobs.py index fc2044de..32f6a041 100644 --- a/backend/app/api/v1/endpoints/telco/telcoJobs.py +++ b/backend/app/api/v1/endpoints/telco/telcoJobs.py @@ -9,20 +9,31 @@ router = APIRouter() - -@router.get('/api/v1/telco/jobs', - summary="Returns a job list", - description="Returns a list of jobs in the specified dates. \ +@router.get( + "/api/v1/telco/jobs", + summary="Returns a job list", + description="Returns a list of jobs in the specified dates. \ If not dates are provided the API will default the values. \ `startDate`: will be set to the day of the request minus 5 days.\ `endDate`: will be set to the day of the request.", - responses={ - 200: telco_200_response(), - 422: response_422(), - },) -async def jobs(start_date: date = Query(None, description="Start date for searching jobs, format: 'YYYY-MM-DD'", examples=["2020-11-10"]), - end_date: date = Query(None, description="End date for searching jobs, format: 'YYYY-MM-DD'", examples=["2020-11-15"]), - pretty: bool = Query(False, description="Output content in pretty format.")): + responses={ + 200: telco_200_response(), + 422: response_422(), + }, +) +async def jobs( + start_date: date = Query( + None, + description="Start date for searching jobs, format: 'YYYY-MM-DD'", + examples=["2020-11-10"], + ), + end_date: date = Query( + None, + description="End date for searching jobs, format: 'YYYY-MM-DD'", + examples=["2020-11-15"], + ), + pretty: bool = Query(False, description="Output content in pretty format."), +): if start_date is None: start_date = datetime.utcnow().date() start_date = start_date - timedelta(days=5) @@ -31,26 +42,31 @@ async def jobs(start_date: date = Query(None, description="Start date for search end_date = datetime.utcnow().date() if start_date > end_date: - return Response(content=json.dumps({'error': "invalid date format, start_date must be less than end_date"}), status_code=422) + return Response( + content=json.dumps( + {"error": "invalid date format, start_date must be less than end_date"} + ), + status_code=422, + ) - results = await getData(start_date, end_date, 'telco.splunk') + results = await getData(start_date, end_date, "telco.splunk") - if len(results) >= 1 : + if len(results) >= 1: response = { - 'startDate': start_date.__str__(), - 'endDate': end_date.__str__(), - 'results': results.to_dict('records') + "startDate": start_date.__str__(), + "endDate": end_date.__str__(), + "results": results.to_dict("records"), } - else : + else: response = { - 'startDate': start_date.__str__(), - 'endDate': end_date.__str__(), - 'results': [] + "startDate": start_date.__str__(), + "endDate": end_date.__str__(), + "results": [], } if pretty: json_str = json.dumps(response, indent=4) - return Response(content=json_str, media_type='application/json') + return Response(content=json_str, media_type="application/json") jsonstring = json.dumps(response) return jsonstring diff --git a/backend/app/async_util.py b/backend/app/async_util.py index dcdecc3b..17466fbc 100644 --- a/backend/app/async_util.py +++ b/backend/app/async_util.py @@ -15,7 +15,8 @@ def done_callback(trio_main_outcome): done_fut.set_result(trio_main_outcome) trio.lowlevel.start_guest_run( - trio_fn, *args, + trio_fn, + *args, run_sync_soon_threadsafe=run_sync_soon_threadsafe, done_callback=done_callback, host_uses_signal_set_wakeup_fd=True diff --git a/backend/app/config.py b/backend/app/config.py index 8578a841..e3cc78f8 100644 --- a/backend/app/config.py +++ b/backend/app/config.py @@ -2,7 +2,7 @@ def get_config(): - v.set_config_name('ocpperf') - v.add_config_path('.') - v.read_in_config() - return v + v.set_config_name("ocpperf") + v.add_config_path(".") + v.read_in_config() + return v diff --git a/backend/app/main.py b/backend/app/main.py index e25f9d10..c755c610 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -15,24 +15,23 @@ def render(self, content: typing.Any) -> bytes: return orjson.dumps(content) -origins = [ - "http://localhost:3000", - "localhost:3000" -] - -app = FastAPI(default_response_class=ORJSONResponse, - docs_url="/docs", - redoc_url=None, - title="CPT-Dashboard API Documentation", - version="0.0.1", - contact={ - "name": "OCP PerfScale Jedi", - "url": "https://redhat.enterprise.slack.com/archives/C05CDC19ZKJ", - }, - license_info={ - "name": "Apache 2.0", - "url": "https://www.apache.org/licenses/LICENSE-2.0", - }) +origins = ["http://localhost:3000", "localhost:3000"] + +app = FastAPI( + default_response_class=ORJSONResponse, + docs_url="/docs", + redoc_url=None, + title="CPT-Dashboard API Documentation", + version="0.0.1", + contact={ + "name": "OCP PerfScale Jedi", + "url": "https://redhat.enterprise.slack.com/archives/C05CDC19ZKJ", + }, + license_info={ + "name": "Apache 2.0", + "url": "https://www.apache.org/licenses/LICENSE-2.0", + }, +) app.add_middleware( CORSMiddleware, @@ -42,15 +41,17 @@ def render(self, content: typing.Any) -> bytes: allow_headers=["*"], ) -routes_to_reroute = ['/'] +routes_to_reroute = ["/"] + -@app.middleware('http') +@app.middleware("http") async def some_middleware(request: Request, call_next): if request.url.path in routes_to_reroute: - request.scope['path'] = '/docs' - headers = dict(request.scope['headers']) - headers[b'custom-header'] = b'my custom header' - request.scope['headers'] = [(k, v) for k, v in headers.items()] + request.scope["path"] = "/docs" + headers = dict(request.scope["headers"]) + headers[b"custom-header"] = b"my custom header" + request.scope["headers"] = [(k, v) for k, v in headers.items()] return await call_next(request) + app.include_router(router) diff --git a/backend/app/services/jira_svc.py b/backend/app/services/jira_svc.py index 5f1f5a50..72e88df3 100644 --- a/backend/app/services/jira_svc.py +++ b/backend/app/services/jira_svc.py @@ -7,18 +7,11 @@ class JiraService: def __init__(self, configpath="jira"): self.cfg = config.get_config() - self.url = self.cfg.get(configpath+'.url') - self.pat = self.cfg.get(configpath+'.personal_access_token') - self.svc = Jira( - url=self.url, - token=self.pat - ) + self.url = self.cfg.get(configpath + ".url") + self.pat = self.cfg.get(configpath + ".personal_access_token") + self.svc = Jira(url=self.url, token=self.pat) def jql(self, query: str, fields="*all", expand=None, validate_query=None): return self.svc.jql( - jql=query, - fields=fields, - expand=expand, - validate_query=validate_query + jql=query, fields=fields, expand=expand, validate_query=validate_query ) - \ No newline at end of file diff --git a/backend/app/services/search.py b/backend/app/services/search.py index f0a98ac1..61a501b5 100644 --- a/backend/app/services/search.py +++ b/backend/app/services/search.py @@ -5,152 +5,228 @@ import bisect import re - + class ElasticService: # todo add bulkhead pattern # todo add error message for unauthorized user - def __init__(self,configpath="",index=""): + def __init__(self, configpath="", index=""): """Init method.""" cfg = config.get_config() - self.new_es, self.new_index, self.new_index_prefix = self.initialize_es(cfg, configpath, index) + self.new_es, self.new_index, self.new_index_prefix = self.initialize_es( + cfg, configpath, index + ) self.prev_es = None - if cfg.get(configpath + '.internal'): - self.prev_es, self.prev_index, self.prev_index_prefix = self.initialize_es(cfg, configpath + '.internal', index) + if cfg.get(configpath + ".internal"): + self.prev_es, self.prev_index, self.prev_index_prefix = self.initialize_es( + cfg, configpath + ".internal", index + ) def initialize_es(self, config, path, index): """Initializes es client using the configuration""" - url = config.get(path+'.url') + url = config.get(path + ".url") esUser = None index_prefix = "" if index == "": - indice = config.get(path+'.indice') + indice = config.get(path + ".indice") else: indice = index - if config.is_set(path+'.prefix'): - index_prefix = config.get(path+'.prefix') - if config.is_set(path+'.username') and \ - config.is_set(path+'.password'): - esUser = config.get(path+'.username') - esPass = config.get(path+'.password') - if esUser : + if config.is_set(path + ".prefix"): + index_prefix = config.get(path + ".prefix") + if config.is_set(path + ".username") and config.is_set(path + ".password"): + esUser = config.get(path + ".username") + esPass = config.get(path + ".password") + if esUser: es = AsyncElasticsearch( - url, - use_ssl=False, - verify_certs=False, - http_auth=(esUser,esPass) + url, use_ssl=False, verify_certs=False, http_auth=(esUser, esPass) ) else: es = AsyncElasticsearch(url, verify_certs=False) return es, indice, index_prefix - async def post(self, query, indice=None, size=10000, start_date=None, end_date=None, timestamp_field=None): + async def post( + self, + query, + indice=None, + size=10000, + start_date=None, + end_date=None, + timestamp_field=None, + ): """Runs a query and returns the results""" if size == 0: """Handles aggregation queries logic""" if self.prev_es: - self.prev_index = self.prev_index_prefix + (self.prev_index if indice is None else indice) + self.prev_index = self.prev_index_prefix + ( + self.prev_index if indice is None else indice + ) return await self.prev_es.search( - index=self.prev_index+"*", - body=jsonable_encoder(query), - size=size) + index=self.prev_index + "*", body=jsonable_encoder(query), size=size + ) else: - self.new_index = self.new_index_prefix + (self.new_index if indice is None else indice) + self.new_index = self.new_index_prefix + ( + self.new_index if indice is None else indice + ) return await self.new_es.search( - index=self.new_index+"*", - body=jsonable_encoder(query), - size=size) + index=self.new_index + "*", body=jsonable_encoder(query), size=size + ) else: """Handles queries that require data from ES docs""" if timestamp_field: """Handles queries that have a timestamp field. Queries from both new and archive instances""" if self.prev_es: - self.prev_index = self.prev_index_prefix + (self.prev_index if indice is None else indice) + self.prev_index = self.prev_index_prefix + ( + self.prev_index if indice is None else indice + ) today = datetime.today().date() seven_days_ago = today - timedelta(days=7) if start_date and start_date > seven_days_ago: previous_results = [] else: - new_end_date = min(end_date, seven_days_ago) if end_date else seven_days_ago - query['query']['bool']['filter']['range'][timestamp_field]['lte'] = str(new_end_date) + new_end_date = ( + min(end_date, seven_days_ago) + if end_date + else seven_days_ago + ) + query["query"]["bool"]["filter"]["range"][timestamp_field][ + "lte" + ] = str(new_end_date) if start_date: - query['query']['bool']['filter']['range'][timestamp_field]['gte'] = str(start_date) + query["query"]["bool"]["filter"]["range"][timestamp_field][ + "gte" + ] = str(start_date) if start_date is None: response = await self.prev_es.search( - index=self.prev_index+"*", + index=self.prev_index + "*", body=jsonable_encoder(query), - size=size) - previous_results = response['hits']['hits'] + size=size, + ) + previous_results = response["hits"]["hits"] else: - previous_results = await self.scan_indices(self.prev_es, self.prev_index, query, timestamp_field, start_date, new_end_date, size) + previous_results = await self.scan_indices( + self.prev_es, + self.prev_index, + query, + timestamp_field, + start_date, + new_end_date, + size, + ) if self.prev_es and self.new_es: - self.new_index = self.new_index_prefix + (self.new_index if indice is None else indice) + self.new_index = self.new_index_prefix + ( + self.new_index if indice is None else indice + ) today = datetime.today().date() seven_days_ago = today - timedelta(days=7) if end_date and end_date < seven_days_ago: new_results = [] else: - new_start_date = max(start_date, seven_days_ago) if start_date else seven_days_ago - query['query']['bool']['filter']['range'][timestamp_field]['gte'] = str(new_start_date) + new_start_date = ( + max(start_date, seven_days_ago) + if start_date + else seven_days_ago + ) + query["query"]["bool"]["filter"]["range"][timestamp_field][ + "gte" + ] = str(new_start_date) if end_date: - query['query']['bool']['filter']['range'][timestamp_field]['lte'] = str(end_date) + query["query"]["bool"]["filter"]["range"][timestamp_field][ + "lte" + ] = str(end_date) if end_date is None: response = await self.new_es.search( - index=self.new_index+"*", + index=self.new_index + "*", body=jsonable_encoder(query), - size=size) - new_results = response['hits']['hits'] + size=size, + ) + new_results = response["hits"]["hits"] else: - new_results = await self.scan_indices(self.new_es, self.new_index, query, timestamp_field, new_start_date, end_date, size) + new_results = await self.scan_indices( + self.new_es, + self.new_index, + query, + timestamp_field, + new_start_date, + end_date, + size, + ) return await self.remove_duplicates(previous_results + new_results) else: if start_date and end_date: - query['query']['bool']['filter']['range'][timestamp_field]['gte'] = str(start_date) - query['query']['bool']['filter']['range'][timestamp_field]['lte'] = str(end_date) - return await self.scan_indices(self.new_es, self.new_index, query, timestamp_field, start_date, end_date, size) + query["query"]["bool"]["filter"]["range"][timestamp_field][ + "gte" + ] = str(start_date) + query["query"]["bool"]["filter"]["range"][timestamp_field][ + "lte" + ] = str(end_date) + return await self.scan_indices( + self.new_es, + self.new_index, + query, + timestamp_field, + start_date, + end_date, + size, + ) else: response = await self.new_es.search( - index=self.new_index+"*", + index=self.new_index + "*", body=jsonable_encoder(query), - size=size) - return response['hits']['hits'] + size=size, + ) + return response["hits"]["hits"] else: """Handles queries that do not have a timestamp field""" previous_results = [] if self.prev_es: - self.prev_index = self.prev_index_prefix + (self.prev_index if indice is None else indice) + self.prev_index = self.prev_index_prefix + ( + self.prev_index if indice is None else indice + ) response = await self.prev_es.search( - index=self.prev_index+"*", + index=self.prev_index + "*", body=jsonable_encoder(query), - size=size) - previous_results = response['hits']['hits'] - self.new_index = self.new_index_prefix + (self.new_index if indice is None else indice) + size=size, + ) + previous_results = response["hits"]["hits"] + self.new_index = self.new_index_prefix + ( + self.new_index if indice is None else indice + ) response = await self.new_es.search( - index=self.new_index+"*", - body=jsonable_encoder(query), - size=size) - new_results = response['hits']['hits'] + index=self.new_index + "*", body=jsonable_encoder(query), size=size + ) + new_results = response["hits"]["hits"] return await self.remove_duplicates(previous_results + new_results) - async def scan_indices(self, es_client, indice, query, timestamp_field, start_date, end_date, size): + async def scan_indices( + self, es_client, indice, query, timestamp_field, start_date, end_date, size + ): """Scans results only from es indexes relevant to a query""" indices = await self.get_indices_from_alias(es_client, indice) if not indices: indices = [indice] sorted_index_list = SortedIndexList() for index in indices: - sorted_index_list.insert(IndexTimestamp(index, await self.get_timestamps(es_client, index, timestamp_field))) - filtered_indices = sorted_index_list.get_indices_in_given_range(start_date, end_date) + sorted_index_list.insert( + IndexTimestamp( + index, await self.get_timestamps(es_client, index, timestamp_field) + ) + ) + filtered_indices = sorted_index_list.get_indices_in_given_range( + start_date, end_date + ) results = [] for each_index in filtered_indices: - query['query']['bool']['filter']['range'][timestamp_field]['lte'] = str(min(end_date, each_index.timestamps[1])) - query['query']['bool']['filter']['range'][timestamp_field]['gte'] = str(max(start_date, each_index.timestamps[0])) + query["query"]["bool"]["filter"]["range"][timestamp_field]["lte"] = str( + min(end_date, each_index.timestamps[1]) + ) + query["query"]["bool"]["filter"]["range"][timestamp_field]["gte"] = str( + max(start_date, each_index.timestamps[0]) + ) response = await es_client.search( - index=each_index.index, - body=jsonable_encoder(query), - size=size) - results.extend(response['hits']['hits']) + index=each_index.index, body=jsonable_encoder(query), size=size + ) + results.extend(response["hits"]["hits"]) return await self.remove_duplicates(results) - + async def remove_duplicates(self, all_results): seen = set() filtered_results = [] @@ -168,35 +244,38 @@ async def get_timestamps(self, es_client, index, timestamp_field): query = { "size": 0, "aggs": { - "min_timestamp": { - "min": { - "field": timestamp_field - } - }, - "max_timestamp": { - "max": { - "field": timestamp_field - } - } - } + "min_timestamp": {"min": {"field": timestamp_field}}, + "max_timestamp": {"max": {"field": timestamp_field}}, + }, } - response = await es_client.search( - index=index, - body=query - ) + response = await es_client.search(index=index, body=query) min_timestamp = response["aggregations"]["min_timestamp"]["value_as_string"] max_timestamp = response["aggregations"]["max_timestamp"]["value_as_string"] - return [datetime.strptime(datetime.strptime(min_timestamp, "%Y-%m-%dT%H:%M:%S.%fZ").strftime("%Y-%m-%d"), "%Y-%m-%d").date(), - datetime.strptime(datetime.strptime(max_timestamp, "%Y-%m-%dT%H:%M:%S.%fZ").strftime("%Y-%m-%d"), "%Y-%m-%d").date()] + return [ + datetime.strptime( + datetime.strptime(min_timestamp, "%Y-%m-%dT%H:%M:%S.%fZ").strftime( + "%Y-%m-%d" + ), + "%Y-%m-%d", + ).date(), + datetime.strptime( + datetime.strptime(max_timestamp, "%Y-%m-%dT%H:%M:%S.%fZ").strftime( + "%Y-%m-%d" + ), + "%Y-%m-%d", + ).date(), + ] async def get_indices_from_alias(self, es_client, alias): """Get indexes that match an alias""" try: indexes = [] response = await es_client.indices.get_alias(alias) - index_prefixes = [re.sub(r'-\d+$', '', index) for index in list(response.keys())] + index_prefixes = [ + re.sub(r"-\d+$", "", index) for index in list(response.keys()) + ] for each_prefix in index_prefixes: - response = await es_client.indices.get(each_prefix + '*', format='json') + response = await es_client.indices.get(each_prefix + "*", format="json") indexes.extend(list(response.keys())) result_set = [alias] if len(indexes) == 0 else indexes return list(set(result_set)) @@ -210,8 +289,10 @@ async def close(self): if self.prev_es is not None: await self.prev_es.close() + class IndexTimestamp: """Custom class to store and index with its start and end timestamps""" + def __init__(self, index, timestamps): self.index = index self.timestamps = timestamps @@ -219,8 +300,10 @@ def __init__(self, index, timestamps): def __lt__(self, other): return self.timestamps[0] < other.timestamps[0] + class SortedIndexList: """Custom class to sort indexes based on their start timestamps""" + def __init__(self): self.indices = [] @@ -228,12 +311,31 @@ def insert(self, index_timestamps): bisect.insort(self.indices, index_timestamps) def get_indices_in_given_range(self, start_date, end_date): - return [index_timestamp for index_timestamp in self.indices if (( - start_date and end_date and start_date <= index_timestamp.timestamps[0] and index_timestamp.timestamps[1] <= end_date) - or (start_date and index_timestamp.timestamps[0] < start_date and start_date <= index_timestamp.timestamps[1]) - or (end_date and index_timestamp.timestamps[1] > end_date and index_timestamp.timestamps[0] <= end_date))] + return [ + index_timestamp + for index_timestamp in self.indices + if ( + ( + start_date + and end_date + and start_date <= index_timestamp.timestamps[0] + and index_timestamp.timestamps[1] <= end_date + ) + or ( + start_date + and index_timestamp.timestamps[0] < start_date + and start_date <= index_timestamp.timestamps[1] + ) + or ( + end_date + and index_timestamp.timestamps[1] > end_date + and index_timestamp.timestamps[0] <= end_date + ) + ) + ] + -def flatten_dict(d, parent_key='', sep='.'): +def flatten_dict(d, parent_key="", sep="."): """Method to flatten a ES doc for comparing duplicates""" items = [] for k, v in d.items(): diff --git a/backend/app/services/splunk.py b/backend/app/services/splunk.py index aea7efb0..f8f07272 100644 --- a/backend/app/services/splunk.py +++ b/backend/app/services/splunk.py @@ -8,7 +8,7 @@ class SplunkService: Class to integrate splunk python client """ - def __init__(self,configpath="", index=""): + def __init__(self, configpath="", index=""): """ Initialize splunk client with provided config details @@ -19,20 +19,20 @@ def __init__(self,configpath="", index=""): try: cfg = config.get_config() if index == "": - self.indice = cfg.get(configpath+'.indice') + self.indice = cfg.get(configpath + ".indice") else: self.indice = index self.service = client.connect( - host=cfg.get(configpath+'.host'), - port=cfg.get(configpath+'.port'), - username=cfg.get(configpath+'.username'), - password=cfg.get(configpath+'.password') + host=cfg.get(configpath + ".host"), + port=cfg.get(configpath + ".port"), + username=cfg.get(configpath + ".username"), + password=cfg.get(configpath + ".password"), ) except Exception as e: print(f"Error connecting to splunk: {e}") return None - async def query(self, query, searchList='', max_results=10000): + async def query(self, query, searchList="", max_results=10000): """ Query data from splunk server using splunk lib sdk @@ -43,35 +43,41 @@ async def query(self, query, searchList='', max_results=10000): query["count"] = max_results # If additional search parameters are provided, include those in searchindex - searchindex = "search index={} {}".format(self.indice, searchList) if searchList else "search index={}".format(self.indice) + searchindex = ( + "search index={} {}".format(self.indice, searchList) + if searchList + else "search index={}".format(self.indice) + ) try: oneshotsearch_results = self.service.jobs.oneshot(searchindex, **query) except Exception as e: - print('Error querying splunk: {}'.format(e)) + print("Error querying splunk: {}".format(e)) return None # Get the results and display them using the JSONResultsReader res_array = [] async for record in self._stream_results(oneshotsearch_results): try: - res_array.append({ - 'data': orjson.loads(record['_raw']), - 'host': record['host'], - 'source': record['source'], - 'sourcetype': record['sourcetype'], - 'bucket': record['_bkt'], - 'serial': record['_serial'], - 'timestamp': record['_indextime'] - }) + res_array.append( + { + "data": orjson.loads(record["_raw"]), + "host": record["host"], + "source": record["source"], + "sourcetype": record["sourcetype"], + "bucket": record["_bkt"], + "serial": record["_serial"], + "timestamp": record["_indextime"], + } + ) except Exception as e: - print(f'Error on including Splunk record query in results array: {e}') + print(f"Error on including Splunk record query in results array: {e}") return res_array - + async def _stream_results(self, oneshotsearch_results): for record in results.JSONResultsReader(oneshotsearch_results): yield record - + async def close(self): """Closes splunk client connections""" await self.service.logout()