Skip to content

Commit

Permalink
Convert a page of historical location forecast data into a flattened …
Browse files Browse the repository at this point in the history
…numpy array for easier use (frequenz-floss#89)

- Enable the conversion of protobuf LocationForecast list to a flattened
numpy
  array.
- Enhance the historical data retrieval example with conversion to
  a pandas dataframe, sample output:
```
(sdk_venv22) david@david-desktop:~/projects/frequenz/source/frequenz-api-weather$ PYTHONPATH=py python examples/iterate_hist_forecast.py "localhost:50051"
            creation_ts latitude longitude         valid_at_ts                                     feature      value
0   2022-06-27 12:00:00     52.5      13.4 2022-06-27 13:00:00  ForecastFeature.U_WIND_COMPONENT_100_METRE  -2.629823
1   2022-06-27 12:00:00     52.5      13.4 2022-06-27 13:00:00  ForecastFeature.V_WIND_COMPONENT_100_METRE   2.316034
2   2022-06-27 12:00:00     52.5      13.4 2022-06-27 14:00:00  ForecastFeature.U_WIND_COMPONENT_100_METRE  -2.439865
3   2022-06-27 12:00:00     52.5      13.4 2022-06-27 14:00:00  ForecastFeature.V_WIND_COMPONENT_100_METRE   1.965858
4   2022-06-27 12:00:00     52.5      13.4 2022-06-27 15:00:00  ForecastFeature.U_WIND_COMPONENT_100_METRE    -1.6119
..                  ...      ...       ...                 ...                                         ...        ...
337 2023-12-18 00:00:00     52.5      13.4 2023-12-18 22:00:00  ForecastFeature.V_WIND_COMPONENT_100_METRE   4.606584
338 2023-12-18 00:00:00     52.5      13.4 2023-12-18 23:00:00  ForecastFeature.U_WIND_COMPONENT_100_METRE  11.596823
339 2023-12-18 00:00:00     52.5      13.4 2023-12-18 23:00:00  ForecastFeature.V_WIND_COMPONENT_100_METRE   4.560944
340 2023-12-18 00:00:00     52.5      13.4 2023-12-19 00:00:00  ForecastFeature.U_WIND_COMPONENT_100_METRE  11.380841
341 2023-12-18 00:00:00     52.5      13.4 2023-12-19 00:00:00  ForecastFeature.V_WIND_COMPONENT_100_METRE   4.240217

[342 rows x 6 columns]
```

Closes frequenz-floss#88.
  • Loading branch information
david-natingga-frequenz authored Apr 10, 2024
2 parents 196a7a8 + ad41562 commit 603065e
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 32 deletions.
22 changes: 16 additions & 6 deletions examples/iterate_hist_forecast.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

import grpc.aio as grpcaio
from frequenz.client.weather._client import Client
from frequenz.client.weather._types import ForecastFeature, Location
from frequenz.client.weather._types import ForecastData, ForecastFeature, Location

_service_address = sys.argv[1]

Expand Down Expand Up @@ -53,11 +53,21 @@ async def main(service_address: str) -> None:
features=features, locations=locations, start=start, end=end
)

async for location_forecast in location_forecast_iterator:
for forecasts in location_forecast.forecasts:
print("Timestamp:", forecasts.valid_at_ts)
for feature_forecast in forecasts.features:
print(feature_forecast)
rows: list[ForecastData] = []
async for forecasts in location_forecast_iterator:
# You can work directly with the protobuf object forecasts.
# Here we choose to flatten into a numpy array instead.
_rows: list[ForecastData] = forecasts.flatten()
rows.extend(_rows)

# Optionally, you can construct a pandas dataframe from the data.
# pylint: disable=import-outside-toplevel, import-error
import pandas as pd # type: ignore[import]

# pylint: enable=import-outside-toplevel, import-error

df = pd.DataFrame(rows)
print(df)


asyncio.run(main(_service_address))
61 changes: 35 additions & 26 deletions py/frequenz/client/weather/_historical_forecast_iterator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,20 @@
"""The Historical Forecast Iterator."""

from datetime import datetime
from typing import Any, AsyncIterator, List
from typing import AsyncIterator, List

from frequenz.api.common.v1.pagination import pagination_params_pb2
from frequenz.api.weather import weather_pb2, weather_pb2_grpc
from google.protobuf import timestamp_pb2

from ._types import ForecastFeature, Location
from ._types import ForecastFeature, HistoricalForecasts, Location

PAGE_SIZE = 20
DEFAULT_PAGE_SIZE = 20
EMPTY_PAGE_TOKEN = ""


class HistoricalForecastIterator(AsyncIterator[weather_pb2.LocationForecast]):
# pylint: disable=too-many-instance-attributes
class HistoricalForecastIterator(AsyncIterator[HistoricalForecasts]):
"""An iterator over historical weather forecasts."""

