Skip to content

Commit

Permalink
wip(sync): POC of using Redis for pubsub
Browse files Browse the repository at this point in the history
Co-authored-by: David Larlet <[email protected]>
  • Loading branch information
yohanboniface and davidbgk committed Jan 13, 2025
1 parent c69dd4d commit 3d75445
Show file tree
Hide file tree
Showing 8 changed files with 165 additions and 185 deletions.
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,6 @@ format_css=true
blank_line_after_tag="load,extends"
line_break_after_multiline_tag=true

[lint]
# Disable autoremove of unused import.
unfixable = ["F401"]
3 changes: 2 additions & 1 deletion umap/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@
"django.contrib.gis",
"django_probes",
"umap",
"umap.sync",
"social_django",
# See https://github.com/peopledoc/django-agnocomplete/commit/26eda2dfa4a2f8a805ca2ea19a0c504b9d773a1c
# Django does not find the app config in the default place, so the app is not loaded
Expand Down Expand Up @@ -344,3 +343,5 @@
WEBSOCKET_BACK_HOST = env("WEBSOCKET_BACK_HOST", default="localhost")
WEBSOCKET_BACK_PORT = env.int("WEBSOCKET_BACK_PORT", default=8001)
WEBSOCKET_FRONT_URI = env("WEBSOCKET_FRONT_URI", default="ws://localhost:8001")

REDIS_URL = "redis://localhost:6379"
45 changes: 45 additions & 0 deletions umap/settings/local_s3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from umap.settings.local import *

DATABASES = {
"default": {
"ENGINE": "django.contrib.gis.db.backends.postgis",
"NAME": "umaps3",
}
}

STORAGES = {
"default": {
"BACKEND": "storages.backends.s3.S3Storage",
"OPTIONS": {
"access_key": "OScTD3CClCcO54Ax2DLz",
"secret_key": "eK9tfPRHoFh0nKLkZpJJoC4RJS1ptGfko3iBBd5k",
"bucket_name": "umap-default",
"region_name": "eu",
"endpoint_url": "http://127.0.0.1:9000",
},
},
"data": {
"BACKEND": "umap.storage.s3.S3DataStorage",
"OPTIONS": {
"access_key": "OScTD3CClCcO54Ax2DLz",
"secret_key": "eK9tfPRHoFh0nKLkZpJJoC4RJS1ptGfko3iBBd5k",
"bucket_name": "umap",
"region_name": "eu",
"endpoint_url": "http://127.0.0.1:9000",
},
},
"staticfiles": {
"BACKEND": "storages.backends.s3.S3Storage",
"OPTIONS": {
"access_key": "OScTD3CClCcO54Ax2DLz",
"secret_key": "eK9tfPRHoFh0nKLkZpJJoC4RJS1ptGfko3iBBd5k",
"bucket_name": "umap-staticfiles",
"region_name": "eu",
"endpoint_url": "http://127.0.0.1:9000",
"default_acl": "public-read",
# "querystring_auth": False,
},
},
}

# STATIC_URL = "http://127.0.0.1:9000/umap-staticfiles/"
120 changes: 115 additions & 5 deletions umap/sync/app.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,26 @@
import asyncio
import logging
import uuid

import redis.asyncio as redis
from django.conf import settings
from django.core.signing import TimestampSigner
from django.urls.resolvers import RoutePattern
from pydantic import ValidationError

from .payloads import (
JoinRequest,
JoinResponse,
ListPeersResponse,
OperationMessage,
PeerMessage,
Request,
)

ws_pattern = RoutePattern("/ws/sync/<str:map_id>")


async def application(scope, receive, send):
from .models import Peer

matched = ws_pattern.match(scope["path"])
print(matched)
if not matched:
Expand All @@ -16,8 +29,7 @@ async def application(scope, receive, send):
_, _, kwargs = matched

