Skip to content

Commit

Permalink
Making more generic match changes based on metrics yaml (#16)
Browse files Browse the repository at this point in the history
* adding more generic matcher

* adding ignore of 'metric'

* ignore name instead of metric
  • Loading branch information
paigerube14 authored Jan 30, 2024
1 parent 6b19079 commit 7798450
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 1 deletion.
2 changes: 1 addition & 1 deletion .github/workflows/pylint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@ jobs:
pip install pylint
- name: Analysing the code with pylint
run: |
pylint -d C0103 $(git ls-files '*.py')
pylint -d C0103 -d R0914 $(git ls-files '*.py')
130 changes: 130 additions & 0 deletions fmatch/matcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,43 @@ def filter_runs(self, pdata, data):
ids_df = ndf.loc[df['jobConfig.jobIterations'] == iterations]
return ids_df['uuid'].to_list()

def getResults(self, uuid: str, uuids: list, index_str: str, metrics: dict):
"""
Get results of elasticsearch data query based on uuid(s) and defined metrics
Args:
uuid (str): _description_
uuids (list): _description_
index_str (str): _description_
metrics (dict): _description_
Returns:
dict: Resulting data from query
"""
if len(uuids) > 1 and uuid in uuids:
uuids.remove(uuid)
ids = '" OR uuid: "'.join(uuids)
metric_string = ""
for k, v in metrics.items():
if k == "not":
for not_list in v:
for k1, v1 in not_list.items():
# f' AND NOT (jobConfig.name: "garbage-collection")'
if isinstance(v1,str):
v1 = f'"{v1}"'
metric_string += f" AND NOT ({k1}: {v1})"
elif isinstance(v,str) and not k in ['name','metric_of_interest']:
if v != "*":
v = f'"{v}"'
metric_string += f" AND {k}: {v}"
query = {
"query": {"query_string": {"query": (f'( uuid: "{ids}" )' + metric_string)}},
"size": self.search_size
}
result = self.query_index(index_str, query)
runs = [item['_source'] for item in result["hits"]["hits"]]
return runs

def burner_results(self, uuid, uuids, index):
""" kube burner podReadyLatency
Args:
Expand Down Expand Up @@ -198,6 +235,80 @@ def burner_results(self, uuid, uuids, index):
runs = [item['_source'] for item in result["hits"]["hits"]]
return runs

def get_agg_metric_query(self, uuids, index, metrics):
""" burner_metric_query will query for specific metrics data.
Args:
uuids (list): List of uuids
index (str): ES/OS Index to query from
metrics (dict): metrics defined in es index metrics
"""
ids = "\" OR uuid: \"".join(uuids)
metric_string = ""
metric_of_interest = metrics['metric_of_interest']
for k, v in metrics.items():
if k == "agg":
agg_value = v["value"]
agg_type = v["agg_type"]

elif k == "not":
for not_list in v:
for k1, v1 in not_list.items():
# f' AND NOT (jobConfig.name: "garbage-collection")'
if isinstance(v1,str):
v1 = f'"{v1}"'
metric_string += f" AND NOT ({k1}: {v1})"
elif isinstance(v,str) and not k in ['name', 'metric_of_interest']:
if v != "*":
v = f'"{v}"'
metric_string += f" AND {k}: {v}"
query = {
"aggs": {
"time": {
"terms": {
"field": "uuid.keyword",
"size": self.search_size
},
"aggs": {
"time": {
"avg": {
"field": "timestamp"}
}
}
},
"uuid": {
"terms": {
"field": "uuid.keyword",
"size": self.search_size
},
"aggs": {
agg_value: {
agg_type: {
"field": metric_of_interest
}
}
}
}
},
"query": {
"bool": {
"must": [{
"query_string": {
"query": (
f'( uuid: \"{ids}\" )' +
metric_string
)
}
}]
}
},
"size": self.search_size
}
runs = self.query_index(index, query)
self.logger.debug("Run details %s", str(runs))
data = self.parse_agg_results(runs, agg_value, agg_type)
return data

def burner_metric_query(self, uuids, namespace, index, metricName):
""" burner_metric_query will query for specific metricName data.
Expand Down Expand Up @@ -296,6 +407,25 @@ def parse_burner_cpu_results(self, data: dict):
res.append(dat)
return res

def parse_agg_results(self, data: dict, agg_value, agg_type):
""" parse out CPU data from kube-burner query
Args:
data (dict): _description_
Returns:
_type_: _description_
"""
res = []
stamps = data['aggregations']['time']['buckets']
agg_buckets = data['aggregations']['uuid']['buckets']
for stamp in stamps:
dat = {}
dat['uuid'] = stamp['key']
dat['timestamp'] = stamp['time']['value_as_string']
agg_values = next(item for item in agg_buckets if item["key"] == stamp['key'])
dat[agg_value + '_' + agg_type] = agg_values[agg_value]["value"]
res.append(dat)
return res

def convert_to_df(self, data, columns=None):
""" convert to a dataframe
Args:
Expand Down
30 changes: 30 additions & 0 deletions fmatch/tests/test_matcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,36 @@ def test_parse_burner_cpu_results(matcher_instance):
assert result == expected


def test_parse_agg_results(matcher_instance):
mock_data = {"aggregations": {"time": {"buckets": []}, "uuid": {"buckets": []}}}
mock_data["aggregations"]["time"]["buckets"] = [
{
"key": "90189fbf-7181-4129-8ca5-3cc8d656b595",
"doc_count": 1110,
"time": {
"value": 1705349944941.3918,
"value_as_string": "2024-01-15T20:19:04.941Z",
},
}
]
mock_data["aggregations"]["uuid"]["buckets"] = [
{
"key": "90189fbf-7181-4129-8ca5-3cc8d656b595",
"doc_count": 1110,
"cpu": {"value": 10.818089329872935},
}
]
expected = [
{
"uuid": "90189fbf-7181-4129-8ca5-3cc8d656b595",
"timestamp": "2024-01-15T20:19:04.941Z",
"cpu_avg": 10.818089329872935,
}
]
result = matcher_instance.parse_agg_results(mock_data, "cpu", "avg")
assert result == expected


def test_convert_to_df(matcher_instance):
mock_data = [
{
Expand Down

0 comments on commit 7798450

Please sign in to comment.