Skip to content

Commit

Permalink
Logic to use internal ES instance
Browse files Browse the repository at this point in the history
Signed-off-by: Vishnu Challa <[email protected]>
  • Loading branch information
Vishnu Challa committed Feb 14, 2024
1 parent 373baab commit 09f6cb0
Show file tree
Hide file tree
Showing 7 changed files with 227 additions and 51 deletions.
6 changes: 2 additions & 4 deletions backend/app/api/v1/commons/hce.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,11 @@ async def getData(start_datetime: date, end_datetime: date, configpath: str):
}
}
}
query['query']['bool']['filter']['range']['date']['lte'] = str(end_datetime)
query['query']['bool']['filter']['range']['date']['gte'] = str(start_datetime)

es = ElasticService(configpath=configpath)
response = await es.post(query)
response = await es.post(query=query, start_date=start_datetime, end_date=end_datetime, timestamp_field='date')
await es.close()
tasks = [item['_source'] for item in response["hits"]["hits"]]
tasks = [item['_source'] for item in response]
jobs = pd.json_normalize(tasks)
jobs[['group']] = jobs[['group']].fillna(0)
jobs.fillna('', inplace=True)
Expand Down
6 changes: 2 additions & 4 deletions backend/app/api/v1/commons/ocp.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,11 @@ async def getData(start_datetime: date, end_datetime: date, configpath: str):
}
}
}
query['query']['bool']['filter']['range']['timestamp']['lte'] = str(end_datetime)
query['query']['bool']['filter']['range']['timestamp']['gte'] = str(start_datetime)

es = ElasticService(configpath=configpath)
response = await es.post(query)
response = await es.post(query=query, start_date=start_datetime, end_date=end_datetime, timestamp_field='timestamp')
await es.close()
tasks = [item['_source'] for item in response["hits"]["hits"]]
tasks = [item['_source'] for item in response]
jobs = pd.json_normalize(tasks)
if len(jobs) == 0:
return jobs
Expand Down
8 changes: 4 additions & 4 deletions backend/app/api/v1/commons/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from app.services.search import ElasticService

async def getMetadata(uuid: str, configpath: str) :
async def getMetadata(uuid: str, configpath: str, timestamp_field: str) :
query = {
"query": {
"query_string": {
Expand All @@ -11,7 +11,7 @@ async def getMetadata(uuid: str, configpath: str) :
}
print(query)
es = ElasticService(configpath=configpath)
response = await es.post(query)
response = await es.post(query=query)
await es.close()
meta = [item['_source'] for item in response["hits"]["hits"]]
return meta[0]
meta = [item['_source'] for item in response]
return meta[0]
18 changes: 9 additions & 9 deletions backend/app/api/v1/endpoints/ocp/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ def parseCPUResults(data: dict):
@router.get('/api/v1/ocp/graph/{uuid}')
async def graph(uuid: str):
index = ""
meta = await getMetadata(uuid, 'ocp.elasticsearch')
meta = await getMetadata(uuid, 'ocp.elasticsearch', 'timestamp')
print(meta)
metrics = []
if meta["benchmark"] == "k8s-netperf" :
Expand Down Expand Up @@ -218,9 +218,9 @@ async def jobSummary(uuids: list):
}
print(query)
es = ElasticService(configpath="ocp.elasticsearch",index=index)
response = await es.post(query)
response = await es.post(query=query)
await es.close()
runs = [item['_source'] for item in response["hits"]["hits"]]
runs = [item['_source'] for item in response]
return runs

async def processBurner(data: dict) :
Expand Down Expand Up @@ -344,9 +344,9 @@ async def getBurnerResults(uuid: str, uuids: list, index: str ):
}
print(query)
es = ElasticService(configpath="ocp.elasticsearch",index=index)
response = await es.post(query)
response = await es.post(query=query)
await es.close()
runs = [item['_source'] for item in response["hits"]["hits"]]
runs = [item['_source'] for item in response]
return runs

async def getResults(uuid: str, uuids: list, index: str ):
Expand All @@ -364,9 +364,9 @@ async def getResults(uuid: str, uuids: list, index: str ):
}
print(query)
es = ElasticService(configpath="ocp.elasticsearch",index=index)
response = await es.post(query)
response = await es.post(query=query)
await es.close()
runs = [item['_source'] for item in response["hits"]["hits"]]
runs = [item['_source'] for item in response]
return runs

async def getMatchRuns(meta: dict, workerCount: False):
Expand Down Expand Up @@ -414,9 +414,9 @@ async def getMatchRuns(meta: dict, workerCount: False):

print(query)
es = ElasticService(configpath="ocp.elasticsearch")
response = await es.post(query)
response = await es.post(query=query)
await es.close()
runs = [item['_source'] for item in response["hits"]["hits"]]
runs = [item['_source'] for item in response]
uuids = []
for run in runs :
uuids.append(run["uuid"])
Expand Down
4 changes: 2 additions & 2 deletions backend/app/api/v1/endpoints/ocp/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ async def results_for_job(
}

