diff --git a/src/frequenz/client/reporting/__main__.py b/src/frequenz/client/reporting/__main__.py index 003e3f5..fd7c427 100644 --- a/src/frequenz/client/reporting/__main__.py +++ b/src/frequenz/client/reporting/__main__.py @@ -34,6 +34,11 @@ def main() -> None: help="List of metrics to process", required=True, ) + parser.add_argument( + "--states", + action="store_true", + help="Include states in the output", + ) parser.add_argument( "--start", type=datetime.fromisoformat, @@ -66,6 +71,7 @@ def main() -> None: args.start, args.end, args.resolution, + states=args.states, service_address=args.url, key=args.key, fmt=args.format, @@ -81,6 +87,7 @@ async def run( start_dt: datetime, end_dt: datetime, resolution: int, + states: bool, service_address: str, key: str, fmt: str, @@ -94,6 +101,7 @@ async def run( start_dt: start datetime end_dt: end datetime resolution: resampling resolution in sec + states: include states in the output service_address: service address key: API key fmt: output format @@ -120,6 +128,7 @@ def data_iter() -> AsyncIterator[MetricSample]: start_dt=start_dt, end_dt=end_dt, resolution=resolution, + include_states=states, ) if fmt == "iter": diff --git a/src/frequenz/client/reporting/_client.py b/src/frequenz/client/reporting/_client.py index 2037e0b..68af00e 100644 --- a/src/frequenz/client/reporting/_client.py +++ b/src/frequenz/client/reporting/_client.py @@ -15,6 +15,7 @@ from frequenz.api.common.v1.microgrid.microgrid_pb2 import ( MicrogridComponentIDs as PBMicrogridComponentIDs, ) +from frequenz.api.reporting.v1.reporting_pb2 import IncludeOptions as PBIncludeOptions from frequenz.api.reporting.v1.reporting_pb2 import ( ReceiveMicrogridComponentsDataStreamRequest as PBReceiveMicrogridComponentsDataStreamRequest, ) @@ -56,7 +57,10 @@ def is_empty(self) -> bool: """ if not self._data_pb.components: return True - if not self._data_pb.components[0].metric_samples: + if ( + not self._data_pb.components[0].metric_samples + and not self._data_pb.components[0].states + ): return True return False @@ -94,6 +98,21 @@ def __iter__(self) -> Iterator[MetricSample]: metric=met, value=value, ) + for state in cdata.states: + ts = state.sampled_at.ToDatetime() + for name, category in { + "state": state.states, + "warning": state.warnings, + "error": state.errors, + }.items(): + for s in category: + yield MetricSample( + timestamp=ts, + microgrid_id=mid, + component_id=cid, + metric=name, + value=s, + ) class ReportingApiClient(BaseApiClient[ReportingStub, grpcaio.Channel]): @@ -120,6 +139,7 @@ async def list_single_component_data( start_dt: datetime, end_dt: datetime, resolution: int | None, + include_states: bool = False, ) -> AsyncIterator[MetricSample]: """Iterate over the data for a single metric. @@ -130,6 +150,7 @@ async def list_single_component_data( start_dt: The start date and time. end_dt: The end date and time. resolution: The resampling resolution for the data, represented in seconds. + include_states: Whether to include the state data. Yields: A named tuple with the following fields: @@ -142,6 +163,7 @@ async def list_single_component_data( start_dt=start_dt, end_dt=end_dt, resolution=resolution, + include_states=include_states, ): for entry in batch: yield entry @@ -155,6 +177,7 @@ async def list_microgrid_components_data( start_dt: datetime, end_dt: datetime, resolution: int | None, + include_states: bool = False, ) -> AsyncIterator[MetricSample]: """Iterate over the data for multiple microgrids and components. @@ -165,6 +188,7 @@ async def list_microgrid_components_data( start_dt: The start date and time. end_dt: The end date and time. resolution: The resampling resolution for the data, represented in seconds. + include_states: Whether to include the state data. Yields: A named tuple with the following fields: @@ -180,11 +204,13 @@ async def list_microgrid_components_data( start_dt=start_dt, end_dt=end_dt, resolution=resolution, + include_states=include_states, ): for entry in batch: yield entry # pylint: disable=too-many-arguments + # pylint: disable=too-many-locals async def _list_microgrid_components_data_batch( self, *, @@ -193,6 +219,7 @@ async def _list_microgrid_components_data_batch( start_dt: datetime, end_dt: datetime, resolution: int | None, + include_states: bool = False, ) -> AsyncIterator[ComponentsDataBatch]: """Iterate over the component data batches in the stream. @@ -205,6 +232,7 @@ async def _list_microgrid_components_data_batch( start_dt: The start date and time. end_dt: The end date and time. resolution: The resampling resolution for the data, represented in seconds. + include_states: Whether to include the state data. Yields: A ComponentsDataBatch object of microgrid components data. @@ -224,9 +252,19 @@ def dt2ts(dt: datetime) -> PBTimestamp: end=dt2ts(end_dt), ) + incl_states = ( + PBIncludeOptions.FilterOption.FILTER_OPTION_INCLUDE + if include_states + else PBIncludeOptions.FilterOption.FILTER_OPTION_EXCLUDE + ) + include_options = PBIncludeOptions( + states=incl_states, + ) + stream_filter = PBReceiveMicrogridComponentsDataStreamRequest.StreamFilter( time_filter=time_filter, resampling_options=PBResamplingOptions(resolution=resolution), + include_options=include_options, ) metrics_pb = [metric.to_proto() for metric in metrics]