From b125855efe63ca5dfb6a9d3dec814bd12f04f0bf Mon Sep 17 00:00:00 2001 From: cwasicki <126617870+cwasicki@users.noreply.github.com> Date: Mon, 18 Mar 2024 22:47:01 +0100 Subject: [PATCH] Rework example client Signed-off-by: cwasicki <126617870+cwasicki@users.noreply.github.com> --- examples/client.py | 244 +++++++++++++++++++++++++++------------------ 1 file changed, 145 insertions(+), 99 deletions(-) diff --git a/examples/client.py b/examples/client.py index 7759d40..62d64a9 100644 --- a/examples/client.py +++ b/examples/client.py @@ -7,7 +7,7 @@ import asyncio from datetime import datetime from pprint import pprint -from typing import AsyncGenerator +from typing import AsyncIterator import pandas as pd from frequenz.client.common.metric import Metric @@ -18,122 +18,168 @@ from frequenz.client.reporting._client import MetricSample -# pylint: disable=too-many-locals -async def main(microgrid_id: int, component_id: int) -> None: +def main() -> None: + """Parse arguments and run the client.""" + parser = argparse.ArgumentParser() + parser.add_argument( + "--url", + type=str, + help="URL of the Reporting service", + default="localhost:50051", + ) + parser.add_argument("--mid", type=int, help="Microgrid ID", required=True) + parser.add_argument("--cid", type=int, help="Component ID", required=True) + parser.add_argument( + "--metrics", + type=str, + nargs="+", + choices=[e.name for e in Metric], + help="List of metrics to process", + required=True, + ) + parser.add_argument( + "--start", + type=datetime.fromisoformat, + help="Start datetime in YYYY-MM-DDTHH:MM:SS format", + required=True, + ) + parser.add_argument( + "--end", + type=datetime.fromisoformat, + help="End datetime in YYYY-MM-DDTHH:MM:SS format", + required=True, + ) + parser.add_argument("--psize", type=int, help="Page size", default=100) + parser.add_argument( + "--display", choices=["iter", "df", "dict"], help="Display format", default="df" + ) + args = parser.parse_args() + asyncio.run( + run( + args.mid, + args.cid, + args.metrics, + args.start, + args.end, + page_size=args.psize, + service_address=args.url, + display=args.display, + ) + ) + + +# pylint: disable=too-many-arguments +async def run( + microgrid_id: int, + component_id: int, + metric_names: list[str], + start_dt: datetime, + end_dt: datetime, + page_size: int, + service_address: str, + display: str, +) -> None: """Test the ReportingClient. Args: - microgrid_id: int - component_id: int + microgrid_id: microgrid ID + component_id: component ID + metric_names: list of metric names + start_dt: start datetime + end_dt: end datetime + page_size: page size + service_address: service address + display: display format + + Raises: + ValueError: if display format is invalid """ - service_address = "localhost:50051" client = ReportingClient(service_address) - microgrid_components = [(microgrid_id, [component_id])] - metrics = [ - Metric.DC_POWER, - Metric.DC_CURRENT, - ] - - start_dt = datetime.fromisoformat("2023-11-21T12:00:00.00+00:00") - end_dt = datetime.fromisoformat("2023-11-21T12:01:00.00+00:00") - - page_size = 10 - - print("########################################################") - print("Iterate over single metric generator") - - async for sample in client.iterate_single_metric( - microgrid_id=microgrid_id, - component_id=component_id, - metric=metrics[0], - start_dt=start_dt, - end_dt=end_dt, - page_size=page_size, - ): - print("Received:", sample) - - ########################################################################### - # - # The following code is experimental and demonstrates potential future - # usage of the ReportingClient. - # - ########################################################################### - - async def components_data_iter() -> AsyncGenerator[MetricSample, None]: - """Iterate over components data. - - Yields: - Single metric sample + metrics = [Metric[mn] for mn in metric_names] + + assert len(metrics) == 1, "Only single metric is supported" + + def data_iter() -> AsyncIterator[MetricSample]: + """Iterate over single metric. + + Just a wrapper around the client method for readability. + + Returns: + Iterator over single metric samples """ - # pylint: disable=protected-access - async for page in client._iterate_components_data_pages( - microgrid_components=microgrid_components, - metrics=metrics, + return client.iterate_single_metric( + microgrid_id=microgrid_id, + component_id=component_id, + metric=metrics[0], start_dt=start_dt, end_dt=end_dt, page_size=page_size, - ): - for entry in page.iterate_metric_samples(): - yield entry - - async def components_data_dict( - components_data_iter: AsyncGenerator[MetricSample, None] - ) -> dict[int, dict[int, dict[datetime, dict[Metric, float]]]]: - """Convert components data iterator into a single dict. - - The nesting structure is: - { - microgrid_id: { - component_id: { - timestamp: { - metric: value - } + ) + + if display == "iter": + print("########################################################") + print("Iterate over single metric generator") + async for sample in data_iter(): + print(sample) + + elif display == "dict": + print("########################################################") + print("Dumping all data as a single dict") + dct = await iter_to_dict(data_iter()) + pprint(dct) + + elif display == "df": + print("########################################################") + print("Turn data into a pandas DataFrame") + data = [cd async for cd in data_iter()] + df = pd.DataFrame(data).set_index("timestamp") + # Set option to display all rows + pd.set_option("display.max_rows", None) + pprint(df) + + else: + raise ValueError(f"Invalid display format: {display}") + + return + + +async def iter_to_dict( + components_data_iter: AsyncIterator[MetricSample], +) -> dict[int, dict[int, dict[datetime, dict[Metric, float]]]]: + """Convert components data iterator into a single dict. + + The nesting structure is: + { + microgrid_id: { + component_id: { + timestamp: { + metric: value } } } + } - Args: - components_data_iter: async generator - - Returns: - Single dict with with all components data - """ - ret: dict[int, dict[int, dict[datetime, dict[Metric, float]]]] = {} - - async for ts, mid, cid, met, value in components_data_iter: - if mid not in ret: - ret[mid] = {} - if cid not in ret[mid]: - ret[mid][cid] = {} - if ts not in ret[mid][cid]: - ret[mid][cid][ts] = {} - - ret[mid][cid][ts][met] = value + Args: + components_data_iter: async generator - return ret + Returns: + Single dict with with all components data + """ + ret: dict[int, dict[int, dict[datetime, dict[Metric, float]]]] = {} - print("########################################################") - print("Iterate over generator") - async for msample in components_data_iter(): - print("Received:", msample) + async for ts, mid, cid, met, value in components_data_iter: + if mid not in ret: + ret[mid] = {} + if cid not in ret[mid]: + ret[mid][cid] = {} + if ts not in ret[mid][cid]: + ret[mid][cid][ts] = {} - print("########################################################") - print("Dumping all data as a single dict") - dct = await components_data_dict(components_data_iter()) - pprint(dct) + ret[mid][cid][ts][met] = value - print("########################################################") - print("Turn data into a pandas DataFrame") - data = [cd async for cd in components_data_iter()] - df = pd.DataFrame(data).set_index("timestamp") - pprint(df) + return ret if __name__ == "__main__": - parser = argparse.ArgumentParser() - parser.add_argument("microgrid_id", type=int, help="Microgrid ID") - parser.add_argument("component_id", type=int, help="Component ID") - - args = parser.parse_args() - asyncio.run(main(args.microgrid_id, args.component_id)) + main()