diff --git a/.github/workflows/unit_test.yml b/.github/workflows/unit_test.yml index 1ece302..6f2bcdc 100644 --- a/.github/workflows/unit_test.yml +++ b/.github/workflows/unit_test.yml @@ -1,6 +1,8 @@ name: Run pytest on Pull Request -on: [push] +on: + push: + pull_request: jobs: test: diff --git a/fmatch/logrus.py b/fmatch/logrus.py new file mode 100644 index 0000000..879e1cf --- /dev/null +++ b/fmatch/logrus.py @@ -0,0 +1,49 @@ +""" +Logger as a common package +""" + +import logging +import sys + + +class SingletonLogger: + """Singleton logger to set logging at one single place + + Returns: + _type_: _description_ + """ + + instance = {} + + def __new__(cls, debug: int, name: str): + if (not cls.instance) or name not in cls.instance: + cls.instance[name] = cls._initialize_logger(debug,name) + return cls.instance[name] + + @staticmethod + def _initialize_logger(debug: int, name: str) -> logging.Logger: + level = debug # if debug else logging.INFO + logger = logging.getLogger(name) + logger.propagate=False + if not logger.hasHandlers(): + logger.setLevel(level) + handler = logging.StreamHandler(sys.stdout) + handler.setLevel(level) + formatter = logging.Formatter( + "%(asctime)s - %(name)-10s - %(levelname)s - file: %(filename)s - line: %(lineno)d - %(message)s" # pylint: disable = line-too-long + ) + handler.setFormatter(formatter) + logger.addHandler(handler) + return logger + + @classmethod + def getLogger(cls, name:str) -> logging.Logger: + """Return logger in instance + + Args: + name (str): name of the logger + + Returns: + logging.Logger: logger + """ + return cls.instance.get(name, None) diff --git a/fmatch/matcher.py b/fmatch/matcher.py index d5dae64..482f939 100644 --- a/fmatch/matcher.py +++ b/fmatch/matcher.py @@ -1,10 +1,11 @@ """ metadata matcher """ -# pylint: disable = invalid-name, invalid-unary-operand-type +# pylint: disable = invalid-name, invalid-unary-operand-type, no-member import os -import sys import logging +from datetime import datetime +from typing import List, Dict, Any # pylint: disable=import-error from elasticsearch import Elasticsearch @@ -13,35 +14,28 @@ # pylint: disable=import-error import pandas as pd from elasticsearch_dsl import Search, Q +from elasticsearch_dsl.response import Response +from fmatch.logrus import SingletonLogger class Matcher: """Matcher""" def __init__( - self, index="ospst-perf-scale-ci", - level=logging.INFO, - ES_URL=os.getenv("ES_SERVER"), - verify_certs=True + self, + index: str ="ospst-perf-scale-ci", + level: int =logging.INFO, + ES_URL: str =os.getenv("ES_SERVER"), + verify_certs: bool =True, ): self.index = index self.es_url = ES_URL self.search_size = 10000 - self.logger = logging.getLogger("Matcher") - self.logger.setLevel(level) - handler = logging.StreamHandler(sys.stdout) - handler.setLevel(level) - formatter = logging.Formatter( - "%(asctime)s [%(name)s:%(filename)s:%(lineno)d] %(levelname)s: %(message)s" - ) - handler.setFormatter(formatter) - self.logger.addHandler(handler) - # We can set the ES logging higher if we want additional debugging - logging.getLogger("elasticsearch").setLevel(logging.WARN) + self.logger = SingletonLogger(debug=level, name="Matcher") self.es = Elasticsearch([self.es_url], timeout=30, verify_certs=verify_certs) self.data = None - def get_metadata_by_uuid(self, uuid, index=None): + def get_metadata_by_uuid(self, uuid: str, index: str = None) -> dict: """Returns back metadata when uuid is given Args: @@ -62,7 +56,7 @@ def get_metadata_by_uuid(self, uuid, index=None): result = dict(hits[0].to_dict()["_source"]) return result - def query_index(self, index, search): + def query_index(self, index: str, search: Search) -> Response: """generic query function Args: @@ -73,33 +67,50 @@ def query_index(self, index, search): self.logger.debug("Executing query \r\n%s", search.to_dict()) return search.execute() - def get_uuid_by_metadata(self, meta, index=None): + def get_uuid_by_metadata( + self, meta: Dict[str, Any], index: str = None, lookback_date: datetime = None + ) -> List[Dict[str, str]]: """get_uuid_by_metadata""" if index is None: index = self.index version = meta["ocpVersion"][:4] + + must_clause = [ + ( + Q("match", **{field: str(value)}) + if isinstance(value, str) + else Q("match", **{field: value}) + ) + for field, value in meta.items() + if field not in "ocpVersion" + ] + + filter_clause = [ + Q("wildcard", ocpVersion=f"{version}*"), + Q("match", jobStatus="success"), + ] + if isinstance(lookback_date, datetime): + lookback_date = lookback_date.strftime("%Y-%m-%dT%H:%M:%SZ") + if lookback_date: + filter_clause.append(Q("range", timestamp={"gt": lookback_date})) query = Q( "bool", - must=[ - Q( - "match", **{field: str(value)} - ) if isinstance(value, str) else Q('match', **{field: value}) - for field, value in meta.items() - if field not in "ocpVersion" - ], - filter=[ - Q("wildcard", ocpVersion=f"{version}*"), - Q("match", jobStatus="success"), - ], + must=must_clause, + filter=filter_clause, ) s = Search(using=self.es, index=index).query(query).extra(size=self.search_size) result = self.query_index(index, s) hits = result.hits.hits - uuids_docs = [{ "uuid":hit.to_dict()["_source"]["uuid"], - "buildUrl":hit.to_dict()["_source"]["buildUrl"]} for hit in hits] + uuids_docs = [ + { + "uuid": hit.to_dict()["_source"]["uuid"], + "buildUrl": hit.to_dict()["_source"]["buildUrl"], + } + for hit in hits + ] return uuids_docs - def match_kube_burner(self, uuids, index): + def match_kube_burner(self, uuids: List[str], index: str) -> List[Dict[str, Any]]: """match kube burner runs Args: uuids (list): list of uuids @@ -121,7 +132,7 @@ def match_kube_burner(self, uuids, index): runs = [item.to_dict()["_source"] for item in result.hits.hits] return runs - def filter_runs(self, pdata, data): + def filter_runs(self, pdata: Dict[Any, Any], data: Dict[Any, Any]) -> List[str]: """filter out runs with different jobIterations Args: pdata (_type_): _description_ @@ -138,7 +149,9 @@ def filter_runs(self, pdata, data): ids_df = ndf.loc[df["jobConfig.jobIterations"] == iterations] return ids_df["uuid"].to_list() - def getResults(self, uuid: str, uuids: list, index_str: str, metrics: dict): + def getResults( + self, uuid: str, uuids: List[str], index_str: str, metrics: Dict[str, Any] + ) -> Dict[Any, Any]: """ Get results of elasticsearch data query based on uuid(s) and defined metrics @@ -156,7 +169,7 @@ def getResults(self, uuid: str, uuids: list, index_str: str, metrics: dict): metric_queries = [] not_queries = [ ~Q("match", **{not_item_key: not_item_value}) - for not_item_key, not_item_value in metrics.get("not",{}).items() + for not_item_key, not_item_value in metrics.get("not", {}).items() ] metric_queries = [ Q("match", **{metric_key: metric_value}) @@ -180,7 +193,9 @@ def getResults(self, uuid: str, uuids: list, index_str: str, metrics: dict): runs = [item.to_dict()["_source"] for item in result.hits.hits] return runs - def get_agg_metric_query(self, uuids, index, metrics): + def get_agg_metric_query( + self, uuids: List[str], index: str, metrics: Dict[str, Any] + ): """burner_metric_query will query for specific metrics data. Args: @@ -222,7 +237,9 @@ def get_agg_metric_query(self, uuids, index, metrics): data = self.parse_agg_results(result, agg_value, agg_type) return data - def parse_agg_results(self, data: dict, agg_value, agg_type): + def parse_agg_results( + self, data: Dict[Any, Any], agg_value: str, agg_type: str + ) -> List[Dict[Any, Any]]: """parse out CPU data from kube-burner query Args: data (dict): Aggregated data from Elasticsearch DSL query @@ -249,7 +266,9 @@ def parse_agg_results(self, data: dict, agg_value, agg_type): res.append(dat) return res - def convert_to_df(self, data, columns=None): + def convert_to_df( + self, data: Dict[Any, Any], columns: List[str] = None + ) -> pd.DataFrame: """convert to a dataframe Args: data (_type_): _description_ @@ -263,7 +282,12 @@ def convert_to_df(self, data, columns=None): odf = pd.DataFrame(odf, columns=columns) return odf - def save_results(self, df, csv_file_path="output.csv", columns=None): + def save_results( + self, + df: pd.DataFrame, + csv_file_path: str = "output.csv", + columns: List[str] = None, + ) -> None: """write results to CSV Args: df (_type_): _description_ diff --git a/fmatch/test_fmatch.py b/fmatch/test_fmatch.py index a63437b..3d665c5 100644 --- a/fmatch/test_fmatch.py +++ b/fmatch/test_fmatch.py @@ -2,24 +2,31 @@ test_fmatch """ +from datetime import datetime import sys +import warnings # pylint: disable=import-error import pandas as pd # pylint: disable=import-error from matcher import Matcher -match = Matcher(index="perf_scale_ci") -res=match.get_metadata_by_uuid("b4afc724-f175-44d1-81ff-a8255fea034f",'perf_scale_ci') +warnings.filterwarnings("ignore", message="Unverified HTTPS request.*") +warnings.filterwarnings( + "ignore", category=UserWarning, message=".*Connecting to.*verify_certs=False.*" +) + +match = Matcher(index="perf_scale_ci*", verify_certs=False) +res=match.get_metadata_by_uuid("b4afc724-f175-44d1-81ff-a8255fea034f",'perf_scale_ci*') meta = {} meta["masterNodesType"] = "m6a.xlarge" meta["workerNodesType"] = "m6a.xlarge" meta["platform"] = "AWS" meta["masterNodesCount"] = 3 -meta["workerNodesCount"] = 24 +meta["workerNodesCount"] = 6 meta["jobStatus"] = "success" -meta["ocpVersion"] = "4.15" +meta["ocpVersion"] = "4.17" meta["networkType"] = "OVNKubernetes" meta["benchmark.keyword"] = "cluster-density-v2" # meta['encrypted'] = "true" @@ -27,10 +34,15 @@ # meta['fips'] = "false" uuids = match.get_uuid_by_metadata(meta) +print("All uuids",len(uuids)) +date= datetime.strptime("2024-07-01T13:46:24Z","%Y-%m-%dT%H:%M:%SZ") +uuids2= match.get_uuid_by_metadata(meta,lookback_date=date) +print("lookback uuids",len(uuids2)) +uuids2 = match.get_uuid_by_metadata(meta) if len(uuids) == 0: print("No UUID present for given metadata") sys.exit() -runs = match.match_kube_burner(uuids) +runs = match.match_kube_burner(uuids,"ripsaw-kube-burner*") ids = match.filter_runs(runs, runs) podl_metrics = { diff --git a/fmatch/tests/test_matcher.py b/fmatch/tests/test_matcher.py index 44a527a..95a6462 100644 --- a/fmatch/tests/test_matcher.py +++ b/fmatch/tests/test_matcher.py @@ -7,6 +7,7 @@ # pylint: disable = import-error, duplicate-code import os from unittest.mock import patch +import datetime from elasticsearch_dsl import Search from elasticsearch_dsl.response import Response @@ -76,6 +77,29 @@ def test_get_uuid_by_metadata(matcher_instance): "buildUrl":"buildUrl1"}] assert result == expected +def test_get_uuid_by_metadata_lookback(matcher_instance): + matcher_instance.es.search = lambda *args, **kwargs: { + "hits": { + "hits": [{"_source": {"uuid": "uuid1", + "buildUrl":"buildUrl1", + "timestamp":"2024-07-10T13:46:24Z"}}, + {"_source": {"uuid": "uuid2", + "buildUrl":"buildUrl1", + "timestamp":"2024-07-08T13:46:24Z"}}] + } + } + meta = { + "field1": "value1", + "ocpVersion": "4.15", + } + date= datetime.datetime.strptime("2024-07-07T13:46:24Z","%Y-%m-%dT%H:%M:%SZ") + result = matcher_instance.get_uuid_by_metadata(meta=meta, lookback_date=date) + expected= [{"uuid": "uuid1", + "buildUrl":"buildUrl1"}, + {"uuid": "uuid2", + "buildUrl":"buildUrl1"}] + assert result == expected + def test_match_kube_burner(matcher_instance): result = matcher_instance.match_kube_burner(["uuid1"],index="ospst-*") diff --git a/setup.py b/setup.py index 6e44201..07e6db9 100644 --- a/setup.py +++ b/setup.py @@ -4,7 +4,7 @@ from setuptools import setup, find_packages -VERSION = '0.0.7' +VERSION = '0.0.8' DESCRIPTION = 'Common package for matching runs with provided metadata' # pylint: disable= line-too-long LONG_DESCRIPTION = "A package that allows to match metadata and get runs and create csv files with queried metrics"