diff --git a/fmatch/matcher.py b/fmatch/matcher.py index 839fe56..7711833 100644 --- a/fmatch/matcher.py +++ b/fmatch/matcher.py @@ -1,26 +1,26 @@ """ metadata matcher """ -#pylint: disable = invalid-name + +# pylint: disable = invalid-name, invalid-unary-operand-type import os import sys import logging -# pylint: disable=import-error -from elasticsearch7 import Elasticsearch -# pylint: disable=import-error -from elasticsearch.exceptions import NotFoundError -# pylint: disable=import-error -import pandas as pd +# pylint: disable=import-error +from elasticsearch import Elasticsearch +# pylint: disable=import-error +import pandas as pd +from elasticsearch_dsl import Search, Q class Matcher: - """ Matcher - """ + """Matcher""" - - def __init__(self, index="perf_scale_ci", level=logging.INFO, ES_URL=os.getenv("ES_SERVER")): + def __init__( + self, index="perf_scale_ci", level=logging.INFO, ES_URL=os.getenv("ES_SERVER") + ): self.index = index self.es_url = ES_URL self.search_size = 10000 @@ -29,7 +29,8 @@ def __init__(self, index="perf_scale_ci", level=logging.INFO, ES_URL=os.getenv(" handler = logging.StreamHandler(sys.stdout) handler.setLevel(level) formatter = logging.Formatter( - '%(asctime)s - %(name)s - %(levelname)s - %(message)s') + "%(asctime)s - %(name)s - %(levelname)s - %(message)s" + ) handler.setFormatter(formatter) self.logger.addHandler(handler) # We can set the ES logging higher if we want additional debugging @@ -38,133 +39,103 @@ def __init__(self, index="perf_scale_ci", level=logging.INFO, ES_URL=os.getenv(" self.data = None def get_metadata_by_uuid(self, uuid, index=None): - """ get_metadata_by_uuid + """Returns back metadata when uuid is given + + Args: + uuid (str): uuid of the run + index (str, optional): index to be searched in. Defaults to None. + + Returns: + _type_: _description_ """ if index is None: index = self.index - query = { - "query": { - "match": { - "uuid": uuid - } - } - } + query = Q("match", uuid=uuid) result = {} - try: - self.logger.info("Executing query against index %s",index) - result = self.es.search(index=index, body=query) - hits = result.get('hits', {}).get('hits', []) - if hits: - result = dict(hits[0]['_source']) - except NotFoundError: - print(f"UUID {uuid} not found in index {index}") + s = Search(using=self.es, index=index).query(query) + res = self.query_index(index, s) + hits = res.hits.hits + if hits: + result = dict(hits[0].to_dict()["_source"]) return result - def query_index(self, index, query): - """ generic query function + def query_index(self, index, search): + """generic query function Args: index (str): _description_ - uuids (list): _description_ - query (str) : Query to make against ES + search (Search) : Search object with query """ - self.logger.info("Executing query against index=%s",index) - self.logger.debug("Executing query \r\n%s",query) - return self.es.search(index=index, body=query) + self.logger.info("Executing query against index=%s", index) + self.logger.debug("Executing query \r\n%s", search.to_dict()) + return search.execute() def get_uuid_by_metadata(self, meta, index=None): - """ get_uuid_by_metadata - """ + """get_uuid_by_metadata""" if index is None: index = self.index version = meta["ocpVersion"][:4] - query = { - "query": { - "bool": { - "must": [ - { - "query_string": { - "query": ' AND '.join([ - f'{field}: "{value}"' if isinstance( - value, str) else f'{field}: {value}' - for field, value in meta.items() if field != "ocpVersion" - ]) + - f' AND ocpVersion: {version}* AND jobStatus: success' - } - } - ] - } - }, - "size": self.search_size - } - result = self.query_index(index, query) - hits = result.get('hits', {}).get('hits', []) - uuids = [hit['_source']['uuid'] for hit in hits] + query = Q( + "bool", + must=[ + Q( + "match", **{field: str(value)} + ) # if isinstance(value, str) else Q('terms', **{field: value}) + for field, value in meta.items() + if field not in "ocpVersion benchmark" + ] + + ([Q("match", **{"benchmark.keyword": meta["benchmark"]})] + if "benchmark" in meta else []), + filter=[ + Q("wildcard", ocpVersion=f"{version}*"), + Q("match", jobStatus="success"), + ], + ) + s = Search(using=self.es, index=index).query(query).extra(size=self.search_size) + result = self.query_index(index, s) + hits = result.hits.hits + uuids = [hit.to_dict()["_source"]["uuid"] for hit in hits] return uuids - def match_k8s_netperf(self, uuids): - """_summary_ - - Args: - uuids (_type_): _description_ - """ - index = "k8s-netperf" - ids = "\" OR uuid: \"".join(uuids) - query = { - "query": { - "query_string": { - "query": ( - f'( uuid: \"{ids}\" )' - ) - } - }, - "size": self.search_size - } - result = self.query_index(index, query) - runs = [item['_source'] for item in result["hits"]["hits"]] - return runs - def match_kube_burner(self, uuids): - """ match kube burner runs + """match kube burner runs Args: uuids (list): list of uuids Returns: list : list of runs """ index = "ripsaw-kube-burner*" - ids = "\" OR uuid: \"".join(uuids) - query = { - "query": { - "query_string": { - "query": ( - f'( uuid: \"{ids}\" )' - f' AND metricName: "jobSummary"' - f' AND NOT (jobConfig.name: "garbage-collection")' - ) - } - }, - "size": self.search_size - } - result = self.query_index(index, query) - runs = [item['_source'] for item in result["hits"]["hits"]] + query = Q( + "bool", + filter=[ + Q("terms", **{"uuid.keyword": uuids}), + Q("match", metricName="jobSummary"), + ~Q("match", **{"jobConfig.name": "garbage-collection"}), + ], + ) + search = ( + Search(using=self.es, index=index).query(query).extra(size=self.search_size) + ) + result = self.query_index(index, search) + runs = [item.to_dict()["_source"] for item in result.hits.hits] return runs def filter_runs(self, pdata, data): - """ filter out runs with different jobIterations + """filter out runs with different jobIterations Args: pdata (_type_): _description_ data (_type_): _description_ Returns: _type_: _description_ """ - columns = ['uuid', 'jobConfig.jobIterations'] + columns = ["uuid", "jobConfig.jobIterations"] pdf = pd.json_normalize(pdata) pick_df = pd.DataFrame(pdf, columns=columns) - iterations = pick_df.iloc[0]['jobConfig.jobIterations'] + iterations = pick_df.iloc[0]["jobConfig.jobIterations"] df = pd.json_normalize(data) ndf = pd.DataFrame(df, columns=columns) - ids_df = ndf.loc[df['jobConfig.jobIterations'] == iterations] - return ids_df['uuid'].to_list() + ids_df = ndf.loc[df["jobConfig.jobIterations"] == iterations] + return ids_df["uuid"].to_list() def getResults(self, uuid: str, uuids: list, index_str: str, metrics: dict): """ @@ -181,253 +152,104 @@ def getResults(self, uuid: str, uuids: list, index_str: str, metrics: dict): """ if len(uuids) > 1 and uuid in uuids: uuids.remove(uuid) - ids = '" OR uuid: "'.join(uuids) - metric_string = "" - for k, v in metrics.items(): - if k == "not": - for not_list in v: - for k1, v1 in not_list.items(): - # f' AND NOT (jobConfig.name: "garbage-collection")' - if isinstance(v1,str): - v1 = f'"{v1}"' - metric_string += f" AND NOT ({k1}: {v1})" - elif isinstance(v,str) and not k in ['name','metric_of_interest']: - if v != "*": - v = f'"{v}"' - metric_string += f" AND {k}: {v}" - query = { - "query": {"query_string": {"query": (f'( uuid: "{ids}" )' + metric_string)}}, - "size": self.search_size - } - result = self.query_index(index_str, query) - runs = [item['_source'] for item in result["hits"]["hits"]] - return runs - - def burner_results(self, uuid, uuids, index): - """ kube burner podReadyLatency - Args: - uuid (_type_): _description_ - uuids (_type_): _description_ - index (_type_): _description_ - Returns: - _type_: _description_ - """ - if len(uuids) >= 1: - if len(uuid) > 0: - uuids.remove(uuid) - if len(uuids) < 1: - return [] - ids = "\" OR uuid: \"".join(uuids) - query = { - "query": { - "query_string": { - "query": ( - f'( uuid: \"{ids}\" )' - f' AND metricName: "podLatencyQuantilesMeasurement"' - f' AND quantileName: "Ready"' - f' AND NOT (jobConfig.name: "garbage-collection")' - ) - } - }, - "size": self.search_size - } - result = self.query_index(index, query) - runs = [item['_source'] for item in result["hits"]["hits"]] + metric_queries = [] + not_queries = [ + ~Q("match", **{not_item_key: not_item_value}) + for not_item_key, not_item_value in metrics["not"].items() + ] + metric_queries = [ + Q("match", **{metric_key: f'"{metric_value}"'}) + for metric_key, metric_value in metrics.items() + if metric_key not in ["name", "metric_of_interest", "not"] + ] + metric_query = Q("bool", must=metric_queries + not_queries) + query = Q( + "bool", + must=[ + Q("terms", **{"uuid.keyword": uuids}), + metric_query, + ], + ) + search = ( + Search(using=self.es, index=index_str) + .query(query) + .extra(size=self.search_size) + ) + result = self.query_index(index_str, search) + runs = [item.to_dict()["_source"] for item in result.hits.hits] return runs def get_agg_metric_query(self, uuids, index, metrics): - """ burner_metric_query will query for specific metrics data. + """burner_metric_query will query for specific metrics data. Args: uuids (list): List of uuids index (str): ES/OS Index to query from metrics (dict): metrics defined in es index metrics """ - ids = "\" OR uuid: \"".join(uuids) - metric_string = "" - metric_of_interest = metrics['metric_of_interest'] - for k, v in metrics.items(): - if k == "agg": - agg_value = v["value"] - agg_type = v["agg_type"] - - elif k == "not": - for not_list in v: - for k1, v1 in not_list.items(): - # f' AND NOT (jobConfig.name: "garbage-collection")' - if isinstance(v1,str): - v1 = f'"{v1}"' - metric_string += f" AND NOT ({k1}: {v1})" - elif isinstance(v,str) and not k in ['name', 'metric_of_interest']: - if v != "*": - v = f'"{v}"' - metric_string += f" AND {k}: {v}" - query = { - "aggs": { - "time": { - "terms": { - "field": "uuid.keyword", - "size": self.search_size - }, - "aggs": { - "time": { - "avg": { - "field": "timestamp"} - } - } - }, - "uuid": { - "terms": { - "field": "uuid.keyword", - "size": self.search_size - }, - "aggs": { - agg_value: { - agg_type: { - "field": metric_of_interest - } - } - } - } - }, - "query": { - "bool": { - "must": [{ - "query_string": { - "query": ( - f'( uuid: \"{ids}\" )' + - metric_string - ) - } - }] - } - }, - "size": self.search_size - } - runs = self.query_index(index, query) - self.logger.debug("Run details %s", str(runs)) - data = self.parse_agg_results(runs, agg_value, agg_type) + metric_queries = [] + not_queries = [ + ~Q("match", **{not_item_key: not_item_value}) + for not_item in metrics.get("not", []) + for not_item_key, not_item_value in not_item.items() + ] + metric_queries = [ + Q("match", **{metric_key: f'"{metric_value}"'}) + for metric_key, metric_value in metrics.items() + if metric_key not in ["name", "metric_of_interest", "not", "agg"] + ] + metric_query = Q("bool", must=metric_queries + not_queries) + query = Q( + "bool", + must=[ + Q("terms", **{"uuid.keyword": uuids}), + metric_query, + ], + ) + search = ( + Search(using=self.es, index=index).query(query).extra(size=self.search_size) + ) + agg_value = metrics["agg"]["value"] + agg_type = metrics["agg"]["agg_type"] + search.aggs.bucket( + "time", "terms", field="uuid.keyword", size=self.search_size + ).metric("time", "avg", field="timestamp") + search.aggs.bucket( + "uuid", "terms", field="uuid.keyword", size=self.search_size + ).metric(agg_value, agg_type, field=metrics["metric_of_interest"]) + result = self.query_index(index, search) + data = self.parse_agg_results(result, agg_value, agg_type) return data - def burner_metric_query(self, uuids, namespace, index, metricName): - """ burner_metric_query will query for specific metricName data. - - Args: - uuids (list): List of uuids - namespace (str): namespace we are interested in - index (str): ES/OS Index to query from - metricName (str): metricName defined in kube-burner metrics - """ - ids = "\" OR uuid: \"".join(uuids) - query = { - "aggs": { - "time": { - "terms": { - "field": "uuid.keyword", - "size": self.search_size - }, - "aggs": { - "time": { - "avg": { - "field": "timestamp"} - } - } - }, - "uuid": { - "terms": { - "field": "uuid.keyword", - "size": self.search_size - }, - "aggs": { - "cpu": { - "avg": { - "field": "value" - } - } - } - } - }, - "query": { - "bool": { - "must": [{ - "query_string": { - "query": ( - f'( uuid: \"{ids}\" )' - f' AND metricName: {metricName}' - f' AND labels.namespace.keyword: {namespace}' - ) - } - }] - } - }, - "size": self.search_size - } - runs = self.query_index(index, query) - data = self.parse_burner_cpu_results(runs) - return data - - def burner_cpu_results(self, uuids, namespace, index): - """ kube burner CPU aggregated results for a namespace - Args: - uuids (_type_): _description_ - namespace (_type_): _description_ - index (_type_): _description_ - Returns: - _type_: _description_ - """ - return self.burner_metric_query(uuids, namespace, index, "containerCPU") - - def burner_mem_results(self, uuids, namespace, index): - """ kube burner memory aggregated results for a namespace - Args: - uuids (_type_): _description_ - namespace (_type_): _description_ - index (_type_): _description_ - Returns: - _type_: _description_ - """ - return self.burner_metric_query(uuids, namespace, index, "containerMemory") - - def parse_burner_cpu_results(self, data: dict): - """ parse out CPU data from kube-burner query - Args: - data (dict): _description_ - Returns: - _type_: _description_ - """ - res = [] - stamps = data['aggregations']['time']['buckets'] - cpu = data['aggregations']['uuid']['buckets'] - for stamp in stamps: - dat = {} - dat['uuid'] = stamp['key'] - dat['timestamp'] = stamp['time']['value_as_string'] - acpu = next(item for item in cpu if item["key"] == stamp['key']) - dat['cpu_avg'] = acpu['cpu']['value'] - res.append(dat) - return res - def parse_agg_results(self, data: dict, agg_value, agg_type): - """ parse out CPU data from kube-burner query + """parse out CPU data from kube-burner query Args: - data (dict): _description_ + data (dict): Aggregated data from Elasticsearch DSL query + agg_value (str): Aggregation value field name + agg_type (str): Aggregation type (e.g., 'avg', 'sum', etc.) Returns: - _type_: _description_ + list: List of parsed results """ res = [] - stamps = data['aggregations']['time']['buckets'] - agg_buckets = data['aggregations']['uuid']['buckets'] + stamps = data.aggregations.time.buckets + agg_buckets = data.aggregations.uuid.buckets + for stamp in stamps: dat = {} - dat['uuid'] = stamp['key'] - dat['timestamp'] = stamp['time']['value_as_string'] - agg_values = next(item for item in agg_buckets if item["key"] == stamp['key']) - dat[agg_value + '_' + agg_type] = agg_values[agg_value]["value"] + dat["uuid"] = stamp.key + dat["timestamp"] = stamp.time.value_as_string + agg_values = next( + (item for item in agg_buckets if item.key == stamp.key), None + ) + if agg_values: + dat[agg_value + "_" + agg_type] = agg_values[agg_value].value + else: + dat[agg_value + "_" + agg_type] = None res.append(dat) return res def convert_to_df(self, data, columns=None): - """ convert to a dataframe + """convert to a dataframe Args: data (_type_): _description_ columns (_type_, optional): _description_. Defaults to None. @@ -435,13 +257,13 @@ def convert_to_df(self, data, columns=None): _type_: _description_ """ odf = pd.json_normalize(data) - odf = odf.sort_values(by=['timestamp']) + odf = odf.sort_values(by=["timestamp"]) if columns is not None: odf = pd.DataFrame(odf, columns=columns) return odf def save_results(self, df, csv_file_path="output.csv", columns=None): - """ write results to CSV + """write results to CSV Args: df (_type_): _description_ csv_file_path (str, optional): _description_. Defaults to "output.csv". diff --git a/fmatch/requirements.txt b/fmatch/requirements.txt index 2777deb..74ccdbd 100644 --- a/fmatch/requirements.txt +++ b/fmatch/requirements.txt @@ -8,8 +8,8 @@ coverage==7.4.0 dill==0.3.7 docutils==0.20.1 elastic-transport==8.11.0 -elasticsearch==8.11.1 -elasticsearch7==7.13.0 +elasticsearch==7.13.0 +elasticsearch-dsl==7.4.1 idna==3.6 importlib-metadata==7.0.1 iniconfig==2.0.0 @@ -21,7 +21,7 @@ mccabe==0.7.0 mdurl==0.1.2 more-itertools==10.2.0 nh3==0.2.15 -numpy==1.24.0 +numpy==1.26.3 packaging==23.2 pandas==2.1.4 pip-name==1.0.2 diff --git a/fmatch/test_fmatch.py b/fmatch/test_fmatch.py index 07f6682..9cb42f4 100644 --- a/fmatch/test_fmatch.py +++ b/fmatch/test_fmatch.py @@ -1,53 +1,60 @@ """ test_fmatch """ + import sys # pylint: disable=import-error import pandas as pd + # pylint: disable=import-error from matcher import Matcher -match = Matcher(index='perf_scale_ci') +match = Matcher(index="perf_scale_ci") +res=match.get_metadata_by_uuid("b4afc724-f175-44d1-81ff-a8255fea034f",'perf_scale_ci') meta = {} -meta['benchmark'] = "cluster-density-v2" -meta['masterNodesType'] = "m6a.xlarge" -meta['workerNodesType'] = "m6a.xlarge" -meta['platform'] = "AWS" -meta['masterNodesCount'] = 3 -meta['workerNodesCount'] = 24 -meta['jobStatus'] = "success" -meta['ocpVersion'] = '4.15' -meta['networkType'] = "OVNKubernetes" -meta['encrypted'] = "true" -meta['ipsec'] = "false" -meta['fips'] = "false" +meta["masterNodesType"] = "m6a.xlarge" +meta["workerNodesType"] = "m6a.xlarge" +meta["platform"] = "AWS" +meta["masterNodesCount"] = 3 +meta["workerNodesCount"] = 24 +meta["jobStatus"] = "success" +meta["ocpVersion"] = "4.15" +meta["networkType"] = "OVNKubernetes" +meta["benchmark"] = "cluster-density-v2" +# meta['encrypted'] = "true" +# meta['ipsec'] = "false" +# meta['fips'] = "false" uuids = match.get_uuid_by_metadata(meta) if len(uuids) == 0: print("No UUID present for given metadata") sys.exit() runs = match.match_kube_burner(uuids) -ids = match.filter_runs(runs, runs) -podl = match.burner_results("", ids, "ripsaw-kube-burner*") - -kapi_cpu = match.burner_cpu_results( - ids, "openshift-kube-apiserver", "ripsaw-kube-burner*") -ovn_cpu = match.burner_cpu_results( - ids, "openshift-ovn-kubernetes", "ripsaw-kube-burner*") -etcd_cpu = match.burner_cpu_results( - ids, "openshift-etcd", "ripsaw-kube-burner*") -ovn_mem = match.burner_mem_results( - ids, "openshift-ovn-kubernetes", "ripsaw-kube-burner*") +ids = match.filter_runs(runs, runs) +podl_metrics = { + "name": "podReadyLatency", + "metricName": "podLatencyQuantilesMeasurement", + "quantileName": "Ready", + "metric_of_interest": "P99", + "not": {"jobConfig.name": "garbage-collection"}, +} +podl = match.getResults("", ids, "ripsaw-kube-burner*",metrics=podl_metrics) +kapi_metrics = { + "name": "apiserverCPU", + "metricName": "containerCPU", + "labels.namespace": "openshift-kube-apiserver", + "metric_of_interest": "value", + "agg": {"value": "cpu", "agg_type": "avg"}, +} +kapi_cpu = match.get_agg_metric_query(ids, "ripsaw-kube-burner*", metrics=kapi_metrics) podl_df = match.convert_to_df( podl, columns=['uuid', 'timestamp', 'quantileName', 'P99']) kapi_cpu_df = match.convert_to_df(kapi_cpu) merge_df = pd.merge(kapi_cpu_df, podl_df, on="uuid") match.save_results(merge_df, "merged.csv", [ "uuid", "timestamp_x", "cpu_avg", "P99"]) -match.save_results(kapi_cpu_df, "CPUavg24.csv") -match.save_results(podl_df, "podlatency24.csv") df = pd.read_csv("merged.csv") ls = df["uuid"].to_list() diff --git a/fmatch/tests/test_matcher.py b/fmatch/tests/test_matcher.py index 0722a47..ba1f4cc 100644 --- a/fmatch/tests/test_matcher.py +++ b/fmatch/tests/test_matcher.py @@ -1,21 +1,22 @@ """ Unit Test file for fmatch.py """ -#pylint: disable = redefined-outer-name -#pylint: disable = missing-function-docstring -#pylint: disable = import-error + +# pylint: disable = redefined-outer-name +# pylint: disable = missing-function-docstring +# pylint: disable = import-error, duplicate-code import os from unittest.mock import patch -from elasticsearch.exceptions import NotFoundError +from elasticsearch_dsl import Search +from elasticsearch_dsl.response import Response import pytest import pandas as pd -#pylint: disable = import-error +# pylint: disable = import-error from fmatch.matcher import Matcher - @pytest.fixture def matcher_instance(): sample_output = { @@ -26,12 +27,13 @@ def matcher_instance(): ] } } - with patch('fmatch.matcher.Elasticsearch') as mock_es: + with patch("fmatch.matcher.Elasticsearch") as mock_es: mock_es_instance = mock_es.return_value mock_es_instance.search.return_value = sample_output match = Matcher(index="perf-scale-ci") return match + def test_get_metadata_by_uuid_found(matcher_instance): uuid = "test_uuid" result = matcher_instance.get_metadata_by_uuid(uuid) @@ -39,22 +41,10 @@ def test_get_metadata_by_uuid_found(matcher_instance): assert result == expected -def test_get_metadata_by_uuid_not_found(matcher_instance): - def raise_exception(): - raise NotFoundError(404, "index_not_found_exception", "no such index [sample]") - - # Mock Elasticsearch response for testing NotFoundError - matcher_instance.es.search = lambda *args, **kwargs: raise_exception() - uuid = "nonexistent_uuid" - result = matcher_instance.get_metadata_by_uuid(uuid=uuid, index="sample index") - expected = {} - assert result == expected - - def test_query_index(matcher_instance): - index = "test_uuid" - query = "test_query" - result = matcher_instance.query_index(index, query) + index = "test_index" + search = Search(using=matcher_instance.es, index=index) + result = matcher_instance.query_index(index, search) expected = { "hits": { "hits": [ @@ -72,22 +62,16 @@ def test_get_uuid_by_metadata(matcher_instance): "hits": [{"_source": {"uuid": "uuid1"}}, {"_source": {"uuid": "uuid2"}}] } } - meta = {"field1": "value1", "ocpVersion": "4.15"} + meta = { + "field1": "value1", + "ocpVersion": "4.15", + } result = matcher_instance.get_uuid_by_metadata(meta) expected = ["uuid1", "uuid2"] assert result == expected -def test_match_k8s_netperf(matcher_instance): - result = matcher_instance.match_k8s_netperf(["uuid1"]) - expected = [ - {"uuid": "uuid1", "field1": "value1"}, - {"uuid": "uuid2", "field1": "value2"}, - ] - assert result == expected - - -def test_match_kube_bruner(matcher_instance): +def test_match_kube_burner(matcher_instance): result = matcher_instance.match_kube_burner(["uuid1"]) expected = [ {"uuid": "uuid1", "field1": "value1"}, @@ -96,32 +80,6 @@ def test_match_kube_bruner(matcher_instance): assert result == expected -def test_burner_results(matcher_instance): - result = matcher_instance.burner_results( - "uuid1", ["uuid1", "uuid1"], "sample_index" - ) - expected = [ - {"uuid": "uuid1", "field1": "value1"}, - {"uuid": "uuid2", "field1": "value2"}, - ] - assert result == expected - - -def test_burner_results_single_element(matcher_instance): - result = matcher_instance.burner_results("uuid1", ["uuid1"], "sample_index") - expected = [] - assert result == expected - - -def test_burner_cpu_results(matcher_instance): - matcher_instance.parse_burner_cpu_results = lambda *args, **kwargs: {} - result = matcher_instance.burner_cpu_results( - ["uuid1", "uuid2"], "sample_namespace", "sample_index" - ) - expected = {} - assert result == expected - - def test_filter_runs(matcher_instance): mock_data = [ { @@ -185,63 +143,108 @@ def test_filter_runs(matcher_instance): assert result == expected -def test_parse_burner_cpu_results(matcher_instance): - mock_data = {"aggregations": {"time": {"buckets": []}, "uuid": {"buckets": []}}} - mock_data["aggregations"]["time"]["buckets"] = [ - { - "key": "90189fbf-7181-4129-8ca5-3cc8d656b595", - "doc_count": 1110, +def test_getResults(matcher_instance): + test_uuid = "uuid1" + test_uuids = ["uuid1", "uuid2"] + test_metrics = { + "name": "podReadyLatency", + "metricName": "podLatencyQuantilesMeasurement", + "quantileName": "Ready", + "metric_of_interest": "P99", + "not": {"jobConfig.name": "garbage-collection"}, + } + result = matcher_instance.getResults( + test_uuid, test_uuids, "test_index", test_metrics + ) + expected = [ + {"uuid": "uuid1", "field1": "value1"}, + {"uuid": "uuid2", "field1": "value2"}, + ] + assert result == expected + + +def test_get_agg_metric_query(matcher_instance): + test_uuids = ["uuid1", "uuid2"] + test_metrics = { + "name": "apiserverCPU", + "metricName": "containerCPU", + "labels.namespace": "openshift-kube-apiserver", + "metric_of_interest": "value", + "agg": {"value": "cpu", "agg_type": "avg"}, + } + data_dict = { + "aggregations": { "time": { - "value": 1705349944941.3918, - "value_as_string": "2024-01-15T20:19:04.941Z", + "buckets": [ + { + "key": "uuid1", + "time": {"value_as_string": "2024-02-09T12:00:00"}, + }, + { + "key": "uuid2", + "time": {"value_as_string": "2024-02-09T13:00:00"}, + }, + ] + }, + "uuid": { + "buckets": [ + {"key": "uuid1", "cpu": {"value": 42}}, + {"key": "uuid2", "cpu": {"value": 56}}, + ] }, } - ] - mock_data["aggregations"]["uuid"]["buckets"] = [ - { - "key": "90189fbf-7181-4129-8ca5-3cc8d656b595", - "doc_count": 1110, - "cpu": {"value": 10.818089329872935}, - } - ] + } expected = [ - { - "uuid": "90189fbf-7181-4129-8ca5-3cc8d656b595", - "timestamp": "2024-01-15T20:19:04.941Z", - "cpu_avg": 10.818089329872935, - } + {"uuid": "uuid1", "timestamp": "2024-02-09T12:00:00", "cpu_avg": 42}, + {"uuid": "uuid2", "timestamp": "2024-02-09T13:00:00", "cpu_avg": 56}, ] - result = matcher_instance.parse_burner_cpu_results(mock_data) + matcher_instance.query_index = lambda *args, **kwargs: Response( + response=data_dict, search=data_dict + ) + + result = matcher_instance.get_agg_metric_query( + test_uuids, "test_index", test_metrics + ) assert result == expected -def test_parse_agg_results(matcher_instance): - mock_data = {"aggregations": {"time": {"buckets": []}, "uuid": {"buckets": []}}} - mock_data["aggregations"]["time"]["buckets"] = [ - { - "key": "90189fbf-7181-4129-8ca5-3cc8d656b595", - "doc_count": 1110, +def test_get_agg_metric_query_no_agg_values(matcher_instance): + test_uuids = ["uuid1", "uuid2"] + test_metrics = { + "name": "apiserverCPU", + "metricName": "containerCPU", + "labels.namespace": "openshift-kube-apiserver", + "metric_of_interest": "value", + "agg": {"value": "cpu", "agg_type": "avg"}, + } + data_dict = { + "aggregations": { "time": { - "value": 1705349944941.3918, - "value_as_string": "2024-01-15T20:19:04.941Z", + "buckets": [ + { + "key": "uuid1", + "time": {"value_as_string": "2024-02-09T12:00:00"}, + }, + { + "key": "uuid2", + "time": {"value_as_string": "2024-02-09T13:00:00"}, + }, + ] }, + "uuid": {"buckets": []}, } - ] - mock_data["aggregations"]["uuid"]["buckets"] = [ - { - "key": "90189fbf-7181-4129-8ca5-3cc8d656b595", - "doc_count": 1110, - "cpu": {"value": 10.818089329872935}, - } - ] + } expected = [ - { - "uuid": "90189fbf-7181-4129-8ca5-3cc8d656b595", - "timestamp": "2024-01-15T20:19:04.941Z", - "cpu_avg": 10.818089329872935, - } + {"uuid": "uuid1", "timestamp": "2024-02-09T12:00:00", "cpu_avg": None}, + {"uuid": "uuid2", "timestamp": "2024-02-09T13:00:00", "cpu_avg": None}, ] - result = matcher_instance.parse_agg_results(mock_data, "cpu", "avg") + matcher_instance.query_index = lambda *args, **kwargs: Response( + response=data_dict, search=data_dict + ) + + result = matcher_instance.get_agg_metric_query( + test_uuids, "test_index", test_metrics + ) assert result == expected diff --git a/setup.py b/setup.py index 853777c..4dcc2ef 100644 --- a/setup.py +++ b/setup.py @@ -4,7 +4,7 @@ from setuptools import setup, find_packages -VERSION = '0.0.4' +VERSION = '0.0.5' DESCRIPTION = 'Common package for matching runs with provided metadata' # pylint: disable= line-too-long LONG_DESCRIPTION = "A package that allows to match metadata and get runs and create csv files with queried metrics" @@ -19,7 +19,7 @@ long_description_content_type="text/x-rst", long_description=LONG_DESCRIPTION, packages=find_packages(), - install_requires=['elasticsearch7==7.13.0', 'elasticsearch', 'pyyaml','pandas'], + install_requires=['elasticsearch==7.13.0', 'elasticsearch-dsl', 'pyyaml','pandas'], keywords=['python', 'matching', 'red hat', 'perf-scale', 'matcher', 'orion'], classifiers=[ "Development Status :: 1 - Planning",