From 5671d7765664417bc188c8c0bcd181259fcfe7a7 Mon Sep 17 00:00:00 2001 From: David Butenhof Date: Fri, 4 Oct 2024 08:44:31 -0400 Subject: [PATCH] Cleanup OpenSearch connections Add a Crucible `close` method, and use a FastAPI yield dependency to ensure every API connection is closed cleanly. --- backend/app/api/v1/endpoints/ilab/ilab.py | 80 ++++++++++++++--------- backend/app/services/crucible_svc.py | 6 ++ 2 files changed, 56 insertions(+), 30 deletions(-) diff --git a/backend/app/api/v1/endpoints/ilab/ilab.py b/backend/app/api/v1/endpoints/ilab/ilab.py index e49b8c6c..d83290a8 100644 --- a/backend/app/api/v1/endpoints/ilab/ilab.py +++ b/backend/app/api/v1/endpoints/ilab/ilab.py @@ -8,7 +8,7 @@ from datetime import datetime, timedelta, timezone from typing import Annotated, Any, Optional -from fastapi import APIRouter, Query +from fastapi import APIRouter, Depends, HTTPException, Query, status from app.services.crucible_svc import CrucibleService, Graph, GraphList @@ -26,6 +26,22 @@ def example_error(message: str) -> dict[str, Any]: return example_response({"message": message}) +def crucible_svc(): + crucible = None + try: + crucible = CrucibleService(CONFIGPATH) + yield crucible + except Exception as e: + print(f"Error opening {CONFIGPATH}: {str(e)!r}") + raise HTTPException( + status.HTTP_502_BAD_GATEWAY, + f"Crucible service is not available: {str(e)!r}", + ) + finally: + if crucible: + crucible.close() + + @router.get( "/api/v1/ilab/runs/filters", summary="Returns possible filters", @@ -66,8 +82,7 @@ def example_error(message: str) -> dict[str, Any]: ) }, ) -async def run_filters(): - crucible = CrucibleService(CONFIGPATH) +async def run_filters(crucible: Annotated[CrucibleService, Depends(crucible_svc)]): return crucible.get_run_filters() @@ -125,6 +140,7 @@ async def run_filters(): }, ) async def runs( + crucible: Annotated[CrucibleService, Depends(crucible_svc)], start_date: Annotated[ Optional[str], Query(description="Start time for search", examples=["2020-11-10"]), @@ -151,7 +167,6 @@ async def runs( Query(description="Page offset to start", examples=[10]), ] = 0, ): - crucible = CrucibleService(CONFIGPATH) if start_date is None and end_date is None: now = datetime.now(timezone.utc) start = now - timedelta(days=30) @@ -174,8 +189,7 @@ async def runs( 400: example_error("Parameter error"), }, ) -async def tags(run: str): - crucible = CrucibleService(CONFIGPATH) +async def tags(crucible: Annotated[CrucibleService, Depends(crucible_svc)], run: str): return crucible.get_tags(run) @@ -209,8 +223,7 @@ async def tags(run: str): 400: example_error("Parameter error"), }, ) -async def params(run: str): - crucible = CrucibleService(CONFIGPATH) +async def params(crucible: Annotated[CrucibleService, Depends(crucible_svc)], run: str): return crucible.get_params(run) @@ -242,8 +255,9 @@ async def params(run: str): 400: example_error("Parameter error"), }, ) -async def iterations(run: str): - crucible = CrucibleService(CONFIGPATH) +async def iterations( + crucible: Annotated[CrucibleService, Depends(crucible_svc)], run: str +): return crucible.get_iterations(run) @@ -275,8 +289,9 @@ async def iterations(run: str): 400: example_error("Parameter error"), }, ) -async def run_samples(run: str): - crucible = CrucibleService(CONFIGPATH) +async def run_samples( + crucible: Annotated[CrucibleService, Depends(crucible_svc)], run: str +): return crucible.get_samples(run) @@ -312,8 +327,9 @@ async def run_samples(run: str): 400: example_error("Parameter error"), }, ) -async def run_periods(run: str): - crucible = CrucibleService(CONFIGPATH) +async def run_periods( + crucible: Annotated[CrucibleService, Depends(crucible_svc)], run: str +): return crucible.get_periods(run) @@ -345,8 +361,9 @@ async def run_periods(run: str): 400: example_error("Parameter error"), }, ) -async def iteration_samples(iteration: str): - crucible = CrucibleService(CONFIGPATH) +async def iteration_samples( + crucible: Annotated[CrucibleService, Depends(crucible_svc)], iteration: str +): return crucible.get_samples(iteration=iteration) @@ -393,8 +410,9 @@ async def iteration_samples(iteration: str): 400: example_error("Parameter error"), }, ) -async def timeline(run: str): - crucible = CrucibleService(CONFIGPATH) +async def timeline( + crucible: Annotated[CrucibleService, Depends(crucible_svc)], run: str +): return crucible.get_timeline(run) @@ -432,8 +450,9 @@ async def timeline(run: str): 400: example_error("Parameter error"), }, ) -async def metrics(run: str): - crucible = CrucibleService(CONFIGPATH) +async def metrics( + crucible: Annotated[CrucibleService, Depends(crucible_svc)], run: str +): return crucible.get_metrics_list(run) @@ -455,6 +474,7 @@ async def metrics(run: str): }, ) async def metric_breakouts( + crucible: Annotated[CrucibleService, Depends(crucible_svc)], run: str, metric: str, name: Annotated[ @@ -472,7 +492,6 @@ async def metric_breakouts( ), ] = None, ): - crucible = CrucibleService(CONFIGPATH) return crucible.get_metric_breakouts(run, metric, names=name, periods=period) @@ -512,6 +531,7 @@ async def metric_breakouts( }, ) async def metric_data( + crucible: Annotated[CrucibleService, Depends(crucible_svc)], run: str, metric: str, name: Annotated[ @@ -532,7 +552,6 @@ async def metric_data( bool, Query(description="Allow aggregation of metrics") ] = False, ): - crucible = CrucibleService(CONFIGPATH) return crucible.get_metrics_data( run, metric, names=name, periods=period, aggregate=aggregate ) @@ -567,6 +586,7 @@ async def metric_data( }, ) async def metric_summary( + crucible: Annotated[CrucibleService, Depends(crucible_svc)], run: str, metric: str, name: Annotated[ @@ -584,7 +604,6 @@ async def metric_summary( ), ] = None, ): - crucible = CrucibleService(CONFIGPATH) return crucible.get_metrics_summary(run, metric, names=name, periods=period) @@ -654,8 +673,9 @@ async def metric_summary( ), }, ) -async def metric_graph_body(graphs: GraphList): - crucible = CrucibleService(CONFIGPATH) +async def metric_graph_body( + crucible: Annotated[CrucibleService, Depends(crucible_svc)], graphs: GraphList +): return crucible.get_metrics_graph(graphs) @@ -712,6 +732,7 @@ async def metric_graph_body(graphs: GraphList): }, ) async def metric_graph_param( + crucible: Annotated[CrucibleService, Depends(crucible_svc)], run: str, metric: str, aggregate: Annotated[ @@ -733,7 +754,6 @@ async def metric_graph_param( ] = None, title: Annotated[Optional[str], Query(description="Title for graph")] = None, ): - crucible = CrucibleService(CONFIGPATH) return crucible.get_metrics_graph( GraphList( run=run, @@ -776,8 +796,7 @@ async def metric_graph_param( ), }, ) -async def info(): - crucible = CrucibleService(CONFIGPATH) +async def info(crucible: Annotated[CrucibleService, Depends(crucible_svc)]): return crucible.info @@ -807,6 +826,7 @@ async def info(): 400: example_error("Index name 'foo' doesn't exist"), }, ) -async def fields(index: str): - crucible = CrucibleService(CONFIGPATH) +async def fields( + crucible: Annotated[CrucibleService, Depends(crucible_svc)], index: str +): return crucible.get_fields(index=index) diff --git a/backend/app/services/crucible_svc.py b/backend/app/services/crucible_svc.py index 1edb76f7..8abe157a 100644 --- a/backend/app/services/crucible_svc.py +++ b/backend/app/services/crucible_svc.py @@ -608,6 +608,12 @@ def _search( ) return value + def close(self): + """Close the OpenSearch connection""" + if self.elastic: + self.elastic.close() + self.elastic = None + def search( self, index: str,