es = ElasticService(configpath="ocp.elasticsearch")
response = await es.post(query)
response = await es.post(query=query)
await es.close()
tasks = [item['_source'] for item in response["hits"]["hits"]]
tasks = [item['_source'] for item in response]
return tasks
6 changes: 3 additions & 3 deletions backend/app/api/v1/endpoints/quay/quayGraphs.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
async def graph(uuid: str):
api_index = "quay-vegeta-results"
image_push_pull_index = "quay-push-pull"
meta = await getMetadata(uuid, 'quay.elasticsearch')
meta = await getMetadata(uuid, 'quay.elasticsearch', 'timestamp')
uuids = await getMatchRuns(meta)
prevApiData = await getQuayMetrics(uuids, api_index)
prevImagesData = await getImageMetrics(uuids, image_push_pull_index)
Expand Down Expand Up @@ -308,9 +308,9 @@ async def getMatchRuns(meta: dict):

print(query)
es = ElasticService(configpath="quay.elasticsearch")
response = await es.post(query)
response = await es.post(query=query)
await es.close()
runs = [item['_source'] for item in response["hits"]["hits"]]
runs = [item['_source'] for item in response]
uuids = []
for run in runs :
uuids.append(run["uuid"])
Expand Down
230 changes: 205 additions & 25 deletions backend/app/services/search.py
Original file line number Diff line number Diff line change
@@ -1,42 +1,222 @@
from elasticsearch import AsyncElasticsearch
from fastapi.encoders import jsonable_encoder

from datetime import datetime, timedelta
from app import config
import bisect


class ElasticService:
# todo add bulkhead pattern
# todo add error message for unauthorized user

