From 885c5fa424499709c0d5b7bd5f3f77d239abf600 Mon Sep 17 00:00:00 2001 From: Yohan Boniface Date: Fri, 27 Dec 2024 18:09:27 +0100 Subject: [PATCH] wip(sync): websocket server with ASGI and PostgreSQL LISTEN/NOTIFY --- umap/asgi.py | 22 ++- umap/settings/base.py | 2 +- umap/static/umap/js/modules/sync/websocket.js | 9 +- umap/sync/app.py | 45 ++++++ umap/sync/apps.py | 6 + umap/sync/consumers.py | 86 ----------- umap/sync/migrations/0001_initial.py | 24 +++ umap/sync/migrations/__init__.py | 0 umap/sync/models.py | 146 ++++++++++++++++++ 9 files changed, 239 insertions(+), 101 deletions(-) create mode 100644 umap/sync/app.py create mode 100644 umap/sync/apps.py delete mode 100644 umap/sync/consumers.py create mode 100644 umap/sync/migrations/0001_initial.py create mode 100644 umap/sync/migrations/__init__.py create mode 100644 umap/sync/models.py diff --git a/umap/asgi.py b/umap/asgi.py index 1f9d618a7..47d69a931 100644 --- a/umap/asgi.py +++ b/umap/asgi.py @@ -1,22 +1,20 @@ import os -from channels.routing import ProtocolTypeRouter, URLRouter -from channels.security.websocket import AllowedHostsOriginValidator +os.environ.setdefault("DJANGO_SETTINGS_MODULE", "umap.settings") + from django.core.asgi import get_asgi_application -from django.urls import re_path -from .sync import consumers +from .sync.app import application as ws_application -os.environ.setdefault("DJANGO_SETTINGS_MODULE", "umap.settings") # Initialize Django ASGI application early to ensure the AppRegistry # is populated before importing code that may import ORM models. django_asgi_app = get_asgi_application() -urlpatterns = (re_path(r"ws/sync/(?P\w+)/$", consumers.SyncConsumer.as_asgi()),) -application = ProtocolTypeRouter( - { - "http": django_asgi_app, - "websocket": AllowedHostsOriginValidator(URLRouter(urlpatterns)), - } -) +async def application(scope, receive, send): + if scope["type"] == "http": + await django_asgi_app(scope, receive, send) + elif scope["type"] == "websocket": + await ws_application(scope, receive, send) + else: + raise NotImplementedError(f"Unknown scope type {scope['type']}") diff --git a/umap/settings/base.py b/umap/settings/base.py index fedefe883..e89f17af2 100644 --- a/umap/settings/base.py +++ b/umap/settings/base.py @@ -129,6 +129,7 @@ "django.contrib.gis", "django_probes", "umap", + "umap.sync", "social_django", # See https://github.com/peopledoc/django-agnocomplete/commit/26eda2dfa4a2f8a805ca2ea19a0c504b9d773a1c # Django does not find the app config in the default place, so the app is not loaded @@ -343,4 +344,3 @@ WEBSOCKET_BACK_HOST = env("WEBSOCKET_BACK_HOST", default="localhost") WEBSOCKET_BACK_PORT = env.int("WEBSOCKET_BACK_PORT", default=8001) WEBSOCKET_FRONT_URI = env("WEBSOCKET_FRONT_URI", default="ws://localhost:8001") -CHANNEL_LAYERS = {"default": {"BACKEND": "channels.layers.InMemoryChannelLayer"}} diff --git a/umap/static/umap/js/modules/sync/websocket.js b/umap/static/umap/js/modules/sync/websocket.js index accdbcc39..0a80f98f5 100644 --- a/umap/static/umap/js/modules/sync/websocket.js +++ b/umap/static/umap/js/modules/sync/websocket.js @@ -6,7 +6,7 @@ export class WebSocketTransport { constructor(webSocketURI, authToken, messagesReceiver) { this.receiver = messagesReceiver - this.websocket = new WebSocket(`${webSocketURI}`) + this.websocket = new WebSocket(webSocketURI) this.websocket.onopen = () => { this.send('JoinRequest', { token: authToken }) @@ -21,6 +21,10 @@ export class WebSocketTransport { } } + this.websocket.onerror = (error) => { + console.log('WS ERROR', error) + } + this.ensureOpen = setInterval(() => { if (this.websocket.readyState !== WebSocket.OPEN) { this.websocket.close() @@ -34,6 +38,7 @@ export class WebSocketTransport { // See https://making.close.com/posts/reliable-websockets/ for more details. this.pingInterval = setInterval(() => { if (this.websocket.readyState === WebSocket.OPEN) { + console.log('sending ping') this.websocket.send('ping') this.pongReceived = false setTimeout(() => { @@ -48,7 +53,6 @@ export class WebSocketTransport { } onMessage(wsMessage) { - console.log(wsMessage) if (wsMessage.data === 'pong') { this.pongReceived = true } else { @@ -64,6 +68,7 @@ export class WebSocketTransport { } close() { + console.log('Closing') this.receiver.closeRequested = true this.websocket.close() } diff --git a/umap/sync/app.py b/umap/sync/app.py new file mode 100644 index 000000000..04482d2d1 --- /dev/null +++ b/umap/sync/app.py @@ -0,0 +1,45 @@ +import uuid + +from django.urls.resolvers import RoutePattern + +ws_pattern = RoutePattern("/ws/sync/") + + +async def application(scope, receive, send): + from .models import Peer + + matched = ws_pattern.match(scope["path"]) + print(matched) + if not matched: + print("Wrong path") + return + _, _, kwargs = matched + + map_id = kwargs["map_id"] + room_id = f"room{map_id}" + peer = await Peer.objects.acreate(uuid=uuid.uuid4(), name="FooBar", room_id=room_id) + print(peer) + peer._send = send + while True: + event = await receive() + print("EVENT", event) + + if event["type"] == "websocket.connect": + try: + print("Let's accept") + await send({"type": "websocket.accept"}) + await peer.connect() + except ValueError: + await send({"type": "websocket.close"}) + + if event["type"] == "websocket.disconnect": + print("Closing", event) + await peer.disconnect() + print("Closed") + break + + if event["type"] == "websocket.receive": + if event["text"] == "ping": + await send({"type": "websocket.send", "text": "pong"}) + else: + await peer.receive(event["text"]) diff --git a/umap/sync/apps.py b/umap/sync/apps.py new file mode 100644 index 000000000..b4c9c694f --- /dev/null +++ b/umap/sync/apps.py @@ -0,0 +1,6 @@ +from django.apps import AppConfig + + +class UmapConfig(AppConfig): + name = "umap.sync" + verbose_name = "uMap Sync" diff --git a/umap/sync/consumers.py b/umap/sync/consumers.py deleted file mode 100644 index dc722279a..000000000 --- a/umap/sync/consumers.py +++ /dev/null @@ -1,86 +0,0 @@ -import logging - -from channels.generic.websocket import AsyncWebsocketConsumer -from django.core.signing import TimestampSigner -from pydantic import ValidationError - -from .payloads import ( - JoinRequest, - JoinResponse, - ListPeersResponse, - OperationMessage, - PeerMessage, - Request, -) - - -class SyncConsumer(AsyncWebsocketConsumer): - @property - def peers(self): - return self.channel_layer.groups[self.map_id].keys() - - async def connect(self): - self.map_id = self.scope["url_route"]["kwargs"]["map_id"] - - # Join room group - await self.channel_layer.group_add(self.map_id, self.channel_name) - - self.is_authenticated = False - await self.accept() - - async def disconnect(self, close_code): - await self.channel_layer.group_discard(self.map_id, self.channel_name) - await self.send_peers_list() - - async def send_peers_list(self): - message = ListPeersResponse(peers=self.peers) - await self.broadcast(message.model_dump_json()) - - async def broadcast(self, message): - # Send to all channels (including sender!) - await self.channel_layer.group_send( - self.map_id, {"message": message, "type": "on_message"} - ) - - async def send_to(self, channel, message): - # Send to one given channel - await self.channel_layer.send( - channel, {"message": message, "type": "on_message"} - ) - - async def on_message(self, event): - # Send to self channel - await self.send(event["message"]) - - async def receive(self, text_data): - if not self.is_authenticated: - message = JoinRequest.model_validate_json(text_data) - signed = TimestampSigner().unsign_object(message.token, max_age=30) - user, map_id, permissions = signed.values() - if "edit" not in permissions: - return await self.disconnect() - response = JoinResponse(uuid=self.channel_name, peers=self.peers) - await self.send(response.model_dump_json()) - await self.send_peers_list() - self.is_authenticated = True - return - - if text_data == "ping": - return await self.send("pong") - - try: - incoming = Request.model_validate_json(text_data) - except ValidationError as error: - message = ( - f"An error occurred when receiving the following message: {text_data!r}" - ) - logging.error(message, error) - else: - match incoming.root: - # Broadcast all operation messages to connected peers - case OperationMessage(): - await self.broadcast(text_data) - - # Send peer messages to the proper peer - case PeerMessage(): - await self.send_to(incoming.root.recipient, text_data) diff --git a/umap/sync/migrations/0001_initial.py b/umap/sync/migrations/0001_initial.py new file mode 100644 index 000000000..d02c425c5 --- /dev/null +++ b/umap/sync/migrations/0001_initial.py @@ -0,0 +1,24 @@ +# Generated by Django 5.1.4 on 2024-12-27 16:14 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + initial = True + + dependencies = [] + + operations = [ + migrations.CreateModel( + name="Peer", + fields=[ + ( + "uuid", + models.UUIDField(primary_key=True, serialize=False, unique=True), + ), + ("name", models.CharField(max_length=200)), + ("room_id", models.CharField(max_length=200)), + ], + ), + ] diff --git a/umap/sync/migrations/__init__.py b/umap/sync/migrations/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/umap/sync/models.py b/umap/sync/models.py new file mode 100644 index 000000000..bf24878c5 --- /dev/null +++ b/umap/sync/models.py @@ -0,0 +1,146 @@ +import asyncio +import logging + +import psycopg +from django.core.signing import TimestampSigner +from django.db import connection, models +from pydantic import ValidationError +from psycopg import sql + +from .payloads import ( + JoinRequest, + JoinResponse, + ListPeersResponse, + OperationMessage, + PeerMessage, + Request, +) + + +class Peer(models.Model): + uuid = models.UUIDField(unique=True, primary_key=True) + name = models.CharField(max_length=200) + room_id = models.CharField(max_length=200) + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.is_authenticated = False + + async def get_peers(self): + qs = Peer.objects.filter(room_id=self.room_id).values_list("uuid", flat=True) + peers = [] + async for peer in qs: + peers.append(peer) + return peers + + async def listen_public(self): + # We need a dedicated connection for the LISTEN + aconn = await psycopg.AsyncConnection.connect( + **self.connection_params, + autocommit=True, + ) + async with aconn: + async with aconn.cursor() as acursor: + await acursor.execute( + sql.SQL("LISTEN {chan}").format( + chan=sql.Identifier(str(self.room_id)) + ) + ) + print("LISTEN", self.room_id) + gen = aconn.notifies() + async for notify in gen: + await self.send(notify.payload) + + async def listen_private(self): + aconn = await psycopg.AsyncConnection.connect( + **self.connection_params, + autocommit=True, + ) + async with aconn: + async with aconn.cursor() as acursor: + await acursor.execute( + sql.SQL("LISTEN {chan}").format(chan=sql.Identifier(str(self.uuid))) + ) + print("LISTEN", self.uuid) + gen = aconn.notifies() + async for notify in gen: + await self.send(notify.payload) + + async def connect(self): + # Join room for this map + connection_params = connection.get_connection_params() + connection_params.pop("cursor_factory") + self.connection_params = connection_params + self.connection = await psycopg.AsyncConnection.connect( + **connection_params, + autocommit=True, + ) + asyncio.create_task(self.listen_public()) + asyncio.create_task(self.listen_private()) + + async def disconnect(self): + await self.adelete() + await self.send_peers_list() + + async def send_peers_list(self): + message = ListPeersResponse(peers=await self.get_peers()) + await self.broadcast(message.model_dump_json()) + + async def broadcast(self, message): + print("BROADCASTING", message) + # Send to all channels (including sender!) + async with self.connection.cursor() as cursor: + await cursor.execute( + sql.SQL("NOTIFY {chan}, {message}").format( + chan=sql.Identifier(str(self.room_id)), + message=message, + ) + ) + + async def send_to(self, peer_id, message): + print("SEND TO", peer_id, message) + # Send to one given channel + async with self.connection.cursor() as cursor: + await cursor.execute( + sql.SQL("NOTIFY {chan}, {message}").format( + chan=sql.Identifier(str(peer_id)), message=message + ) + ) + + async def receive(self, text_data): + if not self.is_authenticated: + print("AUTHENTICATING", self.uuid) + message = JoinRequest.model_validate_json(text_data) + signed = TimestampSigner().unsign_object(message.token, max_age=30) + user, map_id, permissions = signed.values() + if "edit" not in permissions: + return await self.disconnect() + response = JoinResponse(uuid=str(self.uuid), peers=await self.get_peers()) + await self.send(response.model_dump_json()) + await self.send_peers_list() + self.is_authenticated = True + return + + if text_data == "ping": + return await self.send("pong") + + try: + incoming = Request.model_validate_json(text_data) + except ValidationError as error: + message = ( + f"An error occurred when receiving the following message: {text_data!r}" + ) + logging.error(message, error) + else: + match incoming.root: + # Broadcast all operation messages to connected peers + case OperationMessage(): + await self.broadcast(text_data) + + # Send peer messages to the proper peer + case PeerMessage(): + await self.send_to(incoming.root.recipient, text_data) + + async def send(self, text): + print("SEND", text) + await self._send({"type": "websocket.send", "text": text})