From 4c2856ee796f70c19f0820b37155cad59c2075f9 Mon Sep 17 00:00:00 2001 From: cwasicki <126617870+cwasicki@users.noreply.github.com> Date: Fri, 13 Sep 2024 18:57:41 +0200 Subject: [PATCH] Add support for states in client and cli tool States can now be requested via the client and are provided through the flat iterator. They can be identified via their category `state`, `warning` and `error`, respectively. Each individual state is provided as its own sample. Support for states is also added to the CLI tool via the `--states` flag. As of now requests for states without any metrics are not yet supported by the service. Signed-off-by: cwasicki <126617870+cwasicki@users.noreply.github.com> --- src/frequenz/client/reporting/__main__.py | 9 +++++ src/frequenz/client/reporting/_client.py | 47 ++++++++++++++++++++++- 2 files changed, 54 insertions(+), 2 deletions(-) diff --git a/src/frequenz/client/reporting/__main__.py b/src/frequenz/client/reporting/__main__.py index 003e3f5..fd7c427 100644 --- a/src/frequenz/client/reporting/__main__.py +++ b/src/frequenz/client/reporting/__main__.py @@ -34,6 +34,11 @@ def main() -> None: help="List of metrics to process", required=True, ) + parser.add_argument( + "--states", + action="store_true", + help="Include states in the output", + ) parser.add_argument( "--start", type=datetime.fromisoformat, @@ -66,6 +71,7 @@ def main() -> None: args.start, args.end, args.resolution, + states=args.states, service_address=args.url, key=args.key, fmt=args.format, @@ -81,6 +87,7 @@ async def run( start_dt: datetime, end_dt: datetime, resolution: int, + states: bool, service_address: str, key: str, fmt: str, @@ -94,6 +101,7 @@ async def run( start_dt: start datetime end_dt: end datetime resolution: resampling resolution in sec + states: include states in the output service_address: service address key: API key fmt: output format @@ -120,6 +128,7 @@ def data_iter() -> AsyncIterator[MetricSample]: start_dt=start_dt, end_dt=end_dt, resolution=resolution, + include_states=states, ) if fmt == "iter": diff --git a/src/frequenz/client/reporting/_client.py b/src/frequenz/client/reporting/_client.py index 2037e0b..85bf178 100644 --- a/src/frequenz/client/reporting/_client.py +++ b/src/frequenz/client/reporting/_client.py @@ -4,7 +4,7 @@ """Client for requests to the Reporting API.""" from collections import namedtuple -from collections.abc import AsyncIterator, Iterator +from collections.abc import AsyncIterator, Iterable, Iterator from dataclasses import dataclass from datetime import datetime from typing import cast @@ -15,6 +15,7 @@ from frequenz.api.common.v1.microgrid.microgrid_pb2 import ( MicrogridComponentIDs as PBMicrogridComponentIDs, ) +from frequenz.api.reporting.v1.reporting_pb2 import IncludeOptions as PBIncludeOptions from frequenz.api.reporting.v1.reporting_pb2 import ( ReceiveMicrogridComponentsDataStreamRequest as PBReceiveMicrogridComponentsDataStreamRequest, ) @@ -56,7 +57,10 @@ def is_empty(self) -> bool: """ if not self._data_pb.components: return True - if not self._data_pb.components[0].metric_samples: + if ( + not self._data_pb.components[0].metric_samples + and not self._data_pb.components[0].states + ): return True return False @@ -94,6 +98,26 @@ def __iter__(self) -> Iterator[MetricSample]: metric=met, value=value, ) + for state in cdata.states: + ts = state.sampled_at.ToDatetime() + for name, category in { + "state": state.states, + "warning": state.warnings, + "error": state.errors, + }.items(): + # Skip if the category is not present + if not isinstance(category, Iterable): + continue + # Each category can have multiple states + # that are provided as individual samples + for s in category: + yield MetricSample( + timestamp=ts, + microgrid_id=mid, + component_id=cid, + metric=name, + value=s, + ) class ReportingApiClient(BaseApiClient[ReportingStub, grpcaio.Channel]): @@ -120,6 +144,7 @@ async def list_single_component_data( start_dt: datetime, end_dt: datetime, resolution: int | None, + include_states: bool = False, ) -> AsyncIterator[MetricSample]: """Iterate over the data for a single metric. @@ -130,6 +155,7 @@ async def list_single_component_data( start_dt: The start date and time. end_dt: The end date and time. resolution: The resampling resolution for the data, represented in seconds. + include_states: Whether to include the state data. Yields: A named tuple with the following fields: @@ -142,6 +168,7 @@ async def list_single_component_data( start_dt=start_dt, end_dt=end_dt, resolution=resolution, + include_states=include_states, ): for entry in batch: yield entry @@ -155,6 +182,7 @@ async def list_microgrid_components_data( start_dt: datetime, end_dt: datetime, resolution: int | None, + include_states: bool = False, ) -> AsyncIterator[MetricSample]: """Iterate over the data for multiple microgrids and components. @@ -165,6 +193,7 @@ async def list_microgrid_components_data( start_dt: The start date and time. end_dt: The end date and time. resolution: The resampling resolution for the data, represented in seconds. + include_states: Whether to include the state data. Yields: A named tuple with the following fields: @@ -180,11 +209,13 @@ async def list_microgrid_components_data( start_dt=start_dt, end_dt=end_dt, resolution=resolution, + include_states=include_states, ): for entry in batch: yield entry # pylint: disable=too-many-arguments + # pylint: disable=too-many-locals async def _list_microgrid_components_data_batch( self, *, @@ -193,6 +224,7 @@ async def _list_microgrid_components_data_batch( start_dt: datetime, end_dt: datetime, resolution: int | None, + include_states: bool = False, ) -> AsyncIterator[ComponentsDataBatch]: """Iterate over the component data batches in the stream. @@ -205,6 +237,7 @@ async def _list_microgrid_components_data_batch( start_dt: The start date and time. end_dt: The end date and time. resolution: The resampling resolution for the data, represented in seconds. + include_states: Whether to include the state data. Yields: A ComponentsDataBatch object of microgrid components data. @@ -224,9 +257,19 @@ def dt2ts(dt: datetime) -> PBTimestamp: end=dt2ts(end_dt), ) + incl_states = ( + PBIncludeOptions.FilterOption.FILTER_OPTION_INCLUDE + if include_states + else PBIncludeOptions.FilterOption.FILTER_OPTION_EXCLUDE + ) + include_options = PBIncludeOptions( + states=incl_states, + ) + stream_filter = PBReceiveMicrogridComponentsDataStreamRequest.StreamFilter( time_filter=time_filter, resampling_options=PBResamplingOptions(resolution=resolution), + include_options=include_options, ) metrics_pb = [metric.to_proto() for metric in metrics]