diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index a55ff76..e13753d 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -3,7 +3,7 @@ ## Summary This release introduces the initial version of the Reporting API client with support for -retrieving single metric historical data for a single component. +retrieving historical metrics data for a single component. ## Upgrading @@ -12,16 +12,18 @@ retrieving single metric historical data for a single component. ## New Features * Introducing the initial version of the Reporting API client, streamlined for -retrieving single metric historical data for a single component. It incorporates +retrieving historical metrics data for a single component. It incorporates pagination handling and utilizes a wrapper data class that retains the raw protobuf response while offering transformation capabilities limited here to generators of structured data representation via named tuples. -* Current limitations include a single metric focus with plans for extensibility, +* Current limitations include a single component focus with plans for extensibility, ongoing development for states and bounds integration, as well as support for service-side features like resampling, streaming, and formula aggregations. * Code examples are provided to guide users through the basic usage of the client. +The example client is a simple command-line tool that retrieves historical metrics +data for a single component and prints it to the console. ## Bug Fixes diff --git a/examples/client.py b/examples/client.py index 7759d40..ba7e5c3 100644 --- a/examples/client.py +++ b/examples/client.py @@ -7,7 +7,7 @@ import asyncio from datetime import datetime from pprint import pprint -from typing import AsyncGenerator +from typing import AsyncIterator import pandas as pd from frequenz.client.common.metric import Metric @@ -18,122 +18,166 @@ from frequenz.client.reporting._client import MetricSample -# pylint: disable=too-many-locals -async def main(microgrid_id: int, component_id: int) -> None: +def main() -> None: + """Parse arguments and run the client.""" + parser = argparse.ArgumentParser() + parser.add_argument( + "--url", + type=str, + help="URL of the Reporting service", + default="localhost:50051", + ) + parser.add_argument("--mid", type=int, help="Microgrid ID", required=True) + parser.add_argument("--cid", type=int, help="Component ID", required=True) + parser.add_argument( + "--metrics", + type=str, + nargs="+", + choices=[e.name for e in Metric], + help="List of metrics to process", + required=True, + ) + parser.add_argument( + "--start", + type=datetime.fromisoformat, + help="Start datetime in YYYY-MM-DDTHH:MM:SS format", + required=True, + ) + parser.add_argument( + "--end", + type=datetime.fromisoformat, + help="End datetime in YYYY-MM-DDTHH:MM:SS format", + required=True, + ) + parser.add_argument("--psize", type=int, help="Page size", default=100) + parser.add_argument( + "--display", choices=["iter", "df", "dict"], help="Display format", default="df" + ) + args = parser.parse_args() + asyncio.run( + run( + args.mid, + args.cid, + args.metrics, + args.start, + args.end, + page_size=args.psize, + service_address=args.url, + display=args.display, + ) + ) + + +# pylint: disable=too-many-arguments +async def run( + microgrid_id: int, + component_id: int, + metric_names: list[str], + start_dt: datetime, + end_dt: datetime, + page_size: int, + service_address: str, + display: str, +) -> None: """Test the ReportingClient. Args: - microgrid_id: int - component_id: int + microgrid_id: microgrid ID + component_id: component ID + metric_names: list of metric names + start_dt: start datetime + end_dt: end datetime + page_size: page size + service_address: service address + display: display format + + Raises: + ValueError: if display format is invalid """ - service_address = "localhost:50051" client = ReportingClient(service_address) - microgrid_components = [(microgrid_id, [component_id])] - metrics = [ - Metric.DC_POWER, - Metric.DC_CURRENT, - ] - - start_dt = datetime.fromisoformat("2023-11-21T12:00:00.00+00:00") - end_dt = datetime.fromisoformat("2023-11-21T12:01:00.00+00:00") - - page_size = 10 - - print("########################################################") - print("Iterate over single metric generator") - - async for sample in client.iterate_single_metric( - microgrid_id=microgrid_id, - component_id=component_id, - metric=metrics[0], - start_dt=start_dt, - end_dt=end_dt, - page_size=page_size, - ): - print("Received:", sample) - - ########################################################################### - # - # The following code is experimental and demonstrates potential future - # usage of the ReportingClient. - # - ########################################################################### - - async def components_data_iter() -> AsyncGenerator[MetricSample, None]: - """Iterate over components data. - - Yields: - Single metric sample + metrics = [Metric[mn] for mn in metric_names] + + def data_iter() -> AsyncIterator[MetricSample]: + """Iterate over single metric. + + Just a wrapper around the client method for readability. + + Returns: + Iterator over single metric samples """ - # pylint: disable=protected-access - async for page in client._iterate_components_data_pages( - microgrid_components=microgrid_components, + return client.iterate_single_component( + microgrid_id=microgrid_id, + component_id=component_id, metrics=metrics, start_dt=start_dt, end_dt=end_dt, page_size=page_size, - ): - for entry in page.iterate_metric_samples(): - yield entry - - async def components_data_dict( - components_data_iter: AsyncGenerator[MetricSample, None] - ) -> dict[int, dict[int, dict[datetime, dict[Metric, float]]]]: - """Convert components data iterator into a single dict. - - The nesting structure is: - { - microgrid_id: { - component_id: { - timestamp: { - metric: value - } + ) + + if display == "iter": + print("########################################################") + print("Iterate over single metric generator") + async for sample in data_iter(): + print(sample) + + elif display == "dict": + print("########################################################") + print("Dumping all data as a single dict") + dct = await iter_to_dict(data_iter()) + pprint(dct) + + elif display == "df": + print("########################################################") + print("Turn data into a pandas DataFrame") + data = [cd async for cd in data_iter()] + df = pd.DataFrame(data).set_index("timestamp") + # Set option to display all rows + pd.set_option("display.max_rows", None) + pprint(df) + + else: + raise ValueError(f"Invalid display format: {display}") + + return + + +async def iter_to_dict( + components_data_iter: AsyncIterator[MetricSample], +) -> dict[int, dict[int, dict[datetime, dict[Metric, float]]]]: + """Convert components data iterator into a single dict. + + The nesting structure is: + { + microgrid_id: { + component_id: { + timestamp: { + metric: value } } } + } - Args: - components_data_iter: async generator - - Returns: - Single dict with with all components data - """ - ret: dict[int, dict[int, dict[datetime, dict[Metric, float]]]] = {} - - async for ts, mid, cid, met, value in components_data_iter: - if mid not in ret: - ret[mid] = {} - if cid not in ret[mid]: - ret[mid][cid] = {} - if ts not in ret[mid][cid]: - ret[mid][cid][ts] = {} - - ret[mid][cid][ts][met] = value + Args: + components_data_iter: async generator - return ret + Returns: + Single dict with with all components data + """ + ret: dict[int, dict[int, dict[datetime, dict[Metric, float]]]] = {} - print("########################################################") - print("Iterate over generator") - async for msample in components_data_iter(): - print("Received:", msample) + async for ts, mid, cid, met, value in components_data_iter: + if mid not in ret: + ret[mid] = {} + if cid not in ret[mid]: + ret[mid][cid] = {} + if ts not in ret[mid][cid]: + ret[mid][cid][ts] = {} - print("########################################################") - print("Dumping all data as a single dict") - dct = await components_data_dict(components_data_iter()) - pprint(dct) + ret[mid][cid][ts][met] = value - print("########################################################") - print("Turn data into a pandas DataFrame") - data = [cd async for cd in components_data_iter()] - df = pd.DataFrame(data).set_index("timestamp") - pprint(df) + return ret if __name__ == "__main__": - parser = argparse.ArgumentParser() - parser.add_argument("microgrid_id", type=int, help="Microgrid ID") - parser.add_argument("component_id", type=int, help="Component ID") - - args = parser.parse_args() - asyncio.run(main(args.microgrid_id, args.component_id)) + main() diff --git a/src/frequenz/client/reporting/_client.py b/src/frequenz/client/reporting/_client.py index 772c10b..ee0a8dd 100644 --- a/src/frequenz/client/reporting/_client.py +++ b/src/frequenz/client/reporting/_client.py @@ -31,9 +31,6 @@ # pylint: enable=no-name-in-module -Sample = namedtuple("Sample", ["timestamp", "value"]) -"""Type for a sample of a time series.""" - MetricSample = namedtuple( "MetricSample", ["timestamp", "microgrid_id", "component_id", "metric", "value"] ) @@ -120,22 +117,22 @@ def __init__(self, service_address: str): self._stub = ReportingStub(self._grpc_channel) # pylint: disable=too-many-arguments - async def iterate_single_metric( + async def iterate_single_component( self, *, microgrid_id: int, component_id: int, - metric: Metric, + metrics: Metric | list[Metric], start_dt: datetime, end_dt: datetime, page_size: int = 1000, - ) -> AsyncIterator[Sample]: + ) -> AsyncIterator[MetricSample]: """Iterate over the data for a single metric. Args: microgrid_id: The microgrid ID. component_id: The component ID. - metric: The metric name. + metrics: The metric name or list of metric names. start_dt: The start date and time. end_dt: The end date and time. page_size: The page size. @@ -147,13 +144,13 @@ async def iterate_single_metric( """ async for page in self._iterate_components_data_pages( microgrid_components=[(microgrid_id, [component_id])], - metrics=[metric], + metrics=[metrics] if isinstance(metrics, Metric) else metrics, start_dt=start_dt, end_dt=end_dt, page_size=page_size, ): for entry in page.iterate_metric_samples(): - yield Sample(timestamp=entry.timestamp, value=entry.value) + yield entry # pylint: disable=too-many-arguments async def _iterate_components_data_pages(