Skip to content

Commit

Permalink
WIP-commit with print-statements, yeeeaaah
Browse files Browse the repository at this point in the history
  • Loading branch information
haakonvt committed Jan 28, 2025
1 parent f884759 commit f4b6ef2
Show file tree
Hide file tree
Showing 3 changed files with 180 additions and 4 deletions.
52 changes: 49 additions & 3 deletions cognite/client/_api/datapoint_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,15 @@
from cognite.client.data_classes.data_modeling.ids import NodeId
from cognite.client.data_classes.datapoints import (
_INT_AGGREGATES,
_OBJECT_AGGREGATES,
Aggregate,
Datapoints,
DatapointsArray,
DatapointsQuery,
MaxDatapoint,
MaxDatapointWithStatus,
MinDatapoint,
MinDatapointWithStatus,
_DatapointsPayloadItem,
)
from cognite.client.utils._auxiliary import exactly_one_is_not_none, is_finite, is_unlimited
Expand Down Expand Up @@ -255,8 +260,6 @@ def _verify_options_and_categorize_query(query: DatapointsQuery) -> bool:
raise ValueError("When passing `aggregates`, argument `granularity` is also required.")
elif query.include_outside_points is True:
raise ValueError("'Include outside points' is not supported for aggregates.")
elif query.include_status is True:
raise ValueError("'Include status' is not supported for aggregates.")
return False

@staticmethod
Expand Down Expand Up @@ -356,6 +359,28 @@ def nullable_raw_dp(dp: DatapointRaw) -> float | str:
# We pretend like float is always returned to not break every dps annot. in the entire SDK..
return dp.value if not dp.nullValue else None # type: ignore [return-value]

# minDatapoint and maxDatapoint are also objects in the response. The proto lookups doesn't fail,
# so we must be very careful to only attach status codes if requested.
@staticmethod
def min_datapoint(dp: NumericDatapoint) -> MinDatapoint:
return MinDatapoint(dp.timestamp, dp.value)

@staticmethod
def max_datapoint(dp: NumericDatapoint) -> MaxDatapoint:
return MaxDatapoint(dp.timestamp, dp.value)

@staticmethod
def min_datapoint_with_status(dp: NumericDatapoint) -> MinDatapointWithStatus:
return MinDatapointWithStatus(
dp.timestamp, dp.value, DpsUnpackFns.status_code(dp), DpsUnpackFns.status_symbol(dp)
)

@staticmethod
def max_datapoint_with_status(dp: NumericDatapoint) -> MaxDatapointWithStatus:
return MaxDatapointWithStatus(
dp.timestamp, dp.value, DpsUnpackFns.status_code(dp), DpsUnpackFns.status_symbol(dp)
)

