Skip to content

Commit

Permalink
Drop betterproto/grpclib (and upgrade client-base) (#80)
Browse files Browse the repository at this point in the history
Sadly the `betterproto` community is completely unresponsive when asking
about the health of the project, and its main dependency, `grpclib`, is
in maintenance mode and the author says this project only existed
because `grpcio` didn't support `async`, and this is why it doesn't have
a purpose anymore.

Because of this, we go back to using `grpcio` and the Google's
`protobuf` generator.

This PR partially reverts these commits:

* Migrate the client code to use `betterproto`
(e159417)
* Fix component tests to use `betterproto`
(b5b52d5)
* Fix client test to use `betterproto`
(b3efe71)

It preserves the changes to tests to use mocks instead of a fake but
full gRPC server, and adapts the new code to use the `grpcio` library
and the generated code from the traditional Google's `protobuf`
compiler.

It also bumps the `frequenz-client-base` dependency to 0.6.0, as the
version being used only supported `grpclib` for parsing URLs.

We also start using the new `exceptions` module from
`frequenz-clien-base` to avoid having to port the local copy to use
`grpcio`, but we don't use other new features in the new release in this
PR.

Fixes #76.
  • Loading branch information
llucax authored Aug 28, 2024
2 parents 64c6192 + da3db46 commit 8d1c8a8
Show file tree
Hide file tree
Showing 13 changed files with 603 additions and 1,278 deletions.
4 changes: 3 additions & 1 deletion RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@

## Upgrading

<!-- Here goes notes on how to upgrade from previous versions, including deprecations and what they should be replaced with -->
- This release stops using `betterproto` and `grpclib` as backend for gRPC and goes back to using Google's `grpcio` and `protobuf`.

If your code was using `betterproto` and `grpclib`, it now needs to be ported to use `grpcio` and `protobuf` too. Remember to also remove any `betterproto` and `grpclib` dependencies from your project.

## New Features

Expand Down
13 changes: 9 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH

[build-system]
requires = ["setuptools == 68.1.0", "setuptools_scm[toml] == 7.1.0"]
requires = [
"setuptools == 68.1.0",
"setuptools_scm[toml] == 7.1.0",
"frequenz-repo-config[lib] == 0.9.1",
]
build-backend = "setuptools.build_meta"

[project]
Expand Down Expand Up @@ -32,10 +36,11 @@ classifiers = [
]
requires-python = ">= 3.11, < 4"
dependencies = [
"betterproto == 2.0.0b6",
"frequenz-api-microgrid >= 0.15.3, < 0.16.0",
"frequenz-channels >= 1.0.0-rc1, < 2.0.0",
"frequenz-client-base[grpclib] >= 0.4.0, < 0.5",
"frequenz-microgrid-betterproto >= 0.15.3.1, < 0.16",
"frequenz-client-base >= 0.6.0, < 0.7",
"grpcio >= 1.54.2, < 2",
"protobuf >= 4.21.6, < 6",
"timezonefinder >= 6.2.0, < 7",
"typing-extensions >= 4.5.0, < 5",
]
Expand Down
6 changes: 4 additions & 2 deletions src/frequenz/client/microgrid/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@
)
from ._connection import Connection
from ._exception import (
ClientError,
ApiClientError,
ClientNotConnected,
DataLoss,
EntityAlreadyExists,
EntityNotFound,
Expand All @@ -65,12 +66,13 @@

__all__ = [
"ApiClient",
"ApiClientError",
"BatteryComponentState",
"BatteryData",
"BatteryError",
"BatteryErrorCode",
"BatteryRelayState",
"ClientError",
"ClientNotConnected",
"Component",
"ComponentCategory",
"ComponentData",
Expand Down
154 changes: 93 additions & 61 deletions src/frequenz/client/microgrid/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,34 @@

import asyncio
import logging
from collections.abc import Callable, Iterable, Set
from typing import Any, TypeVar
from collections.abc import AsyncIterator, Awaitable, Callable, Iterable, Set
from typing import Any, TypeVar, cast

import grpc.aio

# pylint: disable=no-name-in-module
from frequenz.api.common.components_pb2 import ComponentCategory as PbComponentCategory
from frequenz.api.common.metrics_pb2 import Bounds as PbBounds
from frequenz.api.microgrid.microgrid_pb2 import ComponentData as PbComponentData
from frequenz.api.microgrid.microgrid_pb2 import ComponentFilter as PbComponentFilter
from frequenz.api.microgrid.microgrid_pb2 import ComponentIdParam as PbComponentIdParam
from frequenz.api.microgrid.microgrid_pb2 import ComponentList as PbComponentList
from frequenz.api.microgrid.microgrid_pb2 import ConnectionFilter as PbConnectionFilter
from frequenz.api.microgrid.microgrid_pb2 import ConnectionList as PbConnectionList
from frequenz.api.microgrid.microgrid_pb2 import (
MicrogridMetadata as PbMicrogridMetadata,
)
from frequenz.api.microgrid.microgrid_pb2 import SetBoundsParam as PbSetBoundsParam
from frequenz.api.microgrid.microgrid_pb2 import (
SetPowerActiveParam as PbSetPowerActiveParam,
)
from frequenz.api.microgrid.microgrid_pb2_grpc import MicrogridStub

import grpclib
import grpclib.client
from betterproto.lib.google import protobuf as pb_google
# pylint: enable=no-name-in-module
from frequenz.channels import Receiver
from frequenz.client.base import channel, retry, streaming
from frequenz.microgrid.betterproto.frequenz.api import microgrid as pb_microgrid
from frequenz.microgrid.betterproto.frequenz.api.common import (
components as pb_components,
)
from frequenz.microgrid.betterproto.frequenz.api.common import metrics as pb_metrics
from google.protobuf.empty_pb2 import Empty # pylint: disable=no-name-in-module
from google.protobuf.timestamp_pb2 import Timestamp # pylint: disable=no-name-in-module

from ._component import (
Component,
Expand All @@ -35,7 +50,7 @@
)
from ._connection import Connection
from ._constants import RECEIVER_MAX_SIZE
from ._exception import ClientError
from ._exception import ApiClientError
from ._metadata import Location, Metadata

DEFAULT_GRPC_CALL_TIMEOUT = 60.0
Expand Down Expand Up @@ -72,7 +87,7 @@ def __init__(
self._server_url = server_url
"""The location of the microgrid API server as a URL."""

self.api = pb_microgrid.MicrogridStub(channel.parse_grpc_uri(server_url))
self.api = MicrogridStub(channel.parse_grpc_uri(server_url))
"""The gRPC stub for the microgrid API."""

self._broadcasters: dict[int, streaming.GrpcStreamBroadcaster[Any, Any]] = {}
Expand All @@ -90,25 +105,29 @@ async def components(self) -> Iterable[Component]:
Iterator whose elements are all the components in the microgrid.
Raises:
ClientError: If the are any errors communicating with the Microgrid API,
ApiClientError: If the are any errors communicating with the Microgrid API,
most likely a subclass of
[GrpcError][frequenz.client.microgrid.GrpcError].
"""
try:
component_list = await self.api.list_components(
pb_microgrid.ComponentFilter(),
timeout=int(DEFAULT_GRPC_CALL_TIMEOUT),
# grpc.aio is missing types and mypy thinks this is not awaitable,
# but it is
component_list = await cast(
Awaitable[PbComponentList],
self.api.ListComponents(
PbComponentFilter(),
timeout=int(DEFAULT_GRPC_CALL_TIMEOUT),
),
)
except grpclib.GRPCError as grpc_error:
raise ClientError.from_grpc_error(
except grpc.aio.AioRpcError as grpc_error:
raise ApiClientError.from_grpc_error(
server_url=self._server_url,
operation="list_components",
operation="ListComponents",
grpc_error=grpc_error,
) from grpc_error

components_only = filter(
lambda c: c.category
is not pb_components.ComponentCategory.COMPONENT_CATEGORY_SENSOR,
lambda c: c.category is not PbComponentCategory.COMPONENT_CATEGORY_SENSOR,
component_list.components,
)
result: Iterable[Component] = map(
Expand All @@ -132,13 +151,16 @@ async def metadata(self) -> Metadata:
Returns:
the microgrid metadata.
"""
microgrid_metadata: pb_microgrid.MicrogridMetadata | None = None
microgrid_metadata: PbMicrogridMetadata | None = None
try:
microgrid_metadata = await self.api.get_microgrid_metadata(
pb_google.Empty(),
timeout=int(DEFAULT_GRPC_CALL_TIMEOUT),
microgrid_metadata = await cast(
Awaitable[PbMicrogridMetadata],
self.api.GetMicrogridMetadata(
Empty(),
timeout=int(DEFAULT_GRPC_CALL_TIMEOUT),
),
)
except grpclib.GRPCError:
except grpc.aio.AioRpcError:
_logger.exception("The microgrid metadata is not available.")

if not microgrid_metadata:
Expand Down Expand Up @@ -170,25 +192,28 @@ async def connections(
Microgrid connections matching the provided start and end filters.
Raises:
ClientError: If the are any errors communicating with the Microgrid API,
ApiClientError: If the are any errors communicating with the Microgrid API,
most likely a subclass of
[GrpcError][frequenz.client.microgrid.GrpcError].
"""
connection_filter = pb_microgrid.ConnectionFilter(
starts=list(starts), ends=list(ends)
)
connection_filter = PbConnectionFilter(starts=starts, ends=ends)
try:
valid_components, all_connections = await asyncio.gather(
self.components(),
self.api.list_connections(
connection_filter,
timeout=int(DEFAULT_GRPC_CALL_TIMEOUT),
# grpc.aio is missing types and mypy thinks this is not
# awaitable, but it is
cast(
Awaitable[PbConnectionList],
self.api.ListConnections(
connection_filter,
timeout=int(DEFAULT_GRPC_CALL_TIMEOUT),
),
),
)
except grpclib.GRPCError as grpc_error:
raise ClientError.from_grpc_error(
except grpc.aio.AioRpcError as grpc_error:
raise ApiClientError.from_grpc_error(
server_url=self._server_url,
operation="list_connections",
operation="ListConnections",
grpc_error=grpc_error,
) from grpc_error
# Filter out the components filtered in `components` method.
Expand All @@ -212,7 +237,7 @@ async def _new_component_data_receiver(
*,
component_id: int,
expected_category: ComponentCategory,
transform: Callable[[pb_microgrid.ComponentData], _ComponentDataT],
transform: Callable[[PbComponentData], _ComponentDataT],
maxsize: int,
) -> Receiver[_ComponentDataT]:
"""Return a new broadcaster receiver for a given `component_id`.
Expand All @@ -239,8 +264,13 @@ async def _new_component_data_receiver(
if broadcaster is None:
broadcaster = streaming.GrpcStreamBroadcaster(
f"raw-component-data-{component_id}",
lambda: self.api.stream_component_data(
pb_microgrid.ComponentIdParam(id=component_id)
# We need to cast here because grpc says StreamComponentData is
# a grpc.CallIterator[PbComponentData] which is not an AsyncIterator,
# but it is a grpc.aio.UnaryStreamCall[..., PbComponentData], which it
# is.
lambda: cast(
AsyncIterator[PbComponentData],
self.api.StreamComponentData(PbComponentIdParam(id=component_id)),
),
transform,
retry_strategy=self._retry_strategy,
Expand Down Expand Up @@ -389,21 +419,22 @@ async def set_power(self, component_id: int, power_w: float) -> None:
power_w: power to set for the component.
Raises:
ClientError: If the are any errors communicating with the Microgrid API,
ApiClientError: If the are any errors communicating with the Microgrid API,
most likely a subclass of
[GrpcError][frequenz.client.microgrid.GrpcError].
"""
try:
await self.api.set_power_active(
pb_microgrid.SetPowerActiveParam(
component_id=component_id, power=power_w
await cast(
Awaitable[Empty],
self.api.SetPowerActive(
PbSetPowerActiveParam(component_id=component_id, power=power_w),
timeout=int(DEFAULT_GRPC_CALL_TIMEOUT),
),
timeout=int(DEFAULT_GRPC_CALL_TIMEOUT),
)
except grpclib.GRPCError as grpc_error:
raise ClientError.from_grpc_error(
except grpc.aio.AioRpcError as grpc_error:
raise ApiClientError.from_grpc_error(
server_url=self._server_url,
operation="set_power_active",
operation="SetPowerActive",
grpc_error=grpc_error,
) from grpc_error

Expand All @@ -413,7 +444,7 @@ async def set_bounds(
lower: float,
upper: float,
) -> None:
"""Send `SetBoundsParam`s received from a channel to the Microgrid service.
"""Send `PbSetBoundsParam`s received from a channel to the Microgrid service.
Args:
component_id: ID of the component to set bounds for.
Expand All @@ -423,7 +454,7 @@ async def set_bounds(
Raises:
ValueError: when upper bound is less than 0, or when lower bound is
greater than 0.
ClientError: If the are any errors communicating with the Microgrid API,
ApiClientError: If the are any errors communicating with the Microgrid API,
most likely a subclass of
[GrpcError][frequenz.client.microgrid.GrpcError].
"""
Expand All @@ -432,21 +463,22 @@ async def set_bounds(
if lower > 0:
raise ValueError(f"Lower bound {lower} must be less than or equal to 0.")

target_metric = (
pb_microgrid.SetBoundsParamTargetMetric.TARGET_METRIC_POWER_ACTIVE
)
target_metric = PbSetBoundsParam.TargetMetric.TARGET_METRIC_POWER_ACTIVE
try:
await self.api.add_inclusion_bounds(
pb_microgrid.SetBoundsParam(
component_id=component_id,
target_metric=target_metric,
bounds=pb_metrics.Bounds(lower=lower, upper=upper),
await cast(
Awaitable[Timestamp],
self.api.AddInclusionBounds(
PbSetBoundsParam(
component_id=component_id,
target_metric=target_metric,
bounds=PbBounds(lower=lower, upper=upper),
),
timeout=int(DEFAULT_GRPC_CALL_TIMEOUT),
),
timeout=int(DEFAULT_GRPC_CALL_TIMEOUT),
)
except grpclib.GRPCError as grpc_error:
raise ClientError.from_grpc_error(
except grpc.aio.AioRpcError as grpc_error:
raise ApiClientError.from_grpc_error(
server_url=self._server_url,
operation="add_inclusion_bounds",
operation="AddInclusionBounds",
grpc_error=grpc_error,
) from grpc_error
Loading

0 comments on commit 8d1c8a8

Please sign in to comment.