Skip to content

Commit

Permalink
Use streaming endpoint in list_microgrid_components_data
Browse files Browse the repository at this point in the history
The streaming endpoint supports requests for historical data. This
replaces the list endpoint with the streaming endpoint. The names in the
client are for now unchanged to not break user code. They might be
revised when live streaming will also be supported by the service.

The list endpoint will most likely be deprecated and removed at some point.

Signed-off-by: cwasicki <[email protected]>
  • Loading branch information
cwasicki committed Jun 13, 2024
1 parent d01570b commit e6da30c
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 129 deletions.
2 changes: 2 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

## Upgrading

* The client now uses the streaming endpoint for historical data requests. The page size parameter is no longer required.

## New Features

## Bug Fixes
Expand Down
4 changes: 0 additions & 4 deletions examples/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ def main() -> None:
args.start,
args.end,
args.resolution,
page_size=args.psize,
service_address=args.url,
key=args.key,
display=args.display,
Expand All @@ -85,7 +84,6 @@ async def run(
start_dt: datetime,
end_dt: datetime,
resolution: int,
page_size: int,
service_address: str,
key: str,
display: str,
Expand All @@ -99,7 +97,6 @@ async def run(
start_dt: start datetime
end_dt: end datetime
resolution: resampling resolution in sec
page_size: page size
service_address: service address
key: API key
display: display format
Expand All @@ -126,7 +123,6 @@ def data_iter() -> AsyncIterator[MetricSample]:
start_dt=start_dt,
end_dt=end_dt,
resolution=resolution,
page_size=page_size,
)

if display == "iter":
Expand Down
166 changes: 52 additions & 114 deletions src/frequenz/client/reporting/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,19 @@
from collections import namedtuple
from dataclasses import dataclass
from datetime import datetime
from typing import Any, AsyncIterator, Awaitable, Iterator, Type, cast
from typing import Any, AsyncIterator, Iterator, Type

import grpc.aio as grpcaio

# pylint: disable=no-name-in-module
from frequenz.api.common.v1.metrics.metric_sample_pb2 import Metric as PBMetric
from frequenz.api.common.v1.microgrid.microgrid_pb2 import (
MicrogridComponentIDs as PBMicrogridComponentIDs,
)
from frequenz.api.common.v1.pagination.pagination_params_pb2 import (
PaginationParams as PBPaginationParams,
)
from frequenz.api.reporting.v1.reporting_pb2 import (
ListMicrogridComponentsDataRequest as PBListMicrogridComponentsDataRequest,
ReceiveMicrogridComponentsDataStreamRequest as PBReceiveMicrogridComponentsDataStreamRequest,
)
from frequenz.api.reporting.v1.reporting_pb2 import (
ListMicrogridComponentsDataResponse as PBListMicrogridComponentsDataResponse,
ReceiveMicrogridComponentsDataStreamResponse as PBReceiveMicrogridComponentsDataStreamResponse,
)
from frequenz.api.reporting.v1.reporting_pb2 import (
ResamplingOptions as PBResamplingOptions,
Expand All @@ -46,28 +42,26 @@


@dataclass(frozen=True)
class ComponentsDataPage:
"""A page of microgrid components data returned by the Reporting service."""
class ComponentsDataBatch:
"""A batch of components data for a single microgrid returned by the Reporting service."""

_data_pb: PBListMicrogridComponentsDataResponse
_data_pb: PBReceiveMicrogridComponentsDataStreamResponse
"""The underlying protobuf message."""

def is_empty(self) -> bool:
"""Check if the page contains valid data.
"""Check if the batch contains valid data.
Returns:
True if the page contains no valid data.
True if the batch contains no valid data.
"""
if not self._data_pb.microgrids:
return True
if not self._data_pb.microgrids[0].components:
if not self._data_pb.components:
return True
if not self._data_pb.microgrids[0].components[0].metric_samples:
if not self._data_pb.components[0].metric_samples:
return True
return False

def __iter__(self) -> Iterator[MetricSample]:
"""Get generator that iterates over all values in the page.
"""Get generator that iterates over all values in the batch.
Note: So far only `SimpleMetricSample` in the `MetricSampleVariant`
message is supported.
Expand All @@ -82,34 +76,24 @@ def __iter__(self) -> Iterator[MetricSample]:
* value: The metric value.
"""
data = self._data_pb
for mdata in data.microgrids:
mid = mdata.microgrid_id
for cdata in mdata.components:
cid = cdata.component_id
for msample in cdata.metric_samples:
ts = msample.sampled_at.ToDatetime()
met = Metric.from_proto(msample.metric).name
value = (
msample.value.simple_metric.value
if msample.value.simple_metric
else None
)
yield MetricSample(
timestamp=ts,
microgrid_id=mid,
component_id=cid,
metric=met,
value=value,
)

@property
def next_page_token(self) -> str | None:
"""Get the token for the next page of data.
Returns:
The token for the next page of data.
"""
return self._data_pb.pagination_info.next_page_token
mid = data.microgrid_id
for cdata in data.components:
cid = cdata.component_id
for msample in cdata.metric_samples:
ts = msample.sampled_at.ToDatetime()
met = Metric.from_proto(msample.metric).name
value = (
msample.value.simple_metric.value
if msample.value.simple_metric
else None
)
yield MetricSample(
timestamp=ts,
microgrid_id=mid,
component_id=cid,
metric=met,
value=value,
)


class ReportingApiClient:
Expand All @@ -136,7 +120,6 @@ async def list_single_component_data(
start_dt: datetime,
end_dt: datetime,
resolution: int | None,
page_size: int = 1000,
) -> AsyncIterator[MetricSample]:
"""Iterate over the data for a single metric.
Expand All @@ -147,22 +130,20 @@ async def list_single_component_data(
start_dt: The start date and time.
end_dt: The end date and time.
resolution: The resampling resolution for the data, represented in seconds.
page_size: The page size.
Yields:
A named tuple with the following fields:
* timestamp: The timestamp of the metric sample.
* value: The metric value.
"""
async for page in self._list_microgrid_components_data_pages(
async for batch in self._list_microgrid_components_data_batch(
microgrid_components=[(microgrid_id, [component_id])],
metrics=[metrics] if isinstance(metrics, Metric) else metrics,
start_dt=start_dt,
end_dt=end_dt,
resolution=resolution,
page_size=page_size,
):
for entry in page:
for entry in batch:
yield entry

# pylint: disable=too-many-arguments
Expand All @@ -174,7 +155,6 @@ async def list_microgrid_components_data(
start_dt: datetime,
end_dt: datetime,
resolution: int | None,
page_size: int = 1000,
) -> AsyncIterator[MetricSample]:
"""Iterate over the data for multiple microgrids and components.
Expand All @@ -185,7 +165,6 @@ async def list_microgrid_components_data(
start_dt: The start date and time.
end_dt: The end date and time.
resolution: The resampling resolution for the data, represented in seconds.
page_size: The page size.
Yields:
A named tuple with the following fields:
Expand All @@ -195,31 +174,29 @@ async def list_microgrid_components_data(
* timestamp: The timestamp of the metric sample.
* value: The metric value.
"""
async for page in self._list_microgrid_components_data_pages(
async for batch in self._list_microgrid_components_data_batch(
microgrid_components=microgrid_components,
metrics=[metrics] if isinstance(metrics, Metric) else metrics,
start_dt=start_dt,
end_dt=end_dt,
resolution=resolution,
page_size=page_size,
):
for entry in page:
for entry in batch:
yield entry

# pylint: disable=too-many-arguments
async def _list_microgrid_components_data_pages(
async def _list_microgrid_components_data_batch(
self,
*,
microgrid_components: list[tuple[int, list[int]]],
metrics: list[Metric],
start_dt: datetime,
end_dt: datetime,
resolution: int | None,
page_size: int,
) -> AsyncIterator[ComponentsDataPage]:
"""Iterate over the pages of microgrid components data.
) -> AsyncIterator[ComponentsDataBatch]:
"""Iterate over the component data batches in the stream.
Note: This does not yet support resampling or aggregating the data. It
Note: This does not yet support aggregating the data. It
also does not yet support fetching bound and state data.
Args:
Expand All @@ -228,10 +205,9 @@ async def _list_microgrid_components_data_pages(
start_dt: The start date and time.
end_dt: The end date and time.
resolution: The resampling resolution for the data, represented in seconds.
page_size: The page size.
Yields:
A ComponentsDataPage object of microgrid components data.
A ComponentsDataBatch object of microgrid components data.
"""
microgrid_components_pb = [
PBMicrogridComponentIDs(microgrid_id=mid, component_ids=cids)
Expand All @@ -248,71 +224,33 @@ def dt2ts(dt: datetime) -> PBTimestamp:
end=dt2ts(end_dt),
)

list_filter = PBListMicrogridComponentsDataRequest.ListFilter(
list_filter = PBReceiveMicrogridComponentsDataStreamRequest.StreamFilter(
time_filter=time_filter,
resampling_options=PBResamplingOptions(resolution=resolution),
)

metrics_pb = [metric.to_proto() for metric in metrics]

page_token = None

while True:
pagination_params = PBPaginationParams(
page_size=page_size, page_token=page_token
)
request = PBReceiveMicrogridComponentsDataStreamRequest(
microgrid_components=microgrid_components_pb,
metrics=metrics_pb,
filter=list_filter,
)

response = await self._fetch_page(
microgrid_components=microgrid_components_pb,
metrics=metrics_pb,
list_filter=list_filter,
pagination_params=pagination_params,
try:
stream = self._stub.ReceiveMicrogridComponentsDataStream(
request, metadata=self._metadata
)
if not response or response.is_empty():
break

yield response

page_token = response.next_page_token
if not page_token:
break
async for response in stream:
if not response:
break

async def _fetch_page(
self,
*,
microgrid_components: list[PBMicrogridComponentIDs],
metrics: list[PBMetric.ValueType],
list_filter: PBListMicrogridComponentsDataRequest.ListFilter,
pagination_params: PBPaginationParams,
) -> ComponentsDataPage | None:
"""Fetch a single page of microgrid components data.
Args:
microgrid_components: A list of microgrid components.
metrics: A list of metrics.
list_filter: A list filter.
pagination_params: A pagination params.
yield ComponentsDataBatch(response)

Returns:
A ComponentsDataPage object of microgrid components data.
"""
try:
request = PBListMicrogridComponentsDataRequest(
microgrid_components=microgrid_components,
metrics=metrics,
filter=list_filter,
pagination_params=pagination_params,
)
response = await cast(
Awaitable[PBListMicrogridComponentsDataResponse],
self._stub.ListMicrogridComponentsData(
request, metadata=self._metadata
),
)
except grpcaio.AioRpcError as e:
print(f"RPC failed: {e}")
return None
return ComponentsDataPage(response)
return

async def close(self) -> None:
"""Close the client and cancel any pending requests immediately."""
Expand Down
21 changes: 10 additions & 11 deletions tests/test_client_reporting.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import pytest

from frequenz.client.reporting import ReportingApiClient
from frequenz.client.reporting._client import ComponentsDataPage
from frequenz.client.reporting._client import ComponentsDataBatch


@pytest.fixture
Expand All @@ -25,19 +25,18 @@ async def test_client_initialization(mock_channel: MagicMock) -> None:
mock_channel.assert_called_once_with("localhost:50051")


def test_components_data_page_is_empty_true() -> None:
def test_components_data_batch_is_empty_true() -> None:
"""Test that the is_empty method returns True when the page is empty."""
data_pb = MagicMock()
data_pb.microgrids = []
page = ComponentsDataPage(_data_pb=data_pb)
assert page.is_empty() is True
data_pb.components = []
batch = ComponentsDataBatch(_data_pb=data_pb)
assert batch.is_empty() is True


def test_components_data_page_is_empty_false() -> None:
def test_components_data_batch_is_empty_false() -> None:
"""Test that the is_empty method returns False when the page is not empty."""
data_pb = MagicMock()
data_pb.microgrids = [MagicMock()]
data_pb.microgrids[0].components = [MagicMock()]
data_pb.microgrids[0].components[0].metric_samples = [MagicMock()]
page = ComponentsDataPage(_data_pb=data_pb)
assert page.is_empty() is False
data_pb.components = [MagicMock()]
data_pb.components[0].metric_samples = [MagicMock()]
batch = ComponentsDataBatch(_data_pb=data_pb)
assert batch.is_empty() is False

0 comments on commit e6da30c

Please sign in to comment.