# --------------- #
# Above are functions that operate on single elements
# Below are functions that operate on containers
Expand Down Expand Up @@ -442,6 +467,23 @@ def extract_aggregates_numpy(
# An aggregate is missing, fallback to slower `getattr`:
return np.array([tuple(getattr(dp, agg, math.nan) for agg in aggregates) for dp in dps], dtype=np.float64)

@staticmethod
def extract_fn_min_or_max_dp(
aggregate: Literal["minDatapoint", "maxDatapoint"],
include_status: bool,
) -> Callable[[NumericDatapoint], MinDatapoint | MinDatapointWithStatus | MaxDatapoint | MaxDatapointWithStatus]:
match aggregate, include_status:
case "minDatapoint", False:
return DpsUnpackFns.min_datapoint
case "maxDatapoint", False:
return DpsUnpackFns.max_datapoint
case "minDatapoint", True:
return DpsUnpackFns.min_datapoint_with_status
case "maxDatapoint", True:
return DpsUnpackFns.max_datapoint_with_status
case _:
raise ValueError(f"Unsupported {aggregate=} and/or {include_status=}")


def ensure_int(val: float, change_nan_to: int = 0) -> int:
if math.isnan(val):
Expand Down Expand Up @@ -1050,7 +1092,8 @@ def _set_aggregate_vars(self, aggs_camel_case: list[str], use_numpy: bool) -> No
# would yield the correct type...!
self.all_aggregates = aggs_camel_case
self.int_aggs = _INT_AGGREGATES.intersection(aggs_camel_case)
self.float_aggs = set(aggs_camel_case).difference(self.int_aggs)
self.object_aggs = _OBJECT_AGGREGATES.intersection(aggs_camel_case)
self.float_aggs = set(aggs_camel_case).difference(self.int_aggs).difference(self.object_aggs)

self.agg_unpack_fn = DpsUnpackFns.custom_from_aggregates(self.all_aggregates)
self.first_agg, *others = self.all_aggregates
Expand Down Expand Up @@ -1099,6 +1142,9 @@ def _get_result(self) -> Datapoints | DatapointsArray:
else:
aggs_iter = create_aggregates_list_from_dps_container(self.dps_data)
lst_dct.update(dict(zip(self.all_aggregates, aggs_iter)))
for agg in self.object_aggs:
unpack_fn = DpsUnpackFns.extract_fn_min_or_max_dp(agg, self.query.include_status)
lst_dct[agg] = list(map(unpack_fn, lst_dct[agg]))
for agg in self.int_aggs:
# Need to do an extra NaN-aware int-conversion because protobuf (as opposed to json) returns double:
lst_dct[agg] = list(map(ensure_int, lst_dct[agg]))
Expand Down
131 changes: 130 additions & 1 deletion cognite/client/data_classes/datapoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
Any,
ClassVar,
Literal,
NoReturn,
TypedDict,
overload,
)
Expand All @@ -37,6 +38,7 @@
from cognite.client.utils._text import (
convert_all_keys_to_camel_case,
convert_all_keys_to_snake_case,
iterable_to_case,
to_camel_case,
to_snake_case,
)
Expand Down Expand Up @@ -78,11 +80,14 @@
"duration_uncertain",
"interpolation",
"max",
"max_datapoint",
"min",
"min_datapoint",
"step_interpolation",
"sum",
"total_variation",
]
_OBJECT_AGGREGATES: frozenset[Literal["maxDatapoint", "minDatapoint"]] = frozenset({"maxDatapoint", "minDatapoint"})
_INT_AGGREGATES = frozenset(
{
"count",
Expand Down Expand Up @@ -116,6 +121,96 @@ class StatusCode(IntEnum):
Bad = 0x80000000 # aka 1 << 31 aka 2147483648


@dataclass(slots=True, frozen=True)
class MinDatapoint:
timestamp: int
value: float

@property
def status_code(self) -> NoReturn:
raise AttributeError(
"'MinDatapoint' object has no attribute 'status_code'. Tip: fetch using `include_status=True`."
)

@property
def status_symbol(self) -> NoReturn:
raise AttributeError(
"'MinDatapoint' object has no attribute 'status_symbol'. Tip: fetch using `include_status=True`."
)

@classmethod
def load(cls, dct: dict[str, Any]) -> Self:
return cls(dct["timestamp"], dct["value"])

def dump(self, camel_case: bool = True) -> dict[str, Any]:
return {"timestamp": self.timestamp, "value": self.value}


@dataclass(slots=True, frozen=True)
class MinDatapointWithStatus(MinDatapoint):
timestamp: int
value: float
status_code: int
status_symbol: str

@classmethod
def load(cls, dct: dict[str, Any]) -> Self:
return cls(dct["timestamp"], dct["value"], dct["statusCode"], dct["statusSymbol"])

def dump(self, camel_case: bool = True) -> dict[str, Any]:
return {
"timestamp": self.timestamp,
"value": self.value,
"statusCode" if camel_case else "status_code": self.status_code,
"statusSymbol" if camel_case else "status_symbol": self.status_symbol,
}


@dataclass(slots=True, frozen=True)
class MaxDatapoint:
timestamp: int
value: float

@property
def status_code(self) -> NoReturn:
raise AttributeError(
"'MaxDatapoint' object has no attribute 'status_code'. Tip: fetch using `include_status=True`."
)

@property
def status_symbol(self) -> NoReturn:
raise AttributeError(
"'MaxDatapoint' object has no attribute 'status_symbol'. Tip: fetch using `include_status=True`."
)

@classmethod
def load(cls, dct: dict[str, Any]) -> Self:
return cls(dct["timestamp"], dct["value"])

def dump(self, camel_case: bool = True) -> dict[str, Any]:
return {"timestamp": self.timestamp, "value": self.value}


@dataclass(slots=True, frozen=True)
class MaxDatapointWithStatus(MaxDatapoint):
timestamp: int
value: float
status_code: int
status_symbol: str

@classmethod
def load(cls, dct: dict[str, Any]) -> Self:
return cls(dct["timestamp"], dct["value"], dct["statusCode"], dct["statusSymbol"])

def dump(self, camel_case: bool = True) -> dict[str, Any]:
return {
"timestamp": self.timestamp,
"value": self.value,
"statusCode" if camel_case else "status_code": self.status_code,
"statusSymbol" if camel_case else "status_symbol": self.status_symbol,
}


class _DatapointsPayloadItem(TypedDict, total=False):
# No field required
start: int
Expand Down Expand Up @@ -386,7 +481,9 @@ class Datapoint(CogniteResource):
value (str | float | None): The raw data value. Can be string or numeric.
average (float | None): The time-weighted average value in the aggregate interval.
max (float | None): The maximum value in the aggregate interval.
max_datapoint (MaxDatapoint | MaxDatapointWithStatus | None): No description.
min (float | None): The minimum value in the aggregate interval.
min_datapoint (MinDatapoint | MinDatapointWithStatus | None): No description.
count (int | None): The number of raw datapoints in the aggregate interval.
sum (float | None): The sum of the raw datapoints in the aggregate interval.
interpolation (float | None): The interpolated value at the beginning of the aggregate interval.
Expand All @@ -411,7 +508,9 @@ def __init__(
value: str | float | None = None,
average: float | None = None,
max: float | None = None,
max_datapoint: MaxDatapoint | MaxDatapointWithStatus | None = None,
min: float | None = None,
min_datapoint: MinDatapoint | MinDatapointWithStatus | None = None,
count: int | None = None,
sum: float | None = None,
interpolation: float | None = None,
Expand All @@ -433,7 +532,9 @@ def __init__(
self.value = value
self.average = average
self.max = max
self.max_datapoint = max_datapoint
self.min = min
self.min_datapoint = min_datapoint
self.count = count
self.sum = sum
self.interpolation = interpolation
Expand Down Expand Up @@ -468,13 +569,23 @@ def to_pandas(self, camel_case: bool = False) -> pandas.DataFrame: # type: igno
pd = local_import("pandas")

dumped = self.dump(camel_case=camel_case)
for key in iterable_to_case(["min_datapoint", "max_datapoint"], camel_case):
if dp := dumped.get(key):
dumped[key] = [dp] # make pandas treat this dict as a scalar value

timestamp = dumped.pop("timestamp")
tz = convert_tz_for_pandas(self.timezone)
return pd.DataFrame(dumped, index=[pd.Timestamp(timestamp, unit="ms", tz=tz)])

@classmethod
def _load(cls, resource: dict[str, Any], cognite_client: CogniteClient | None = None) -> Self:
instance = super()._load(resource, cognite_client=cognite_client)
if isinstance(instance.max_datapoint, dict):
dp_cls = MaxDatapointWithStatus if "statusCode" in instance.max_datapoint else MaxDatapoint
instance.max_datapoint = dp_cls.load(instance.max_datapoint)
if isinstance(instance.min_datapoint, dict):
dp_cls = MinDatapointWithStatus if "statusCode" in instance.min_datapoint else MinDatapoint
instance.min_datapoint = dp_cls.load(instance.min_datapoint)
if isinstance(instance.timezone, str):
with contextlib.suppress(ValueError): # Dont fail load if invalid
instance.timezone = parse_str_timezone(instance.timezone)
Expand All @@ -484,6 +595,10 @@ def dump(self, camel_case: bool = True, include_timezone: bool = True) -> dict[s
dumped = super().dump(camel_case=camel_case)
# Keep value even if None (bad status codes support missing):
dumped["value"] = self.value # TODO: What if Datapoint represents one or more aggregates?
if self.max_datapoint:
dumped["maxDatapoint" if camel_case else "max_datapoint"] = self.max_datapoint.dump(camel_case)
if self.min_datapoint:
dumped["minDatapoint" if camel_case else "min_datapoint"] = self.min_datapoint.dump(camel_case)
if include_timezone:
if self.timezone is not None:
dumped["timezone"] = convert_timezone_to_str(self.timezone)
Expand All @@ -509,7 +624,9 @@ def __init__(
value: NumpyFloat64Array | NumpyObjArray | None = None,
average: NumpyFloat64Array | None = None,
max: NumpyFloat64Array | None = None,
max_datapoint: NumpyObjArray | None = None,
min: NumpyFloat64Array | None = None,
min_datapoint: NumpyObjArray | None = None,
count: NumpyInt64Array | None = None,
sum: NumpyFloat64Array | None = None,
interpolation: NumpyFloat64Array | None = None,
Expand Down Expand Up @@ -542,7 +659,9 @@ def __init__(
self.value = value
self.average = average
self.max = max
self.max_datapoint = max_datapoint
self.min = min
self.min_datapoint = min_datapoint
self.count = count
self.sum = sum
self.interpolation = interpolation
Expand Down Expand Up @@ -872,7 +991,9 @@ class Datapoints(CogniteResource):
value (SequenceNotStr[str] | Sequence[float] | None): The raw data values. Can be string or numeric.
average (list[float] | None): The time-weighted average values per aggregate interval.
max (list[float] | None): The maximum values per aggregate interval.
max_datapoint (list[MinDatapoint] | list[MinDatapointWithStatus] | None): No description.
min (list[float] | None): The minimum values per aggregate interval.
min_datapoint (list[MaxDatapoint] | list[MaxDatapointWithStatus] | None): No description.
count (list[int] | None): The number of raw datapoints per aggregate interval.
sum (list[float] | None): The sum of the raw datapoints per aggregate interval.
interpolation (list[float] | None): The interpolated values at the beginning of each the aggregate interval.
Expand Down Expand Up @@ -906,7 +1027,9 @@ def __init__(
value: SequenceNotStr[str] | Sequence[float] | None = None,
average: list[float] | None = None,
max: list[float] | None = None,
max_datapoint: list[MinDatapoint] | list[MinDatapointWithStatus] | None = None,
min: list[float] | None = None,
min_datapoint: list[MaxDatapoint] | list[MaxDatapointWithStatus] | None = None,
count: list[int] | None = None,
sum: list[float] | None = None,
interpolation: list[float] | None = None,
Expand Down Expand Up @@ -937,7 +1060,9 @@ def __init__(
self.value = value
self.average = average
self.max = max
self.max_datapoint = max_datapoint
self.min = min
self.min_datapoint = min_datapoint
self.count = count
self.sum = sum
self.interpolation = interpolation
Expand Down Expand Up @@ -1207,16 +1332,20 @@ def __get_datapoint_objects(self) -> list[Datapoint]:
return self.__datapoint_objects
fields = self._get_non_empty_data_fields(get_error=False)
new_dps_objects = []
print("IN __get_datapoint_objects")
for i in range(len(self)):
dp_args: dict[str, Any] = {"timezone": self.timezone}
for attr, value in fields:
dp_args[attr] = value[i]
dp_args[to_camel_case(attr)] = value[i]
if self.status_code is not None:
dp_args.update(
statusCode=self.status_code[i],
statusSymbol=self.status_symbol[i], # type: ignore [index]
)
print(i, dp_args)
new_dps_objects.append(Datapoint.load(dp_args))
print(Datapoint.load(dp_args))
print()
self.__datapoint_objects = new_dps_objects
return self.__datapoint_objects

Expand Down
1 change: 1 addition & 0 deletions cognite/client/utils/_auxiliary.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ def fast_dict_load(
for camel_attr, value in item.items():
try:
setattr(instance, accepted[camel_attr], value)
# print(f"--- Set {camel_attr} ")
except KeyError:
pass
return instance
Expand Down

0 comments on commit f4b6ef2

Please sign in to comment.