Skip to content

Commit

Permalink
add websocket notifications about users that joined the workspaces
Browse files Browse the repository at this point in the history
  • Loading branch information
meln1k committed Dec 7, 2023
1 parent 27328d5 commit b9c8a6b
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 10 deletions.
33 changes: 30 additions & 3 deletions fixbackend/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,16 @@ async def setup_teardown_application(_: FastAPI) -> AsyncIterator[None]:
domain_event_publisher = deps.add(SN.domain_event_sender, DomainEventPublisherImpl(fixbackend_events))
workspace_repo = deps.add(
SN.workspace_repo,
WorkspaceRepositoryImpl(session_maker, graph_db_access, domain_event_publisher),
WorkspaceRepositoryImpl(
session_maker,
graph_db_access,
domain_event_publisher,
RedisPubSubPublisher(
redis=readwrite_redis,
channel="workspaces",
publisher_name="workspace_service",
),
),
)
deps.add(
SN.invitation_repository,
Expand Down Expand Up @@ -250,7 +259,16 @@ async def setup_teardown_dispatcher(_: FastAPI) -> AsyncIterator[None]:
domain_event_publisher = deps.add(SN.domain_event_sender, DomainEventPublisherImpl(fixbackend_events))
workspace_repo = deps.add(
SN.workspace_repo,
WorkspaceRepositoryImpl(session_maker, db_access, domain_event_publisher),
WorkspaceRepositoryImpl(
session_maker,
db_access,
domain_event_publisher,
RedisPubSubPublisher(
redis=rw_redis,
channel="workspaces",
publisher_name="workspace_service",
),
),
)

cloud_accounts_redis_publisher = RedisPubSubPublisher(
Expand Down Expand Up @@ -312,7 +330,16 @@ async def setup_teardown_billing(_: FastAPI) -> AsyncIterator[None]:
metering_repo = deps.add(SN.metering_repo, MeteringRepository(session_maker))
workspace_repo = deps.add(
SN.workspace_repo,
WorkspaceRepositoryImpl(session_maker, graph_db_access, domain_event_publisher),
WorkspaceRepositoryImpl(
session_maker,
graph_db_access,
domain_event_publisher,
RedisPubSubPublisher(
redis=readwrite_redis,
channel="workspaces",
publisher_name="workspace_service",
),
),
)
subscription_repo = deps.add(SN.subscription_repo, SubscriptionRepository(session_maker))
aws_marketplace = deps.add(
Expand Down
4 changes: 2 additions & 2 deletions fixbackend/workspaces/invitation_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ async def invite_user(
subject = f"FIX Cloud {inviter.email} has invited you to FIX workspace"
invite_link = f"{accept_invite_base_url}?token={token}"
text = (
f"{inviter.email} has invited you to join the workspace {workspace.name}."
"Please click on the link below to accept the invitation."
f"{inviter.email} has invited you to join the workspace {workspace.name}. "
"Please click on the link below to accept the invitation. \n\n"
f"{invite_link}"
)
await self.notification_service.send_email(to=invitee_email, subject=subject, text=text, html=None)
Expand Down
4 changes: 4 additions & 0 deletions fixbackend/workspaces/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from typing import Annotated, Optional, Sequence

from fastapi import Depends
from fixcloudutils.redis.pub_sub import RedisPubSubPublisher
from sqlalchemy import select, or_
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession
Expand Down Expand Up @@ -72,10 +73,12 @@ def __init__(
session_maker: AsyncSessionMaker,
graph_db_access_manager: GraphDatabaseAccessManager,
domain_event_sender: DomainEventPublisher,
pubsub_publisher: RedisPubSubPublisher,
) -> None:
self.session_maker = session_maker
self.graph_db_access_manager = graph_db_access_manager
self.domain_event_sender = domain_event_sender
self.pubsub_publisher = pubsub_publisher

async def create_workspace(self, name: str, slug: str, owner: User) -> Workspace:
async with self.session_maker() as session:
Expand Down Expand Up @@ -163,6 +166,7 @@ async def add_to_workspace(self, workspace_id: WorkspaceId, user_id: UserId) ->

event = UserJoinedWorkspace(workspace_id, user_id)
await self.domain_event_sender.publish(event)
await self.pubsub_publisher.publish(event.kind, event.to_json(), f"tenant-events::{event.workspace_id}")

async def remove_from_workspace(self, workspace_id: WorkspaceId, user_id: UserId) -> None:
async with self.session_maker() as session:
Expand Down
8 changes: 4 additions & 4 deletions tests/fixbackend/cloud_accounts/service_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ async def list_all_discovered_accounts(self) -> List[CloudAccount]:
)


class OrganizationServiceMock(WorkspaceRepositoryImpl):
class WorkspaceServiceMock(WorkspaceRepositoryImpl):
# noinspection PyMissingConstructor
def __init__(self) -> None:
pass
Expand Down Expand Up @@ -197,8 +197,8 @@ def repository() -> CloudAccountRepositoryMock:


@pytest.fixture
def organization_repository() -> OrganizationServiceMock:
return OrganizationServiceMock()
def organization_repository() -> WorkspaceServiceMock:
return WorkspaceServiceMock()


@pytest.fixture
Expand All @@ -218,7 +218,7 @@ def account_setup_helper() -> AwsAccountSetupHelperMock:

@pytest.fixture
def service(
organization_repository: OrganizationServiceMock,
organization_repository: WorkspaceServiceMock,
repository: CloudAccountRepositoryMock,
pubsub_publisher: RedisPubSubPublisherMock,
domain_sender: DomainEventSenderMock,
Expand Down
27 changes: 26 additions & 1 deletion tests/fixbackend/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from datetime import datetime, timezone
from typing import Any, AsyncIterator, Awaitable, Callable, Dict, Iterator, List, Sequence, Tuple, Optional
from unittest.mock import patch
from attrs import frozen

import pytest
from alembic.command import upgrade as alembic_upgrade
Expand Down Expand Up @@ -61,6 +62,7 @@
from fixbackend.workspaces.invitation_repository import InvitationRepository, InvitationRepositoryImpl
from fixbackend.workspaces.models import Workspace
from fixbackend.workspaces.repository import WorkspaceRepository, WorkspaceRepositoryImpl
from fixcloudutils.redis.pub_sub import RedisPubSubPublisher

DATABASE_URL = "mysql+aiomysql://[email protected]:3306/fixbackend-testdb"
# only used to create/drop the database
Expand Down Expand Up @@ -435,13 +437,36 @@ async def domain_event_sender() -> InMemoryDomainEventPublisher:
return InMemoryDomainEventPublisher()


@frozen
class PubSubMessage:
kind: str
message: Json
channel: Optional[str]


class InMemoryRedisPubSubPublisher(RedisPubSubPublisher):
def __init__(self) -> None:
self.events: List[PubSubMessage] = []

async def publish(self, kind: str, message: Json, channel: Optional[str] = None) -> None:
self.events.append(PubSubMessage(kind, message, channel))


@pytest.fixture
def pubsub_publisher() -> InMemoryRedisPubSubPublisher:
return InMemoryRedisPubSubPublisher()


@pytest.fixture
async def workspace_repository(
async_session_maker: AsyncSessionMaker,
graph_database_access_manager: GraphDatabaseAccessManager,
domain_event_sender: DomainEventPublisher,
pubsub_publisher: InMemoryRedisPubSubPublisher,
) -> WorkspaceRepository:
return WorkspaceRepositoryImpl(async_session_maker, graph_database_access_manager, domain_event_sender)
return WorkspaceRepositoryImpl(
async_session_maker, graph_database_access_manager, domain_event_sender, pubsub_publisher
)


@pytest.fixture
Expand Down

0 comments on commit b9c8a6b

Please sign in to comment.