From 09f6cb0d5d266e68d0b0f9a75944c793571c958f Mon Sep 17 00:00:00 2001 From: Vishnu Challa Date: Tue, 13 Feb 2024 23:16:04 -0500 Subject: [PATCH] Logic to use internal ES instance Signed-off-by: Vishnu Challa --- backend/app/api/v1/commons/hce.py | 6 +- backend/app/api/v1/commons/ocp.py | 6 +- backend/app/api/v1/commons/utils.py | 8 +- backend/app/api/v1/endpoints/ocp/graph.py | 18 +- backend/app/api/v1/endpoints/ocp/results.py | 4 +- .../app/api/v1/endpoints/quay/quayGraphs.py | 6 +- backend/app/services/search.py | 230 ++++++++++++++++-- 7 files changed, 227 insertions(+), 51 deletions(-) diff --git a/backend/app/api/v1/commons/hce.py b/backend/app/api/v1/commons/hce.py index ee53cbd5..701f0c2e 100644 --- a/backend/app/api/v1/commons/hce.py +++ b/backend/app/api/v1/commons/hce.py @@ -17,13 +17,11 @@ async def getData(start_datetime: date, end_datetime: date, configpath: str): } } } - query['query']['bool']['filter']['range']['date']['lte'] = str(end_datetime) - query['query']['bool']['filter']['range']['date']['gte'] = str(start_datetime) es = ElasticService(configpath=configpath) - response = await es.post(query) + response = await es.post(query=query, start_date=start_datetime, end_date=end_datetime, timestamp_field='date') await es.close() - tasks = [item['_source'] for item in response["hits"]["hits"]] + tasks = [item['_source'] for item in response] jobs = pd.json_normalize(tasks) jobs[['group']] = jobs[['group']].fillna(0) jobs.fillna('', inplace=True) diff --git a/backend/app/api/v1/commons/ocp.py b/backend/app/api/v1/commons/ocp.py index 965babd4..6959f15f 100644 --- a/backend/app/api/v1/commons/ocp.py +++ b/backend/app/api/v1/commons/ocp.py @@ -17,13 +17,11 @@ async def getData(start_datetime: date, end_datetime: date, configpath: str): } } } - query['query']['bool']['filter']['range']['timestamp']['lte'] = str(end_datetime) - query['query']['bool']['filter']['range']['timestamp']['gte'] = str(start_datetime) es = ElasticService(configpath=configpath) - response = await es.post(query) + response = await es.post(query=query, start_date=start_datetime, end_date=end_datetime, timestamp_field='timestamp') await es.close() - tasks = [item['_source'] for item in response["hits"]["hits"]] + tasks = [item['_source'] for item in response] jobs = pd.json_normalize(tasks) if len(jobs) == 0: return jobs diff --git a/backend/app/api/v1/commons/utils.py b/backend/app/api/v1/commons/utils.py index 912e7eaf..1b7a2769 100644 --- a/backend/app/api/v1/commons/utils.py +++ b/backend/app/api/v1/commons/utils.py @@ -1,6 +1,6 @@ from app.services.search import ElasticService -async def getMetadata(uuid: str, configpath: str) : +async def getMetadata(uuid: str, configpath: str, timestamp_field: str) : query = { "query": { "query_string": { @@ -11,7 +11,7 @@ async def getMetadata(uuid: str, configpath: str) : } print(query) es = ElasticService(configpath=configpath) - response = await es.post(query) + response = await es.post(query=query) await es.close() - meta = [item['_source'] for item in response["hits"]["hits"]] - return meta[0] + meta = [item['_source'] for item in response] + return meta[0] \ No newline at end of file diff --git a/backend/app/api/v1/endpoints/ocp/graph.py b/backend/app/api/v1/endpoints/ocp/graph.py index 3728f0f1..bdb0f809 100644 --- a/backend/app/api/v1/endpoints/ocp/graph.py +++ b/backend/app/api/v1/endpoints/ocp/graph.py @@ -115,7 +115,7 @@ def parseCPUResults(data: dict): @router.get('/api/v1/ocp/graph/{uuid}') async def graph(uuid: str): index = "" - meta = await getMetadata(uuid, 'ocp.elasticsearch') + meta = await getMetadata(uuid, 'ocp.elasticsearch', 'timestamp') print(meta) metrics = [] if meta["benchmark"] == "k8s-netperf" : @@ -218,9 +218,9 @@ async def jobSummary(uuids: list): } print(query) es = ElasticService(configpath="ocp.elasticsearch",index=index) - response = await es.post(query) + response = await es.post(query=query) await es.close() - runs = [item['_source'] for item in response["hits"]["hits"]] + runs = [item['_source'] for item in response] return runs async def processBurner(data: dict) : @@ -344,9 +344,9 @@ async def getBurnerResults(uuid: str, uuids: list, index: str ): } print(query) es = ElasticService(configpath="ocp.elasticsearch",index=index) - response = await es.post(query) + response = await es.post(query=query) await es.close() - runs = [item['_source'] for item in response["hits"]["hits"]] + runs = [item['_source'] for item in response] return runs async def getResults(uuid: str, uuids: list, index: str ): @@ -364,9 +364,9 @@ async def getResults(uuid: str, uuids: list, index: str ): } print(query) es = ElasticService(configpath="ocp.elasticsearch",index=index) - response = await es.post(query) + response = await es.post(query=query) await es.close() - runs = [item['_source'] for item in response["hits"]["hits"]] + runs = [item['_source'] for item in response] return runs async def getMatchRuns(meta: dict, workerCount: False): @@ -414,9 +414,9 @@ async def getMatchRuns(meta: dict, workerCount: False): print(query) es = ElasticService(configpath="ocp.elasticsearch") - response = await es.post(query) + response = await es.post(query=query) await es.close() - runs = [item['_source'] for item in response["hits"]["hits"]] + runs = [item['_source'] for item in response] uuids = [] for run in runs : uuids.append(run["uuid"]) diff --git a/backend/app/api/v1/endpoints/ocp/results.py b/backend/app/api/v1/endpoints/ocp/results.py index f296b21b..6b94d70d 100644 --- a/backend/app/api/v1/endpoints/ocp/results.py +++ b/backend/app/api/v1/endpoints/ocp/results.py @@ -21,7 +21,7 @@ async def results_for_job( } es = ElasticService(configpath="ocp.elasticsearch") - response = await es.post(query) + response = await es.post(query=query) await es.close() - tasks = [item['_source'] for item in response["hits"]["hits"]] + tasks = [item['_source'] for item in response] return tasks diff --git a/backend/app/api/v1/endpoints/quay/quayGraphs.py b/backend/app/api/v1/endpoints/quay/quayGraphs.py index a68df0f7..866e3d02 100644 --- a/backend/app/api/v1/endpoints/quay/quayGraphs.py +++ b/backend/app/api/v1/endpoints/quay/quayGraphs.py @@ -13,7 +13,7 @@ async def graph(uuid: str): api_index = "quay-vegeta-results" image_push_pull_index = "quay-push-pull" - meta = await getMetadata(uuid, 'quay.elasticsearch') + meta = await getMetadata(uuid, 'quay.elasticsearch', 'timestamp') uuids = await getMatchRuns(meta) prevApiData = await getQuayMetrics(uuids, api_index) prevImagesData = await getImageMetrics(uuids, image_push_pull_index) @@ -308,9 +308,9 @@ async def getMatchRuns(meta: dict): print(query) es = ElasticService(configpath="quay.elasticsearch") - response = await es.post(query) + response = await es.post(query=query) await es.close() - runs = [item['_source'] for item in response["hits"]["hits"]] + runs = [item['_source'] for item in response] uuids = [] for run in runs : uuids.append(run["uuid"]) diff --git a/backend/app/services/search.py b/backend/app/services/search.py index 72508c12..0ea5e87f 100644 --- a/backend/app/services/search.py +++ b/backend/app/services/search.py @@ -1,42 +1,222 @@ from elasticsearch import AsyncElasticsearch from fastapi.encoders import jsonable_encoder - +from datetime import datetime, timedelta from app import config +import bisect - + class ElasticService: # todo add bulkhead pattern # todo add error message for unauthorized user - def __init__(self,configpath="",index=""): - self.cfg = config.get_config() - self.url = self.cfg.get(configpath+'.url') - self.esUser = None + def initialize_es(self, config, path, index): + url = config.get(path+'.url') + esUser = None if index == "": - self.indice = self.cfg.get(configpath+'.indice') + indice = config.get(path+'.indice') else: - self.indice = index - if self.cfg.is_set(configpath+'.username') and \ - self.cfg.is_set(configpath+'.password'): - self.esUser = self.cfg.get(configpath+'.username') - self.esPass = self.cfg.get(configpath+'.password') - if self.esUser : - self.es = AsyncElasticsearch( - self.url, + indice = index + if config.is_set(path+'.username') and \ + config.is_set(path+'.password'): + esUser = config.get(path+'.username') + esPass = config.get(path+'.password') + if esUser : + es = AsyncElasticsearch( + url, use_ssl=False, verify_certs=False, - http_auth=(self.esUser,self.esPass) + http_auth=(esUser,esPass) ) else: - self.es = AsyncElasticsearch(self.url, verify_certs=False) + es = AsyncElasticsearch(url, verify_certs=False) + return es, indice + + def __init__(self,configpath="",index=""): + cfg = config.get_config() + self.new_es, self.new_index = self.initialize_es(cfg, configpath, index) + self.prev_es = None + if cfg.get(configpath + '.internal'): + self.prev_es, self.prev_index = self.initialize_es(cfg, configpath + '.internal', index) + + async def scan_indices(self, es_client, indice, query, timestamp_field, start_date, end_date, size): + indices = await self.get_indices_from_alias(es_client, indice) + if not indices: + indices = [indice] + sorted_index_list = SortedIndexList() + for index in indices: + sorted_index_list.insert(IndexTimestamp(index, await self.get_timestamps(es_client, index, timestamp_field))) + filtered_indices = sorted_index_list.get_indices_in_given_range(start_date, end_date) + results = [] + for each_index in filtered_indices: + query['query']['bool']['filter']['range'][timestamp_field]['lte'] = str(min(end_date, each_index.timestamps[1])) + query['query']['bool']['filter']['range'][timestamp_field]['gte'] = str(max(start_date, each_index.timestamps[0])) + response = await es_client.search( + index=each_index.index, + body=jsonable_encoder(query), + size=size) + results.extend(response['hits']['hits']) + return results + + async def post(self, query, indice=None, size=10000, start_date=None, end_date=None, timestamp_field=None): + if size == 0: + if self.prev_es: + self.prev_index = 'ospst-' + self.prev_index if indice is None else indice + return await self.prev_es.search( + index=self.prev_index, + body=jsonable_encoder(query), + size=size) + else: + self.new_index = self.new_index if indice is None else indice + return await self.new_es.search( + index=self.new_index, + body=jsonable_encoder(query), + size=size) + else: + if timestamp_field: + if self.prev_es: + self.prev_index = 'ospst-' + self.prev_index if indice is None else indice + today = datetime.today().date() + seven_days_ago = today - timedelta(days=7) + if start_date and start_date > seven_days_ago: + previous_results = [] + else: + end_date = min(end_date, seven_days_ago) if end_date else seven_days_ago + query['query']['bool']['filter']['range'][timestamp_field]['lte'] = str(end_date) + if start_date: + query['query']['bool']['filter']['range'][timestamp_field]['gte'] = str(start_date) + if start_date is None: + response = await self.prev_es.search( + index=self.prev_index, + body=jsonable_encoder(query), + size=size) + previous_results = response['hits']['hits'] + else: + previous_results = await self.scan_indices(self.prev_es, self.prev_index, query, timestamp_field, start_date, end_date, size) + if self.prev_es and self.new_es: + self.new_index = self.new_index if indice is None else indice + today = datetime.today().date() + seven_days_ago = today - timedelta(days=7) + if end_date and end_date < seven_days_ago: + new_results = [] + else: + start_date = max(start_date, seven_days_ago) if start_date else seven_days_ago + query['query']['bool']['filter']['range'][timestamp_field]['gte'] = str(start_date) + if end_date: + query['query']['bool']['filter']['range'][timestamp_field]['lte'] = str(end_date) + if end_date is None: + response = await self.new_es.search( + index=self.new_index, + body=jsonable_encoder(query), + size=size) + new_results = response['hits']['hits'] + else: + new_results = await self.scan_indices(self.new_es, self.new_index, query, timestamp_field, start_date, end_date, size) + return previous_results + new_results + else: + if start_date and end_date: + query['query']['bool']['filter']['range'][timestamp_field]['gte'] = str(start_date) + query['query']['bool']['filter']['range'][timestamp_field]['lte'] = str(end_date) + return await self.scan_indices(self.new_es, self.new_index, query, timestamp_field, start_date, end_date, size) + else: + response = await self.new_es.search( + index=self.new_index, + body=jsonable_encoder(query), + size=size) + return response['hits']['hits'] + else: + previous_results = [] + if self.prev_es: + self.prev_index = 'ospst-' + self.prev_index if indice is None else indice + response = await self.prev_es.search( + index=self.prev_index, + body=jsonable_encoder(query), + size=size) + previous_results = response['hits']['hits'] + self.new_index = self.new_index if indice is None else indice + response = await self.new_es.search( + index=self.new_index, + body=jsonable_encoder(query), + size=size) + new_results = response['hits']['hits'] + seen = set() + results = [] + all_results = previous_results + new_results + for each_result in all_results: + flat_doc = flatten_dict(each_result) + if tuple(sorted(flat_doc.items())) in seen: + continue + else: + results.append(each_result) + seen.add(tuple(sorted(flat_doc.items()))) + return results + - async def post(self, query, indice=None,size=10000): - if indice is None: - indice = self.indice - return await self.es.search( - index=indice, - body=jsonable_encoder(query), - size=size) + async def get_timestamps(self, es_client, index, timestamp_field): + query = { + "size": 0, + "aggs": { + "min_timestamp": { + "min": { + "field": timestamp_field + } + }, + "max_timestamp": { + "max": { + "field": timestamp_field + } + } + } + } + response = await es_client.search( + index=index, + body=query + ) + min_timestamp = response["aggregations"]["min_timestamp"]["value_as_string"] + max_timestamp = response["aggregations"]["max_timestamp"]["value_as_string"] + return [datetime.strptime(datetime.strptime(min_timestamp, "%Y-%m-%dT%H:%M:%S.%fZ").strftime("%Y-%m-%d"), "%Y-%m-%d").date(), + datetime.strptime(datetime.strptime(max_timestamp, "%Y-%m-%dT%H:%M:%S.%fZ").strftime("%Y-%m-%d"), "%Y-%m-%d").date()] + + async def get_indices_from_alias(self, es_client, alias): + try: + response = await es_client.indices.get_alias(alias) + return list(response.keys()) + except Exception as e: + print(f"Error retrieving indices for alias '{alias}': {e}") + return [] async def close(self): - await self.es.close() + await self.new_es.close() + if self.prev_es is not None: + await self.prev_es.close() + +class IndexTimestamp: + def __init__(self, index, timestamps): + self.index = index + self.timestamps = timestamps + + def __lt__(self, other): + return self.timestamps[0] < other.timestamps[0] + +class SortedIndexList: + def __init__(self): + self.indices = [] + + def insert(self, index_timestamps): + bisect.insort(self.indices, index_timestamps) + + def get_indices_in_given_range(self, start_date, end_date): + return [index_timestamp for index_timestamp in self.indices if (start_date and index_timestamp.timestamps[0] <= start_date <= index_timestamp.timestamps[1]) + or (end_date and index_timestamp.timestamps[0] <= end_date <= index_timestamp.timestamps[1])] + +def flatten_dict(d, parent_key='', sep='.'): + items = [] + for k, v in d.items(): + new_key = parent_key + sep + k if parent_key else k + if isinstance(v, dict): + items.extend(flatten_dict(v, new_key, sep=sep).items()) + elif isinstance(v, list): + for i, val in enumerate(v): + items.extend(flatten_dict({str(i): val}, new_key, sep=sep).items()) + else: + items.append((new_key, v)) + return dict(items) \ No newline at end of file