def __init__(self,configpath="",index=""):
self.cfg = config.get_config()
self.url = self.cfg.get(configpath+'.url')
self.esUser = None
def initialize_es(self, config, path, index):
url = config.get(path+'.url')
esUser = None
if index == "":
self.indice = self.cfg.get(configpath+'.indice')
indice = config.get(path+'.indice')
else:
self.indice = index
if self.cfg.is_set(configpath+'.username') and \
self.cfg.is_set(configpath+'.password'):
self.esUser = self.cfg.get(configpath+'.username')
self.esPass = self.cfg.get(configpath+'.password')
if self.esUser :
self.es = AsyncElasticsearch(
self.url,
indice = index
if config.is_set(path+'.username') and \
config.is_set(path+'.password'):
esUser = config.get(path+'.username')
esPass = config.get(path+'.password')
if esUser :
es = AsyncElasticsearch(
url,
use_ssl=False,
verify_certs=False,
http_auth=(self.esUser,self.esPass)
http_auth=(esUser,esPass)
)
else:
self.es = AsyncElasticsearch(self.url, verify_certs=False)
es = AsyncElasticsearch(url, verify_certs=False)
return es, indice

def __init__(self,configpath="",index=""):
cfg = config.get_config()
self.new_es, self.new_index = self.initialize_es(cfg, configpath, index)
self.prev_es = None
if cfg.get(configpath + '.internal'):
self.prev_es, self.prev_index = self.initialize_es(cfg, configpath + '.internal', index)

async def scan_indices(self, es_client, indice, query, timestamp_field, start_date, end_date, size):
indices = await self.get_indices_from_alias(es_client, indice)
if not indices:
indices = [indice]
sorted_index_list = SortedIndexList()
for index in indices:
sorted_index_list.insert(IndexTimestamp(index, await self.get_timestamps(es_client, index, timestamp_field)))
filtered_indices = sorted_index_list.get_indices_in_given_range(start_date, end_date)
results = []
for each_index in filtered_indices:
query['query']['bool']['filter']['range'][timestamp_field]['lte'] = str(min(end_date, each_index.timestamps[1]))
query['query']['bool']['filter']['range'][timestamp_field]['gte'] = str(max(start_date, each_index.timestamps[0]))
response = await es_client.search(
index=each_index.index,
body=jsonable_encoder(query),
size=size)
results.extend(response['hits']['hits'])
return results

async def post(self, query, indice=None, size=10000, start_date=None, end_date=None, timestamp_field=None):
if size == 0:
if self.prev_es:
self.prev_index = 'ospst-' + self.prev_index if indice is None else indice
return await self.prev_es.search(
index=self.prev_index,
body=jsonable_encoder(query),
size=size)
else:
self.new_index = self.new_index if indice is None else indice
return await self.new_es.search(
index=self.new_index,
body=jsonable_encoder(query),
size=size)
else:
if timestamp_field:
if self.prev_es:
self.prev_index = 'ospst-' + self.prev_index if indice is None else indice
today = datetime.today().date()
seven_days_ago = today - timedelta(days=7)
if start_date and start_date > seven_days_ago:
previous_results = []
else:
end_date = min(end_date, seven_days_ago) if end_date else seven_days_ago
query['query']['bool']['filter']['range'][timestamp_field]['lte'] = str(end_date)
if start_date:
query['query']['bool']['filter']['range'][timestamp_field]['gte'] = str(start_date)
if start_date is None:
response = await self.prev_es.search(
index=self.prev_index,
body=jsonable_encoder(query),
size=size)
previous_results = response['hits']['hits']
else:
previous_results = await self.scan_indices(self.prev_es, self.prev_index, query, timestamp_field, start_date, end_date, size)
if self.prev_es and self.new_es:
self.new_index = self.new_index if indice is None else indice
today = datetime.today().date()
seven_days_ago = today - timedelta(days=7)
if end_date and end_date < seven_days_ago:
new_results = []
else:
start_date = max(start_date, seven_days_ago) if start_date else seven_days_ago
query['query']['bool']['filter']['range'][timestamp_field]['gte'] = str(start_date)
if end_date:
query['query']['bool']['filter']['range'][timestamp_field]['lte'] = str(end_date)
if end_date is None:
response = await self.new_es.search(
index=self.new_index,
body=jsonable_encoder(query),
size=size)
new_results = response['hits']['hits']
else:
new_results = await self.scan_indices(self.new_es, self.new_index, query, timestamp_field, start_date, end_date, size)
return previous_results + new_results
else:
if start_date and end_date:
query['query']['bool']['filter']['range'][timestamp_field]['gte'] = str(start_date)
query['query']['bool']['filter']['range'][timestamp_field]['lte'] = str(end_date)
return await self.scan_indices(self.new_es, self.new_index, query, timestamp_field, start_date, end_date, size)
else:
response = await self.new_es.search(
index=self.new_index,
body=jsonable_encoder(query),
size=size)
return response['hits']['hits']
else:
previous_results = []
if self.prev_es:
self.prev_index = 'ospst-' + self.prev_index if indice is None else indice
response = await self.prev_es.search(
index=self.prev_index,
body=jsonable_encoder(query),
size=size)
previous_results = response['hits']['hits']
self.new_index = self.new_index if indice is None else indice
response = await self.new_es.search(
index=self.new_index,
body=jsonable_encoder(query),
size=size)
new_results = response['hits']['hits']
seen = set()
results = []
all_results = previous_results + new_results
for each_result in all_results:
flat_doc = flatten_dict(each_result)
if tuple(sorted(flat_doc.items())) in seen:
continue
else:
results.append(each_result)
seen.add(tuple(sorted(flat_doc.items())))
return results


async def post(self, query, indice=None,size=10000):
if indice is None:
indice = self.indice
return await self.es.search(
index=indice,
body=jsonable_encoder(query),
size=size)
async def get_timestamps(self, es_client, index, timestamp_field):
query = {
"size": 0,
"aggs": {
"min_timestamp": {
"min": {
"field": timestamp_field
}
},
"max_timestamp": {
"max": {
"field": timestamp_field
}
}
}
}
response = await es_client.search(
index=index,
body=query
)
min_timestamp = response["aggregations"]["min_timestamp"]["value_as_string"]
max_timestamp = response["aggregations"]["max_timestamp"]["value_as_string"]
return [datetime.strptime(datetime.strptime(min_timestamp, "%Y-%m-%dT%H:%M:%S.%fZ").strftime("%Y-%m-%d"), "%Y-%m-%d").date(),
datetime.strptime(datetime.strptime(max_timestamp, "%Y-%m-%dT%H:%M:%S.%fZ").strftime("%Y-%m-%d"), "%Y-%m-%d").date()]

async def get_indices_from_alias(self, es_client, alias):
try:
response = await es_client.indices.get_alias(alias)
return list(response.keys())
except Exception as e:
print(f"Error retrieving indices for alias '{alias}': {e}")
return []

async def close(self):
await self.es.close()
await self.new_es.close()
if self.prev_es is not None:
await self.prev_es.close()

class IndexTimestamp:
def __init__(self, index, timestamps):
self.index = index
self.timestamps = timestamps

def __lt__(self, other):
return self.timestamps[0] < other.timestamps[0]

class SortedIndexList:
def __init__(self):
self.indices = []

def insert(self, index_timestamps):
bisect.insort(self.indices, index_timestamps)

def get_indices_in_given_range(self, start_date, end_date):
return [index_timestamp for index_timestamp in self.indices if (start_date and index_timestamp.timestamps[0] <= start_date <= index_timestamp.timestamps[1])
or (end_date and index_timestamp.timestamps[0] <= end_date <= index_timestamp.timestamps[1])]

def flatten_dict(d, parent_key='', sep='.'):
items = []
for k, v in d.items():
new_key = parent_key + sep + k if parent_key else k
if isinstance(v, dict):
items.extend(flatten_dict(v, new_key, sep=sep).items())
elif isinstance(v, list):
for i, val in enumerate(v):
items.extend(flatten_dict({str(i): val}, new_key, sep=sep).items())
else:
items.append((new_key, v))
return dict(items)

0 comments on commit 09f6cb0

Please sign in to comment.