Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Websocket server with ASGI and PostgreSQL LISTEN/NOTIFY #2414

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ dev = [
"isort==5.13.2",
]
test = [
"daphne==4.1.2",
"factory-boy==3.3.1",
"playwright>=1.39",
"pytest==8.3.4",
Expand All @@ -70,10 +71,7 @@ s3 = [
"django-storages[s3]==1.14.4",
]
sync = [
"channels==4.2.0",
"daphne==4.1.2",
"pydantic==2.10.3",
"websockets==13.1",
]

[project.scripts]
Expand Down
19 changes: 12 additions & 7 deletions umap/asgi.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
import os

from channels.routing import ProtocolTypeRouter
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "umap.settings")

from django.core.asgi import get_asgi_application

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "umap.settings")
from .sync.app import application as ws_application

# Initialize Django ASGI application early to ensure the AppRegistry
# is populated before importing code that may import ORM models.
django_asgi_app = get_asgi_application()

application = ProtocolTypeRouter(
{
"http": django_asgi_app,
}
)

async def application(scope, receive, send):
if scope["type"] == "http":
await django_asgi_app(scope, receive, send)
elif scope["type"] == "websocket":
await ws_application(scope, receive, send)
else:
raise NotImplementedError(f"Unknown scope type {scope['type']}")
23 changes: 0 additions & 23 deletions umap/management/commands/run_websocket_server.py

This file was deleted.

