Skip to content

Commit

Permalink
Rework example client
Browse files Browse the repository at this point in the history
Signed-off-by: cwasicki <[email protected]>
  • Loading branch information
cwasicki committed Mar 22, 2024
1 parent adc0b5e commit b125855
Showing 1 changed file with 145 additions and 99 deletions.
244 changes: 145 additions & 99 deletions examples/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import asyncio
from datetime import datetime
from pprint import pprint
from typing import AsyncGenerator
from typing import AsyncIterator

import pandas as pd
from frequenz.client.common.metric import Metric
Expand All @@ -18,122 +18,168 @@
from frequenz.client.reporting._client import MetricSample


# pylint: disable=too-many-locals
async def main(microgrid_id: int, component_id: int) -> None:
def main() -> None:
"""Parse arguments and run the client."""
parser = argparse.ArgumentParser()
parser.add_argument(
"--url",
type=str,
help="URL of the Reporting service",
default="localhost:50051",
)
parser.add_argument("--mid", type=int, help="Microgrid ID", required=True)
parser.add_argument("--cid", type=int, help="Component ID", required=True)
parser.add_argument(
"--metrics",
type=str,
nargs="+",
choices=[e.name for e in Metric],
help="List of metrics to process",
required=True,
)
parser.add_argument(
"--start",
type=datetime.fromisoformat,
help="Start datetime in YYYY-MM-DDTHH:MM:SS format",
required=True,
)
parser.add_argument(
"--end",
type=datetime.fromisoformat,
help="End datetime in YYYY-MM-DDTHH:MM:SS format",
required=True,
)
parser.add_argument("--psize", type=int, help="Page size", default=100)
parser.add_argument(
"--display", choices=["iter", "df", "dict"], help="Display format", default="df"
)
args = parser.parse_args()
asyncio.run(
run(
args.mid,
args.cid,
args.metrics,
args.start,
args.end,
page_size=args.psize,
service_address=args.url,
display=args.display,
)
)


# pylint: disable=too-many-arguments
async def run(
microgrid_id: int,
component_id: int,
metric_names: list[str],
start_dt: datetime,
end_dt: datetime,
page_size: int,
service_address: str,
display: str,
) -> None:
"""Test the ReportingClient.
Args:
microgrid_id: int
component_id: int
microgrid_id: microgrid ID
component_id: component ID
metric_names: list of metric names
start_dt: start datetime
end_dt: end datetime
page_size: page size
service_address: service address
display: display format
Raises:
ValueError: if display format is invalid
"""
service_address = "localhost:50051"
client = ReportingClient(service_address)

microgrid_components = [(microgrid_id, [component_id])]
metrics = [
Metric.DC_POWER,
Metric.DC_CURRENT,
]

start_dt = datetime.fromisoformat("2023-11-21T12:00:00.00+00:00")
end_dt = datetime.fromisoformat("2023-11-21T12:01:00.00+00:00")

page_size = 10

print("########################################################")
print("Iterate over single metric generator")

async for sample in client.iterate_single_metric(
microgrid_id=microgrid_id,
component_id=component_id,
metric=metrics[0],
start_dt=start_dt,
end_dt=end_dt,
page_size=page_size,
):
print("Received:", sample)

###########################################################################
#
# The following code is experimental and demonstrates potential future
# usage of the ReportingClient.
#
###########################################################################

async def components_data_iter() -> AsyncGenerator[MetricSample, None]:
"""Iterate over components data.
Yields:
Single metric sample
metrics = [Metric[mn] for mn in metric_names]

assert len(metrics) == 1, "Only single metric is supported"

def data_iter() -> AsyncIterator[MetricSample]:
"""Iterate over single metric.
Just a wrapper around the client method for readability.
Returns:
Iterator over single metric samples
"""
# pylint: disable=protected-access
async for page in client._iterate_components_data_pages(
microgrid_components=microgrid_components,
metrics=metrics,
return client.iterate_single_metric(
microgrid_id=microgrid_id,
component_id=component_id,
metric=metrics[0],
start_dt=start_dt,
end_dt=end_dt,
page_size=page_size,
):
for entry in page.iterate_metric_samples():
yield entry

async def components_data_dict(
components_data_iter: AsyncGenerator[MetricSample, None]
) -> dict[int, dict[int, dict[datetime, dict[Metric, float]]]]:
"""Convert components data iterator into a single dict.
The nesting structure is:
{
microgrid_id: {
component_id: {
timestamp: {
metric: value
}
)

if display == "iter":
print("########################################################")
print("Iterate over single metric generator")
async for sample in data_iter():
print(sample)

elif display == "dict":
print("########################################################")
print("Dumping all data as a single dict")
dct = await iter_to_dict(data_iter())
pprint(dct)

elif display == "df":
print("########################################################")
print("Turn data into a pandas DataFrame")
data = [cd async for cd in data_iter()]
df = pd.DataFrame(data).set_index("timestamp")
# Set option to display all rows
pd.set_option("display.max_rows", None)
pprint(df)

else:
raise ValueError(f"Invalid display format: {display}")

return


async def iter_to_dict(
components_data_iter: AsyncIterator[MetricSample],
) -> dict[int, dict[int, dict[datetime, dict[Metric, float]]]]:
"""Convert components data iterator into a single dict.
The nesting structure is:
{
microgrid_id: {
component_id: {
timestamp: {
metric: value
}
}
}
}
Args:
components_data_iter: async generator
Returns:
Single dict with with all components data
"""
ret: dict[int, dict[int, dict[datetime, dict[Metric, float]]]] = {}

async for ts, mid, cid, met, value in components_data_iter:
if mid not in ret:
ret[mid] = {}
if cid not in ret[mid]:
ret[mid][cid] = {}
if ts not in ret[mid][cid]:
ret[mid][cid][ts] = {}

ret[mid][cid][ts][met] = value
Args:
components_data_iter: async generator
return ret
Returns:
Single dict with with all components data
"""
ret: dict[int, dict[int, dict[datetime, dict[Metric, float]]]] = {}

print("########################################################")
print("Iterate over generator")
async for msample in components_data_iter():
print("Received:", msample)
async for ts, mid, cid, met, value in components_data_iter:
if mid not in ret:
ret[mid] = {}
if cid not in ret[mid]:
ret[mid][cid] = {}
if ts not in ret[mid][cid]:
ret[mid][cid][ts] = {}

print("########################################################")
print("Dumping all data as a single dict")
dct = await components_data_dict(components_data_iter())
pprint(dct)
ret[mid][cid][ts][met] = value

print("########################################################")
print("Turn data into a pandas DataFrame")
data = [cd async for cd in components_data_iter()]
df = pd.DataFrame(data).set_index("timestamp")
pprint(df)
return ret


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("microgrid_id", type=int, help="Microgrid ID")
parser.add_argument("component_id", type=int, help="Component ID")

args = parser.parse_args()
asyncio.run(main(args.microgrid_id, args.component_id))
main()

0 comments on commit b125855

Please sign in to comment.