Skip to content

Commit

Permalink
Cleanup OpenSearch connections
Browse files Browse the repository at this point in the history
Add a Crucible `close` method, and use a FastAPI yield dependency to ensure
every API connection is closed cleanly.
  • Loading branch information
dbutenhof committed Oct 4, 2024
1 parent ad48906 commit 5671d77
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 30 deletions.
80 changes: 50 additions & 30 deletions backend/app/api/v1/endpoints/ilab/ilab.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from datetime import datetime, timedelta, timezone
from typing import Annotated, Any, Optional

from fastapi import APIRouter, Query
from fastapi import APIRouter, Depends, HTTPException, Query, status

from app.services.crucible_svc import CrucibleService, Graph, GraphList

Expand All @@ -26,6 +26,22 @@ def example_error(message: str) -> dict[str, Any]:
return example_response({"message": message})


def crucible_svc():
crucible = None
try:
crucible = CrucibleService(CONFIGPATH)
yield crucible
except Exception as e:
print(f"Error opening {CONFIGPATH}: {str(e)!r}")
raise HTTPException(
status.HTTP_502_BAD_GATEWAY,
f"Crucible service is not available: {str(e)!r}",
)
finally:
if crucible:
crucible.close()


@router.get(
"/api/v1/ilab/runs/filters",
summary="Returns possible filters",
Expand Down Expand Up @@ -66,8 +82,7 @@ def example_error(message: str) -> dict[str, Any]:
)
},
)
async def run_filters():
crucible = CrucibleService(CONFIGPATH)
async def run_filters(crucible: Annotated[CrucibleService, Depends(crucible_svc)]):
return crucible.get_run_filters()


Expand Down Expand Up @@ -125,6 +140,7 @@ async def run_filters():
},
)
async def runs(
crucible: Annotated[CrucibleService, Depends(crucible_svc)],
start_date: Annotated[
Optional[str],
Query(description="Start time for search", examples=["2020-11-10"]),
Expand All @@ -151,7 +167,6 @@ async def runs(
Query(description="Page offset to start", examples=[10]),
] = 0,
):
crucible = CrucibleService(CONFIGPATH)
if start_date is None and end_date is None:
now = datetime.now(timezone.utc)
start = now - timedelta(days=30)
Expand All @@ -174,8 +189,7 @@ async def runs(
400: example_error("Parameter error"),
},
)
async def tags(run: str):
crucible = CrucibleService(CONFIGPATH)
async def tags(crucible: Annotated[CrucibleService, Depends(crucible_svc)], run: str):
return crucible.get_tags(run)


Expand Down Expand Up @@ -209,8 +223,7 @@ async def tags(run: str):
400: example_error("Parameter error"),
},
)
async def params(run: str):
crucible = CrucibleService(CONFIGPATH)
async def params(crucible: Annotated[CrucibleService, Depends(crucible_svc)], run: str):
return crucible.get_params(run)


Expand Down Expand Up @@ -242,8 +255,9 @@ async def params(run: str):
400: example_error("Parameter error"),
},
)
async def iterations(run: str):
crucible = CrucibleService(CONFIGPATH)
async def iterations(
crucible: Annotated[CrucibleService, Depends(crucible_svc)], run: str
):
return crucible.get_iterations(run)


Expand Down Expand Up @@ -275,8 +289,9 @@ async def iterations(run: str):
400: example_error("Parameter error"),
},
)
async def run_samples(run: str):
crucible = CrucibleService(CONFIGPATH)
async def run_samples(
crucible: Annotated[CrucibleService, Depends(crucible_svc)], run: str
):
return crucible.get_samples(run)


Expand Down Expand Up @@ -312,8 +327,9 @@ async def run_samples(run: str):
400: example_error("Parameter error"),
},
)
async def run_periods(run: str):
crucible = CrucibleService(CONFIGPATH)
async def run_periods(
crucible: Annotated[CrucibleService, Depends(crucible_svc)], run: str
):
return crucible.get_periods(run)


Expand Down Expand Up @@ -345,8 +361,9 @@ async def run_periods(run: str):
400: example_error("Parameter error"),
},
)
async def iteration_samples(iteration: str):
crucible = CrucibleService(CONFIGPATH)
async def iteration_samples(
crucible: Annotated[CrucibleService, Depends(crucible_svc)], iteration: str
):
return crucible.get_samples(iteration=iteration)