map_id = kwargs["map_id"]
room_id = f"room{map_id}"
peer = Peer(uuid=uuid.uuid4(), name="FooBar", room_id=room_id)
peer = Peer(uuid=uuid.uuid4(), map_id=map_id)
print(peer)
peer._send = send
while True:
Expand All @@ -27,8 +39,10 @@ async def application(scope, receive, send):
if event["type"] == "websocket.connect":
try:
print("Let's accept")
await send({"type": "websocket.accept"})
await peer.connect()
print("After connect")
await send({"type": "websocket.accept"})
print("After accept")
except ValueError:
await send({"type": "websocket.close"})

Expand All @@ -43,3 +57,99 @@ async def application(scope, receive, send):
await send({"type": "websocket.send", "text": "pong"})
else:
await peer.receive(event["text"])


class Peer:
def __init__(self, uuid, map_id, username=None):
self.uuid = uuid
self.user_id = f"user:{uuid}"
self.username = username or ""
self.room_id = f"umap:{map_id}"
self.is_authenticated = False

async def get_peers(self):
peers = await self.client.hgetall(self.room_id)
# Send only ids for now (values are client names).
return peers.keys()

async def listen_to_channel(self, channel_name):
async def reader(pubsub):
await pubsub.subscribe(channel_name)
while True:
try:
message = await pubsub.get_message(ignore_subscribe_messages=True)
except Exception as err:
print(err)
break
if message is not None:
if message["data"].decode() == "STOP":
break
await self.send(message["data"].decode())

async with self.client.pubsub() as pubsub:
asyncio.create_task(reader(pubsub))

async def listen(self):
await self.listen_to_channel(self.room_id)
await self.listen_to_channel(self.user_id)

async def connect(self):
self.client = redis.from_url(settings.REDIS_URL)

async def disconnect(self):
await self.client.hdel(self.room_id, self.user_id)
await self.send_peers_list()
await self.client.aclose()
await self.client.publish(self.room_id, "STOP")
await self.client.publish(self.user_id, "STOP")

async def send_peers_list(self):
message = ListPeersResponse(peers=await self.get_peers())
await self.broadcast(message.model_dump_json())

async def broadcast(self, message):
print("BROADCASTING", message)
# Send to all channels (including sender!)
await self.client.publish(self.room_id, message)

async def send_to(self, peer_id, message):
print("SEND TO", peer_id, message)
# Send to one given channel
await self.client.publish(peer_id, message)

async def receive(self, text_data):
if not self.is_authenticated:
print("AUTHENTICATING", self.uuid)
message = JoinRequest.model_validate_json(text_data)
signed = TimestampSigner().unsign_object(message.token, max_age=30)
user, room_id, permissions = signed.values()
if "edit" not in permissions:
return await self.disconnect()
await self.client.hset(self.room_id, self.user_id, self.username)
await self.listen()
response = JoinResponse(uuid=str(self.uuid), peers=await self.get_peers())
await self.send(response.model_dump_json())
await self.send_peers_list()
self.is_authenticated = True
return

try:
incoming = Request.model_validate_json(text_data)
except ValidationError as error:
message = (
f"An error occurred when receiving the following message: {text_data!r}"
)
logging.error(message, error)
else:
match incoming.root:
# Broadcast all operation messages to connected peers
case OperationMessage():
await self.broadcast(text_data)

# Send peer messages to the proper peer
case PeerMessage():
await self.send_to(incoming.root.recipient, text_data)

async def send(self, text):
print("SEND", text)
await self._send({"type": "websocket.send", "text": text})
6 changes: 0 additions & 6 deletions umap/sync/apps.py

This file was deleted.

23 changes: 0 additions & 23 deletions umap/sync/migrations/0001_initial.py

This file was deleted.

Empty file removed umap/sync/migrations/__init__.py
Empty file.
150 changes: 0 additions & 150 deletions umap/sync/models.py

This file was deleted.

0 comments on commit 3d75445

Please sign in to comment.