Skip to content

Commit

Permalink
Add support for requesting multiple metrics (frequenz-floss#26)
Browse files Browse the repository at this point in the history
This add support to request multiple metrics in a single request. The
limitation to query data only from a single component is still in place.
Moreover the distinct return types when querying single or multiple
metrics is removed.

In addition to that, this turns the example client into a command line
tool that can be used to extract data from the reporting API and print
it to stdout. Besides option for selecting the data, the service address
and the display format can be changed via command line.
  • Loading branch information
cwasicki authored Mar 23, 2024
2 parents 9b830c8 + 5aec06d commit 90daf45
Show file tree
Hide file tree
Showing 3 changed files with 153 additions and 110 deletions.
8 changes: 5 additions & 3 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
## Summary

This release introduces the initial version of the Reporting API client with support for
retrieving single metric historical data for a single component.
retrieving historical metrics data for a single component.

## Upgrading

Expand All @@ -12,16 +12,18 @@ retrieving single metric historical data for a single component.
## New Features

* Introducing the initial version of the Reporting API client, streamlined for
retrieving single metric historical data for a single component. It incorporates
retrieving historical metrics data for a single component. It incorporates
pagination handling and utilizes a wrapper data class that retains the raw
protobuf response while offering transformation capabilities limited here
to generators of structured data representation via named tuples.

* Current limitations include a single metric focus with plans for extensibility,
* Current limitations include a single component focus with plans for extensibility,
ongoing development for states and bounds integration, as well as support for
service-side features like resampling, streaming, and formula aggregations.

* Code examples are provided to guide users through the basic usage of the client.
The example client is a simple command-line tool that retrieves historical metrics
data for a single component and prints it to the console.


## Bug Fixes
Expand Down
240 changes: 142 additions & 98 deletions examples/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import asyncio
from datetime import datetime
from pprint import pprint
from typing import AsyncGenerator
from typing import AsyncIterator

import pandas as pd
from frequenz.client.common.metric import Metric
Expand All @@ -18,122 +18,166 @@
from frequenz.client.reporting._client import MetricSample


# pylint: disable=too-many-locals
async def main(microgrid_id: int, component_id: int) -> None:
def main() -> None:
"""Parse arguments and run the client."""
parser = argparse.ArgumentParser()
parser.add_argument(
"--url",
type=str,
help="URL of the Reporting service",
default="localhost:50051",
)
parser.add_argument("--mid", type=int, help="Microgrid ID", required=True)
parser.add_argument("--cid", type=int, help="Component ID", required=True)
parser.add_argument(
"--metrics",
type=str,
nargs="+",
choices=[e.name for e in Metric],
help="List of metrics to process",
required=True,
)
parser.add_argument(
"--start",
type=datetime.fromisoformat,
help="Start datetime in YYYY-MM-DDTHH:MM:SS format",
required=True,
)
parser.add_argument(
"--end",
type=datetime.fromisoformat,
help="End datetime in YYYY-MM-DDTHH:MM:SS format",
required=True,
)
parser.add_argument("--psize", type=int, help="Page size", default=100)
parser.add_argument(
"--display", choices=["iter", "df", "dict"], help="Display format", default="df"
)
args = parser.parse_args()
asyncio.run(
run(
args.mid,
args.cid,
args.metrics,
args.start,
args.end,
page_size=args.psize,
service_address=args.url,
display=args.display,
)
)


# pylint: disable=too-many-arguments
async def run(
microgrid_id: int,
component_id: int,
metric_names: list[str],
start_dt: datetime,
end_dt: datetime,
page_size: int,
service_address: str,
display: str,
) -> None:
"""Test the ReportingClient.
Args:
microgrid_id: int
component_id: int
microgrid_id: microgrid ID
component_id: component ID
metric_names: list of metric names
start_dt: start datetime
end_dt: end datetime
page_size: page size
service_address: service address
display: display format
Raises:
ValueError: if display format is invalid
"""
service_address = "localhost:50051"
client = ReportingClient(service_address)

microgrid_components = [(microgrid_id, [component_id])]
metrics = [
Metric.DC_POWER,
Metric.DC_CURRENT,
]

start_dt = datetime.fromisoformat("2023-11-21T12:00:00.00+00:00")
end_dt = datetime.fromisoformat("2023-11-21T12:01:00.00+00:00")

page_size = 10

print("########################################################")
print("Iterate over single metric generator")

async for sample in client.iterate_single_metric(
microgrid_id=microgrid_id,
component_id=component_id,
metric=metrics[0],
start_dt=start_dt,
end_dt=end_dt,
page_size=page_size,
):
print("Received:", sample)

###########################################################################
#
# The following code is experimental and demonstrates potential future
# usage of the ReportingClient.
#
###########################################################################

async def components_data_iter() -> AsyncGenerator[MetricSample, None]:
"""Iterate over components data.
Yields:
Single metric sample
metrics = [Metric[mn] for mn in metric_names]

def data_iter() -> AsyncIterator[MetricSample]:
"""Iterate over single metric.
Just a wrapper around the client method for readability.
Returns:
Iterator over single metric samples
"""
# pylint: disable=protected-access
async for page in client._iterate_components_data_pages(
microgrid_components=microgrid_components,
return client.iterate_single_component(
microgrid_id=microgrid_id,
component_id=component_id,
metrics=metrics,
start_dt=start_dt,
end_dt=end_dt,
page_size=page_size,
):
for entry in page.iterate_metric_samples():
yield entry

async def components_data_dict(
components_data_iter: AsyncGenerator[MetricSample, None]
) -> dict[int, dict[int, dict[datetime, dict[Metric, float]]]]:
"""Convert components data iterator into a single dict.
The nesting structure is:
{
microgrid_id: {
component_id: {
timestamp: {
metric: value
}
)

if display == "iter":
print("########################################################")
print("Iterate over single metric generator")
async for sample in data_iter():
print(sample)

elif display == "dict":
print("########################################################")
print("Dumping all data as a single dict")
dct = await iter_to_dict(data_iter())
pprint(dct)

elif display == "df":
print("########################################################")
print("Turn data into a pandas DataFrame")
data = [cd async for cd in data_iter()]
df = pd.DataFrame(data).set_index("timestamp")
# Set option to display all rows
pd.set_option("display.max_rows", None)
pprint(df)

else:
raise ValueError(f"Invalid display format: {display}")

return


async def iter_to_dict(
components_data_iter: AsyncIterator[MetricSample],
) -> dict[int, dict[int, dict[datetime, dict[Metric, float]]]]:
"""Convert components data iterator into a single dict.
The nesting structure is:
{
microgrid_id: {
component_id: {
timestamp: {
metric: value
}
}
}
}
Args:
components_data_iter: async generator
Returns:
Single dict with with all components data
"""
ret: dict[int, dict[int, dict[datetime, dict[Metric, float]]]] = {}

async for ts, mid, cid, met, value in components_data_iter:
if mid not in ret:
ret[mid] = {}
if cid not in ret[mid]:
ret[mid][cid] = {}
if ts not in ret[mid][cid]:
ret[mid][cid][ts] = {}

ret[mid][cid][ts][met] = value
Args:
components_data_iter: async generator
return ret
Returns:
Single dict with with all components data
"""
ret: dict[int, dict[int, dict[datetime, dict[Metric, float]]]] = {}

print("########################################################")
print("Iterate over generator")
async for msample in components_data_iter():
print("Received:", msample)
async for ts, mid, cid, met, value in components_data_iter:
if mid not in ret:
ret[mid] = {}
if cid not in ret[mid]:
ret[mid][cid] = {}
if ts not in ret[mid][cid]:
ret[mid][cid][ts] = {}

print("########################################################")
print("Dumping all data as a single dict")
dct = await components_data_dict(components_data_iter())
pprint(dct)
ret[mid][cid][ts][met] = value

print("########################################################")
print("Turn data into a pandas DataFrame")
data = [cd async for cd in components_data_iter()]
df = pd.DataFrame(data).set_index("timestamp")
pprint(df)
return ret


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("microgrid_id", type=int, help="Microgrid ID")
parser.add_argument("component_id", type=int, help="Component ID")

args = parser.parse_args()
asyncio.run(main(args.microgrid_id, args.component_id))
main()
15 changes: 6 additions & 9 deletions src/frequenz/client/reporting/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,6 @@

# pylint: enable=no-name-in-module

Sample = namedtuple("Sample", ["timestamp", "value"])
"""Type for a sample of a time series."""

MetricSample = namedtuple(
"MetricSample", ["timestamp", "microgrid_id", "component_id", "metric", "value"]
)
Expand Down Expand Up @@ -120,22 +117,22 @@ def __init__(self, service_address: str):
self._stub = ReportingStub(self._grpc_channel)

# pylint: disable=too-many-arguments
async def iterate_single_metric(
async def iterate_single_component(
self,
*,
microgrid_id: int,
component_id: int,
metric: Metric,
metrics: Metric | list[Metric],
start_dt: datetime,
end_dt: datetime,
page_size: int = 1000,
) -> AsyncIterator[Sample]:
) -> AsyncIterator[MetricSample]:
"""Iterate over the data for a single metric.
Args:
microgrid_id: The microgrid ID.
component_id: The component ID.
metric: The metric name.
metrics: The metric name or list of metric names.
start_dt: The start date and time.
end_dt: The end date and time.
page_size: The page size.
Expand All @@ -147,13 +144,13 @@ async def iterate_single_metric(
"""
async for page in self._iterate_components_data_pages(
microgrid_components=[(microgrid_id, [component_id])],
metrics=[metric],
metrics=[metrics] if isinstance(metrics, Metric) else metrics,
start_dt=start_dt,
end_dt=end_dt,
page_size=page_size,
):
for entry in page.iterate_metric_samples():
yield Sample(timestamp=entry.timestamp, value=entry.value)
yield entry

# pylint: disable=too-many-arguments
async def _iterate_components_data_pages(
Expand Down

0 comments on commit 90daf45

Please sign in to comment.