1 change: 1 addition & 0 deletions umap/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@
"django.contrib.gis",
"django_probes",
"umap",
"umap.sync",
"social_django",
# See https://github.com/peopledoc/django-agnocomplete/commit/26eda2dfa4a2f8a805ca2ea19a0c504b9d773a1c
# Django does not find the app config in the default place, so the app is not loaded
Expand Down
7 changes: 4 additions & 3 deletions umap/static/umap/js/modules/sync/engine.js
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ export class SyncEngine {

start(authToken) {
this.transport = new WebSocketTransport(
this._umap.properties.websocketURI,
Utils.template(this._umap.properties.websocketURI, { id: this._umap.id }),
authToken,
this
)
Expand Down Expand Up @@ -125,7 +125,7 @@ export class SyncEngine {

if (this.offline) return
if (this.transport) {
this.transport.send('OperationMessage', message)
this.transport.send('OperationMessage', { sender: this.uuid, ...message })
}
}

Expand Down Expand Up @@ -177,6 +177,7 @@ export class SyncEngine {
* @param {Object} payload
*/
onOperationMessage(payload) {
if (payload.sender === this.uuid) return
this._operations.storeRemoteOperations([payload])
this._applyOperation(payload)
}
Expand Down Expand Up @@ -484,7 +485,7 @@ export class Operations {
return (
Utils.deepEqual(local.subject, remote.subject) &&
Utils.deepEqual(local.metadata, remote.metadata) &&
(!shouldCheckKey || (shouldCheckKey && local.key == remote.key))
(!shouldCheckKey || (shouldCheckKey && local.key === remote.key))
)
}
}
Expand Down
6 changes: 6 additions & 0 deletions umap/static/umap/js/modules/sync/websocket.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ export class WebSocketTransport {
}
}

this.websocket.onerror = (error) => {
console.log('WS ERROR', error)
}

this.ensureOpen = setInterval(() => {
if (this.websocket.readyState !== WebSocket.OPEN) {
this.websocket.close()
Expand All @@ -34,6 +38,7 @@ export class WebSocketTransport {
// See https://making.close.com/posts/reliable-websockets/ for more details.
this.pingInterval = setInterval(() => {
if (this.websocket.readyState === WebSocket.OPEN) {
console.log('sending ping')
this.websocket.send('ping')
this.pongReceived = false
setTimeout(() => {
Expand Down Expand Up @@ -63,6 +68,7 @@ export class WebSocketTransport {
}

close() {
console.log('Closing')
this.receiver.closeRequested = true
this.websocket.close()
}
Expand Down
Empty file added umap/sync/__init__.py
Empty file.
45 changes: 45 additions & 0 deletions umap/sync/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import uuid

from django.urls.resolvers import RoutePattern

ws_pattern = RoutePattern("/ws/sync/<str:map_id>")


async def application(scope, receive, send):
from .models import Peer

matched = ws_pattern.match(scope["path"])
print(matched)
if not matched:
print("Wrong path")
return
_, _, kwargs = matched

map_id = kwargs["map_id"]
room_id = f"room{map_id}"
peer = Peer(uuid=uuid.uuid4(), name="FooBar", room_id=room_id)
print(peer)
peer._send = send
while True:
event = await receive()
print("EVENT", event)

if event["type"] == "websocket.connect":
try:
print("Let's accept")
await send({"type": "websocket.accept"})
await peer.connect()
except ValueError:
await send({"type": "websocket.close"})

if event["type"] == "websocket.disconnect":
print("Closing", event)
await peer.disconnect()
print("Closed")
break

if event["type"] == "websocket.receive":
if event["text"] == "ping":
await send({"type": "websocket.send", "text": "pong"})
else:
await peer.receive(event["text"])
6 changes: 6 additions & 0 deletions umap/sync/apps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from django.apps import AppConfig


class UmapConfig(AppConfig):
name = "umap.sync"
verbose_name = "uMap Sync"
23 changes: 23 additions & 0 deletions umap/sync/migrations/0001_initial.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Generated by Django 5.1.4 on 2024-12-27 16:14

from django.db import migrations, models


class Migration(migrations.Migration):
initial = True

dependencies = []

operations = [
migrations.CreateModel(
name="Peer",
fields=[
(
"uuid",
models.UUIDField(primary_key=True, serialize=False, unique=True),
),
("name", models.CharField(max_length=200)),
("room_id", models.CharField(max_length=200)),
],
),
]
Empty file.
150 changes: 150 additions & 0 deletions umap/sync/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
import asyncio
import logging

import psycopg
from django.core.signing import TimestampSigner
from django.db import connection, models
from psycopg import sql
from pydantic import ValidationError

from .payloads import (
JoinRequest,
JoinResponse,
ListPeersResponse,
OperationMessage,
PeerMessage,
Request,
)


class Peer(models.Model):
uuid = models.UUIDField(unique=True, primary_key=True)
name = models.CharField(max_length=200)
room_id = models.CharField(max_length=200)

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.is_authenticated = False

async def get_peers(self):
qs = Peer.objects.filter(room_id=self.room_id).values_list("uuid", flat=True)
peers = []
async for peer in qs:
peers.append(peer)
return peers

async def listen_public(self):
# We need a dedicated connection for the LISTEN
aconn = await psycopg.AsyncConnection.connect(
**self.connection_params,
autocommit=True,
)
async with aconn:
async with aconn.cursor() as acursor:
await acursor.execute(
sql.SQL("LISTEN {chan}").format(
chan=sql.Identifier(str(self.room_id))
)
)
print("LISTEN", self.room_id)
gen = aconn.notifies()
async for notify in gen:
await self.send(notify.payload)

async def listen_private(self):
aconn = await psycopg.AsyncConnection.connect(
**self.connection_params,
autocommit=True,
)
async with aconn:
async with aconn.cursor() as acursor:
await acursor.execute(
sql.SQL("LISTEN {chan}").format(chan=sql.Identifier(str(self.uuid)))
)
print("LISTEN", self.uuid)
gen = aconn.notifies()
async for notify in gen:
await self.send(notify.payload)

async def connect(self):
# Join room for this map
connection_params = connection.get_connection_params()
connection_params.pop("cursor_factory")
self.connection_params = connection_params
self.connection = await psycopg.AsyncConnection.connect(
**connection_params,
autocommit=True,
)

async def listen(self):
asyncio.create_task(self.listen_public())
asyncio.create_task(self.listen_private())

async def disconnect(self):
await self.adelete()
await self.send_peers_list()

async def send_peers_list(self):
message = ListPeersResponse(peers=await self.get_peers())
await self.broadcast(message.model_dump_json())

async def broadcast(self, message):
print("BROADCASTING", message)
# Send to all channels (including sender!)
async with self.connection.cursor() as cursor:
await cursor.execute(
sql.SQL("NOTIFY {chan}, {message}").format(
chan=sql.Identifier(str(self.room_id)),
message=message,
)
)

async def send_to(self, peer_id, message):
print("SEND TO", peer_id, message)
# Send to one given channel
async with self.connection.cursor() as cursor:
await cursor.execute(
sql.SQL("NOTIFY {chan}, {message}").format(
chan=sql.Identifier(str(peer_id)), message=message
)
)

async def receive(self, text_data):
if not self.is_authenticated:
print("AUTHENTICATING", self.uuid)
message = JoinRequest.model_validate_json(text_data)
signed = TimestampSigner().unsign_object(message.token, max_age=30)
user, map_id, permissions = signed.values()
if "edit" not in permissions:
return await self.disconnect()
await self.asave()
await self.listen()
response = JoinResponse(uuid=str(self.uuid), peers=await self.get_peers())
await self.send(response.model_dump_json())
await self.send_peers_list()
self.is_authenticated = True
return

if text_data == "ping":
return await self.send("pong")

try:
incoming = Request.model_validate_json(text_data)
except ValidationError as error:
message = (
f"An error occurred when receiving the following message: {text_data!r}"
)
logging.error(message, error)
else:
match incoming.root:
# Broadcast all operation messages to connected peers
case OperationMessage():
await self.broadcast(text_data)

# Send peer messages to the proper peer
case PeerMessage():
await self.send_to(incoming.root.recipient, text_data)

async def send(self, text):
print("SEND", text)
await self._send({"type": "websocket.send", "text": text})
Loading
Loading