Expand Down Expand Up @@ -393,8 +410,9 @@ async def iteration_samples(iteration: str):
400: example_error("Parameter error"),
},
)
async def timeline(run: str):
crucible = CrucibleService(CONFIGPATH)
async def timeline(
crucible: Annotated[CrucibleService, Depends(crucible_svc)], run: str
):
return crucible.get_timeline(run)


Expand Down Expand Up @@ -432,8 +450,9 @@ async def timeline(run: str):
400: example_error("Parameter error"),
},
)
async def metrics(run: str):
crucible = CrucibleService(CONFIGPATH)
async def metrics(
crucible: Annotated[CrucibleService, Depends(crucible_svc)], run: str
):
return crucible.get_metrics_list(run)


Expand All @@ -455,6 +474,7 @@ async def metrics(run: str):
},
)
async def metric_breakouts(
crucible: Annotated[CrucibleService, Depends(crucible_svc)],
run: str,
metric: str,
name: Annotated[
Expand All @@ -472,7 +492,6 @@ async def metric_breakouts(
),
] = None,
):
crucible = CrucibleService(CONFIGPATH)
return crucible.get_metric_breakouts(run, metric, names=name, periods=period)


Expand Down Expand Up @@ -512,6 +531,7 @@ async def metric_breakouts(
},
)
async def metric_data(
crucible: Annotated[CrucibleService, Depends(crucible_svc)],
run: str,
metric: str,
name: Annotated[
Expand All @@ -532,7 +552,6 @@ async def metric_data(
bool, Query(description="Allow aggregation of metrics")
] = False,
):
crucible = CrucibleService(CONFIGPATH)
return crucible.get_metrics_data(
run, metric, names=name, periods=period, aggregate=aggregate
)
Expand Down Expand Up @@ -567,6 +586,7 @@ async def metric_data(
},
)
async def metric_summary(
crucible: Annotated[CrucibleService, Depends(crucible_svc)],
run: str,
metric: str,
name: Annotated[
Expand All @@ -584,7 +604,6 @@ async def metric_summary(
),
] = None,
):
crucible = CrucibleService(CONFIGPATH)
return crucible.get_metrics_summary(run, metric, names=name, periods=period)


Expand Down Expand Up @@ -654,8 +673,9 @@ async def metric_summary(
),
},
)
async def metric_graph_body(graphs: GraphList):
crucible = CrucibleService(CONFIGPATH)
async def metric_graph_body(
crucible: Annotated[CrucibleService, Depends(crucible_svc)], graphs: GraphList
):
return crucible.get_metrics_graph(graphs)


Expand Down Expand Up @@ -712,6 +732,7 @@ async def metric_graph_body(graphs: GraphList):
},
)
async def metric_graph_param(
crucible: Annotated[CrucibleService, Depends(crucible_svc)],
run: str,
metric: str,
aggregate: Annotated[
Expand All @@ -733,7 +754,6 @@ async def metric_graph_param(
] = None,
title: Annotated[Optional[str], Query(description="Title for graph")] = None,
):
crucible = CrucibleService(CONFIGPATH)
return crucible.get_metrics_graph(
GraphList(
run=run,
Expand Down Expand Up @@ -776,8 +796,7 @@ async def metric_graph_param(
),
},
)
async def info():
crucible = CrucibleService(CONFIGPATH)
async def info(crucible: Annotated[CrucibleService, Depends(crucible_svc)]):
return crucible.info


Expand Down Expand Up @@ -807,6 +826,7 @@ async def info():
400: example_error("Index name 'foo' doesn't exist"),
},
)
async def fields(index: str):
crucible = CrucibleService(CONFIGPATH)
async def fields(
crucible: Annotated[CrucibleService, Depends(crucible_svc)], index: str
):
return crucible.get_fields(index=index)
6 changes: 6 additions & 0 deletions backend/app/services/crucible_svc.py
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,12 @@ def _search(
)
return value

def close(self):
"""Close the OpenSearch connection"""
if self.elastic:
self.elastic.close()
self.elastic = None

def search(
self,
index: str,
Expand Down

0 comments on commit 5671d77

Please sign in to comment.