diff --git a/pyproject.toml b/pyproject.toml index b380ab0c3..8e1637deb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -54,6 +54,7 @@ dev = [ "isort==5.13.2", ] test = [ + "daphne==4.1.2", "factory-boy==3.3.1", "playwright>=1.39", "pytest==8.3.4", @@ -70,10 +71,7 @@ s3 = [ "django-storages[s3]==1.14.4", ] sync = [ - "channels==4.2.0", - "daphne==4.1.2", "pydantic==2.10.3", - "websockets==13.1", ] [project.scripts] diff --git a/umap/asgi.py b/umap/asgi.py index 2ca12ddce..47d69a931 100644 --- a/umap/asgi.py +++ b/umap/asgi.py @@ -1,15 +1,20 @@ import os -from channels.routing import ProtocolTypeRouter +os.environ.setdefault("DJANGO_SETTINGS_MODULE", "umap.settings") + from django.core.asgi import get_asgi_application -os.environ.setdefault("DJANGO_SETTINGS_MODULE", "umap.settings") +from .sync.app import application as ws_application + # Initialize Django ASGI application early to ensure the AppRegistry # is populated before importing code that may import ORM models. django_asgi_app = get_asgi_application() -application = ProtocolTypeRouter( - { - "http": django_asgi_app, - } -) + +async def application(scope, receive, send): + if scope["type"] == "http": + await django_asgi_app(scope, receive, send) + elif scope["type"] == "websocket": + await ws_application(scope, receive, send) + else: + raise NotImplementedError(f"Unknown scope type {scope['type']}") diff --git a/umap/management/commands/run_websocket_server.py b/umap/management/commands/run_websocket_server.py deleted file mode 100644 index 2cb2db891..000000000 --- a/umap/management/commands/run_websocket_server.py +++ /dev/null @@ -1,23 +0,0 @@ -from django.conf import settings -from django.core.management.base import BaseCommand - -from umap import websocket_server - - -class Command(BaseCommand): - help = "Run the websocket server" - - def add_arguments(self, parser): - parser.add_argument( - "--host", - help="The server host to bind to.", - default=settings.WEBSOCKET_BACK_HOST, - ) - parser.add_argument( - "--port", - help="The server port to bind to.", - default=settings.WEBSOCKET_BACK_PORT, - ) - - def handle(self, *args, **options): - websocket_server.run(options["host"], options["port"]) diff --git a/umap/settings/base.py b/umap/settings/base.py index f47ad2361..e89f17af2 100644 --- a/umap/settings/base.py +++ b/umap/settings/base.py @@ -129,6 +129,7 @@ "django.contrib.gis", "django_probes", "umap", + "umap.sync", "social_django", # See https://github.com/peopledoc/django-agnocomplete/commit/26eda2dfa4a2f8a805ca2ea19a0c504b9d773a1c # Django does not find the app config in the default place, so the app is not loaded diff --git a/umap/static/umap/js/modules/sync/engine.js b/umap/static/umap/js/modules/sync/engine.js index 212c2528e..c9b321673 100644 --- a/umap/static/umap/js/modules/sync/engine.js +++ b/umap/static/umap/js/modules/sync/engine.js @@ -77,7 +77,7 @@ export class SyncEngine { start(authToken) { this.transport = new WebSocketTransport( - this._umap.properties.websocketURI, + Utils.template(this._umap.properties.websocketURI, { id: this._umap.id }), authToken, this ) @@ -125,7 +125,7 @@ export class SyncEngine { if (this.offline) return if (this.transport) { - this.transport.send('OperationMessage', message) + this.transport.send('OperationMessage', { sender: this.uuid, ...message }) } } @@ -177,6 +177,7 @@ export class SyncEngine { * @param {Object} payload */ onOperationMessage(payload) { + if (payload.sender === this.uuid) return this._operations.storeRemoteOperations([payload]) this._applyOperation(payload) } @@ -484,7 +485,7 @@ export class Operations { return ( Utils.deepEqual(local.subject, remote.subject) && Utils.deepEqual(local.metadata, remote.metadata) && - (!shouldCheckKey || (shouldCheckKey && local.key == remote.key)) + (!shouldCheckKey || (shouldCheckKey && local.key === remote.key)) ) } } diff --git a/umap/static/umap/js/modules/sync/websocket.js b/umap/static/umap/js/modules/sync/websocket.js index 26c99f26e..0a80f98f5 100644 --- a/umap/static/umap/js/modules/sync/websocket.js +++ b/umap/static/umap/js/modules/sync/websocket.js @@ -21,6 +21,10 @@ export class WebSocketTransport { } } + this.websocket.onerror = (error) => { + console.log('WS ERROR', error) + } + this.ensureOpen = setInterval(() => { if (this.websocket.readyState !== WebSocket.OPEN) { this.websocket.close() @@ -34,6 +38,7 @@ export class WebSocketTransport { // See https://making.close.com/posts/reliable-websockets/ for more details. this.pingInterval = setInterval(() => { if (this.websocket.readyState === WebSocket.OPEN) { + console.log('sending ping') this.websocket.send('ping') this.pongReceived = false setTimeout(() => { @@ -63,6 +68,7 @@ export class WebSocketTransport { } close() { + console.log('Closing') this.receiver.closeRequested = true this.websocket.close() } diff --git a/umap/sync/__init__.py b/umap/sync/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/umap/sync/app.py b/umap/sync/app.py new file mode 100644 index 000000000..cf526722a --- /dev/null +++ b/umap/sync/app.py @@ -0,0 +1,45 @@ +import uuid + +from django.urls.resolvers import RoutePattern + +ws_pattern = RoutePattern("/ws/sync/") + + +async def application(scope, receive, send): + from .models import Peer + + matched = ws_pattern.match(scope["path"]) + print(matched) + if not matched: + print("Wrong path") + return + _, _, kwargs = matched + + map_id = kwargs["map_id"] + room_id = f"room{map_id}" + peer = Peer(uuid=uuid.uuid4(), name="FooBar", room_id=room_id) + print(peer) + peer._send = send + while True: + event = await receive() + print("EVENT", event) + + if event["type"] == "websocket.connect": + try: + print("Let's accept") + await send({"type": "websocket.accept"}) + await peer.connect() + except ValueError: + await send({"type": "websocket.close"}) + + if event["type"] == "websocket.disconnect": + print("Closing", event) + await peer.disconnect() + print("Closed") + break + + if event["type"] == "websocket.receive": + if event["text"] == "ping": + await send({"type": "websocket.send", "text": "pong"}) + else: + await peer.receive(event["text"]) diff --git a/umap/sync/apps.py b/umap/sync/apps.py new file mode 100644 index 000000000..b4c9c694f --- /dev/null +++ b/umap/sync/apps.py @@ -0,0 +1,6 @@ +from django.apps import AppConfig + + +class UmapConfig(AppConfig): + name = "umap.sync" + verbose_name = "uMap Sync" diff --git a/umap/sync/migrations/0001_initial.py b/umap/sync/migrations/0001_initial.py new file mode 100644 index 000000000..a8893dfac --- /dev/null +++ b/umap/sync/migrations/0001_initial.py @@ -0,0 +1,23 @@ +# Generated by Django 5.1.4 on 2024-12-27 16:14 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + initial = True + + dependencies = [] + + operations = [ + migrations.CreateModel( + name="Peer", + fields=[ + ( + "uuid", + models.UUIDField(primary_key=True, serialize=False, unique=True), + ), + ("name", models.CharField(max_length=200)), + ("room_id", models.CharField(max_length=200)), + ], + ), + ] diff --git a/umap/sync/migrations/__init__.py b/umap/sync/migrations/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/umap/sync/models.py b/umap/sync/models.py new file mode 100644 index 000000000..18a40076a --- /dev/null +++ b/umap/sync/models.py @@ -0,0 +1,150 @@ +import asyncio +import logging + +import psycopg +from django.core.signing import TimestampSigner +from django.db import connection, models +from psycopg import sql +from pydantic import ValidationError + +from .payloads import ( + JoinRequest, + JoinResponse, + ListPeersResponse, + OperationMessage, + PeerMessage, + Request, +) + + +class Peer(models.Model): + uuid = models.UUIDField(unique=True, primary_key=True) + name = models.CharField(max_length=200) + room_id = models.CharField(max_length=200) + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.is_authenticated = False + + async def get_peers(self): + qs = Peer.objects.filter(room_id=self.room_id).values_list("uuid", flat=True) + peers = [] + async for peer in qs: + peers.append(peer) + return peers + + async def listen_public(self): + # We need a dedicated connection for the LISTEN + aconn = await psycopg.AsyncConnection.connect( + **self.connection_params, + autocommit=True, + ) + async with aconn: + async with aconn.cursor() as acursor: + await acursor.execute( + sql.SQL("LISTEN {chan}").format( + chan=sql.Identifier(str(self.room_id)) + ) + ) + print("LISTEN", self.room_id) + gen = aconn.notifies() + async for notify in gen: + await self.send(notify.payload) + + async def listen_private(self): + aconn = await psycopg.AsyncConnection.connect( + **self.connection_params, + autocommit=True, + ) + async with aconn: + async with aconn.cursor() as acursor: + await acursor.execute( + sql.SQL("LISTEN {chan}").format(chan=sql.Identifier(str(self.uuid))) + ) + print("LISTEN", self.uuid) + gen = aconn.notifies() + async for notify in gen: + await self.send(notify.payload) + + async def connect(self): + # Join room for this map + connection_params = connection.get_connection_params() + connection_params.pop("cursor_factory") + self.connection_params = connection_params + self.connection = await psycopg.AsyncConnection.connect( + **connection_params, + autocommit=True, + ) + + async def listen(self): + asyncio.create_task(self.listen_public()) + asyncio.create_task(self.listen_private()) + + async def disconnect(self): + await self.adelete() + await self.send_peers_list() + + async def send_peers_list(self): + message = ListPeersResponse(peers=await self.get_peers()) + await self.broadcast(message.model_dump_json()) + + async def broadcast(self, message): + print("BROADCASTING", message) + # Send to all channels (including sender!) + async with self.connection.cursor() as cursor: + await cursor.execute( + sql.SQL("NOTIFY {chan}, {message}").format( + chan=sql.Identifier(str(self.room_id)), + message=message, + ) + ) + + async def send_to(self, peer_id, message): + print("SEND TO", peer_id, message) + # Send to one given channel + async with self.connection.cursor() as cursor: + await cursor.execute( + sql.SQL("NOTIFY {chan}, {message}").format( + chan=sql.Identifier(str(peer_id)), message=message + ) + ) + + async def receive(self, text_data): + if not self.is_authenticated: + print("AUTHENTICATING", self.uuid) + message = JoinRequest.model_validate_json(text_data) + signed = TimestampSigner().unsign_object(message.token, max_age=30) + user, map_id, permissions = signed.values() + if "edit" not in permissions: + return await self.disconnect() + await self.asave() + await self.listen() + response = JoinResponse(uuid=str(self.uuid), peers=await self.get_peers()) + await self.send(response.model_dump_json()) + await self.send_peers_list() + self.is_authenticated = True + return + + if text_data == "ping": + return await self.send("pong") + + try: + incoming = Request.model_validate_json(text_data) + except ValidationError as error: + message = ( + f"An error occurred when receiving the following message: {text_data!r}" + ) + logging.error(message, error) + else: + match incoming.root: + # Broadcast all operation messages to connected peers + case OperationMessage(): + await self.broadcast(text_data) + + # Send peer messages to the proper peer + case PeerMessage(): + await self.send_to(incoming.root.recipient, text_data) + + async def send(self, text): + print("SEND", text) + await self._send({"type": "websocket.send", "text": text}) diff --git a/umap/sync/payloads.py b/umap/sync/payloads.py new file mode 100644 index 000000000..6a15a3f1d --- /dev/null +++ b/umap/sync/payloads.py @@ -0,0 +1,47 @@ +from typing import Literal, Optional, Union + +from pydantic import BaseModel, Field, RootModel + + +class JoinRequest(BaseModel): + kind: Literal["JoinRequest"] = "JoinRequest" + token: str + + +class OperationMessage(BaseModel): + """Message sent from one peer to all the others""" + + kind: Literal["OperationMessage"] = "OperationMessage" + verb: Literal["upsert", "update", "delete"] + subject: Literal["map", "datalayer", "feature"] + metadata: Optional[dict] = None + key: Optional[str] = None + + +class PeerMessage(BaseModel): + """Message sent from a specific peer to another one""" + + kind: Literal["PeerMessage"] = "PeerMessage" + sender: str + recipient: str + # The message can be whatever the peers want. It's not checked by the server. + message: dict + + +class Request(RootModel): + """Any message coming from the websocket should be one of these, and will be rejected otherwise.""" + + root: Union[PeerMessage, OperationMessage] = Field(discriminator="kind") + + +class JoinResponse(BaseModel): + """Server response containing the list of peers""" + + kind: Literal["JoinResponse"] = "JoinResponse" + peers: list + uuid: str + + +class ListPeersResponse(BaseModel): + kind: Literal["ListPeersResponse"] = "ListPeersResponse" + peers: list diff --git a/umap/tests/integration/conftest.py b/umap/tests/integration/conftest.py index 4601a7093..d72886f87 100644 --- a/umap/tests/integration/conftest.py +++ b/umap/tests/integration/conftest.py @@ -1,12 +1,12 @@ import os import re -import subprocess -import time -from pathlib import Path import pytest +from daphne.testing import DaphneProcess from playwright.sync_api import expect +from umap.asgi import application + from ..base import mock_tiles @@ -67,23 +67,17 @@ def do_login(user, **kwargs): return do_login -@pytest.fixture -def websocket_server(): - # Find the test-settings, and put them in the current environment - settings_path = (Path(__file__).parent.parent / "settings.py").absolute().as_posix() - os.environ["UMAP_SETTINGS"] = settings_path - - ds_proc = subprocess.Popen( - [ - "umap", - "run_websocket_server", - ], - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - ) - time.sleep(2) - # Ensure it started properly before yielding - assert not ds_proc.poll(), ds_proc.stdout.read().decode("utf-8") - yield ds_proc - # Shut it down at the end of the pytest session - ds_proc.terminate() +@pytest.fixture(scope="function") +def asgi_live_server(request, settings): + request.getfixturevalue("transactional_db") + server = DaphneProcess("localhost", lambda: application) + server.start() + server.ready.wait() + port = server.port.value + settings.WEBSOCKET_FRONT_URI = f"ws://localhost:{port}/ws/sync/{{id}}/" + server.url = f"http://localhost:{port}" + + yield server + + server.terminate() + server.join() diff --git a/umap/tests/integration/test_websocket_sync.py b/umap/tests/integration/test_websocket_sync.py index c5e56e893..949a081e6 100644 --- a/umap/tests/integration/test_websocket_sync.py +++ b/umap/tests/integration/test_websocket_sync.py @@ -12,7 +12,7 @@ @pytest.mark.xdist_group(name="websockets") def test_websocket_connection_can_sync_markers( - new_page, live_server, websocket_server, tilelayer + new_page, live_server, asgi_live_server, tilelayer ): map = MapFactory(name="sync", edit_status=Map.ANONYMOUS) map.settings["properties"]["syncEnabled"] = True @@ -80,7 +80,7 @@ def test_websocket_connection_can_sync_markers( @pytest.mark.xdist_group(name="websockets") def test_websocket_connection_can_sync_polygons( - context, live_server, websocket_server, tilelayer + context, live_server, asgi_live_server, tilelayer ): map = MapFactory(name="sync", edit_status=Map.ANONYMOUS) map.settings["properties"]["syncEnabled"] = True @@ -164,7 +164,7 @@ def test_websocket_connection_can_sync_polygons( @pytest.mark.xdist_group(name="websockets") def test_websocket_connection_can_sync_map_properties( - new_page, live_server, websocket_server, tilelayer + new_page, live_server, asgi_live_server, tilelayer ): map = MapFactory(name="sync", edit_status=Map.ANONYMOUS) map.settings["properties"]["syncEnabled"] = True @@ -196,7 +196,7 @@ def test_websocket_connection_can_sync_map_properties( @pytest.mark.xdist_group(name="websockets") def test_websocket_connection_can_sync_datalayer_properties( - new_page, live_server, websocket_server, tilelayer + new_page, live_server, asgi_live_server, tilelayer ): map = MapFactory(name="sync", edit_status=Map.ANONYMOUS) map.settings["properties"]["syncEnabled"] = True @@ -225,7 +225,7 @@ def test_websocket_connection_can_sync_datalayer_properties( @pytest.mark.xdist_group(name="websockets") def test_websocket_connection_can_sync_cloned_polygons( - context, live_server, websocket_server, tilelayer + context, live_server, asgi_live_server, tilelayer ): map = MapFactory(name="sync", edit_status=Map.ANONYMOUS) map.settings["properties"]["syncEnabled"] = True @@ -288,7 +288,7 @@ def test_websocket_connection_can_sync_cloned_polygons( @pytest.mark.xdist_group(name="websockets") def test_websocket_connection_can_sync_late_joining_peer( - new_page, live_server, websocket_server, tilelayer + new_page, live_server, asgi_live_server, tilelayer ): map = MapFactory(name="sync", edit_status=Map.ANONYMOUS) map.settings["properties"]["syncEnabled"] = True @@ -349,7 +349,7 @@ def test_websocket_connection_can_sync_late_joining_peer( @pytest.mark.xdist_group(name="websockets") -def test_should_sync_datalayers(new_page, live_server, websocket_server, tilelayer): +def test_should_sync_datalayers(new_page, live_server, asgi_live_server, tilelayer): map = MapFactory(name="sync", edit_status=Map.ANONYMOUS) map.settings["properties"]["syncEnabled"] = True map.save() @@ -422,7 +422,7 @@ def test_should_sync_datalayers(new_page, live_server, websocket_server, tilelay @pytest.mark.xdist_group(name="websockets") def test_create_and_sync_map( - new_page, live_server, websocket_server, tilelayer, login, user + new_page, live_server, asgi_live_server, tilelayer, login, user ): # Create a syncable map with peerA peerA = login(user, prefix="Page A") diff --git a/umap/tests/test_websocket_server.py b/umap/tests/test_websocket_server.py deleted file mode 100644 index 62bc93e92..000000000 --- a/umap/tests/test_websocket_server.py +++ /dev/null @@ -1,22 +0,0 @@ -from umap.websocket_server import OperationMessage, PeerMessage, Request, ServerRequest - - -def test_messages_are_parsed_correctly(): - server = Request.model_validate(dict(kind="Server", action="list-peers")).root - assert type(server) is ServerRequest - - operation = Request.model_validate( - dict( - kind="OperationMessage", - verb="upsert", - subject="map", - metadata={}, - key="key", - ) - ).root - assert type(operation) is OperationMessage - - peer_message = Request.model_validate( - dict(kind="PeerMessage", sender="Alice", recipient="Bob", message={}) - ).root - assert type(peer_message) is PeerMessage diff --git a/umap/websocket_server.py b/umap/websocket_server.py deleted file mode 100644 index 6483d6489..000000000 --- a/umap/websocket_server.py +++ /dev/null @@ -1,202 +0,0 @@ -#!/usr/bin/env python - -import asyncio -import logging -import uuid -from collections import defaultdict -from typing import Literal, Optional, Union - -import websockets -from django.conf import settings -from django.core.signing import TimestampSigner -from pydantic import BaseModel, Field, RootModel, ValidationError -from websockets import WebSocketClientProtocol -from websockets.server import serve - - -class Connections: - def __init__(self) -> None: - self._connections: set[WebSocketClientProtocol] = set() - self._ids: dict[WebSocketClientProtocol, str] = dict() - - def join(self, websocket: WebSocketClientProtocol) -> str: - self._connections.add(websocket) - _id = str(uuid.uuid4()) - self._ids[websocket] = _id - return _id - - def leave(self, websocket: WebSocketClientProtocol) -> None: - self._connections.remove(websocket) - del self._ids[websocket] - - def get(self, id) -> WebSocketClientProtocol: - # use an iterator to stop iterating as soon as we found - return next(k for k, v in self._ids.items() if v == id) - - def get_id(self, websocket: WebSocketClientProtocol): - return self._ids[websocket] - - def get_other_peers( - self, websocket: WebSocketClientProtocol - ) -> set[WebSocketClientProtocol]: - return self._connections - {websocket} - - def get_all_peers(self) -> set[WebSocketClientProtocol]: - return self._connections - - -# Contains the list of websocket connections handled by this process. -# It's a mapping of map_id to a set of the active websocket connections -CONNECTIONS: defaultdict[int, Connections] = defaultdict(Connections) - - -class JoinRequest(BaseModel): - kind: Literal["JoinRequest"] = "JoinRequest" - token: str - - -class OperationMessage(BaseModel): - """Message sent from one peer to all the others""" - - kind: Literal["OperationMessage"] = "OperationMessage" - verb: Literal["upsert", "update", "delete"] - subject: Literal["map", "datalayer", "feature"] - metadata: Optional[dict] = None - key: Optional[str] = None - - -class PeerMessage(BaseModel): - """Message sent from a specific peer to another one""" - - kind: Literal["PeerMessage"] = "PeerMessage" - sender: str - recipient: str - # The message can be whatever the peers want. It's not checked by the server. - message: dict - - -class ServerRequest(BaseModel): - """A request towards the server""" - - kind: Literal["Server"] = "Server" - action: Literal["list-peers"] - - -class Request(RootModel): - """Any message coming from the websocket should be one of these, and will be rejected otherwise.""" - - root: Union[ServerRequest, PeerMessage, OperationMessage] = Field( - discriminator="kind" - ) - - -class JoinResponse(BaseModel): - """Server response containing the list of peers""" - - kind: Literal["JoinResponse"] = "JoinResponse" - peers: list - uuid: str - - -class ListPeersResponse(BaseModel): - kind: Literal["ListPeersResponse"] = "ListPeersResponse" - peers: list - - -async def join_and_listen( - map_id: int, permissions: list, user: str | int, websocket: WebSocketClientProtocol -): - """Join a "room" with other connected peers, and wait for messages.""" - logging.debug(f"{user} joined room #{map_id}") - connections: Connections = CONNECTIONS[map_id] - _id: str = connections.join(websocket) - - # Assign an ID to the joining peer and return it the list of connected peers. - peers: list[WebSocketClientProtocol] = [ - connections.get_id(p) for p in connections.get_all_peers() - ] - response = JoinResponse(uuid=_id, peers=peers) - await websocket.send(response.model_dump_json()) - - # Notify all other peers of the new list of connected peers. - message = ListPeersResponse(peers=peers) - websockets.broadcast( - connections.get_other_peers(websocket), message.model_dump_json() - ) - - try: - async for raw_message in websocket: - if raw_message == "ping": - await websocket.send("pong") - continue - - # recompute the peers list at the time of message-sending. - # as doing so beforehand would miss new connections - other_peers = connections.get_other_peers(websocket) - try: - incoming = Request.model_validate_json(raw_message) - except ValidationError as e: - error = f"An error occurred when receiving the following message: {raw_message!r}" - logging.error(error, e) - else: - match incoming.root: - # Broadcast all operation messages to connected peers - case OperationMessage(): - websockets.broadcast(other_peers, raw_message) - - # Send peer messages to the proper peer - case PeerMessage(recipient=_id): - peer = connections.get(_id) - if peer: - await peer.send(raw_message) - - finally: - # On disconnect, remove the connection from the pool - connections.leave(websocket) - - # TODO: refactor this in a separate method. - # Notify all other peers of the new list of connected peers. - peers = [connections.get_id(p) for p in connections.get_all_peers()] - message = ListPeersResponse(peers=peers) - websockets.broadcast( - connections.get_other_peers(websocket), message.model_dump_json() - ) - - -async def handler(websocket: WebSocketClientProtocol): - """Main WebSocket handler. - - Check if the permission is granted and let the peer enter a room. - """ - raw_message = await websocket.recv() - - # The first event should always be 'join' - message: JoinRequest = JoinRequest.model_validate_json(raw_message) - signed = TimestampSigner().unsign_object(message.token, max_age=30) - user, map_id, permissions = signed.values() - - # Check if permissions for this map have been granted by the server - if "edit" in signed["permissions"]: - await join_and_listen(map_id, permissions, user, websocket) - - -def run(host: str, port: int): - if not settings.WEBSOCKET_ENABLED: - msg = ( - "WEBSOCKET_ENABLED should be set to True to run the WebSocket Server. " - "See the documentation at " - "https://docs.umap-project.org/en/stable/config/settings/#websocket_enabled " - "for more information." - ) - print(msg) - exit(1) - - async def _serve(): - async with serve(handler, host, port): - logging.debug(f"Waiting for connections on {host}:{port}") - await asyncio.Future() # run forever - - try: - asyncio.run(_serve()) - except KeyboardInterrupt: - print("Closing WebSocket server")