Skip to content

Commit

Permalink
Extend the client to stream historical weather forecast data.
Browse files Browse the repository at this point in the history
Add examples to demonstrate:
- How to stream live data with the client.
- How to stream the historical data with the client.
  • Loading branch information
david-natingga-frequenz committed Feb 29, 2024
1 parent 7fc2504 commit e4e3cf1
Show file tree
Hide file tree
Showing 3 changed files with 163 additions and 0 deletions.
61 changes: 61 additions & 0 deletions examples/stream_hist_forecast.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
"""
Streams historical Germany wind forecast data.
Example run:
PYTHONPATH=py python examples/stream_hist_forecast.py "localhost:50051"
License: MIT
Copyright © 2024 Frequenz Energy-as-a-Service GmbH
"""

import asyncio
import datetime
import sys

import grpc.aio as grpcaio
from frequenz.client.weather._client import Client
from frequenz.client.weather._types import ForecastFeature, Location

service_address = sys.argv[1]


async def main(service_address):

client = Client(
grpcaio.insecure_channel(service_address), # or secure channel with credentials
service_address,
)

features = [
ForecastFeature.V_WIND_COMPONENT_100_METRE,
ForecastFeature.U_WIND_COMPONENT_100_METRE,
]

locations = [
Location(
latitude=52.5,
longitude=13.4,
country_code="DE",
),
]

now = datetime.datetime.utcnow()
start = now - datetime.timedelta(days=30)
end = now + datetime.timedelta(days=7)

stream = await client.stream_hist_forecast(
features=features, locations=locations, start=start, end=end
)

async for fc in stream:
print(fc)
print(fc.to_ndarray_vlf())
print(
fc.to_ndarray_vlf(
features=[ForecastFeature.U_WIND_COMPONENT_100_METRE],
)
)


asyncio.run(main(service_address))
56 changes: 56 additions & 0 deletions examples/stream_live_forecast.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
"""
Streams live Germany wind forecast data.
Example run:
PYTHONPATH=py python examples/stream_live_forecast.py "localhost:50051"
License: MIT
Copyright © 2024 Frequenz Energy-as-a-Service GmbH
"""

import asyncio
import sys

import grpc.aio as grpcaio
from frequenz.client.weather._client import Client
from frequenz.client.weather._types import ForecastFeature, Location

service_address = sys.argv[1]


async def main(service_address):

client = Client(
grpcaio.insecure_channel(service_address), # or secure channel with credentials
service_address,
)

features = [
ForecastFeature.V_WIND_COMPONENT_100_METRE,
ForecastFeature.U_WIND_COMPONENT_100_METRE,
]

locations = [
Location(
latitude=52.5,
longitude=13.4,
country_code="DE",
),
]

stream = await client.stream_live_forecast(
features=features,
locations=locations,
)

async for fc in stream:
print(fc)
print(fc.to_ndarray_vlf())
print(
fc.to_ndarray_vlf(
features=[ForecastFeature.U_WIND_COMPONENT_100_METRE],
)
)


asyncio.run(main(service_address))
46 changes: 46 additions & 0 deletions py/frequenz/client/weather/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,14 @@

"""The Weather Forecast API client."""

from datetime import datetime

import grpc
from frequenz.api.common.pagination import pagination_params_pb2
from frequenz.api.weather import weather_pb2, weather_pb2_grpc
from frequenz.channels import Receiver
from frequenz.client.base.grpc_streaming_helper import GrpcStreamingHelper
from google.protobuf import timestamp_pb2

from ._types import ForecastFeature, Forecasts, Location

Expand Down Expand Up @@ -58,3 +62,45 @@ async def stream_live_forecast(
Forecasts.from_pb,
)
return self._streams[stream_key].new_receiver()

async def stream_hist_forecast(
self,
locations: list[Location],
features: list[ForecastFeature],
start: datetime,
end: datetime,
) -> Receiver[Forecasts]:
"""Stream historical weather forecast data.
Args:
locations: locations to stream data for.
features: features to stream data for.
start: start of the time range to stream data for.
end: end of the time range to stream data for.
Returns:
A channel receiver for weather forecast data.
"""
stream_key = tuple(tuple(locations) + tuple(features))

start_ts = timestamp_pb2.Timestamp()
start_ts.FromDatetime(start)
end_ts = timestamp_pb2.Timestamp()
end_ts.FromDatetime(end)
pagination_params = pagination_params_pb2.PaginationParams()

if stream_key not in self._streams:
self._streams[stream_key] = GrpcStreamingHelper(
f"weather-forecast-{stream_key}",
lambda: self._stub.GetHistoricalWeatherForecast( # type:ignore
weather_pb2.GetHistoricalWeatherForecastRequest(
locations=(location.to_pb() for location in locations),
features=(feature.value for feature in features),
start_ts=start_ts,
end_ts=end_ts,
pagination_params=pagination_params,
)
),
Forecasts.from_pb,
)
return self._streams[stream_key].new_receiver()

0 comments on commit e4e3cf1

Please sign in to comment.