Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add room storage for Django Channels Consumers #94

Open
wants to merge 21 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
0e0f7cc
refactor(django_channels): move to its own folder to not be confused …
cacosandon Mar 21, 2024
725f12f
refactor(django_channels): rename consumer file to yjs_consumer
cacosandon Mar 21, 2024
459b04a
feat(django_channels): add base room storage for each consumer
cacosandon Mar 21, 2024
a59b7a1
feat(yroom_storage): add Redis storage as an example
cacosandon Mar 21, 2024
a6171a9
chore: optional redis as dependency for django channels Redis Room
cacosandon Mar 21, 2024
7b3adf4
feat(yjs_consumer): add optional room storage to consumer
cacosandon Mar 21, 2024
8967b69
feat(yjs_consumer): add new docs for YjsConsumer
cacosandon Mar 21, 2024
7af0fa9
feat(yroom_storage): new docs and rename persist_document to save_sna…
cacosandon Mar 21, 2024
ecfd04d
docs: move django channels related stuff to its own folder
cacosandon Mar 21, 2024
415f52c
feat(yjs_consumer): already disconnected if there is no room name
cacosandon Mar 21, 2024
5a3101f
refactor(yoom_storage): correct use of throttle instead of debounce
cacosandon Mar 21, 2024
da5cf22
refactor: apply pre_commit linter and fix issues
cacosandon Mar 27, 2024
65a51d0
refactor(yjs_consumer): remove unnecessary docstring because of inter…
cacosandon Mar 27, 2024
60f713c
refactor: create EMPTY_MESSAGE constant
cacosandon Mar 27, 2024
c57dfb9
docs(yjs_consumer.py): update comments and example in YjsConsumer cla…
cacosandon Mar 27, 2024
214af21
refactor(yroom_storage): move _apply_update_to_snapshot to RedisYRoom…
cacosandon Mar 27, 2024
3ff089e
refactor(yroom_storage): move get_snapshot_from_database to Abstract …
cacosandon Mar 27, 2024
d3e5c26
feat(django_channels): reexport on __init__
cacosandon Mar 27, 2024
80eb54c
feat(yroom_storage): just receive room_name as init params
cacosandon Mar 27, 2024
4847ded
style: use 100 as file line length
cacosandon Mar 27, 2024
8814228
Merge pull request #1 from its-dart/feat/add-room-storage
cacosandon Mar 27, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions docs/reference/Django_Channels.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
## Consumer

::: ypy_websocket.django_channels.yjs_consumer.YjsConsumer

## Storage

### BaseYRoomStorage
::: ypy_websocket.django_channels.yroom_storage.BaseYRoomStorage

### RedisYRoomStorage
::: ypy_websocket.django_channels.yroom_storage.RedisYRoomStorage
1 change: 0 additions & 1 deletion docs/reference/Django_Channels_consumer.md

This file was deleted.

2 changes: 1 addition & 1 deletion mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ nav:
- reference/WebSocket_provider.md
- reference/WebSocket_server.md
- reference/ASGI_server.md
- reference/Django_Channels_consumer.md
- reference/Django_Channels.md
- reference/WebSocket.md
- reference/Room.md
- reference/Store.md
Expand Down
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ docs = [
django = [
"channels",
]
redis = [
"redis",
]

[project.urls]
Homepage = "https://github.com/y-crdt/ypy-websocket"
Expand Down
2 changes: 2 additions & 0 deletions ypy_websocket/django_channels/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .yjs_consumer import YjsConsumer
from .yroom_storage import BaseYRoomStorage, RedisYRoomStorage
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,19 @@
from typing import TypedDict

import y_py as Y
from channels.generic.websocket import AsyncWebsocketConsumer # type: ignore
from channels.generic.websocket import AsyncWebsocketConsumer

from .websocket import Websocket
from .yutils import YMessageType, process_sync_message, sync
from ypy_websocket.django_channels.yroom_storage import BaseYRoomStorage

from ..websocket import Websocket
from ..yutils import (
EMPTY_UPDATE,
YMessageType,
YSyncMessageType,
process_sync_message,
read_message,
sync,
)

logger = getLogger(__name__)

Expand Down Expand Up @@ -70,14 +79,17 @@ class YjsConsumer(AsyncWebsocketConsumer):
In particular,

- Override `make_room_name` to customize the room name.
- Override `make_ydoc` to initialize the YDoc. This is useful to initialize it with data
from your database, or to add observers to it).
- Override `make_room_storage` to initialize the room storage. Create your own storage class
by subclassing `BaseYRoomStorage` and implementing the methods.
- Override `connect` to do custom validation (like auth) on connect,
but be sure to call `await super().connect()` in the end.
- Call `group_send_message` to send a message to an entire group/room.
- Call `send_message` to send a message to a single client, although this is not recommended.

