Skip to content

Commit

Permalink
Merge pull request #16 from rpadn/add_get_peristent_subscription_details
Browse files Browse the repository at this point in the history
feat(client): add support for reading persistent subscription details
  • Loading branch information
betaboon authored May 10, 2024
2 parents 8584643 + 8aca140 commit 146b5e1
Show file tree
Hide file tree
Showing 7 changed files with 178 additions and 2 deletions.
4 changes: 4 additions & 0 deletions eventstoredb/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
from eventstoredb.client.delete_persistent_subscription_to_stream.mixin import (
DeletePersistentSubscriptionToStreamMixin,
)
from eventstoredb.client.get_persistent_subscription_details.mixin import (
GetPersistentSubscriptionDetailsMixin,
)
from eventstoredb.client.read_all.mixin import ReadAllMixin
from eventstoredb.client.read_stream.mixin import ReadStreamMixin
from eventstoredb.client.subscribe_to_all.mixin import SubscribeToAllMixin
Expand Down Expand Up @@ -47,6 +50,7 @@ class Client(
CreatePersistentSubscriptionToAllMixin,
UpdatePersistentSubscriptionToAllMixin,
DeletePersistentSubscriptionToAllMixin,
GetPersistentSubscriptionDetailsMixin,
SubscribeToPersistentSubscriptionToAllMixin,
):
def __init__(self, options: ClientOptions | str) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,4 @@


@dataclass
class DeletePersistentSubscriptionToStreamOptions:
...
class DeletePersistentSubscriptionToStreamOptions: ...
Empty file.
48 changes: 48 additions & 0 deletions eventstoredb/client/get_persistent_subscription_details/grpc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from __future__ import annotations

from typing import TYPE_CHECKING

from eventstoredb.client.get_persistent_subscription_details.types import SubscriptionDetails
from eventstoredb.generated.event_store.client import Empty, StreamIdentifier
from eventstoredb.generated.event_store.client.persistent_subscriptions import (
GetInfoReq,
GetInfoReqOptions,
GetInfoResp,
)

if TYPE_CHECKING:
from eventstoredb.client.get_persistent_subscription_details.types import (
GetPersistentSubscriptionDetailsOptions,
)


def create_get_persistent_subscription_details_request(
stream_name: str,
group_name: str,
options: GetPersistentSubscriptionDetailsOptions,
) -> GetInfoReq:
request_options = GetInfoReqOptions()

if stream_name == "$all":
request_options.all = Empty()
else:
request_options.stream_identifier = StreamIdentifier(stream_name.encode())
request_options.group_name = group_name

return GetInfoReq(options=request_options)


def convert_get_persistent_subscription_details_response(
response: GetInfoResp,
) -> SubscriptionDetails:
info = response.subscription_info
return SubscriptionDetails(
event_source=info.event_source,
group_name=info.group_name,
status=info.status,
last_known_event_position=info.last_known_event_position,
last_checkpointed_event_position=info.last_checkpointed_event_position,
start_from=info.start_from,
total_in_flight_messages=info.total_in_flight_messages,
outstanding_messages_count=info.outstanding_messages_count,
)
44 changes: 44 additions & 0 deletions eventstoredb/client/get_persistent_subscription_details/mixin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from __future__ import annotations

from grpclib.exceptions import GRPCError

from eventstoredb.client.create_persistent_subscription_to_stream.grpc import (
convert_grpc_error_to_exception,
)
from eventstoredb.client.get_persistent_subscription_details.grpc import (
convert_get_persistent_subscription_details_response,
create_get_persistent_subscription_details_request,
)
from eventstoredb.client.get_persistent_subscription_details.types import (
GetPersistentSubscriptionDetailsOptions,
SubscriptionDetails,
)
from eventstoredb.client.protocol import ClientProtocol
from eventstoredb.generated.event_store.client.persistent_subscriptions import (
PersistentSubscriptionsStub,
)


class GetPersistentSubscriptionDetailsMixin(ClientProtocol):
async def get_persistent_subscription_details(
self,
stream_name: str,
group_name: str,
options: GetPersistentSubscriptionDetailsOptions | None = None,
) -> SubscriptionDetails:
if options is None:
options = GetPersistentSubscriptionDetailsOptions()

client = PersistentSubscriptionsStub(channel=self.channel)
request = create_get_persistent_subscription_details_request(
stream_name=stream_name,
group_name=group_name,
options=options,
)

try:
response = await client.get_info(get_info_req=request)
except GRPCError as e:
raise convert_grpc_error_to_exception(e) # noqa: B904,TRY200

return convert_get_persistent_subscription_details_response(response)
19 changes: 19 additions & 0 deletions eventstoredb/client/get_persistent_subscription_details/types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from __future__ import annotations

from dataclasses import dataclass


@dataclass
class GetPersistentSubscriptionDetailsOptions: ...


@dataclass
class SubscriptionDetails:
event_source: str
group_name: str
status: str
last_known_event_position: str
last_checkpointed_event_position: str
start_from: str
total_in_flight_messages: int
outstanding_messages_count: int
62 changes: 62 additions & 0 deletions tests/test_get_persistent_subscription_details.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import re

from eventstoredb import Client
from eventstoredb.client.get_persistent_subscription_details.types import SubscriptionDetails


class RegexMatcher: # noqa: PLW1641
def __init__(self, pattern: str) -> None:
self._regex = re.compile(pattern)

def __eq__(self, actual: str) -> bool: # type: ignore
return bool(self._regex.match(actual))


async def test_get_persistent_subscription_to_all_details(
eventstoredb_client: Client,
group_name: str,
) -> None:
await eventstoredb_client.create_persistent_subscription_to_all(group_name=group_name)

details = await eventstoredb_client.get_persistent_subscription_details(
stream_name="$all",
group_name=group_name,
)

assert details == SubscriptionDetails(
event_source="$all",
group_name=group_name,
status="Live",
last_known_event_position=RegexMatcher(r"C:\d+/P:\d+"), # type: ignore
last_checkpointed_event_position="",
start_from="C:0/P:0",
total_in_flight_messages=0,
outstanding_messages_count=0,
)


async def test_get_persistent_subscription_to_stream_details(
eventstoredb_client: Client,
stream_name: str,
group_name: str,
) -> None:
await eventstoredb_client.create_persistent_subscription_to_stream(
stream_name=stream_name,
group_name=group_name,
)

details = await eventstoredb_client.get_persistent_subscription_details(
stream_name=stream_name,
group_name=group_name,
)

assert details == SubscriptionDetails(
event_source=stream_name,
group_name=group_name,
status="Live",
last_known_event_position="",
last_checkpointed_event_position="",
start_from="0",
total_in_flight_messages=0,
outstanding_messages_count=0,
)

0 comments on commit 146b5e1

Please sign in to comment.