From a1394d6632e2e0336bee8005045e10a9c6e57706 Mon Sep 17 00:00:00 2001 From: David Butenhof Date: Wed, 16 Oct 2024 18:06:44 -0400 Subject: [PATCH] Add new Crucible backend service This encapsulates substantial logic to encapsulate interpretation of the Crucible Common Data Model OpenSearch schema for the use of CPT dashboard API components. By itself, it does nothing. --- backend/app/services/crucible_readme.md | 164 ++ backend/app/services/crucible_svc.py | 1952 +++++++++++++++++++++++ 2 files changed, 2116 insertions(+) create mode 100644 backend/app/services/crucible_readme.md create mode 100644 backend/app/services/crucible_svc.py diff --git a/backend/app/services/crucible_readme.md b/backend/app/services/crucible_readme.md new file mode 100644 index 00000000..0ac2e71f --- /dev/null +++ b/backend/app/services/crucible_readme.md @@ -0,0 +1,164 @@ +Crucible divides data across a set of OpenSearch (or ElasticSearch) indices, +each with a specific document mapping. CDM index names include a "root" name +(like "run") with a versioned prefix, like "cdmv7dev-run". + +Crucible timestamps are integers in "millisecond-from-the-epoch" format. + +The Crucible CDM hierarchy is roughly: + +- RUN (an instrumented benchmark run) + - TAG (metadata) + - ITERATION (a benchmark interval) + - PARAM (execution parameters) + - SAMPLE + - PERIOD (time range where data is recorded) + - METRIC_DESC (description of a specific recorded metric) + - METRIC_DATA (a specific recorded data point) + +OpenSearch doesn't support the concept of a SQL "join", but many of the indices +contain documents that could be considered a static "join" with parent documents +for convenience. For example, each `iteration` document contains a copy of it's +parent `run` document, while the `period` document contains copies of its parent +`sample`, `iteration`, and `run` documents. This means, for example, that it's +possible to make a single query returning all `period` documents for specific +iteration number of a specific run. + +
+
RUN
this contains the basic information about a performance run, including a + generated UUID, begin and end timestamps, a benchmark name, a user name and + email, the (host/directory) "source" of the indexed data (which is usable on + the controler's local file system), plus host and test harness names.
+
TAG
this contains information about a general purpose "tag" to associate some + arbitrary context with a run, for example software versions, hardware, or + other metadata. This can be considered a SQL JOIN with the run document, + adding a tag UUID, name, and value.
+
ITERATION
this contains basic information about a performance run iteration, + including the iteration UUID, number, the primary (benchmark) metric associated + with the iteration, plus the primary "period" of the iteration, and the + iteration status.
+
PARAM
this defines a key/value pair specifying behavior of the benchmark + script for an iteration. Parameters are iteration-specific, but parameters that + don't vary between iterations are often represented as run parameters.
+
SAMPLE
this contains basic information about a sample of an iteration, + including a sample UUID and sample number, along with a "path" for sample data + and a sample status.
+
PERIOD
this contains basic information about a period during which data is + collected within a sample, including the period UUID, name, and begin and end + timestamps. A set of periods can be "linked" through a "prev_id" field.
+
METRIC_DESC
this contains descriptive data about a specific series + of metric values within a specific period of a run, including the metric UUID, + the metric "class", type, and source, along with a set of "names" (key/value + pairs) defining the metric breakout details that narrow down a specific source and + type. For example source:mpstat, type:Busy-CPU data is broken down by package, cpu, + core, and other breakouts which can be isolated or aggregated for data reporting.
+
METRIC_DATA
this describes a specific data point, sampled over a specified + duration with a fixed begin and end timestamp, plus a floating point value. + Each is tied to a specific metric_desc UUID value. Depending on the varied + semantics of metric_desc breakouts, it's often valid to aggregate these + across a set of relatead metric_desc IDs, based on source and type, for + example to get aggregate CPU load across all modes, cores, or across all + modes within a core. This service allows arbitrary aggregation within a + given metric source and type, but by default will attempt to direct the + caller to specifying a set of breakouts that result in a single metric_desc + ID.
+
+ +The `crucible_svc` allows CPT project APIs to access a Crucible CDM backing +store to find information about runs, tags, params, iterations, samples, +periods, plus various ways to expose and aggregate metric data both for +primary benchmarks and non-periodic tools. + +The `get_runs` API is the primary entry point, returning an object that +supports filtering, sorting, and pagination of the Crucible run data decorated +with useful iteration, tag, and parameter data. + +The metrics data APIs (data, breakouts, summary, and graph) now allow +filtering by the metric "name" data. This allows "drilling down" through +the non-periodic "tool data". For example, IO data is per-disk, CPU +information is broken down by core and package. You can now aggregate +all global data (e.g., total system CPU), or filter by breakout names to +select by CPU, mode (usr, sys, irq), etc. + +For example, to return `Busy-CPU` ("type") graph data from the `mpstat` +("source") tool for system mode on one core, you might query: + +``` +/api/v1/ilab/runs//graph/mpstat::Busy-CPU?name=core=12,package=1,num=77,type=sys +``` + +If you make a `graph`, `data`, or `summary` query that doesn't translate +to a unique metric, and don't select aggregation, you'll get a diagnostic +message identifying possible additional filters. For example, with +`type=sys` removed, that same query will show the available values for +the `type` breakout name: + +``` +{ + "detail": [ + { + "message": "More than one metric (5) probably means you should add filters", + "names": { + "type": [ + "guest", + "irq", + "soft", + "sys", + "usr" + ] + }, + "periods": [] + } + ] +} +``` + +This capability can be used to build an interactive exploratory UI to +allow displaying breakout details. The `get_metrics` API will show all +recorded metrics, along with information the names and values used in +those. Metrics that show "names" with more than one value will need to be +filtered to produce meaningful summaries or graphs. + +You can instead aggregate metrics across breakouts using the `?aggregate` +query parameter, like `GET /api/v1/ilab/runs//graph/mpstat::Busy-CPU?aggregate` +which will aggregate all CPU busy data for the system. + +Normally you'll want to display data based on sample periods, for example the +primary period of an iteration, using `?period=`. This will +implicitly constrain the metric data based on the period ID associated with +the `metric_desc` document *and* the begin/end time period of the selected +periods. Normally, a benchmark will will separate iterations because each is +run with a different parameter value, and the default graph labeling will +look for a set of distinct parameters not used by other iterations: for +example, `mpstat::Busy-CPU (batch-size=16)`. + +The `get_breakouts` API can be used to explore the namespace recorded for that +metric in the specified run. For example, + +``` +GET /api/v1/ilab/runs//breakouts/sar-net::packets-sec?name=direction=rx +{ + "label": "sar-net::packets-sec", + "source": "sar-net", + "type": "packets-sec", + "class": [], + "names": { + "dev": [ + "lo", + "eno12409", + "eno12399" + ], + "type": [ + "physical", + "virtual" + ] + } +} +``` + +The `get_filters` API reports all the tag and param filter tags and +values for the runs. These can be used for the `filters` query parameter +on `get_runs` to restrict the set of runs reported; for example, +`/api/v1/ilab/runs?filter=param:workflow=sdg` shows only runs with the param +arg `workflow` set to the value `sdg`. You can search for a subset of the +string value using the operator "~" instead of "=". For example, +`?filter=param:user~user` will match `user` values of "A user" or "The user". diff --git a/backend/app/services/crucible_svc.py b/backend/app/services/crucible_svc.py new file mode 100644 index 00000000..51f81345 --- /dev/null +++ b/backend/app/services/crucible_svc.py @@ -0,0 +1,1952 @@ +"""Service to pull data from a Crucible CDM OpenSearch data store + +A set of helper methods to enable a project API to easily process data from a +Crucible controller's OpenSearch data backend. + +This includes paginated, filtered, and sorted lists of benchmark runs, along +access to the associated Crucible documents such as iterations, samples, and +periods. Metric data can be accessed by breakout names, or aggregated by +breakout subsets or collection periods as either raw data points, statistical +aggregate, or Plotly graph format for UI display. +""" + +import time +from collections import defaultdict +from dataclasses import dataclass +from datetime import datetime, timezone +from typing import Any, Iterator, Optional, Tuple, Union + +from elasticsearch import AsyncElasticsearch, NotFoundError +from fastapi import HTTPException, status +from pydantic import BaseModel + +from app import config + + +class Graph(BaseModel): + """Describe a single graph + + This represents a JSON object provided by a caller through the get_graph + API to describe a specific metric graph. + + The default title (if the field is omitted) is the metric label with a + suffix denoting breakout values selected, any unique parameter values + in a selected iteration, and (if multiple runs are selected in any Graph + list) an indication of the run index. For example, + "mpstat::Busy-CPU [core=2,type=usr] (batch-size=16)". + + Fields: + metric: the metric label, "ilab::train-samples-sec" + aggregate: True to aggregate unspecified breakouts + color: CSS color string ("green" or "#008000") + names: Lock in breakouts + periods: Select metrics for specific test period(s) + run: Override the default run ID from GraphList + title: Provide a title for the graph. The default is a generated title + """ + + metric: str + aggregate: bool = False + color: Optional[str] = None + names: Optional[list[str]] = None + periods: Optional[list[str]] = None + run: Optional[str] = None + title: Optional[str] = None + + +class GraphList(BaseModel): + """Describe a set of overlaid graphs + + This represents a JSON object provided by a caller through the get_graph + API to introduce a set of constrained metrics to be graphed. The "run + ID" here provides a default for the embedded Graph objects, and can be + omitted if all Graph objects specify a run ID. (This is most useful to + select a set of graphs all for a single run ID.) + + Fields: + run: Specify the (default) run ID + name: Specify a name for the set of graphs + graphs: a list of Graph objects + """ + + run: Optional[str] = None + name: str + graphs: list[Graph] + + +@dataclass +class Point: + """Graph point + + Record the start & end timestamp and value of a metric data point + """ + + begin: int + end: int + value: float + + +colors = [ + "black", + "aqua", + "blue", + "fuschia", + "gray", + "green", + "maroon", + "navy", + "olive", + "teal", + "silver", + "lightskyblue", + "mediumspringgreen", + "mistyrose", + "darkgoldenrod", + "cadetblue", + "chocolate", + "coral", + "brown", + "bisque", + "deeppink", + "sienna", +] + + +@dataclass +class Term: + namespace: str + key: str + value: str + + +class Parser: + """Help parsing filter expressions.""" + + def __init__(self, term: str): + """Construct an instance to help parse query parameter expressions + + These consist of a sequence of tokens separated by delimiters. Each + token may be quoted to allow matching against strings with spaces. + + For example, `param:name="A string"` + + Args: + term: A filter expression to parse + """ + self.buffer = term + self.context = term + self.offset = 0 + + def _next_token( + self, delimiters: list[str] = [], optional: bool = False + ) -> Tuple[str, Union[str, None]]: + """Extract the next token from an expression + + Tokens may be quoted; the quotes are removed. for example, the two + expressions `'param':"workflow"='"sdg"'` and `param:workflow:sdg` are + identical. + + Args: + delimiters: a list of delimiter characters + optional: whether the terminating delimiter is optional + + Returns: + A tuple consisting of the token and the delimiter (or None if + parsing reached the end of the expression and the delimiter was + optional) + """ + + @dataclass + class Quote: + open: int + quote: str + + quoted: list[Quote] = [] + next_char = None + token = "" + first_quote = None + for o in range(len(self.buffer)): + next_char = self.buffer[o] + if next_char in delimiters and not quoted: + self.buffer = self.buffer[o + 1 :] + self.offset += o + 1 + break + elif next_char in ('"', "'"): + if o == 0: + first_quote = next_char + if quoted and quoted[-1].quote == next_char: + quoted.pop() + else: + quoted.append(Quote(o, next_char)) + token += next_char + else: + next_char = None + if quoted: + q = quoted[-1] + c = self.context + i = q.open + self.offset + annotated = c[:i] + "[" + c[i] + "]" + c[i + 1 :] + raise HTTPException( + status.HTTP_400_BAD_REQUEST, f"Unterminated quote at {annotated!r}" + ) + + # If delimiters are specified, and not optional, then we didn't + # find one, and that's an error. + if not optional and delimiters: + raise HTTPException( + status.HTTP_400_BAD_REQUEST, + f"Missing delimiter from {','.join(delimiters)} after {token!r}", + ) + self.buffer = "" + self.offset = len(self.context) + return (token, next_char) if not first_quote else (token[1:-1], next_char) + + +class CommonParams: + """Help with sorting out parameters + + Parameter values are associated with iterations, but often a set of + parameters is common across all iterations of a run, and that set can + provide useful context. + + This helps to filter out identical parameters across a set of + iterations. + """ + + def __init__(self): + self.common: dict[str, Any] = {} + self.omit = set() + + def add(self, params: dict[str, Any]): + """Add a new iteration into the param set + + Mark all parameter keys which don't appear in all iterations, or which + have different values in at least one iteration, to be omitted from the + merged "common" param set. + + Args: + params: the param dictionary of an iteration + """ + if not self.common: + self.common.update(params) + else: + for k, v in self.common.items(): + if k not in self.omit and (k not in params or v != params[k]): + self.omit.add(k) + + def render(self) -> dict[str, Any]: + """Return a new param set with only common params""" + return {k: v for k, v in self.common.items() if k not in self.omit} + + +class CrucibleService: + """Support convenient generalized access to Crucible data + + This implements access to the "v7" Crucible "Common Data Model" through + OpenSearch queries. + """ + + # OpenSearch massive limit on hits in a single query + BIGQUERY = 262144 + + # Define the 'run' document fields that support general filtering via + # `?filter=:` + # + # TODO: this excludes 'desc', which isn't used by the ilab runs, and needs + # different treatment as it's a text field rather than a term. It's not an + # immediate priority for ilab, but may be important for general use. + RUN_FILTERS = ("benchmark", "email", "name", "source", "harness", "host") + + # Define the keywords for sorting. + DIRECTIONS = ("asc", "desc") + FIELDS = ( + "begin", + "benchmark", + "desc", + "email", + "end", + "harness", + "host", + "id", + "name", + "source", + ) + + def __init__(self, configpath: str = "crucible"): + """Initialize a Crucible CDM (OpenSearch) connection. + + Generally the `configpath` should be scoped, like `ilab.crucible` so + that multiple APIs based on access to distinct Crucible controllers can + coexist. + + Initialization includes making an "info" call to confirm and record the + server response. + + Args: + configpath: The Vyper config path (e.g., "ilab.crucible") + """ + self.cfg = config.get_config() + self.user = self.cfg.get(configpath + ".username") + self.password = self.cfg.get(configpath + ".password") + self.auth = (self.user, self.password) if self.user or self.password else None + self.url = self.cfg.get(configpath + ".url") + self.elastic = AsyncElasticsearch(self.url, basic_auth=self.auth) + + @staticmethod + def _get_index(root: str) -> str: + return "cdmv7dev-" + root + + @staticmethod + def _split_list(alist: Optional[list[str]] = None) -> list[str]: + """Split a list of parameters + + For simplicity, the APIs supporting "list" query parameters allow + each element in the list to be a comma-separated list of strings. + For example, ["a", "b", "c"] is logically the same as ["a,b,c"]. + + This method normalizes the second form into first to simplify life for + consumers. + + Args: + alist: list of names or name lists + + Returns: + A simple list of options + """ + l: list[str] = [] + if alist: + for n in alist: + l.extend(n.split(",")) + return l + + @staticmethod + def _normalize_date(value: Optional[Union[int, str, datetime]]) -> int: + """Normalize date parameters + + The Crucible data model stores dates as string representations of an + integer "millseconds-from-epoch" value. To allow flexibility, this + Crucible service allows incoming dates to be specified as ISO-format + strings, as integers, or as the stringified integer. + + That is, "2024-09-12 18:29:35.123000+00:00", "1726165775123", and + 1726165775123 are identical. + + Args: + value: Representation of a date-time value + + Returns: + The integer milliseconds-from-epoch equivalent + """ + try: + if isinstance(value, int): + return value + elif isinstance(value, datetime): + return int(value.timestamp() * 1000.0) + elif isinstance(value, str): + try: + return int(value) + except ValueError: + pass + try: + d = datetime.fromisoformat(value) + return int(d.timestamp() * 1000.0) + except ValueError: + pass + except Exception as e: + print(f"normalizing {type(value).__name__} {value} failed with {str(e)}") + raise HTTPException( + status.HTTP_400_BAD_REQUEST, + f"Date representation {value} is not a date string or timestamp", + ) + + @staticmethod + def _hits( + payload: dict[str, Any], fields: Optional[list[str]] = None + ) -> Iterator[dict[str, Any]]: + """Helper to iterate through OpenSearch query matches + + Iteratively yields the "_source" of each hit. As a convenience, can + yield a sub-object of "_source" ... for example, specifying the + optional "fields" as ["metric_desc", "id"] will yield the equivalent of + hit["_source"]["metric_desc"]["id"] + + Args: + payload: OpenSearch reponse payload + fields: Optional sub-fields of "_source" + + Returns: + Yields each object from the "greatest hits" list + """ + if "hits" not in payload: + raise HTTPException( + status_code=500, detail=f"Attempt to iterate hits for {payload}" + ) + hits = payload.get("hits", {}).get("hits", []) + for h in hits: + source = h["_source"] + if fields: + for f in fields: + source = source[f] + yield source + + @staticmethod + def _aggs(payload: dict[str, Any], aggregation: str) -> Iterator[dict[str, Any]]: + """Helper to access OpenSearch aggregations + + Iteratively yields the name and value of each aggregation returned + by an OpenSearch query. This can also be used for nested aggregations + by specifying an aggregation object. + + Args: + payload: A JSON dict containing an "aggregations" field + + Returns: + Yields each aggregation from an aggregation bucket list + """ + if "aggregations" not in payload: + raise HTTPException( + status_code=500, + detail=f"Attempt to iterate missing aggregations for {payload}", + ) + aggs = payload["aggregations"] + if aggregation not in aggs: + raise HTTPException( + status_code=500, + detail=f"Attempt to iterate missing aggregation {aggregation} for {payload}", + ) + for agg in aggs[aggregation]["buckets"]: + yield agg + + @staticmethod + def _format_timestamp(timestamp: Union[str, int]) -> str: + """Convert stringified integer milliseconds-from-epoch to ISO date""" + try: + ts = int(timestamp) + except Exception as e: + print(f"ERROR: invalid {timestamp!r}: {str(e)!r}") + ts = 0 + return str(datetime.fromtimestamp(ts / 1000.00, timezone.utc)) + + @classmethod + def _format_data(cls, data: dict[str, Any]) -> dict[str, Any]: + """Helper to format a "metric_data" object + + Crucible stores the date, duration, and value as strings, so this + converts them to more useful values. The end timestamp is converted + to an ISO date-time string; the duration and value to floating point + numbers. + + Args: + data: a "metric_data" object + + Returns: + A neatly formatted "metric_data" object + """ + return { + "begin": cls._format_timestamp(data["begin"]), + "end": cls._format_timestamp(data["end"]), + "duration": int(data["duration"]) / 1000, + "value": float(data["value"]), + } + + @classmethod + def _format_period(cls, period: dict[str, Any]) -> dict[str, Any]: + """Helper to format a "period" object + + Crucible stores the date values as stringified integers, so this + converts the begin and end timestamps to ISO date-time strings. + + Args: + period: a "period" object + + Returns: + A neatly formatted "period" object + """ + return { + "begin": cls._format_timestamp(timestamp=period["begin"]), + "end": cls._format_timestamp(period["end"]), + "id": period["id"], + "name": period["name"], + } + + @classmethod + def _build_filter_options(cls, filter: Optional[list[str]] = None) -> Tuple[ + Optional[list[dict[str, Any]]], + Optional[list[dict[str, Any]]], + Optional[list[dict[str, Any]]], + ]: + """Build filter terms for tag and parameter filter terms + + Each term has the form ":". Any term + may be quoted: quotes are stripped and ignored. (This is generally only + useful on the to include spaces.) + + We support three namespaces: + param: Match against param index arg/val + tag: Match against tag index name/val + run: Match against run index fields + + We support two operators: + =: Exact match + ~: Partial match + + Args: + filter: list of filter terms like "param:key=value" + + Returns: + A set of OpenSearch filter object lists to detect missing + and matching documents for params, tags, and run fields. For + example, to select param:batch-size=12 results in the + following param filter list: + + [ + {' + dis_max': { + 'queries': [ + { + 'bool': { + 'must': [ + {'term': {'param.arg': 'batch-size'}}, + {'term': {'param.val': '12'}} + ] + } + } + ] + } + } + ] + """ + terms = defaultdict(list) + for term in cls._split_list(filter): + p = Parser(term) + namespace, _ = p._next_token([":"]) + key, operation = p._next_token(["=", "~"]) + value, _ = p._next_token() + if operation == "~": + value = f".*{value}.*" + matcher = "regexp" + else: + matcher = "term" + if namespace in ("param", "tag"): + if namespace == "param": + key_field = "param.arg" + value_field = "param.val" + else: + key_field = "tag.name" + value_field = "tag.val" + terms[namespace].append( + { + "bool": { + "must": [ + {"term": {key_field: key}}, + {matcher: {value_field: value}}, + ] + } + } + ) + elif namespace == "run": + terms[namespace].append({matcher: {f"run.{key}": value}}) + else: + raise HTTPException( + status.HTTP_400_BAD_REQUEST, + f"unknown filter namespace {namespace!r}", + ) + param_filter = None + tag_filter = None + if "param" in terms: + param_filter = [{"dis_max": {"queries": terms["param"]}}] + if "tag" in terms: + tag_filter = [{"dis_max": {"queries": terms["tag"]}}] + return param_filter, tag_filter, terms.get("run") + + @classmethod + def _build_name_filters( + cls, namelist: Optional[list[str]] = None + ) -> list[dict[str, Any]]: + """Build filter terms for metric breakout names + + for example, "cpu=10" filters for metric data descriptors where the + breakout name "cpu" exists and has a value of 10. + + Args: + namelist: list of possibly comma-separated list values + + Returns: + A list of filters to match breakout terms + """ + names: list[str] = cls._split_list(namelist) + filters = [] + for e in names: + try: + n, v = e.split("=", maxsplit=1) + except ValueError: + raise HTTPException( + status.HTTP_400_BAD_REQUEST, f"Filter item {e} must be '='" + ) + filters.append({"term": {f"metric_desc.names.{n}": v}}) + return filters + + @classmethod + def _build_period_filters( + cls, periodlist: Optional[list[str]] = None + ) -> list[dict[str, Any]]: + """Build period filters + + Generate metric_desc filter terms to match against a list of period IDs. + + Note that not all metric descriptions are periodic, and we don't want + these filters to exclude them -- so the filter will exclude only + documents that have a period and don't match. (That is, we won't drop + any non-periodic metrics. We expect those to be filtered by timestamp + instead.) + + Args: + period: list of possibly comma-separated period IDs + + Returns: + A filter term that requires a period.id match only for metric_desc + documents with a period. + """ + pl: list[str] = cls._split_list(periodlist) + if pl: + return [ + { + "dis_max": { + "queries": [ + {"bool": {"must_not": {"exists": {"field": "period"}}}}, + {"terms": {"period.id": pl}}, + ] + } + } + ] + else: + return [] + + @classmethod + def _build_metric_filters( + cls, + run: str, + metric: str, + names: Optional[list[str]] = None, + periods: Optional[list[str]] = None, + ) -> list[dict[str, Any]]: + """Helper for filtering metric descriptions + + We normally filter by run, metric "label", and optionally by breakout + names and periods. This encapsulates the filter construction. + + Args: + run: run ID + metric: metric label (ilab::sdg-samples-sec) + names: list of "name=value" filters + periods: list of period IDs + + Returns: + A list of OpenSearch filter expressions + """ + msource, mtype = metric.split("::") + return ( + [ + {"term": {"run.id": run}}, + {"term": {"metric_desc.source": msource}}, + {"term": {"metric_desc.type": mtype}}, + ] + + cls._build_name_filters(names) + + cls._build_period_filters(periods) + ) + + @classmethod + def _build_sort_terms(cls, sorters: Optional[list[str]]) -> list[dict[str, str]]: + """Build sort term list + + Sorters may reference any native `run` index field and must specify + either "asc"(ending) or "desc"(ending) sort order. Any number of + sorters may be combined, like ["name:asc,benchmark:desc", "end:desc"] + + Args: + sorters: list of : sort terms + + Returns: + list of OpenSearch sort terms + """ + if sorters: + sort_terms = [] + for s in sorters: + key, dir = s.split(":", maxsplit=1) + if dir not in cls.DIRECTIONS: + raise HTTPException( + status.HTTP_400_BAD_REQUEST, + f"Sort direction {dir!r} must be one of {','.join(DIRECTIONS)}", + ) + if key not in cls.FIELDS: + raise HTTPException( + status.HTTP_400_BAD_REQUEST, + f"Sort key {key!r} must be one of {','.join(FIELDS)}", + ) + sort_terms.append({f"run.{key}": dir}) + else: + sort_terms = [{"run.begin": "asc"}] + return sort_terms + + async def _search( + self, index: str, query: Optional[dict[str, Any]] = None, **kwargs + ) -> dict[str, Any]: + """Issue an OpenSearch query + + Args: + index: The "base" CDM index name, e.g., "run", "metric_desc" + query: An OpenSearch query object + kwargs: Additional OpenSearch parameters + + Returns: + The OpenSearch response payload (JSON dict) + """ + idx = self._get_index(index) + start = time.time() + value = await self.elastic.search(index=idx, body=query, **kwargs) + print( + f"QUERY on {idx} took {time.time() - start} seconds, " + f"hits: {value.get('hits', {}).get('total')}" + ) + return value + + async def close(self): + """Close the OpenSearch connection""" + if self.elastic: + await self.elastic.close() + self.elastic = None + + async def search( + self, + index: str, + filters: Optional[list[dict[str, Any]]] = None, + aggregations: Optional[dict[str, Any]] = None, + sort: Optional[list[dict[str, str]]] = None, + source: Optional[str] = None, + size: Optional[int] = None, + offset: Optional[int] = None, + **kwargs, + ) -> dict[str, Any]: + """OpenSearch query helper + + Combine index, filters, aggregations, sort, and pagination options + into an OpenSearch query. + + Args: + index: "root" CDM index name ("run", "metric_desc", ...) + filters: list of JSON dict filter terms {"term": {"name": "value}} + aggregations: list of JSON dict aggregations {"name": {"term": "name"}} + sort: list of JSON dict sort terms ("name": "asc") + size: The number of hits to return; defaults to "very large" + offset: The number of hits to skip, for pagination + kwargs: Additional OpenSearch options + + Returns: + The OpenSearch response + """ + f = filters if filters else [] + query = { + "size": self.BIGQUERY if size is None else size, + "query": {"bool": {"filter": f}}, + } + if sort: + query.update({"sort": sort}) + if source: + query.update({"_source": source}) + if offset: + query.update({"from": offset}) + if aggregations: + query.update({"aggs": aggregations}) + return await self._search(index, query, **kwargs) + + async def _get_metric_ids( + self, + run: str, + metric: str, + namelist: Optional[list[str]] = None, + periodlist: Optional[list[str]] = None, + aggregate: bool = False, + ) -> list[str]: + """Generate a list of matching metric_desc IDs + + Given a specific run and metric name, and a set of breakout filters, + returns a list of metric desc IDs that match. + + If a single ID is required to produce a consistent metric, and the + supplied filters produce more than one without aggregation, raise a + 422 HTTP error (UNPROCESSABLE CONTENT) with a response body showing + the unsatisfied breakouts (name and available values). + + Args: + run: run ID + metric: combined metric name (e.g., sar-net::packets-sec) + namelist: a list of breakout filters like "type=physical" + periodlist: a list of period IDs + aggregate: if True, allow multiple metric IDs + + Returns: + A list of matching metric_desc ID value(s) + """ + filters = self._build_metric_filters(run, metric, namelist, periodlist) + metrics = await self.search( + "metric_desc", + filters=filters, + ignore_unavailable=True, + ) + if len(metrics["hits"]["hits"]) < 1: + raise HTTPException( + status.HTTP_400_BAD_REQUEST, + ( + f"No matches for {metric}" + f"{('+' + ','.join(namelist) if namelist else '')}" + ), + ) + ids = [h["metric_desc"]["id"] for h in self._hits(metrics)] + if len(ids) < 2 or aggregate: + return ids + + # If we get here, the client asked for breakout data that doesn't + # resolve to a single metric stream, and didn't specify aggregation. + # Offer some help. + names = defaultdict(set) + periods = set() + response = { + "message": f"More than one metric ({len(ids)}) means " + "you should add breakout filters or aggregate." + } + for m in self._hits(metrics): + if "period" in m: + periods.add(m["period"]["id"]) + for n, v in m["metric_desc"]["names"].items(): + names[n].add(v) + + # We want to help filter a consistent summary, so only show those + # breakout names with more than one value. + response["names"] = {n: sorted(v) for n, v in names.items() if v and len(v) > 1} + response["periods"] = list(periods) + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=response + ) + + async def _build_timestamp_range_filters( + self, periods: Optional[list[str]] = None + ) -> list[dict[str, Any]]: + """Create a timestamp range filter + + This extracts the begin and end timestamps from the list of periods and + builds a timestamp filter range to select documents on or after the + earliest begin timestamp and on or before the latest end timestamp. + + Args: + periods: a list of CDM period IDs + + Returns: + Constructs a range filter for the earliest begin timestamp and the + latest end timestamp among the specified periods. + """ + if periods: + ps = self._split_list(periods) + matches = await self.search( + "period", filters=[{"terms": {"period.id": ps}}] + ) + start = min([int(h) for h in self._hits(matches, ["period", "begin"])]) + end = max([int(h) for h in self._hits(matches, ["period", "end"])]) + return [ + {"range": {"metric_data.begin": {"gte": str(start)}}}, + {"range": {"metric_data.end": {"lte": str(end)}}}, + ] + else: + return [] + + async def _get_run_ids( + self, index: str, filters: Optional[list[dict[str, Any]]] = None + ) -> set[str]: + """Return a set of run IDs matching a filter + + Documents in the specified index must have "run.id" fields. Returns + a set of unique run IDs matched by the filter in the specified index. + + Args: + index: root CDM index name + filters: a list of OpenSearch filter terms + + Returns: + a set of unique run ID values + """ + filtered = await self.search( + index, source="run.id", filters=filters, ignore_unavailable=True + ) + print(f"HITS: {filtered['hits']['hits']}") + return set([x for x in self._hits(filtered, ["run", "id"])]) + + async def get_run_filters(self) -> dict[str, dict[str, list[str]]]: + """Return possible tag and filter terms + + Return a description of tag and param filter terms meaningful + across all datasets. TODO: we should support date-range and benchmark + filtering. Consider supporting all `run` API filtering, which would + allow adjusting the filter popups to drop options no longer relevant + to a given set. + + { + "param": { + {"gpus": [4", "8"]} + } + } + + Returns: + A two-level JSON dict; the first level is the namespace (param or + tag), the second level key is the param/tag/field name and its value + is the set of values defined for that key. + """ + tags = self.search( + "tag", + size=0, + aggregations={ + "key": { + "terms": {"field": "tag.name", "size": self.BIGQUERY}, + "aggs": { + "values": {"terms": {"field": "tag.val", "size": self.BIGQUERY}} + }, + } + }, + ignore_unavailable=True, + ) + params = await self.search( + "param", + size=0, + aggregations={ + "key": { + "terms": {"field": "param.arg", "size": self.BIGQUERY}, + "aggs": { + "values": { + "terms": {"field": "param.val", "size": self.BIGQUERY} + } + }, + } + }, + ignore_unavailable=True, + ) + aggs = { + k: {"terms": {"field": f"run.{k}", "size": self.BIGQUERY}} + for k in self.RUN_FILTERS + } + runs = await self.search( + "run", + size=0, + aggregations=aggs, + ) + result = defaultdict(lambda: defaultdict(lambda: set())) + for p in self._aggs(params, "key"): + for v in p["values"]["buckets"]: + result["param"][p["key"]].add(v["key"]) + for t in self._aggs(tags, "key"): + for v in t["values"]["buckets"]: + result["tag"][t["key"]].add(v["key"]) + for name in self.RUN_FILTERS: + for f in self._aggs(runs, name): + result["run"][name].add(f["key"]) + return {s: {k: list(v) for k, v in keys.items()} for s, keys in result.items()} + + async def get_runs( + self, + filter: Optional[list[str]] = None, + start: Optional[Union[int, str, datetime]] = None, + end: Optional[Union[int, str, datetime]] = None, + offset: int = 0, + sort: Optional[list[str]] = None, + size: Optional[int] = None, + **kwargs, + ) -> dict[str, Any]: + """Return matching Crucible runs + + Filtered and sorted list of runs. + + { + "sort": [], + "startDate": "2024-01-01T05:00:00+00:00", + "size": 1, + "offset": 0, + "results": [ + { + "begin": "1722878906342", + "benchmark": "ilab", + "email": "A@email", + "end": "1722880503544", + "id": "4e1d2c3c-b01c-4007-a92d-23a561af2c11", + "name": "\"A User\"", + "source": "node.example.com//var/lib/crucible/run/ilab--2024-08-05_17:17:13_UTC--4e1d2c3c-b01c-4007-a92d-23a561af2c11", + "tags": { + "topology": "none" + }, + "iterations": [ + { + "iteration": 1, + "primary_metric": "ilab::train-samples-sec", + "primary_period": "measurement", + "status": "pass", + "params": { + "cpu-offload-pin-memory": "1", + "model": "/home/models/granite-7b-lab/", + "data-path": "/home/data/training/knowledge_data.jsonl", + "cpu-offload-optimizer": "1", + "nnodes": "1", + "nproc-per-node": "4", + "num-runavg-samples": "2" + } + } + ], + "primary_metrics": [ + "ilab::train-samples-sec" + ], + "status": "pass", + "params": { + "cpu-offload-pin-memory": "1", + "model": "/home/models/granite-7b-lab/", + "data-path": "/home/data/training/knowledge_data.jsonl", + "cpu-offload-optimizer": "1", + "nnodes": "1", + "nproc-per-node": "4", + "num-runavg-samples": "2" + }, + "begin_date": "2024-08-05 17:28:26.342000+00:00", + "end_date": "2024-08-05 17:55:03.544000+00:00" + } + ], + "count": 1, + "total": 15, + "next_offset": 1 + } + + Args: + start: Include runs starting at timestamp + end: Include runs ending no later than timestamp + filter: List of tag/param filter terms (parm:key=value) + sort: List of sort terms (column:) + size: Include up to runs in output + offset: Use size/from pagination instead of search_after + + Returns: + JSON object with "results" list and "housekeeping" fields + """ + + # We need to remove runs which don't match against 'tag' or 'param' + # filter terms. The CDM schema doesn't make it possible to do this in + # one shot. Instead, we run queries against the param and tag indices + # separately, producing a list of run IDs which we'll exclude from the + # final collection. + # + # If there are no matches, we can exit early. (TODO: should this be an + # error, or just a success with an empty list?) + results = {} + filters = [] + sorters = self._split_list(sort) + results["sort"] = sorters + sort_terms = self._build_sort_terms(sorters) + param_filters, tag_filters, run_filters = self._build_filter_options(filter) + if run_filters: + filters.extend(run_filters) + if start or end: + s = None + e = None + if start: + s = self._normalize_date(start) + results["startDate"] = datetime.fromtimestamp( + s / 1000.0, tz=timezone.utc + ) + if end: + e = self._normalize_date(end) + results["endDate"] = datetime.fromtimestamp(e / 1000.0, tz=timezone.utc) + + if s and e and s > e: + raise HTTPException( + status_code=422, + detail={ + "error": "Invalid date format, start_date must be less than end_date" + }, + ) + cond = {} + if s: + cond["gte"] = str(s) + if e: + cond["lte"] = str(e) + filters.append({"range": {"run.begin": cond}}) + if size: + results["size"] = size + results["offset"] = offset if offset is not None else 0 + + # In order to filter by param or tag values, we need to produce a list + # of matching RUN IDs from each index. We'll then drop any RUN ID that's + # not on both lists. + if tag_filters: + tagids = await self._get_run_ids("tag", tag_filters) + if param_filters: + paramids = await self._get_run_ids("param", param_filters) + + # If it's obvious we can't produce any matches at this point, exit. + if (tag_filters and len(tagids) == 0) or (param_filters and len(paramids) == 0): + results.update({"results": [], "count": 0, "total": 0}) + return results + + hits = await self.search( + "run", + size=size, + offset=offset, + sort=sort_terms, + filters=filters, + **kwargs, + ignore_unavailable=True, + ) + rawiterations = await self.search("iteration", ignore_unavailable=True) + rawtags = await self.search("tag", ignore_unavailable=True) + rawparams = await self.search("param", ignore_unavailable=True) + + iterations = defaultdict(list) + tags = defaultdict(defaultdict) + params = defaultdict(defaultdict) + run_params = defaultdict(list) + + for i in self._hits(rawiterations): + iterations[i["run"]["id"]].append(i["iteration"]) + + # Organize tags by run ID + for t in self._hits(rawtags): + tags[t["run"]["id"]][t["tag"]["name"]] = t["tag"]["val"] + + # Organize params by iteration ID + for p in self._hits(rawparams): + run_params[p["run"]["id"]].append(p) + params[p["iteration"]["id"]][p["param"]["arg"]] = p["param"]["val"] + + runs = {} + for h in self._hits(hits): + run = h["run"] + rid = run["id"] + + # Filter the runs by our tag and param queries + if param_filters and rid not in paramids: + continue + + if tag_filters and rid not in tagids: + continue + + # Collect unique runs: the status is "fail" if any iteration for + # that run ID failed. + runs[rid] = run + run["tags"] = tags.get(rid, {}) + run["iterations"] = [] + run["primary_metrics"] = set() + common = CommonParams() + for i in iterations.get(rid, []): + iparams = params.get(i["id"], {}) + if "status" not in run: + run["status"] = i["status"] + else: + if i["status"] != "pass": + run["status"] = i["status"] + common.add(iparams) + run["primary_metrics"].add(i["primary-metric"]) + run["iterations"].append( + { + "iteration": i["num"], + "primary_metric": i["primary-metric"], + "primary_period": i["primary-period"], + "status": i["status"], + "params": iparams, + } + ) + run["params"] = common.render() + try: + run["begin_date"] = self._format_timestamp(run["begin"]) + run["end_date"] = self._format_timestamp(run["end"]) + except KeyError as e: + print(f"Missing 'run' key {str(e)} in {run}") + run["begin_date"] = self._format_timestamp("0") + run["end_date"] = self._format_timestamp("0") + + count = len(runs) + total = hits["hits"]["total"]["value"] + results.update( + { + "results": list(runs.values()), + "count": count, + "total": total, + } + ) + if size and (offset + count < total): + results["next_offset"] = offset + size + return results + + async def get_tags(self, run: str, **kwargs) -> dict[str, str]: + """Return the set of tags associated with a run + + Args: + run: run ID + + Returns: + JSON dict with "tag" keys showing each value + """ + tags = await self.search( + index="tag", + filters=[{"term": {"run.id": run}}], + **kwargs, + ignore_unavailable=True, + ) + return {t["name"]: t["val"] for t in self._hits(tags, ["tag"])} + + async def get_params( + self, run: Optional[str] = None, iteration: Optional[str] = None, **kwargs + ) -> dict[str, dict[str, str]]: + """Return the set of parameters for a run or iteration + + Parameters are technically associated with an iteration, but can be + aggregated for a run. This will return a set of parameters for each + iteration; plus, if a "run" was specified, a filtered list of param + values that are common across all iterations. + + Args: + run: run ID + iteration: iteration ID + kwargs: additional OpenSearch keywords + + Returns: + JSON dict of param values by iteration (plus "common" if by run ID) + """ + if not run and not iteration: + raise HTTPException( + status.HTTP_400_BAD_REQUEST, + "A params query requires either a run or iteration ID", + ) + match = {"run.id" if run else "iteration.id": run if run else iteration} + params = await self.search( + index="param", + filters=[{"term": match}], + **kwargs, + ignore_unavailable=True, + ) + response = defaultdict(defaultdict) + for param in self._hits(params): + iter = param["iteration"]["id"] + arg = param["param"]["arg"] + val = param["param"]["val"] + if response.get(iter) and response.get(iter).get(arg): + print(f"Duplicate param {arg} for iteration {iter}") + response[iter][arg] = val + + # Filter out all parameter values that don't exist in all or which have + # different values. + if run: + common = CommonParams() + for params in response.values(): + common.add(params) + response["common"] = common.render() + return response + + async def get_iterations(self, run: str, **kwargs) -> list[dict[str, Any]]: + """Return a list of iterations for a run + + Args: + run: run ID + kwargs: additional OpenSearch keywords + + Returns: + A list of iteration documents + """ + iterations = await self.search( + index="iteration", + filters=[{"term": {"run.id": run}}], + sort=[{"iteration.num": "asc"}], + **kwargs, + ignore_unavailable=True, + ) + return [i["iteration"] for i in self._hits(iterations)] + + async def get_samples( + self, run: Optional[str] = None, iteration: Optional[str] = None, **kwargs + ): + """Return a list of samples for a run or iteration + + Args: + run: run ID + iteration: iteration ID + kwargs: additional OpenSearch keywords + + Returns: + A list of sample documents. + """ + if not run and not iteration: + raise HTTPException( + status.HTTP_400_BAD_REQUEST, + "A sample query requires either a run or iteration ID", + ) + match = {"run.id" if run else "iteration.id": run if run else iteration} + hits = await self.search( + index="sample", + filters=[{"term": match}], + **kwargs, + ignore_unavailable=True, + ) + samples = [] + for s in self._hits(hits): + print(f"SAMPLE's ITERATION {s['iteration']}") + sample = s["sample"] + sample["iteration"] = s["iteration"]["num"] + sample["primary_metric"] = s["iteration"]["primary-metric"] + sample["status"] = s["iteration"]["status"] + samples.append(sample) + return samples + + async def get_periods( + self, + run: Optional[str] = None, + iteration: Optional[str] = None, + sample: Optional[str] = None, + **kwargs, + ): + """Return a list of periods associated with a run, an iteration, or a + sample + + The "period" document is normalized to represent timestamps using ISO + strings. + + Args: + run: run ID + iteration: iteration ID + sample: sample ID + kwargs: additional OpenSearch parameters + + Returns: + a list of normalized period documents + """ + if not any((run, iteration, sample)): + raise HTTPException( + status.HTTP_400_BAD_REQUEST, + "A period query requires a run, iteration, or sample ID", + ) + match = None + if sample: + match = {"sample.id": sample} + elif iteration: + match = {"iteration.id": iteration} + else: + match = {"run.id": run} + periods = await self.search( + index="period", + filters=[{"term": match}], + sort=[{"period.begin": "asc"}], + **kwargs, + ignore_unavailable=True, + ) + body = [] + for h in self._hits(periods): + period = self._format_period(period=h["period"]) + period["iteration"] = h["iteration"]["num"] + period["sample"] = h["sample"]["num"] + period["primary_metric"] = h["iteration"]["primary-metric"] + period["status"] = h["iteration"]["status"] + body.append(period) + return body + + async def get_timeline(self, run: str, **kwargs) -> dict[str, Any]: + """Report the relative timeline of a run + + With nested object lists, show runs to iterations to samples to + periods. + + Args: + run: run ID + kwargs: additional OpenSearch parameters + """ + itr = await self.search( + index="iteration", + filters=[{"term": {"run.id": run}}], + **kwargs, + ignore_unavailable=True, + ) + sam = await self.search( + index="sample", + filters=[{"term": {"run.id": run}}], + **kwargs, + ignore_unavailable=True, + ) + per = await self.search( + index="period", + filters=[{"term": {"run.id": run}}], + **kwargs, + ignore_unavailable=True, + ) + samples = defaultdict(list) + periods = defaultdict(list) + + for s in self._hits(sam): + samples[s["iteration"]["id"]].append(s) + for p in self._hits(per): + periods[p["sample"]["id"]].append(p) + + iterations = [] + robj = {"id": run, "iterations": iterations} + body = {"run": robj} + for i in self._hits(itr): + if "begin" not in robj: + robj["begin"] = self._format_timestamp(i["run"]["begin"]) + robj["end"] = self._format_timestamp(i["run"]["end"]) + iteration = i["iteration"] + iterations.append(iteration) + iteration["samples"] = [] + for s in samples.get(iteration["id"], []): + sample = s["sample"] + sample["periods"] = [] + for pr in periods.get(sample["id"], []): + period = self._format_period(pr["period"]) + sample["periods"].append(period) + iteration["samples"].append(sample) + return body + + async def get_metrics_list(self, run: str, **kwargs) -> dict[str, Any]: + """Return a list of metrics available for a run + + Each run may have multiple performance metrics stored. This API allows + retrieving a sorted list of the metrics available for a given run, with + the "names" selection criteria available for each and, for "periodic" + (benchmark) metrics, the defined periods for which data was gathered. + + { + "ilab::train-samples-sec": { + "periods": [{"id": , "name": "measurement"}], + "breakouts": {"benchmark-group" ["unknown"], ...} + }, + "iostat::avg-queue-length": { + "periods": [], + "breakouts": {"benchmark-group": ["unknown"], ...}, + }, + ... + } + + Args: + run: run ID + + Returns: + List of metrics available for the run + """ + hits = await self.search( + index="metric_desc", + filters=[{"term": {"run.id": run}}], + ignore_unavailable=True, + **kwargs, + ) + met = {} + for h in self._hits(hits): + desc = h["metric_desc"] + name = desc["source"] + "::" + desc["type"] + if name in met: + record = met[name] + else: + record = {"periods": [], "breakouts": defaultdict(set)} + met[name] = record + if "period" in h: + record["periods"].append(h["period"]["id"]) + for n, v in desc["names"].items(): + record["breakouts"][n].add(v) + return met + + async def get_metric_breakouts( + self, + run: str, + metric: str, + names: Optional[list[str]] = None, + periods: Optional[list[str]] = None, + ) -> dict[str, Any]: + """Help explore available metric breakouts + + Args: + run: run ID + metric: metric label (e.g., "mpstat::Busy-CPU") + names: list of name filters ("cpu=3") + periods: list of period IDs + + Returns: + A description of all breakout names and values, which can be + specified to narrow down metrics returns by the data, summary, and + graph APIs. + + { + "label": "mpstat::Busy-CPU", + "class": [ + "throughput" + ], + "type": "Busy-CPU", + "source": "mpstat", + "breakouts": { + "num": [ + "8", + "72" + ], + "thread": [ + 0, + 1 + ] + } + } + """ + start = time.time() + filters = self._build_metric_filters(run, metric, names, periods) + metric_name = metric + ("" if not names else ("+" + ",".join(names))) + metrics = await self.search( + "metric_desc", + filters=filters, + ignore_unavailable=True, + ) + if len(metrics["hits"]["hits"]) < 1: + raise HTTPException( + status.HTTP_400_BAD_REQUEST, + f"Metric name {metric_name} not found for run {run}", + ) + classes = set() + response = {"label": metric, "class": classes} + breakouts = defaultdict(set) + pl = set() + for m in self._hits(metrics): + desc = m["metric_desc"] + response["type"] = desc["type"] + response["source"] = desc["source"] + if desc.get("class"): + classes.add(desc["class"]) + if "period" in m: + pl.add(m["period"]["id"]) + for n, v in desc["names"].items(): + breakouts[n].add(v) + # We want to help filter a consistent summary, so only show those + # names with more than one value. + if len(pl) > 1: + response["periods"] = pl + response["breakouts"] = {n: v for n, v in breakouts.items() if len(v) > 1} + duration = time.time() - start + print(f"Processing took {duration} seconds") + return response + + async def get_metrics_data( + self, + run: str, + metric: str, + names: Optional[list[str]] = None, + periods: Optional[list[str]] = None, + aggregate: bool = False, + ) -> list[Any]: + """Return a list of metric data + + The "aggregate" option allows aggregating various metrics across + breakout streams and periods: be careful, as this is meaningful only if + the breakout streams are sufficiently related. + + Args: + run: run ID + metric: metric label (e.g., "mpstat::Busy-CPU") + names: list of name filters ("cpu=3") + periods: list of period IDs + aggregate: aggregate multiple metric data streams + + Returns: + A sequence of data samples, showing the aggregate sample along with + the duration and end timestamp of each sample interval. + + [ + { + "begin": "2024-08-22 20:03:23.028000+00:00", + "end": "2024-08-22 20:03:37.127000+00:00", + "duration": 14.1, + "value": 9.35271216694379 + }, + { + "begin": "2024-08-22 20:03:37.128000+00:00", + "end": "2024-08-22 20:03:51.149000+00:00", + "duration": 14.022, + "value": 9.405932330557683 + }, + { + "begin": "2024-08-22 20:03:51.150000+00:00", + "end": "2024-08-22 20:04:05.071000+00:00", + "duration": 13.922, + "value": 9.478773265522682 + } + ] + """ + start = time.time() + ids = await self._get_metric_ids( + run, metric, names, periodlist=periods, aggregate=aggregate + ) + + # If we're searching by periods, filter metric data by the period + # timestamp range rather than just relying on the metric desc IDs as + # we also want to filter non-periodic tool data. + filters = [{"terms": {"metric_desc.id": ids}}] + filters.extend(await self._build_timestamp_range_filters(periods)) + + response = [] + if len(ids) > 1: + # Find the minimum sample interval of the selected metrics + aggdur = await self.search( + "metric_data", + size=0, + filters=filters, + aggregations={"duration": {"stats": {"field": "metric_data.duration"}}}, + ) + interval = int(aggdur["aggregations"]["duration"]["min"]) + data = await self.search( + index="metric_data", + size=0, + filters=filters, + aggregations={ + "interval": { + "histogram": {"field": "metric_data.end", "interval": interval}, + "aggs": {"value": {"sum": {"field": "metric_data.value"}}}, + } + }, + ) + for h in self._aggs(data, "interval"): + response.append( + { + "begin": self._format_timestamp(h["key"] - interval), + "end": self._format_timestamp(h["key"]), + "value": h["value"]["value"], + "duration": interval / 1000.0, + } + ) + else: + data = await self.search("metric_data", filters=filters) + for h in self._hits(data, ["metric_data"]): + response.append(self._format_data(h)) + response.sort(key=lambda a: a["end"]) + duration = time.time() - start + print(f"Processing took {duration} seconds") + return response + + async def get_metrics_summary( + self, + run: str, + metric: str, + names: Optional[list[str]] = None, + periods: Optional[list[str]] = None, + ) -> dict[str, Any]: + """Return a statistical summary of metric data + + Provides a statistical summary of selected data samples. + + Args: + run: run ID + metric: metric label (e.g., "mpstat::Busy-CPU") + names: list of name filters ("cpu=3") + periods: list of period IDs + + Returns: + A statistical summary of the selected metric data + + { + "count": 71, + "min": 0.0, + "max": 0.3296, + "avg": 0.02360704225352113, + "sum": 1.676self.BIGQUERY00000001 + } + """ + start = time.time() + ids = await self._get_metric_ids(run, metric, names, periodlist=periods) + filters = [{"terms": {"metric_desc.id": ids}}] + filters.extend(self._build_timestamp_range_filters(periods)) + data = await self.search( + "metric_data", + size=0, + filters=filters, + aggregations={"score": {"stats": {"field": "metric_data.value"}}}, + ) + duration = time.time() - start + print(f"Processing took {duration} seconds") + return data["aggregations"]["score"] + + async def _graph_title( + self, + run_id: str, + run_id_list: list[str], + graph: Graph, + params_by_run: dict[str, Any], + periods_by_run: dict[str, Any], + ) -> str: + """Compute a default title for a graph + + Use the period, breakout name selections, run list, and iteration + parameters to construct a meaningful name for a graph. + + For example, "ilab::sdg-samples-sec (batch-size=4) {run 1}", or + "mpstat::Busy-CPU [cpu=4]" + + Args: + run_id: the Crucible run ID + run_id_list: ordered list of run IDs in our list of graphs + graph: the current Graph object + params_by_run: initially empty dict used to cache parameters + periods_by_run: initially empty dict used to cache periods + + Returns: + A string title + """ + names = graph.names + metric = graph.metric + if run_id not in params_by_run: + # Gather iteration parameters outside the loop for help in + # generating useful labels. + all_params = await self.search( + "param", filters=[{"term": {"run.id": run_id}}] + ) + collector = defaultdict(defaultdict) + for h in self._hits(all_params): + collector[h["iteration"]["id"]][h["param"]["arg"]] = h["param"]["val"] + params_by_run[run_id] = collector + else: + collector = params_by_run[run_id] + + if run_id not in periods_by_run: + periods = await self.search( + "period", filters=[{"term": {"run.id": run_id}}] + ) + iteration_periods = defaultdict(set) + for p in self._hits(periods): + iteration_periods[p["iteration"]["id"]].add(p["period"]["id"]) + periods_by_run[run_id] = iteration_periods + else: + iteration_periods = periods_by_run[run_id] + + # We can easily end up with multiple graphs across distinct + # periods or iterations, so we want to be able to provide some + # labeling to the graphs. We do this by looking for unique + # iteration parameters values, since the iteration number and + # period name aren't useful by themselves. + name_suffix = "" + if graph.periods: + iteration = None + for i, pset in iteration_periods.items(): + if set(graph.periods) <= pset: + iteration = i + break + + # If the period(s) we're graphing resolve to a single + # iteration in a run with multiple iterations, then we can + # try to find a unique title suffix based on distinct param + # values for that iteration. + if iteration and len(collector) > 1: + unique = collector[iteration].copy() + for i, params in collector.items(): + if i != iteration: + for p in list(unique.keys()): + if p in params and unique[p] == params[p]: + del unique[p] + if unique: + name_suffix = ( + " (" + ",".join([f"{p}={v}" for p, v in unique.items()]) + ")" + ) + + if len(run_id_list) > 1: + name_suffix += f" {{run {run_id_list.index(run_id) + 1}}}" + + options = (" [" + ",".join(names) + "]") if names else "" + return metric + options + name_suffix + + async def get_metrics_graph(self, graphdata: GraphList) -> dict[str, Any]: + """Return metrics data for a run + + Each run may have multiple performance metrics stored. This API allows + retrieving graphable time-series representation of a metric over the + period of the run, in the format defined by Plotly as configuration + settings plus an x value array and a y value array. + + { + "data": [ + { + "x": [ + "2024-08-27 09:16:27.371000", + ... + ], + "y": [ + 10.23444312132161, + ... + ], + "name": "Metric ilab::train-samples-sec", + "type": "scatter", + "mode": "line", + "marker": {"color": "black"}, + "labels": {"x": "sample timestamp", "y": "samples / second"} + } + ] + "layout": { + "width": 1500, + "yaxis": { + "title": "mpstat::Busy-CPU core=2,package=0,num=112,type=usr", + "color": "black" + } + } + } + + Args: + graphdata: A GraphList object + + Returns: + A Plotly object with layout + """ + start = time.time() + graphlist = [] + default_run_id = graphdata.run + layout: dict[str, Any] = {"width": "1500"} + axes = {} + yaxis = None + cindex = 0 + params_by_run = {} + periods_by_run = {} + + # Construct a de-duped ordered list of run IDs, starting with the + # default. + run_id_list = [] + if default_run_id: + run_id_list.append(default_run_id) + for g in graphdata.graphs: + if g.run and g.run not in run_id_list: + run_id_list.append(g.run) + + if len(run_id_list) < len(graphdata.graphs) and not default_run_id: + raise HTTPException( + status.HTTP_400_BAD_REQUEST, "each graph request must have a run ID" + ) + + for g in graphdata.graphs: + run_id = g.run if g.run else default_run_id + names = g.names + metric: str = g.metric + + # The caller can provide a title for each graph; but, if not, we + # journey down dark overgrown pathways to fabricate a default with + # reasonable context, including unique iteration parameters, + # breakdown selections, and which run provided the data. + if g.title: + title = g.title + else: + title = await self._graph_title( + run_id, run_id_list, g, params_by_run, periods_by_run + ) + + ids = await self._get_metric_ids( + run_id, + metric, + names, + periodlist=g.periods, + aggregate=g.aggregate, + ) + filters = [{"terms": {"metric_desc.id": ids}}] + filters.extend(await self._build_timestamp_range_filters(g.periods)) + y_max = 0.0 + points: list[Point] = [] + + # If we're pulling multiple breakouts, e.g., total CPU across modes + # or cores, we want to aggregate by timestamp interval. Sample + # timstamps don't necessarily align, so the "histogram" aggregation + # normalizes within the interval (based on the minimum actual + # interval duration). + if len(ids) > 1: + # Find the minimum sample interval of the selected metrics + aggdur = await self.search( + "metric_data", + size=0, + filters=filters, + aggregations={ + "duration": {"stats": {"field": "metric_data.duration"}} + }, + ) + interval = int(aggdur["aggregations"]["duration"]["min"]) + data = await self.search( + index="metric_data", + size=0, + filters=filters, + aggregations={ + "interval": { + "histogram": { + "field": "metric_data.begin", + "interval": interval, + }, + "aggs": {"value": {"sum": {"field": "metric_data.value"}}}, + } + }, + ) + for h in self._aggs(data, "interval"): + begin = int(h["key"]) + end = begin + interval - 1 + points.append(Point(begin, end, float(h["value"]["value"]))) + else: + data = await self.search("metric_data", filters=filters) + for h in self._hits(data, ["metric_data"]): + points.append( + Point(int(h["begin"]), int(h["end"]), float(h["value"])) + ) + + # Sort the graph points by timestamp so that Ploty will draw nice + # lines. We graph both the "begin" and "end" timestamp of each + # sample against the value to more clearly show the sampling + # interval. + x = [] + y = [] + + for p in sorted(points, key=lambda a: a.begin): + x.extend( + [self._format_timestamp(p.begin), self._format_timestamp(p.end)] + ) + y.extend([p.value, p.value]) + y_max = max(y_max, p.value) + + if g.color: + color = g.color + else: + color = colors[cindex] + cindex += 1 + if cindex >= len(colors): + cindex = 0 + graphitem = { + "x": x, + "y": y, + "name": title, + "type": "scatter", + "mode": "line", + "marker": {"color": color}, + "labels": { + "x": "sample timestamp", + "y": "samples / second", + }, + } + + # Y-axis scaling and labeling is divided by benchmark label; + # so store each we've created to reuse. (E.g., if we graph + # 5 different mpstat::Busy-CPU periods, they'll share a single + # Y axis.) + if metric in axes: + yref = axes[metric] + else: + if yaxis: + name = f"yaxis{yaxis}" + yref = f"y{yaxis}" + yaxis += 1 + layout[name] = { + "title": metric, + "color": color, + "autorange": True, + "anchor": "free", + "autoshift": True, + "overlaying": "y", + } + else: + name = "yaxis" + yref = "y" + yaxis = 2 + layout[name] = { + "title": metric, + "color": color, + } + axes[metric] = yref + graphitem["yaxis"] = yref + graphlist.append(graphitem) + duration = time.time() - start + print(f"Processing took {duration} seconds") + return {"data": graphlist, "layout": layout}