Skip to content

Commit

Permalink
implement websocket channel using for monitor report 's comments and …
Browse files Browse the repository at this point in the history
…new created report.
  • Loading branch information
pphetra committed Aug 27, 2022
1 parent 52163a8 commit cb4c29c
Show file tree
Hide file tree
Showing 12 changed files with 215 additions and 8 deletions.
19 changes: 19 additions & 0 deletions common/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from typing import Union
from django.db import models
from django.http import parse_cookie
from graphql_jwt.utils import jwt_decode

from common.types import AdminFieldValidationProblem


Expand Down Expand Up @@ -41,3 +44,19 @@ def check_and_get(name: str, value: str, entity: models.Model):
def camel(snake_str):
first, *others = snake_str.split("_")
return "".join([first.lower(), *map(str.title, others)])


def extract_jwt_payload_from_asgi_scope(scope):
username = None
authority_id = None
for name, value in scope.get("headers", []):
if name == b"cookie":
cookies = parse_cookie(value.decode("latin1"))
token = cookies["JWT"]
payload = jwt_decode(token)
username = payload["username"]
authority_id = payload["authority_id"]
return {
"username": username,
"authority_id": authority_id,
}
20 changes: 19 additions & 1 deletion podd_api/asgi.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,26 @@

import os

from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter

from django.core.asgi import get_asgi_application

import reports.routing
import threads.routing

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "podd_api.settings")

application = get_asgi_application()
django_asgi_app = get_asgi_application()

application = ProtocolTypeRouter(
{
"http": django_asgi_app,
"websocket": AuthMiddlewareStack(
URLRouter(
reports.routing.websocket_urlpatterns
+ threads.routing.websocket_urlpatterns
)
),
}
)
1 change: 0 additions & 1 deletion podd_api/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@
},
]