A full example of a custom consumer showcasing all of these options is:
A full example of a custom consumer showcasing all of these options is below. The example also
includes an example function `propagate_document_update_from_external` that demonstrates how to
send a message to all connected clients from an external source (like a Celery job).

```py
import y_py as Y
from asgiref.sync import async_to_sync
Expand All @@ -87,45 +99,57 @@ class YjsConsumer(AsyncWebsocketConsumer):


class DocConsumer(YjsConsumer):
def make_room_storage(self) -> BaseYRoomStorage:
# Modify the room storage here

return RedisYRoomStorage(self.room_name)

def make_room_name(self) -> str:
# modify the room name here
return self.scope["url_route"]["kwargs"]["room"]
# Modify the room name here

async def make_ydoc(self) -> Y.YDoc:
doc = Y.YDoc()
# fill doc with data from DB here
doc.observe_after_transaction(self.on_update_event)
return doc
return self.scope["url_route"]["kwargs"]["room"]

async def connect(self):
user = self.scope["user"]

if user is None or user.is_anonymous:
await self.close()
return
await super().connect()

def on_update_event(self, event):
# process event here
...
await super().connect()

async def doc_update(self, update_wrapper):
async def propagate_document_update(self, update_wrapper):
update = update_wrapper["update"]
Y.apply_update(self.ydoc, update)
await self.group_send_message(create_update_message(update))

await self.send(create_update_message(update))

def send_doc_update(room_name, update):
layer = get_channel_layer()
async_to_sync(layer.group_send)(room_name, {"type": "doc_update", "update": update})
```

async def propagate_document_update_from_external(room_name, update):
channel_layer = get_channel_layer()

await channel_layer.group_send(
room_name,
{"type": "propagate_document_update", "update": update},
)
```
"""

def __init__(self):
super().__init__()
self.room_name = None
self.ydoc = None
self._websocket_shim = None
self.room_name: str | None = None
self.ydoc: Y.YDoc | None = None
self.room_storage: BaseYRoomStorage | None = None
self._websocket_shim: _WebsocketShim | None = None

def make_room_storage(self) -> BaseYRoomStorage | None:
"""Make the room storage for a new channel to persist the YDoc permanently.

Defaults to not using any (just broadcast updates between consumers).

Example:
self.room_storage = RedisYRoomStorage(self.room_name)
"""
return None

def make_room_name(self) -> str:
"""Make the room name for a new channel.
Expand All @@ -137,23 +161,20 @@ def make_room_name(self) -> str:
"""
return self.scope["url_route"]["kwargs"]["room"]

async def make_ydoc(self) -> Y.YDoc:
"""Make the YDoc for a new channel.

Override to customize the YDoc when a channel is created
(useful to initialize it with data from your database, or to add observers to it).
async def _make_ydoc(self) -> Y.YDoc:
if self.room_storage:
return await self.room_storage.get_document()

Returns:
The YDoc for a new channel. Defaults to a new empty YDoc.
"""
return Y.YDoc()

def _make_websocket_shim(self, path: str) -> _WebsocketShim:
return _WebsocketShim(path, self.group_send_message)

async def connect(self) -> None:
self.room_name = self.make_room_name()
self.ydoc = await self.make_ydoc()
self.room_storage = self.make_room_storage()

self.ydoc = await self._make_ydoc()
self._websocket_shim = self._make_websocket_shim(self.scope["path"])

await self.channel_layer.group_add(self.room_name, self.channel_name)
Expand All @@ -162,14 +183,32 @@ async def connect(self) -> None:
await sync(self.ydoc, self._websocket_shim, logger)

async def disconnect(self, code) -> None:
if self.room_storage:
await self.room_storage.close()

if not self.room_name:
return

await self.channel_layer.group_discard(self.room_name, self.channel_name)

async def receive(self, text_data=None, bytes_data=None):
if bytes_data is None:
return

await self.group_send_message(bytes_data)

if bytes_data[0] != YMessageType.SYNC:
return

# If it's an update message, apply it to the storage document
if self.room_storage and bytes_data[1] == YSyncMessageType.SYNC_UPDATE:
update = read_message(bytes_data[2:])

if update != EMPTY_UPDATE:
await self.room_storage.update_document(update)

return

await process_sync_message(bytes_data[1:], self.ydoc, self._websocket_shim, logger)

class WrappedMessage(TypedDict):
Expand Down
Loading