Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update to frequenz-client-base v0.8.0 #140

Merged
merged 4 commits into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@

## Upgrading

<!-- Here goes notes on how to upgrade from previous versions, including deprecations and what they should be replaced with -->
- The minimum required version of `frequenz-client-base` is now `v0.8.0`.

- The `Client` now expects gRPC URLs to be [this format](https://frequenz-floss.github.io/frequenz-client-base-python/latest/reference/frequenz/client/base/channel/#frequenz.client.base.channel.parse_grpc_uri) required by the `BaseApiClient`.

## New Features

Expand Down
2 changes: 0 additions & 2 deletions examples/iterate_hist_forecast.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import datetime
import sys

import grpc.aio as grpcaio
from frequenz.client.weather._client import Client
from frequenz.client.weather._types import ForecastData, ForecastFeature, Location

Expand All @@ -28,7 +27,6 @@ async def main(service_address: str) -> None:
given in a form of a host followed by a colon and a port.
"""
client = Client(
grpcaio.insecure_channel(service_address), # or secure channel with credentials
service_address,
)

Expand Down
2 changes: 0 additions & 2 deletions examples/stream_live_forecast.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import asyncio
import sys

import grpc.aio as grpcaio
from frequenz.client.weather._client import Client
from frequenz.client.weather._types import ForecastFeature, Location

Expand All @@ -26,7 +25,6 @@ async def main(service_address: str) -> None:
given in a form of a host followed by a colon and a port.
"""
client = Client(
grpcaio.insecure_channel(service_address), # or secure channel with credentials
service_address,
)

Expand Down
47 changes: 38 additions & 9 deletions py/frequenz/client/weather/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,36 +3,65 @@

"""The Weather Forecast API client."""

from __future__ import annotations

from datetime import datetime

import grpc
from frequenz.api.weather import weather_pb2, weather_pb2_grpc
from frequenz.channels import Receiver
from frequenz.client.base.channel import ChannelOptions
from frequenz.client.base.client import BaseApiClient
from frequenz.client.base.exception import ClientNotConnected
from frequenz.client.base.streaming import GrpcStreamBroadcaster

from ._historical_forecast_iterator import HistoricalForecastIterator
from ._types import ForecastFeature, Forecasts, Location


class Client:
class Client(BaseApiClient[weather_pb2_grpc.WeatherForecastServiceStub]):
"""Weather forecast client."""

def __init__(self, grpc_channel: grpc.aio.Channel, svc_addr: str) -> None:
def __init__(
self,
server_url: str,
*,
connect: bool = True,
channel_defaults: ChannelOptions = ChannelOptions(),
) -> None:
"""Initialize the client.

Args:
grpc_channel: gRPC channel to use for communication with the API.
svc_addr: Address of the service to connect to.
server_url: The URL of the server to connect to.
connect: Whether to connect to the server as soon as a client instance is
created. If `False`, the client will not connect to the server until
[connect()][frequenz.client.base.client.BaseApiClient.connect] is
called.
channel_defaults: Default options for the gRPC channel.
"""
self._svc_addr = svc_addr
self._stub = weather_pb2_grpc.WeatherForecastServiceStub(grpc_channel)
super().__init__(
server_url,
weather_pb2_grpc.WeatherForecastServiceStub,
connect=connect,
channel_defaults=channel_defaults,
)
self._streams: dict[
tuple[Location | ForecastFeature, ...],
GrpcStreamBroadcaster[
weather_pb2.ReceiveLiveWeatherForecastResponse, Forecasts
],
] = {}

@property
def stub(self) -> weather_pb2_grpc.WeatherForecastServiceAsyncStub:
"""The gRPC stub for the API."""
if self.channel is None or self._stub is None:
raise ClientNotConnected(server_url=self.server_url, operation="stub")
# This type: ignore is needed because we need to cast the sync stub to
# the async stub, but we can't use cast because the async stub doesn't
# actually exists to the eyes of the interpreter, it only exists for the
# type-checker, so it can only be used for type hints.
return self._stub # type: ignore

async def stream_live_forecast(
self,
locations: list[Location],
Expand All @@ -52,7 +81,7 @@ async def stream_live_forecast(
if stream_key not in self._streams:
self._streams[stream_key] = GrpcStreamBroadcaster(
f"weather-forecast-{stream_key}",
lambda: self._stub.ReceiveLiveWeatherForecast( # type:ignore
lambda: self.stub.ReceiveLiveWeatherForecast(
weather_pb2.ReceiveLiveWeatherForecastRequest(
locations=(location.to_pb() for location in locations),
features=(feature.value for feature in features),
Expand Down Expand Up @@ -80,4 +109,4 @@ def hist_forecast_iterator(
Returns:
A channel receiver for weather forecast data.
"""
return HistoricalForecastIterator(self._stub, locations, features, start, end)
return HistoricalForecastIterator(self.stub, locations, features, start, end)
10 changes: 6 additions & 4 deletions py/frequenz/client/weather/_historical_forecast_iterator.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@

"""The Historical Forecast Iterator."""

from __future__ import annotations

from collections.abc import AsyncIterator
from datetime import datetime
from typing import AsyncIterator, List

from frequenz.api.common.v1.pagination import pagination_params_pb2
from frequenz.api.weather import weather_pb2, weather_pb2_grpc
Expand All @@ -22,7 +24,7 @@ class HistoricalForecastIterator(AsyncIterator[HistoricalForecasts]):

def __init__( # pylint: disable=too-many-arguments
self,
stub: weather_pb2_grpc.WeatherForecastServiceStub,
stub: weather_pb2_grpc.WeatherForecastServiceAsyncStub,
locations: list[Location],
features: list[ForecastFeature],
start: datetime,
Expand All @@ -48,7 +50,7 @@ def __init__( # pylint: disable=too-many-arguments
self.end_ts = timestamp_pb2.Timestamp()
self.end_ts.FromDatetime(end)

self.location_forecasts: List[weather_pb2.LocationForecast] = []
self.location_forecasts: list[weather_pb2.LocationForecast] = []
self.page_token: str | None = None
self.page_size = page_size

Expand Down Expand Up @@ -78,7 +80,7 @@ async def __anext__(self) -> HistoricalForecasts:
pagination_params.page_token = self.page_token

response: weather_pb2.GetHistoricalWeatherForecastResponse = (
await self._stub.GetHistoricalWeatherForecast( # type:ignore
await self._stub.GetHistoricalWeatherForecast(
weather_pb2.GetHistoricalWeatherForecastRequest(
locations=(location.to_pb() for location in self.locations),
features=(feature.value for feature in self.features),
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ dependencies = [
"frequenz-api-common >= 0.6.0, < 0.7.0",
"googleapis-common-protos >= 1.56.4, < 2",
"frequenz-channels >= 1.0.0, < 2",
"frequenz-client-base >= 0.6.0, < 0.7",
"frequenz-client-base >= 0.8.0, < 0.9",
"numpy >= 1.24.2, < 2",
# We can't widen beyond 6 because of protobuf cross-version runtime guarantees
# https://protobuf.dev/support/cross-version-runtime-guarantee/#major
Expand Down
Loading