Skip to content

Commit

Permalink
Add support for states in client and cli tool
Browse files Browse the repository at this point in the history
States can now be requested via the client and are provided through the
flat iterator. They can be identified via their category `state`,
`warning` and `error`, respectively. Each individual state is provided
as its own sample.

Support for states is also added to the CLI tool via the `--states`
flag. As of now requests for states without any metrics are not yet
supported by the service.

Signed-off-by: cwasicki <[email protected]>
  • Loading branch information
cwasicki committed Sep 13, 2024
1 parent ab3f06e commit 4c2856e
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 2 deletions.
9 changes: 9 additions & 0 deletions src/frequenz/client/reporting/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ def main() -> None:
help="List of metrics to process",
required=True,
)
parser.add_argument(
"--states",
action="store_true",
help="Include states in the output",
)
parser.add_argument(
"--start",
type=datetime.fromisoformat,
Expand Down Expand Up @@ -66,6 +71,7 @@ def main() -> None:
args.start,
args.end,
args.resolution,
states=args.states,
service_address=args.url,
key=args.key,
fmt=args.format,
Expand All @@ -81,6 +87,7 @@ async def run(
start_dt: datetime,
end_dt: datetime,
resolution: int,
states: bool,
service_address: str,
key: str,
fmt: str,
Expand All @@ -94,6 +101,7 @@ async def run(
start_dt: start datetime
end_dt: end datetime
resolution: resampling resolution in sec
states: include states in the output
service_address: service address
key: API key
fmt: output format
Expand All @@ -120,6 +128,7 @@ def data_iter() -> AsyncIterator[MetricSample]:
start_dt=start_dt,
end_dt=end_dt,
resolution=resolution,
include_states=states,
)

if fmt == "iter":
Expand Down
47 changes: 45 additions & 2 deletions src/frequenz/client/reporting/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"""Client for requests to the Reporting API."""

from collections import namedtuple
from collections.abc import AsyncIterator, Iterator
from collections.abc import AsyncIterator, Iterable, Iterator
from dataclasses import dataclass
from datetime import datetime
from typing import cast
Expand All @@ -15,6 +15,7 @@
from frequenz.api.common.v1.microgrid.microgrid_pb2 import (
MicrogridComponentIDs as PBMicrogridComponentIDs,
)
from frequenz.api.reporting.v1.reporting_pb2 import IncludeOptions as PBIncludeOptions
from frequenz.api.reporting.v1.reporting_pb2 import (
ReceiveMicrogridComponentsDataStreamRequest as PBReceiveMicrogridComponentsDataStreamRequest,
)
Expand Down Expand Up @@ -56,7 +57,10 @@ def is_empty(self) -> bool:
"""
if not self._data_pb.components:
return True
if not self._data_pb.components[0].metric_samples:
if (
not self._data_pb.components[0].metric_samples
and not self._data_pb.components[0].states
):
return True
return False

Expand Down Expand Up @@ -94,6 +98,26 @@ def __iter__(self) -> Iterator[MetricSample]:
metric=met,
value=value,
)
for state in cdata.states:
ts = state.sampled_at.ToDatetime()
for name, category in {
"state": state.states,
"warning": state.warnings,
"error": state.errors,
}.items():
# Skip if the category is not present
if not isinstance(category, Iterable):
continue
# Each category can have multiple states
# that are provided as individual samples
for s in category:
yield MetricSample(
timestamp=ts,
microgrid_id=mid,
component_id=cid,
metric=name,
value=s,
)


class ReportingApiClient(BaseApiClient[ReportingStub, grpcaio.Channel]):
Expand All @@ -120,6 +144,7 @@ async def list_single_component_data(
start_dt: datetime,
end_dt: datetime,
resolution: int | None,
include_states: bool = False,
) -> AsyncIterator[MetricSample]:
"""Iterate over the data for a single metric.
Expand All @@ -130,6 +155,7 @@ async def list_single_component_data(
start_dt: The start date and time.
end_dt: The end date and time.
resolution: The resampling resolution for the data, represented in seconds.
include_states: Whether to include the state data.
Yields:
A named tuple with the following fields:
Expand All @@ -142,6 +168,7 @@ async def list_single_component_data(
start_dt=start_dt,
end_dt=end_dt,
resolution=resolution,
include_states=include_states,
):
for entry in batch:
yield entry
Expand All @@ -155,6 +182,7 @@ async def list_microgrid_components_data(
start_dt: datetime,
end_dt: datetime,
resolution: int | None,
include_states: bool = False,
) -> AsyncIterator[MetricSample]:
"""Iterate over the data for multiple microgrids and components.
Expand All @@ -165,6 +193,7 @@ async def list_microgrid_components_data(
start_dt: The start date and time.
end_dt: The end date and time.
resolution: The resampling resolution for the data, represented in seconds.
include_states: Whether to include the state data.
Yields:
A named tuple with the following fields:
Expand All @@ -180,11 +209,13 @@ async def list_microgrid_components_data(
start_dt=start_dt,
end_dt=end_dt,
resolution=resolution,
include_states=include_states,
):
for entry in batch:
yield entry

# pylint: disable=too-many-arguments
# pylint: disable=too-many-locals
async def _list_microgrid_components_data_batch(
self,
*,
Expand All @@ -193,6 +224,7 @@ async def _list_microgrid_components_data_batch(
start_dt: datetime,
end_dt: datetime,
resolution: int | None,
include_states: bool = False,
) -> AsyncIterator[ComponentsDataBatch]:
"""Iterate over the component data batches in the stream.
Expand All @@ -205,6 +237,7 @@ async def _list_microgrid_components_data_batch(
start_dt: The start date and time.
end_dt: The end date and time.
resolution: The resampling resolution for the data, represented in seconds.
include_states: Whether to include the state data.
Yields:
A ComponentsDataBatch object of microgrid components data.
Expand All @@ -224,9 +257,19 @@ def dt2ts(dt: datetime) -> PBTimestamp:
end=dt2ts(end_dt),
)

incl_states = (
PBIncludeOptions.FilterOption.FILTER_OPTION_INCLUDE
if include_states
else PBIncludeOptions.FilterOption.FILTER_OPTION_EXCLUDE
)
include_options = PBIncludeOptions(
states=incl_states,
)

stream_filter = PBReceiveMicrogridComponentsDataStreamRequest.StreamFilter(
time_filter=time_filter,
resampling_options=PBResamplingOptions(resolution=resolution),
include_options=include_options,
)

metrics_pb = [metric.to_proto() for metric in metrics]
Expand Down

0 comments on commit 4c2856e

Please sign in to comment.