WSGI_APPLICATION = "podd_api.wsgi.application"
ASGI_APPLICATION = "podd_api.asgi.application"
CHANNEL_LAYERS = {
"default": {
Expand Down
40 changes: 40 additions & 0 deletions reports/consumers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from channels.exceptions import DenyConnection
from channels.generic.websocket import AsyncWebsocketConsumer

from common.utils import extract_jwt_payload_from_asgi_scope


def new_report_group_name(authority_id):
return f"rp_{authority_id}"


class NewReportConsumers(AsyncWebsocketConsumer):
def __init__(self, *args, **kwargs):
super().__init__(args, kwargs)
self.username = None
self.authority_id = None
self.group_name = None

async def connect(self):
self.authority_id = self.scope["url_route"]["kwargs"]["authority_id"]
self.group_name = new_report_group_name(self.authority_id)
payload = extract_jwt_payload_from_asgi_scope(self.scope)
self.username = payload["username"]

if self.username:
await self.channel_layer.group_add(
self.group_name,
self.channel_name,
)
await self.accept()
else:
raise DenyConnection("invalid token")

async def disconnect(self, code):
await self.channel_layer.group_discard(
self.group_name,
self.channel_name,
)

async def new_report(self, event):
await self.send(text_data=event["text"])
10 changes: 8 additions & 2 deletions reports/models/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,13 @@ class IncidentReport(AbstractIncidentReport):
null=True,
)

@property
def gps_location_str(self):
if self.gps_location:
return f"{self.gps_location.x},{self.gps_location.y}"
else:
return ""

def render_data_context(self):
return {
"data": self.data,
Expand All @@ -112,9 +119,8 @@ def resolve_relevant_authorities_by_area(self):
found_authorities = Authority.objects.filter(
area__contains=self.gps_location
)
for authority in found_authorities:
self.relevant_authorities.add(authority)
if found_authorities:
self.relevant_authorities.add(*found_authorities)
self.relevant_authority_resolved = True
self.save(update_fields=("relevant_authority_resolved",))

Expand Down
9 changes: 9 additions & 0 deletions reports/routing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from django.urls import re_path

from . import consumers

websocket_urlpatterns = [
re_path(
r"ws/reports/(?P<authority_id>\w+)/$", consumers.NewReportConsumers.as_asgi()
),
]
5 changes: 1 addition & 4 deletions reports/schema/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,7 @@ class Meta:
}

def resolve_gps_location(self, info):
if self.gps_location:
return f"{self.gps_location.x},{self.gps_location.y}"
else:
return ""
return self.gps_location_str

def resolve_images(self, info):
return self.images.all()
Expand Down
35 changes: 35 additions & 0 deletions reports/signals.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import json

import channels
from asgiref.sync import async_to_sync
from django.db.models.signals import m2m_changed
from django.dispatch import receiver

from accounts.models import Authority
from reports.consumers import new_report_group_name
from reports.models import IncidentReport


@receiver(
m2m_changed,
sender=IncidentReport.relevant_authorities.through,
dispatch_id="report_signal_to_ws",
)
def on_create_report(sender, instance, action, reverse, model, pk_set, **kwargs):
if action == "post_add":
report_id = instance.id
for authority_id in pk_set:
for authority in Authority.objects.get(pk=authority_id).all_inherits_up():
group_name = new_report_group_name(authority.id)
channel_layer = channels.layers.get_channel_layer()
async_to_sync(channel_layer.group_send)(
group_name,
{
"type": "new.report",
"text": json.dumps(
{
"report_id": report_id,
}
),
},
)
3 changes: 3 additions & 0 deletions threads/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,6 @@
class ThreadConfig(AppConfig):
default_auto_field = "django.db.models.BigAutoField"
name = "threads"

def ready(self):
from . import signals
45 changes: 45 additions & 0 deletions threads/consumers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import json

from channels.exceptions import DenyConnection
from channels.generic.websocket import AsyncWebsocketConsumer
from django.http import parse_cookie
from graphql_jwt.utils import get_payload, jwt_decode

from common.utils import extract_jwt_payload_from_asgi_scope


def new_comment_group_name(thread_id):
return f"cm_{thread_id}"


class NewCommentConsumers(AsyncWebsocketConsumer):
def __init__(self, *args, **kwargs):
self.username = None
self.authority_id = None
self.group_name = None
super().__init__(*args, **kwargs)

async def connect(self):
thread_id = self.scope["url_route"]["kwargs"]["thread_id"]
self.group_name = new_comment_group_name(thread_id)
payload = extract_jwt_payload_from_asgi_scope(self.scope)
self.username = payload["username"]
self.authority_id = payload["authority_id"]

if self.username:
await self.channel_layer.group_add(
self.group_name,
self.channel_name,
)
await self.accept()
else:
raise DenyConnection("invalid token")

async def disconnect(self, code):
await self.channel_layer.group_discard(
self.group_name,
self.channel_name,
)

async def update_comment(self, event):
await self.send(text_data=event["text"])
9 changes: 9 additions & 0 deletions threads/routing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from django.urls import re_path

from . import consumers

websocket_urlpatterns = [
re_path(
r"ws/comments/(?P<thread_id>\w+)/$", consumers.NewCommentConsumers.as_asgi()
),
]
27 changes: 27 additions & 0 deletions threads/signals.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import json

import channels.layers
from asgiref.sync import async_to_sync
from django.db.models.signals import post_save
from django.dispatch import receiver

from threads.consumers import new_comment_group_name
from threads.models import Comment


@receiver(post_save, sender=Comment, dispatch_uid="comment_signal_to_ws")
def on_comment_update(sender, instance, **kwargs):
thread_id = instance.thread_id
group_name = new_comment_group_name(thread_id)
channel_layer = channels.layers.get_channel_layer()
async_to_sync(channel_layer.group_send)(
group_name,
{
"type": "update.comment",
"text": json.dumps(
{
"thread_id": thread_id,
}
),
},
)

0 comments on commit cb4c29c

Please sign in to comment.