Skip to content

Commit

Permalink
Add support for states in client
Browse files Browse the repository at this point in the history
  • Loading branch information
cwasicki committed Sep 13, 2024
1 parent be1bb45 commit 7f5cca8
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 1 deletion.
9 changes: 9 additions & 0 deletions src/frequenz/client/reporting/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ def main() -> None:
help="List of metrics to process",
required=True,
)
parser.add_argument(
"--states",
action="store_true",
help="Include states in the output",
)
parser.add_argument(
"--start",
type=datetime.fromisoformat,
Expand Down Expand Up @@ -66,6 +71,7 @@ def main() -> None:
args.start,
args.end,
args.resolution,
states=args.states,
service_address=args.url,
key=args.key,
fmt=args.format,
Expand All @@ -81,6 +87,7 @@ async def run(
start_dt: datetime,
end_dt: datetime,
resolution: int,
states: bool,
service_address: str,
key: str,
fmt: str,
Expand All @@ -94,6 +101,7 @@ async def run(
start_dt: start datetime
end_dt: end datetime
resolution: resampling resolution in sec
states: include states in the output
service_address: service address
key: API key
fmt: output format
Expand All @@ -120,6 +128,7 @@ def data_iter() -> AsyncIterator[MetricSample]:
start_dt=start_dt,
end_dt=end_dt,
resolution=resolution,
include_states=states,
)

if fmt == "iter":
Expand Down
40 changes: 39 additions & 1 deletion src/frequenz/client/reporting/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from frequenz.api.common.v1.microgrid.microgrid_pb2 import (
MicrogridComponentIDs as PBMicrogridComponentIDs,
)
from frequenz.api.reporting.v1.reporting_pb2 import IncludeOptions as PBIncludeOptions
from frequenz.api.reporting.v1.reporting_pb2 import (
ReceiveMicrogridComponentsDataStreamRequest as PBReceiveMicrogridComponentsDataStreamRequest,
)
Expand Down Expand Up @@ -56,7 +57,10 @@ def is_empty(self) -> bool:
"""
if not self._data_pb.components:
return True
if not self._data_pb.components[0].metric_samples:
if (
not self._data_pb.components[0].metric_samples
and not self._data_pb.components[0].states
):
return True
return False

Expand Down Expand Up @@ -94,6 +98,21 @@ def __iter__(self) -> Iterator[MetricSample]:
metric=met,
value=value,
)
for state in cdata.states:
ts = state.sampled_at.ToDatetime()
for name, category in {
"state": state.states,
"warning": state.warnings,
"error": state.errors,
}.items():
for s in category:
yield MetricSample(
timestamp=ts,
microgrid_id=mid,
component_id=cid,
metric=name,
value=s,
)


class ReportingApiClient(BaseApiClient[ReportingStub, grpcaio.Channel]):
Expand All @@ -120,6 +139,7 @@ async def list_single_component_data(
start_dt: datetime,
end_dt: datetime,
resolution: int | None,
include_states: bool = False,
) -> AsyncIterator[MetricSample]:
"""Iterate over the data for a single metric.
Expand All @@ -130,6 +150,7 @@ async def list_single_component_data(
start_dt: The start date and time.
end_dt: The end date and time.
resolution: The resampling resolution for the data, represented in seconds.
include_states: Whether to include the state data.
Yields:
A named tuple with the following fields:
Expand All @@ -142,6 +163,7 @@ async def list_single_component_data(
start_dt=start_dt,
end_dt=end_dt,
resolution=resolution,
include_states=include_states,
):
for entry in batch:
yield entry
Expand All @@ -155,6 +177,7 @@ async def list_microgrid_components_data(
start_dt: datetime,
end_dt: datetime,
resolution: int | None,
include_states: bool = False,
) -> AsyncIterator[MetricSample]:
"""Iterate over the data for multiple microgrids and components.
Expand All @@ -165,6 +188,7 @@ async def list_microgrid_components_data(
start_dt: The start date and time.
end_dt: The end date and time.
resolution: The resampling resolution for the data, represented in seconds.
include_states: Whether to include the state data.
Yields:
A named tuple with the following fields:
Expand All @@ -180,11 +204,13 @@ async def list_microgrid_components_data(
start_dt=start_dt,
end_dt=end_dt,
resolution=resolution,
include_states=include_states,
):
for entry in batch:
yield entry

# pylint: disable=too-many-arguments
# pylint: disable=too-many-locals
async def _list_microgrid_components_data_batch(
self,
*,
Expand All @@ -193,6 +219,7 @@ async def _list_microgrid_components_data_batch(
start_dt: datetime,
end_dt: datetime,
resolution: int | None,
include_states: bool = False,
) -> AsyncIterator[ComponentsDataBatch]:
"""Iterate over the component data batches in the stream.
Expand All @@ -205,6 +232,7 @@ async def _list_microgrid_components_data_batch(
start_dt: The start date and time.
end_dt: The end date and time.
resolution: The resampling resolution for the data, represented in seconds.
include_states: Whether to include the state data.
Yields:
A ComponentsDataBatch object of microgrid components data.
Expand All @@ -224,9 +252,19 @@ def dt2ts(dt: datetime) -> PBTimestamp:
end=dt2ts(end_dt),
)

incl_states = (
PBIncludeOptions.FilterOption.FILTER_OPTION_INCLUDE
if include_states
else PBIncludeOptions.FilterOption.FILTER_OPTION_EXCLUDE
)
include_options = PBIncludeOptions(
states=incl_states,
)

stream_filter = PBReceiveMicrogridComponentsDataStreamRequest.StreamFilter(
time_filter=time_filter,
resampling_options=PBResamplingOptions(resolution=resolution),
include_options=include_options,
)

metrics_pb = [metric.to_proto() for metric in metrics]
Expand Down

0 comments on commit 7f5cca8

Please sign in to comment.