Skip to content

Commit

Permalink
Fix handling of SSE connections during gracious teardown of the server
Browse files Browse the repository at this point in the history
  • Loading branch information
touilleMan committed Jan 6, 2025
1 parent 697403f commit 680e81d
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 2 deletions.
3 changes: 1 addition & 2 deletions server/parsec/asgi/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,7 @@ def should_exit(self, value: bool) -> None:
if self._should_exit:
app = cast(AsgiApp, self.config.app)
backend = cast(Backend, app.state.backend)
for client in backend.events._registered_clients.values():
client.cancel_scope.cancel()
backend.events.stop()


async def serve_parsec_asgi_app(
Expand Down
5 changes: 5 additions & 0 deletions server/parsec/asgi/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -930,6 +930,11 @@ async def _run_session(self, scope, receive, send) -> None:
api_version=self.settled_api_version,
**self.headers,
)
case SseAPiEventsListenBadOutcome.STOPPED:
# Listening to events in not possible since the server is stopping.
# This is because otherwise the ASGI server would wait pointlessly
# for our task to stop until the gracious shutdown timeout is reached.
raise HTTPException(status_code=503)


@rpc_router.post("/authenticated/{raw_organization_id}/tos")
Expand Down
21 changes: 21 additions & 0 deletions server/parsec/components/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,10 +240,12 @@ class SseAPiEventsListenBadOutcome(BadOutcomeEnum):
ORGANIZATION_EXPIRED = auto()
AUTHOR_NOT_FOUND = auto()
AUTHOR_REVOKED = auto()
STOPPED = auto()


class BaseEventsComponent:
def __init__(self, config: BackendConfig, event_bus: EventBus):
self._stopped = False
self._event_bus = event_bus
# Key is `id(client_ctx)`
self._registered_clients: dict[int, RegisteredClient] = {}
Expand All @@ -256,6 +258,18 @@ def __init__(self, config: BackendConfig, event_bus: EventBus):
# Note we don't have a `__del__` to disconnect from the event bus: the lifetime
# of this component is basically equivalent of the one of the event bus anyway

def stop(self) -> None:
"""
Close all listening SSE clients and refuse any new one.
This is needed to allow graceful shutdown of the the ASGI server, since otherwise
the server will wait pointlessly for the SSE connection to finish until the graceful
timeout is reached.
"""
self._stopped = True
for client in self._registered_clients.values():
client.cancel_scope.cancel()

def _on_event(self, event: Event) -> None:
match event:
# Events to be dispatched to the listening clients
Expand Down Expand Up @@ -394,6 +408,13 @@ def _collect_realm_changes(event: Event):
profile=user_profile,
cancel_scope=cancel_scope,
)

# Note it is vital that there in no await between the check of `self._stopped`
# and the registration of the client !
# Otherwise the registration may occur after `BaseEventsComponent.stop()` has
# been called.
if self._stopped:
return SseAPiEventsListenBadOutcome.STOPPED
self._registered_clients[id(client_ctx)] = registered

# Finally populate the event channel with the event that have been missed
Expand Down

0 comments on commit 680e81d

Please sign in to comment.