diff --git a/.github/workflows/pylint.yml b/.github/workflows/pylint.yml index e4876ed..875454a 100644 --- a/.github/workflows/pylint.yml +++ b/.github/workflows/pylint.yml @@ -20,4 +20,4 @@ jobs: pip install pylint - name: Analysing the code with pylint run: | - pylint -d C0103 $(git ls-files '*.py') + pylint -d C0103 -d R0914 $(git ls-files '*.py') diff --git a/fmatch/matcher.py b/fmatch/matcher.py index aec9b76..839fe56 100644 --- a/fmatch/matcher.py +++ b/fmatch/matcher.py @@ -166,6 +166,43 @@ def filter_runs(self, pdata, data): ids_df = ndf.loc[df['jobConfig.jobIterations'] == iterations] return ids_df['uuid'].to_list() + def getResults(self, uuid: str, uuids: list, index_str: str, metrics: dict): + """ + Get results of elasticsearch data query based on uuid(s) and defined metrics + + Args: + uuid (str): _description_ + uuids (list): _description_ + index_str (str): _description_ + metrics (dict): _description_ + + Returns: + dict: Resulting data from query + """ + if len(uuids) > 1 and uuid in uuids: + uuids.remove(uuid) + ids = '" OR uuid: "'.join(uuids) + metric_string = "" + for k, v in metrics.items(): + if k == "not": + for not_list in v: + for k1, v1 in not_list.items(): + # f' AND NOT (jobConfig.name: "garbage-collection")' + if isinstance(v1,str): + v1 = f'"{v1}"' + metric_string += f" AND NOT ({k1}: {v1})" + elif isinstance(v,str) and not k in ['name','metric_of_interest']: + if v != "*": + v = f'"{v}"' + metric_string += f" AND {k}: {v}" + query = { + "query": {"query_string": {"query": (f'( uuid: "{ids}" )' + metric_string)}}, + "size": self.search_size + } + result = self.query_index(index_str, query) + runs = [item['_source'] for item in result["hits"]["hits"]] + return runs + def burner_results(self, uuid, uuids, index): """ kube burner podReadyLatency Args: @@ -198,6 +235,80 @@ def burner_results(self, uuid, uuids, index): runs = [item['_source'] for item in result["hits"]["hits"]] return runs + def get_agg_metric_query(self, uuids, index, metrics): + """ burner_metric_query will query for specific metrics data. + + Args: + uuids (list): List of uuids + index (str): ES/OS Index to query from + metrics (dict): metrics defined in es index metrics + """ + ids = "\" OR uuid: \"".join(uuids) + metric_string = "" + metric_of_interest = metrics['metric_of_interest'] + for k, v in metrics.items(): + if k == "agg": + agg_value = v["value"] + agg_type = v["agg_type"] + + elif k == "not": + for not_list in v: + for k1, v1 in not_list.items(): + # f' AND NOT (jobConfig.name: "garbage-collection")' + if isinstance(v1,str): + v1 = f'"{v1}"' + metric_string += f" AND NOT ({k1}: {v1})" + elif isinstance(v,str) and not k in ['name', 'metric_of_interest']: + if v != "*": + v = f'"{v}"' + metric_string += f" AND {k}: {v}" + query = { + "aggs": { + "time": { + "terms": { + "field": "uuid.keyword", + "size": self.search_size + }, + "aggs": { + "time": { + "avg": { + "field": "timestamp"} + } + } + }, + "uuid": { + "terms": { + "field": "uuid.keyword", + "size": self.search_size + }, + "aggs": { + agg_value: { + agg_type: { + "field": metric_of_interest + } + } + } + } + }, + "query": { + "bool": { + "must": [{ + "query_string": { + "query": ( + f'( uuid: \"{ids}\" )' + + metric_string + ) + } + }] + } + }, + "size": self.search_size + } + runs = self.query_index(index, query) + self.logger.debug("Run details %s", str(runs)) + data = self.parse_agg_results(runs, agg_value, agg_type) + return data + def burner_metric_query(self, uuids, namespace, index, metricName): """ burner_metric_query will query for specific metricName data. @@ -296,6 +407,25 @@ def parse_burner_cpu_results(self, data: dict): res.append(dat) return res + def parse_agg_results(self, data: dict, agg_value, agg_type): + """ parse out CPU data from kube-burner query + Args: + data (dict): _description_ + Returns: + _type_: _description_ + """ + res = [] + stamps = data['aggregations']['time']['buckets'] + agg_buckets = data['aggregations']['uuid']['buckets'] + for stamp in stamps: + dat = {} + dat['uuid'] = stamp['key'] + dat['timestamp'] = stamp['time']['value_as_string'] + agg_values = next(item for item in agg_buckets if item["key"] == stamp['key']) + dat[agg_value + '_' + agg_type] = agg_values[agg_value]["value"] + res.append(dat) + return res + def convert_to_df(self, data, columns=None): """ convert to a dataframe Args: diff --git a/fmatch/tests/test_matcher.py b/fmatch/tests/test_matcher.py index f9e744f..0722a47 100644 --- a/fmatch/tests/test_matcher.py +++ b/fmatch/tests/test_matcher.py @@ -215,6 +215,36 @@ def test_parse_burner_cpu_results(matcher_instance): assert result == expected +def test_parse_agg_results(matcher_instance): + mock_data = {"aggregations": {"time": {"buckets": []}, "uuid": {"buckets": []}}} + mock_data["aggregations"]["time"]["buckets"] = [ + { + "key": "90189fbf-7181-4129-8ca5-3cc8d656b595", + "doc_count": 1110, + "time": { + "value": 1705349944941.3918, + "value_as_string": "2024-01-15T20:19:04.941Z", + }, + } + ] + mock_data["aggregations"]["uuid"]["buckets"] = [ + { + "key": "90189fbf-7181-4129-8ca5-3cc8d656b595", + "doc_count": 1110, + "cpu": {"value": 10.818089329872935}, + } + ] + expected = [ + { + "uuid": "90189fbf-7181-4129-8ca5-3cc8d656b595", + "timestamp": "2024-01-15T20:19:04.941Z", + "cpu_avg": 10.818089329872935, + } + ] + result = matcher_instance.parse_agg_results(mock_data, "cpu", "avg") + assert result == expected + + def test_convert_to_df(matcher_instance): mock_data = [ {