From f4b6ef29a57a7ac38f57e086ebbd9d6ee3ffaeed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5kon=20V=2E=20Treider?= Date: Fri, 24 Jan 2025 15:21:19 +0100 Subject: [PATCH] WIP-commit with print-statements, yeeeaaah --- cognite/client/_api/datapoint_tasks.py | 52 ++++++++- cognite/client/data_classes/datapoints.py | 131 +++++++++++++++++++++- cognite/client/utils/_auxiliary.py | 1 + 3 files changed, 180 insertions(+), 4 deletions(-) diff --git a/cognite/client/_api/datapoint_tasks.py b/cognite/client/_api/datapoint_tasks.py index b0ee758249..546d15bcc2 100644 --- a/cognite/client/_api/datapoint_tasks.py +++ b/cognite/client/_api/datapoint_tasks.py @@ -32,10 +32,15 @@ from cognite.client.data_classes.data_modeling.ids import NodeId from cognite.client.data_classes.datapoints import ( _INT_AGGREGATES, + _OBJECT_AGGREGATES, Aggregate, Datapoints, DatapointsArray, DatapointsQuery, + MaxDatapoint, + MaxDatapointWithStatus, + MinDatapoint, + MinDatapointWithStatus, _DatapointsPayloadItem, ) from cognite.client.utils._auxiliary import exactly_one_is_not_none, is_finite, is_unlimited @@ -255,8 +260,6 @@ def _verify_options_and_categorize_query(query: DatapointsQuery) -> bool: raise ValueError("When passing `aggregates`, argument `granularity` is also required.") elif query.include_outside_points is True: raise ValueError("'Include outside points' is not supported for aggregates.") - elif query.include_status is True: - raise ValueError("'Include status' is not supported for aggregates.") return False @staticmethod @@ -356,6 +359,28 @@ def nullable_raw_dp(dp: DatapointRaw) -> float | str: # We pretend like float is always returned to not break every dps annot. in the entire SDK.. return dp.value if not dp.nullValue else None # type: ignore [return-value] + # minDatapoint and maxDatapoint are also objects in the response. The proto lookups doesn't fail, + # so we must be very careful to only attach status codes if requested. + @staticmethod + def min_datapoint(dp: NumericDatapoint) -> MinDatapoint: + return MinDatapoint(dp.timestamp, dp.value) + + @staticmethod + def max_datapoint(dp: NumericDatapoint) -> MaxDatapoint: + return MaxDatapoint(dp.timestamp, dp.value) + + @staticmethod + def min_datapoint_with_status(dp: NumericDatapoint) -> MinDatapointWithStatus: + return MinDatapointWithStatus( + dp.timestamp, dp.value, DpsUnpackFns.status_code(dp), DpsUnpackFns.status_symbol(dp) + ) + + @staticmethod + def max_datapoint_with_status(dp: NumericDatapoint) -> MaxDatapointWithStatus: + return MaxDatapointWithStatus( + dp.timestamp, dp.value, DpsUnpackFns.status_code(dp), DpsUnpackFns.status_symbol(dp) + ) + # --------------- # # Above are functions that operate on single elements # Below are functions that operate on containers @@ -442,6 +467,23 @@ def extract_aggregates_numpy( # An aggregate is missing, fallback to slower `getattr`: return np.array([tuple(getattr(dp, agg, math.nan) for agg in aggregates) for dp in dps], dtype=np.float64) + @staticmethod + def extract_fn_min_or_max_dp( + aggregate: Literal["minDatapoint", "maxDatapoint"], + include_status: bool, + ) -> Callable[[NumericDatapoint], MinDatapoint | MinDatapointWithStatus | MaxDatapoint | MaxDatapointWithStatus]: + match aggregate, include_status: + case "minDatapoint", False: + return DpsUnpackFns.min_datapoint + case "maxDatapoint", False: + return DpsUnpackFns.max_datapoint + case "minDatapoint", True: + return DpsUnpackFns.min_datapoint_with_status + case "maxDatapoint", True: + return DpsUnpackFns.max_datapoint_with_status + case _: + raise ValueError(f"Unsupported {aggregate=} and/or {include_status=}") + def ensure_int(val: float, change_nan_to: int = 0) -> int: if math.isnan(val): @@ -1050,7 +1092,8 @@ def _set_aggregate_vars(self, aggs_camel_case: list[str], use_numpy: bool) -> No # would yield the correct type...! self.all_aggregates = aggs_camel_case self.int_aggs = _INT_AGGREGATES.intersection(aggs_camel_case) - self.float_aggs = set(aggs_camel_case).difference(self.int_aggs) + self.object_aggs = _OBJECT_AGGREGATES.intersection(aggs_camel_case) + self.float_aggs = set(aggs_camel_case).difference(self.int_aggs).difference(self.object_aggs) self.agg_unpack_fn = DpsUnpackFns.custom_from_aggregates(self.all_aggregates) self.first_agg, *others = self.all_aggregates @@ -1099,6 +1142,9 @@ def _get_result(self) -> Datapoints | DatapointsArray: else: aggs_iter = create_aggregates_list_from_dps_container(self.dps_data) lst_dct.update(dict(zip(self.all_aggregates, aggs_iter))) + for agg in self.object_aggs: + unpack_fn = DpsUnpackFns.extract_fn_min_or_max_dp(agg, self.query.include_status) + lst_dct[agg] = list(map(unpack_fn, lst_dct[agg])) for agg in self.int_aggs: # Need to do an extra NaN-aware int-conversion because protobuf (as opposed to json) returns double: lst_dct[agg] = list(map(ensure_int, lst_dct[agg])) diff --git a/cognite/client/data_classes/datapoints.py b/cognite/client/data_classes/datapoints.py index e6e93b6405..b4a7e72deb 100644 --- a/cognite/client/data_classes/datapoints.py +++ b/cognite/client/data_classes/datapoints.py @@ -15,6 +15,7 @@ Any, ClassVar, Literal, + NoReturn, TypedDict, overload, ) @@ -37,6 +38,7 @@ from cognite.client.utils._text import ( convert_all_keys_to_camel_case, convert_all_keys_to_snake_case, + iterable_to_case, to_camel_case, to_snake_case, ) @@ -78,11 +80,14 @@ "duration_uncertain", "interpolation", "max", + "max_datapoint", "min", + "min_datapoint", "step_interpolation", "sum", "total_variation", ] +_OBJECT_AGGREGATES: frozenset[Literal["maxDatapoint", "minDatapoint"]] = frozenset({"maxDatapoint", "minDatapoint"}) _INT_AGGREGATES = frozenset( { "count", @@ -116,6 +121,96 @@ class StatusCode(IntEnum): Bad = 0x80000000 # aka 1 << 31 aka 2147483648 +@dataclass(slots=True, frozen=True) +class MinDatapoint: + timestamp: int + value: float + + @property + def status_code(self) -> NoReturn: + raise AttributeError( + "'MinDatapoint' object has no attribute 'status_code'. Tip: fetch using `include_status=True`." + ) + + @property + def status_symbol(self) -> NoReturn: + raise AttributeError( + "'MinDatapoint' object has no attribute 'status_symbol'. Tip: fetch using `include_status=True`." + ) + + @classmethod + def load(cls, dct: dict[str, Any]) -> Self: + return cls(dct["timestamp"], dct["value"]) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + return {"timestamp": self.timestamp, "value": self.value} + + +@dataclass(slots=True, frozen=True) +class MinDatapointWithStatus(MinDatapoint): + timestamp: int + value: float + status_code: int + status_symbol: str + + @classmethod + def load(cls, dct: dict[str, Any]) -> Self: + return cls(dct["timestamp"], dct["value"], dct["statusCode"], dct["statusSymbol"]) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + return { + "timestamp": self.timestamp, + "value": self.value, + "statusCode" if camel_case else "status_code": self.status_code, + "statusSymbol" if camel_case else "status_symbol": self.status_symbol, + } + + +@dataclass(slots=True, frozen=True) +class MaxDatapoint: + timestamp: int + value: float + + @property + def status_code(self) -> NoReturn: + raise AttributeError( + "'MaxDatapoint' object has no attribute 'status_code'. Tip: fetch using `include_status=True`." + ) + + @property + def status_symbol(self) -> NoReturn: + raise AttributeError( + "'MaxDatapoint' object has no attribute 'status_symbol'. Tip: fetch using `include_status=True`." + ) + + @classmethod + def load(cls, dct: dict[str, Any]) -> Self: + return cls(dct["timestamp"], dct["value"]) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + return {"timestamp": self.timestamp, "value": self.value} + + +@dataclass(slots=True, frozen=True) +class MaxDatapointWithStatus(MaxDatapoint): + timestamp: int + value: float + status_code: int + status_symbol: str + + @classmethod + def load(cls, dct: dict[str, Any]) -> Self: + return cls(dct["timestamp"], dct["value"], dct["statusCode"], dct["statusSymbol"]) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + return { + "timestamp": self.timestamp, + "value": self.value, + "statusCode" if camel_case else "status_code": self.status_code, + "statusSymbol" if camel_case else "status_symbol": self.status_symbol, + } + + class _DatapointsPayloadItem(TypedDict, total=False): # No field required start: int @@ -386,7 +481,9 @@ class Datapoint(CogniteResource): value (str | float | None): The raw data value. Can be string or numeric. average (float | None): The time-weighted average value in the aggregate interval. max (float | None): The maximum value in the aggregate interval. + max_datapoint (MaxDatapoint | MaxDatapointWithStatus | None): No description. min (float | None): The minimum value in the aggregate interval. + min_datapoint (MinDatapoint | MinDatapointWithStatus | None): No description. count (int | None): The number of raw datapoints in the aggregate interval. sum (float | None): The sum of the raw datapoints in the aggregate interval. interpolation (float | None): The interpolated value at the beginning of the aggregate interval. @@ -411,7 +508,9 @@ def __init__( value: str | float | None = None, average: float | None = None, max: float | None = None, + max_datapoint: MaxDatapoint | MaxDatapointWithStatus | None = None, min: float | None = None, + min_datapoint: MinDatapoint | MinDatapointWithStatus | None = None, count: int | None = None, sum: float | None = None, interpolation: float | None = None, @@ -433,7 +532,9 @@ def __init__( self.value = value self.average = average self.max = max + self.max_datapoint = max_datapoint self.min = min + self.min_datapoint = min_datapoint self.count = count self.sum = sum self.interpolation = interpolation @@ -468,6 +569,10 @@ def to_pandas(self, camel_case: bool = False) -> pandas.DataFrame: # type: igno pd = local_import("pandas") dumped = self.dump(camel_case=camel_case) + for key in iterable_to_case(["min_datapoint", "max_datapoint"], camel_case): + if dp := dumped.get(key): + dumped[key] = [dp] # make pandas treat this dict as a scalar value + timestamp = dumped.pop("timestamp") tz = convert_tz_for_pandas(self.timezone) return pd.DataFrame(dumped, index=[pd.Timestamp(timestamp, unit="ms", tz=tz)]) @@ -475,6 +580,12 @@ def to_pandas(self, camel_case: bool = False) -> pandas.DataFrame: # type: igno @classmethod def _load(cls, resource: dict[str, Any], cognite_client: CogniteClient | None = None) -> Self: instance = super()._load(resource, cognite_client=cognite_client) + if isinstance(instance.max_datapoint, dict): + dp_cls = MaxDatapointWithStatus if "statusCode" in instance.max_datapoint else MaxDatapoint + instance.max_datapoint = dp_cls.load(instance.max_datapoint) + if isinstance(instance.min_datapoint, dict): + dp_cls = MinDatapointWithStatus if "statusCode" in instance.min_datapoint else MinDatapoint + instance.min_datapoint = dp_cls.load(instance.min_datapoint) if isinstance(instance.timezone, str): with contextlib.suppress(ValueError): # Dont fail load if invalid instance.timezone = parse_str_timezone(instance.timezone) @@ -484,6 +595,10 @@ def dump(self, camel_case: bool = True, include_timezone: bool = True) -> dict[s dumped = super().dump(camel_case=camel_case) # Keep value even if None (bad status codes support missing): dumped["value"] = self.value # TODO: What if Datapoint represents one or more aggregates? + if self.max_datapoint: + dumped["maxDatapoint" if camel_case else "max_datapoint"] = self.max_datapoint.dump(camel_case) + if self.min_datapoint: + dumped["minDatapoint" if camel_case else "min_datapoint"] = self.min_datapoint.dump(camel_case) if include_timezone: if self.timezone is not None: dumped["timezone"] = convert_timezone_to_str(self.timezone) @@ -509,7 +624,9 @@ def __init__( value: NumpyFloat64Array | NumpyObjArray | None = None, average: NumpyFloat64Array | None = None, max: NumpyFloat64Array | None = None, + max_datapoint: NumpyObjArray | None = None, min: NumpyFloat64Array | None = None, + min_datapoint: NumpyObjArray | None = None, count: NumpyInt64Array | None = None, sum: NumpyFloat64Array | None = None, interpolation: NumpyFloat64Array | None = None, @@ -542,7 +659,9 @@ def __init__( self.value = value self.average = average self.max = max + self.max_datapoint = max_datapoint self.min = min + self.min_datapoint = min_datapoint self.count = count self.sum = sum self.interpolation = interpolation @@ -872,7 +991,9 @@ class Datapoints(CogniteResource): value (SequenceNotStr[str] | Sequence[float] | None): The raw data values. Can be string or numeric. average (list[float] | None): The time-weighted average values per aggregate interval. max (list[float] | None): The maximum values per aggregate interval. + max_datapoint (list[MinDatapoint] | list[MinDatapointWithStatus] | None): No description. min (list[float] | None): The minimum values per aggregate interval. + min_datapoint (list[MaxDatapoint] | list[MaxDatapointWithStatus] | None): No description. count (list[int] | None): The number of raw datapoints per aggregate interval. sum (list[float] | None): The sum of the raw datapoints per aggregate interval. interpolation (list[float] | None): The interpolated values at the beginning of each the aggregate interval. @@ -906,7 +1027,9 @@ def __init__( value: SequenceNotStr[str] | Sequence[float] | None = None, average: list[float] | None = None, max: list[float] | None = None, + max_datapoint: list[MinDatapoint] | list[MinDatapointWithStatus] | None = None, min: list[float] | None = None, + min_datapoint: list[MaxDatapoint] | list[MaxDatapointWithStatus] | None = None, count: list[int] | None = None, sum: list[float] | None = None, interpolation: list[float] | None = None, @@ -937,7 +1060,9 @@ def __init__( self.value = value self.average = average self.max = max + self.max_datapoint = max_datapoint self.min = min + self.min_datapoint = min_datapoint self.count = count self.sum = sum self.interpolation = interpolation @@ -1207,16 +1332,20 @@ def __get_datapoint_objects(self) -> list[Datapoint]: return self.__datapoint_objects fields = self._get_non_empty_data_fields(get_error=False) new_dps_objects = [] + print("IN __get_datapoint_objects") for i in range(len(self)): dp_args: dict[str, Any] = {"timezone": self.timezone} for attr, value in fields: - dp_args[attr] = value[i] + dp_args[to_camel_case(attr)] = value[i] if self.status_code is not None: dp_args.update( statusCode=self.status_code[i], statusSymbol=self.status_symbol[i], # type: ignore [index] ) + print(i, dp_args) new_dps_objects.append(Datapoint.load(dp_args)) + print(Datapoint.load(dp_args)) + print() self.__datapoint_objects = new_dps_objects return self.__datapoint_objects diff --git a/cognite/client/utils/_auxiliary.py b/cognite/client/utils/_auxiliary.py index a0940af60b..bcfe1fcf4c 100644 --- a/cognite/client/utils/_auxiliary.py +++ b/cognite/client/utils/_auxiliary.py @@ -79,6 +79,7 @@ def fast_dict_load( for camel_attr, value in item.items(): try: setattr(instance, accepted[camel_attr], value) + # print(f"--- Set {camel_attr} ") except KeyError: pass return instance