From e4e3cf106e297785a7372a8268087c2fde9facf9 Mon Sep 17 00:00:00 2001 From: David Natingga Date: Thu, 29 Feb 2024 08:07:31 +0100 Subject: [PATCH] Extend the client to stream historical weather forecast data. Add examples to demonstrate: - How to stream live data with the client. - How to stream the historical data with the client. --- examples/stream_hist_forecast.py | 61 +++++++++++++++++++++++++++ examples/stream_live_forecast.py | 56 ++++++++++++++++++++++++ py/frequenz/client/weather/_client.py | 46 ++++++++++++++++++++ 3 files changed, 163 insertions(+) create mode 100644 examples/stream_hist_forecast.py create mode 100644 examples/stream_live_forecast.py diff --git a/examples/stream_hist_forecast.py b/examples/stream_hist_forecast.py new file mode 100644 index 0000000..3f40905 --- /dev/null +++ b/examples/stream_hist_forecast.py @@ -0,0 +1,61 @@ +""" +Streams historical Germany wind forecast data. + +Example run: +PYTHONPATH=py python examples/stream_hist_forecast.py "localhost:50051" + +License: MIT +Copyright © 2024 Frequenz Energy-as-a-Service GmbH + +""" + +import asyncio +import datetime +import sys + +import grpc.aio as grpcaio +from frequenz.client.weather._client import Client +from frequenz.client.weather._types import ForecastFeature, Location + +service_address = sys.argv[1] + + +async def main(service_address): + + client = Client( + grpcaio.insecure_channel(service_address), # or secure channel with credentials + service_address, + ) + + features = [ + ForecastFeature.V_WIND_COMPONENT_100_METRE, + ForecastFeature.U_WIND_COMPONENT_100_METRE, + ] + + locations = [ + Location( + latitude=52.5, + longitude=13.4, + country_code="DE", + ), + ] + + now = datetime.datetime.utcnow() + start = now - datetime.timedelta(days=30) + end = now + datetime.timedelta(days=7) + + stream = await client.stream_hist_forecast( + features=features, locations=locations, start=start, end=end + ) + + async for fc in stream: + print(fc) + print(fc.to_ndarray_vlf()) + print( + fc.to_ndarray_vlf( + features=[ForecastFeature.U_WIND_COMPONENT_100_METRE], + ) + ) + + +asyncio.run(main(service_address)) diff --git a/examples/stream_live_forecast.py b/examples/stream_live_forecast.py new file mode 100644 index 0000000..1864038 --- /dev/null +++ b/examples/stream_live_forecast.py @@ -0,0 +1,56 @@ +""" +Streams live Germany wind forecast data. + +Example run: +PYTHONPATH=py python examples/stream_live_forecast.py "localhost:50051" + +License: MIT +Copyright © 2024 Frequenz Energy-as-a-Service GmbH +""" + +import asyncio +import sys + +import grpc.aio as grpcaio +from frequenz.client.weather._client import Client +from frequenz.client.weather._types import ForecastFeature, Location + +service_address = sys.argv[1] + + +async def main(service_address): + + client = Client( + grpcaio.insecure_channel(service_address), # or secure channel with credentials + service_address, + ) + + features = [ + ForecastFeature.V_WIND_COMPONENT_100_METRE, + ForecastFeature.U_WIND_COMPONENT_100_METRE, + ] + + locations = [ + Location( + latitude=52.5, + longitude=13.4, + country_code="DE", + ), + ] + + stream = await client.stream_live_forecast( + features=features, + locations=locations, + ) + + async for fc in stream: + print(fc) + print(fc.to_ndarray_vlf()) + print( + fc.to_ndarray_vlf( + features=[ForecastFeature.U_WIND_COMPONENT_100_METRE], + ) + ) + + +asyncio.run(main(service_address)) diff --git a/py/frequenz/client/weather/_client.py b/py/frequenz/client/weather/_client.py index 1a4456f..10d56c9 100644 --- a/py/frequenz/client/weather/_client.py +++ b/py/frequenz/client/weather/_client.py @@ -3,10 +3,14 @@ """The Weather Forecast API client.""" +from datetime import datetime + import grpc +from frequenz.api.common.pagination import pagination_params_pb2 from frequenz.api.weather import weather_pb2, weather_pb2_grpc from frequenz.channels import Receiver from frequenz.client.base.grpc_streaming_helper import GrpcStreamingHelper +from google.protobuf import timestamp_pb2 from ._types import ForecastFeature, Forecasts, Location @@ -58,3 +62,45 @@ async def stream_live_forecast( Forecasts.from_pb, ) return self._streams[stream_key].new_receiver() + + async def stream_hist_forecast( + self, + locations: list[Location], + features: list[ForecastFeature], + start: datetime, + end: datetime, + ) -> Receiver[Forecasts]: + """Stream historical weather forecast data. + + Args: + locations: locations to stream data for. + features: features to stream data for. + start: start of the time range to stream data for. + end: end of the time range to stream data for. + + Returns: + A channel receiver for weather forecast data. + """ + stream_key = tuple(tuple(locations) + tuple(features)) + + start_ts = timestamp_pb2.Timestamp() + start_ts.FromDatetime(start) + end_ts = timestamp_pb2.Timestamp() + end_ts.FromDatetime(end) + pagination_params = pagination_params_pb2.PaginationParams() + + if stream_key not in self._streams: + self._streams[stream_key] = GrpcStreamingHelper( + f"weather-forecast-{stream_key}", + lambda: self._stub.GetHistoricalWeatherForecast( # type:ignore + weather_pb2.GetHistoricalWeatherForecastRequest( + locations=(location.to_pb() for location in locations), + features=(feature.value for feature in features), + start_ts=start_ts, + end_ts=end_ts, + pagination_params=pagination_params, + ) + ), + Forecasts.from_pb, + ) + return self._streams[stream_key].new_receiver()