def __init__( # pylint: disable=too-many-arguments
Expand All @@ -26,6 +27,7 @@ def __init__( # pylint: disable=too-many-arguments
features: list[ForecastFeature],
start: datetime,
end: datetime,
page_size: int = DEFAULT_PAGE_SIZE,
) -> None:
"""Initialize the iterator.
Expand All @@ -35,6 +37,7 @@ def __init__( # pylint: disable=too-many-arguments
features: Features to get historical weather forecasts for.
start: Start of the time range to get historical weather forecasts for.
end: End of the time range to get historical weather forecasts for.
page_size: The number of historical weather forecasts to get per page.
"""
self._stub = stub
self.locations = locations
Expand All @@ -46,7 +49,8 @@ def __init__( # pylint: disable=too-many-arguments
self.end_ts.FromDatetime(end)

self.location_forecasts: List[weather_pb2.LocationForecast] = []
self.page_token = None
self.page_token: str | None = None
self.page_size = page_size

def __aiter__(self) -> "HistoricalForecastIterator":
"""Return the iterator.
Expand All @@ -56,7 +60,7 @@ def __aiter__(self) -> "HistoricalForecastIterator":
"""
return self

async def __anext__(self) -> weather_pb2.LocationForecast:
async def __anext__(self) -> HistoricalForecasts:
"""Get the next historical weather forecast.
Returns:
Expand All @@ -65,29 +69,34 @@ async def __anext__(self) -> weather_pb2.LocationForecast:
Raises:
StopAsyncIteration: If there are no more historical weather forecasts.
"""
if len(self.location_forecasts) == 0 and self.page_token == EMPTY_PAGE_TOKEN:
if self.page_token == EMPTY_PAGE_TOKEN:
raise StopAsyncIteration

if self.location_forecasts is None or len(self.location_forecasts) == 0:
pagination_params = pagination_params_pb2.PaginationParams()
pagination_params.page_size = PAGE_SIZE
if self.page_token is not None:
pagination_params.page_token = self.page_token

response: Any = (
await self._stub.GetHistoricalWeatherForecast( # type:ignore
weather_pb2.GetHistoricalWeatherForecastRequest(
locations=(location.to_pb() for location in self.locations),
features=(feature.value for feature in self.features),
start_ts=self.start_ts,
end_ts=self.end_ts,
pagination_params=pagination_params,
)
pagination_params = pagination_params_pb2.PaginationParams()
pagination_params.page_size = self.page_size
if self.page_token is not None:
pagination_params.page_token = self.page_token

response: weather_pb2.GetHistoricalWeatherForecastResponse = (
await self._stub.GetHistoricalWeatherForecast( # type:ignore
weather_pb2.GetHistoricalWeatherForecastRequest(
locations=(location.to_pb() for location in self.locations),
features=(feature.value for feature in self.features),
start_ts=self.start_ts,
end_ts=self.end_ts,
pagination_params=pagination_params,
)
)
)

self.page_token = response.pagination_info.next_page_token
self.location_forecasts = response.location_forecasts
if (
response.pagination_info is None
or response.pagination_info.next_page_token is None
):
raise StopAsyncIteration

self.page_token = response.pagination_info.next_page_token
if len(response.location_forecasts) == 0:
raise StopAsyncIteration

location_forecast: weather_pb2.LocationForecast = self.location_forecasts.pop(0)
return location_forecast
return HistoricalForecasts.from_pb(response)
82 changes: 82 additions & 0 deletions py/frequenz/client/weather/_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import enum
import logging
import typing
from collections import namedtuple
from dataclasses import dataclass

import numpy as np
Expand Down Expand Up @@ -281,3 +282,84 @@ def to_ndarray_vlf(
raise RuntimeError("Error processing forecast data") from e

return array


ForecastData = namedtuple(
"ForecastData",
["creation_ts", "latitude", "longitude", "validity_ts", "feature", "value"],
)


@dataclass(frozen=True)
class HistoricalForecasts:
"""Historical weather forecast data."""

_forecasts_pb: weather_pb2.GetHistoricalWeatherForecastResponse

@classmethod
def from_pb(
cls, forecasts: weather_pb2.GetHistoricalWeatherForecastResponse
) -> HistoricalForecasts:
"""Convert a protobuf Forecast message to Forecast object.
Args:
forecasts: protobuf message with historical forecast data.
Returns:
Forecast object corresponding to the protobuf message.
"""
return cls(_forecasts_pb=forecasts)

def flatten(
self,
) -> list[ForecastData]:
"""Flatten a Forecast object to a list of named tuples of data.
Returns:
List of named tuples with the flattened forecast data.
Raises:
ValueError: If the forecasts data is missing or invalid.
"""
# check for empty forecasts data
if not self._forecasts_pb.location_forecasts:
raise ValueError("Forecast data is missing or invalid.")

return flatten(list(self._forecasts_pb.location_forecasts))


def flatten(
location_forecasts: list[weather_pb2.LocationForecast],
) -> list[ForecastData]:
"""Flatten a Forecast object to a list of named tuples of data.
Each tuple contains the following data:
- creation timestamp
- latitude
- longitude
- validity timestamp
- feature
- forecast value
Args:
location_forecasts: The location forecasts to flatten.
Returns:
List of named tuples with the flattened forecast data.
"""
data = []
for location_forecast in location_forecasts:
for forecasts in location_forecast.forecasts:
for feature_forecast in forecasts.features:
# Create and append an instance of the named tuple instead of a plain tuple
data.append(
ForecastData(
creation_ts=location_forecast.creation_ts.ToDatetime(),
latitude=location_forecast.location.latitude,
longitude=location_forecast.location.longitude,
validity_ts=forecasts.valid_at_ts.ToDatetime(),
feature=ForecastFeature(feature_forecast.feature),
value=feature_forecast.value,
)
)

return data

0 comments on commit 603065e

Please sign in to comment.