From c363278bb952ed1075b03202d6a629efbb65d1f5 Mon Sep 17 00:00:00 2001 From: Matthias Veit Date: Mon, 23 Sep 2024 09:38:57 +0200 Subject: [PATCH] [feat] Endpoint to request timeseries --- fixbackend/inventory/inventory_router.py | 15 +++++++++++++++ fixbackend/inventory/inventory_schemas.py | 14 ++++++++++++-- fixbackend/inventory/inventory_service.py | 9 +++++---- 3 files changed, 32 insertions(+), 6 deletions(-) diff --git a/fixbackend/inventory/inventory_router.py b/fixbackend/inventory/inventory_router.py index 1347a989..e6fd687b 100644 --- a/fixbackend/inventory/inventory_router.py +++ b/fixbackend/inventory/inventory_router.py @@ -35,6 +35,8 @@ InventorySummaryRead, HistoryTimelineRequest, AggregateRequest, + TimeseriesRequest, + Scatters, ) from fixbackend.streaming_response import streaming_response, StreamOnSuccessResponse from fixbackend.workspaces.dependencies import UserWorkspaceDependency @@ -395,4 +397,17 @@ async def workspace_info( buckets_size_bytes_progress=info.buckets_size_bytes_progress, ) + @router.post("/timeseries", tags=["timeseries"]) + async def timeseries(graph_db: CurrentGraphDbDependency, ts: TimeseriesRequest) -> Scatters: + return await inventory().timeseries_scattered( + graph_db, + name=ts.name, + start=ts.start, + end=ts.end, + group=ts.group, + filter_group=ts.filter_group, + granularity=ts.granularity, + aggregation=ts.aggregation, + ) + return router diff --git a/fixbackend/inventory/inventory_schemas.py b/fixbackend/inventory/inventory_schemas.py index cbd764f4..8cfaf6ea 100644 --- a/fixbackend/inventory/inventory_schemas.py +++ b/fixbackend/inventory/inventory_schemas.py @@ -13,7 +13,7 @@ # along with this program. If not, see . from datetime import timedelta, datetime from enum import StrEnum -from typing import List, Dict, Optional, Literal, Tuple, Union, Any +from typing import List, Dict, Optional, Literal, Tuple, Union, Any, Set from urllib.parse import urlencode from fixcloudutils.types import Json @@ -74,7 +74,7 @@ class VulnerabilitiesChanged(BaseModel): class Scatter(BaseModel): - group_name: str + group_name: str = Field(exclude=True) group: Dict[str, Optional[str]] values: Dict[datetime, float] attributes: Dict[str, Any] = Field(default_factory=dict) @@ -249,3 +249,13 @@ class InventorySummaryRead(BaseModel): databases_bytes_progress: Tuple[int, int] buckets_objects_progress: Tuple[int, int] buckets_size_bytes_progress: Tuple[int, int] + + +class TimeseriesRequest(BaseModel): + name: str + start: datetime + end: datetime + granularity: timedelta + group: Optional[Set[str]] = None + filter_group: Optional[List[str]] = None + aggregation: Optional[str] = None diff --git a/fixbackend/inventory/inventory_service.py b/fixbackend/inventory/inventory_service.py index 820ca462..b8938075 100644 --- a/fixbackend/inventory/inventory_service.py +++ b/fixbackend/inventory/inventory_service.py @@ -704,12 +704,13 @@ async def timeseries_scattered( aggregation=aggregation, ) as cursor: async for entry in cursor: - if (atstr := entry.get("at")) and (group := entry.get("group")) and (v := entry.get("v")): - at = parse_utc_str(str(atstr)) - group_name = "::".join(f"{k}={v}" for k, v in sorted(group.items())) + if (at_str := entry.get("at")) and (v := entry.get("v")): + at = parse_utc_str(str(at_str)) + groups: Json = entry.get("group") or {} + group_name = "::".join(f"{k}={v}" for k, v in sorted(groups.items())) if groups else "all" ats.add(at) points = {at: v} - scatter = Scatter(group_name=group_name, group=group, values=points) + scatter = Scatter(group_name=group_name, group=groups, values=points) if existing := scatters.get(scatter.group_name): existing.values.update(scatter.values) else: