From b4a8f70c66c8ca779167cab7601b068c1b8585de Mon Sep 17 00:00:00 2001 From: Tumblingman <73558578+tumblingman@users.noreply.github.com> Date: Wed, 20 Nov 2024 14:17:50 +0400 Subject: [PATCH 01/13] Fix instance data loss during DELETE actions in ModelObserver's database_event --- djangochannelsrestframework/observer/model_observer.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/djangochannelsrestframework/observer/model_observer.py b/djangochannelsrestframework/observer/model_observer.py index d9bd7c5..69f238c 100644 --- a/djangochannelsrestframework/observer/model_observer.py +++ b/djangochannelsrestframework/observer/model_observer.py @@ -103,6 +103,9 @@ def post_delete_receiver(self, instance: Model, **kwargs): self.database_event(instance, Action.DELETE) def database_event(self, instance: Model, action: Action): + if action == Action.DELETE: + self.post_change_receiver(instance, action) + return connection = transaction.get_connection() From 097dae269496f7b665e159c094d37366fbf2b022 Mon Sep 17 00:00:00 2001 From: Tumblingman <73558578+tumblingman@users.noreply.github.com> Date: Tue, 26 Nov 2024 19:00:14 +0400 Subject: [PATCH 02/13] Refactor ModelObserver to prepare messages before transaction commit --- .../observer/model_observer.py | 62 +++++++++---------- 1 file changed, 28 insertions(+), 34 deletions(-) diff --git a/djangochannelsrestframework/observer/model_observer.py b/djangochannelsrestframework/observer/model_observer.py index 69f238c..44f98ad 100644 --- a/djangochannelsrestframework/observer/model_observer.py +++ b/djangochannelsrestframework/observer/model_observer.py @@ -103,9 +103,10 @@ def post_delete_receiver(self, instance: Model, **kwargs): self.database_event(instance, Action.DELETE) def database_event(self, instance: Model, action: Action): - if action == Action.DELETE: - self.post_change_receiver(instance, action) - return + """ + Handles database events and prepares messages for sending on commit. + """ + messages = list(self.prepare_messages(instance, action)) connection = transaction.get_connection() @@ -113,17 +114,16 @@ def database_event(self, instance: Model, action: Action): if len(connection.savepoint_ids) > 0: warnings.warn( "Model observation with save points is unsupported and will" - " result in unexpected beauvoir.", + " result in unexpected behavior.", UnsupportedWarning, ) - connection.on_commit(partial(self.post_change_receiver, instance, action)) + connection.on_commit(partial(self.send_prepared_messages, messages)) - def post_change_receiver(self, instance: Model, action: Action, **kwargs): + def prepare_messages(self, instance: Model, action: Action, **kwargs): """ - Triggers the old_binding to possibly send to its group. + Prepares messages for sending based on the given action and instance. """ - if action == Action.CREATE: old_group_names = set() else: @@ -136,37 +136,31 @@ def post_change_receiver(self, instance: Model, action: Action, **kwargs): self.get_observer_state(instance).current_groups = new_group_names - # if post delete, new_group_names should be [] + yield from self.generate_messages(instance, old_group_names, new_group_names, action, **kwargs) - # Django DDP had used the ordering of DELETE, UPDATE then CREATE for good reasons. - self.send_messages( - instance, old_group_names - new_group_names, Action.DELETE, **kwargs - ) - # the object has been updated so that its groups are not the same. - self.send_messages( - instance, old_group_names & new_group_names, Action.UPDATE, **kwargs - ) - - # - self.send_messages( - instance, new_group_names - old_group_names, Action.CREATE, **kwargs - ) + def generate_messages(self, instance: Model, old_group_names: Set[str], new_group_names: Set[str], action: Action, **kwargs): + """ + Generates messages for the given group names and action. + """ + for group_name in old_group_names - new_group_names: + yield {**self.serialize(instance, Action.DELETE, **kwargs), "group": group_name} - def send_messages( - self, instance: Model, group_names: Set[str], action: Action, **kwargs - ): - if not group_names: - return - message = self.serialize(instance, action, **kwargs) - channel_layer = get_channel_layer() + for group_name in old_group_names & new_group_names: + yield {**self.serialize(instance, Action.UPDATE, **kwargs), "group": group_name} - for group_name in group_names: - message_to_send = deepcopy(message) + for group_name in new_group_names - old_group_names: + yield {**self.serialize(instance, Action.CREATE, **kwargs), "group": group_name} - # Include the group name in the message being sent - message_to_send["group"] = group_name + def send_prepared_messages(self, messages): + """ + Sends prepared messages to the channel layer. + """ + if not messages: + return - async_to_sync(channel_layer.group_send)(group_name, message_to_send) + channel_layer = get_channel_layer() + for message in messages: + async_to_sync(channel_layer.group_send)(message["group"], deepcopy(message)) def group_names(self, *args, **kwargs): # one channel for all updates. From 0051ffb8e02db965a2f52f12f90c989ca3a30481 Mon Sep 17 00:00:00 2001 From: Tumblingman <73558578+tumblingman@users.noreply.github.com> Date: Tue, 26 Nov 2024 21:36:12 +0400 Subject: [PATCH 03/13] Fix test warnings by refining test models and communicator management --- setup.cfg | 1 + tests/communicator.py | 70 ++ tests/models.py | 3 + tests/test_consumer.py | 252 +++--- tests/test_decorators.py | 2 +- tests/test_django_view_consumer.py | 92 +-- tests/test_generic_consumer.py | 833 ++++++++++---------- tests/test_model_observer.py | 606 +++++++------- tests/test_model_with_custom_pk_observer.py | 74 +- tests/test_observer.py | 422 ++++------ tests/test_permission.py | 66 +- 11 files changed, 1163 insertions(+), 1258 deletions(-) create mode 100644 tests/communicator.py diff --git a/setup.cfg b/setup.cfg index 8ffb8ad..998ed03 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1 +1,2 @@ [tool:pytest] +asyncio_default_fixture_loop_scope = session diff --git a/tests/communicator.py b/tests/communicator.py new file mode 100644 index 0000000..d02b5b9 --- /dev/null +++ b/tests/communicator.py @@ -0,0 +1,70 @@ +import asyncio +from contextlib import asynccontextmanager +from typing import Awaitable + +from asgiref.timeout import timeout as async_timeout + +from channels.testing import WebsocketCommunicator + + +class Communicator(WebsocketCommunicator): + """ + Custom communicator class for WebSocket communication in tests. + + This override resolves an issue where the default ApplicationCommunicator + cancels the application future unnecessarily when a timeout occurs + while waiting for output. This behavior disrupts subsequent attempts + to receive output or reconnect, causing the application to raise + CancelledError. + + The `receive_output` method is modified to: + - Avoid cancelling the application future when a timeout occurs. + - Ensure that the method can be called repeatedly without causing + failures in subsequent operations. + + This makes the communicator more flexible for tests where output + availability is uncertain, allowing patterns like: + + outputs = [] + while True: + try: + outputs.append(await communicator.receive_output()) + except asyncio.TimeoutError: + break + """ + async def receive_output(self, timeout=1): + if self.future.done(): + self.future.result() # Ensure exceptions are re-raised if future is complete + try: + async with async_timeout(timeout): # Wait for output with a timeout + return await self.output_queue.get() + except asyncio.TimeoutError as e: + if self.future.done(): # Re-check the state of the future after the timeout + self.future.result() + raise e # Propagate the timeout exception + + +@asynccontextmanager +async def connected_communicator(consumer, path: str = "/testws/") -> Awaitable[Communicator]: + """ + Asynchronous context manager for managing WebSocket communicator lifecycle. + + This utility simplifies tests involving WebSocket communication by: + - Initializing and connecting a Communicator instance for the given consumer and path. + - Ensuring the connection is properly established, raising an assertion error if not. + - Guaranteeing cleanup by disconnecting the communicator upon exiting the context. + + Example usage: + + async with connected_communicator(TestConsumer) as communicator: + await communicator.send_json_to({"key": "value"}) + response = await communicator.receive_json_from() + assert response == {"key": "value"} + """ + communicator = Communicator(consumer, path) + connected, _ = await communicator.connect() + try: + assert connected, "Failed to connect to WebSocket" + yield communicator + finally: + await communicator.disconnect() diff --git a/tests/models.py b/tests/models.py index 0fab57f..40bb242 100644 --- a/tests/models.py +++ b/tests/models.py @@ -3,11 +3,14 @@ class TestModel(models.Model): """Simple model to test with.""" + __test__ = False # Prevent pytest from collecting this as a test class name = models.CharField(max_length=255) class TestModelWithCustomPK(models.Model): """Simple model with custom primary key to test with.""" + __test__ = False # Prevent pytest from collecting this as a test class name = models.CharField(max_length=255, primary_key=True) + description = models.CharField(max_length=255, null=True, blank=True) diff --git a/tests/test_consumer.py b/tests/test_consumer.py index f059238..fe85735 100644 --- a/tests/test_consumer.py +++ b/tests/test_consumer.py @@ -1,7 +1,7 @@ import asyncio import pytest -from channels.testing import WebsocketCommunicator +from tests.communicator import connected_communicator from rest_framework.exceptions import Throttled from djangochannelsrestframework.decorators import action, detached @@ -26,41 +26,35 @@ def test_sync_action(self, pk=None, **kwargs): return {"pk": pk, "sync": True}, 200 # Test a normal connection - communicator = WebsocketCommunicator(AConsumer(), "/testws/") + async with connected_communicator(AConsumer()) as communicator: - connected, _ = await communicator.connect() + await communicator.send_json_to( + {"action": "test_async_action", "pk": 2, "request_id": 1} + ) - assert connected + response = await communicator.receive_json_from() - await communicator.send_json_to( - {"action": "test_async_action", "pk": 2, "request_id": 1} - ) + assert response == { + "errors": [], + "data": {"pk": 2}, + "action": "test_async_action", + "response_status": 200, + "request_id": 1, + } - response = await communicator.receive_json_from() + await communicator.send_json_to( + {"action": "test_sync_action", "pk": 3, "request_id": 10} + ) - assert response == { - "errors": [], - "data": {"pk": 2}, - "action": "test_async_action", - "response_status": 200, - "request_id": 1, - } + response = await communicator.receive_json_from() - await communicator.send_json_to( - {"action": "test_sync_action", "pk": 3, "request_id": 10} - ) - - response = await communicator.receive_json_from() - - assert response == { - "errors": [], - "data": {"pk": 3, "sync": True}, - "action": "test_sync_action", - "response_status": 200, - "request_id": 10, - } - - await communicator.disconnect() + assert response == { + "errors": [], + "data": {"pk": 3, "sync": True}, + "action": "test_sync_action", + "response_status": 200, + "request_id": 10, + } @pytest.mark.django_db(transaction=True) @@ -81,31 +75,25 @@ async def detached_test_method(self): await self.send_json({"waited": 1}) # Test a normal connection - communicator = WebsocketCommunicator(AConsumer(), "/testws/") - - connected, _ = await communicator.connect() - - assert connected + async with connected_communicator(AConsumer()) as communicator: - await communicator.send_json_to( - {"action": "test_async_action", "pk": 2, "request_id": 1} - ) + await communicator.send_json_to( + {"action": "test_async_action", "pk": 2, "request_id": 1} + ) - response = await communicator.receive_json_from() + response = await communicator.receive_json_from() - assert response == { - "errors": [], - "data": {"pk": 2}, - "action": "test_async_action", - "response_status": 200, - "request_id": 1, - } + assert response == { + "errors": [], + "data": {"pk": 2}, + "action": "test_async_action", + "response_status": 200, + "request_id": 1, + } - response = await communicator.receive_json_from(timeout=2) + response = await communicator.receive_json_from(timeout=2) - assert response == {"waited": 1} - - await communicator.disconnect() + assert response == {"waited": 1} @pytest.mark.django_db(transaction=True) @@ -129,27 +117,21 @@ async def detached_test_method(self): raise # Test a normal connection - communicator = WebsocketCommunicator(AConsumer(), "/testws/") - - connected, _ = await communicator.connect() - - assert connected - - await communicator.send_json_to( - {"action": "test_async_action", "pk": 2, "request_id": 1} - ) + async with connected_communicator(AConsumer()) as communicator: - response = await communicator.receive_json_from() + await communicator.send_json_to( + {"action": "test_async_action", "pk": 2, "request_id": 1} + ) - assert response == { - "errors": [], - "data": {"pk": 2}, - "action": "test_async_action", - "response_status": 200, - "request_id": 1, - } + response = await communicator.receive_json_from() - await communicator.disconnect() + assert response == { + "errors": [], + "data": {"pk": 2}, + "action": "test_async_action", + "response_status": 200, + "request_id": 1, + } assert len(errors) == 1 @@ -171,43 +153,37 @@ async def test_async_action(self, pk=None, **kwargs): return {"pk": pk}, 200 # Test a normal connection - communicator = WebsocketCommunicator(AConsumer(), "/testws/") + async with connected_communicator(AConsumer()) as communicator: - connected, _ = await communicator.connect() + await communicator.send_json_to( + {"action": "test_detached_async_action", "pk": 2, "request_id": 1} + ) - assert connected + await communicator.send_json_to( + {"action": "test_async_action", "pk": 3, "request_id": 2} + ) - await communicator.send_json_to( - {"action": "test_detached_async_action", "pk": 2, "request_id": 1} - ) + response = await communicator.receive_json_from() - await communicator.send_json_to( - {"action": "test_async_action", "pk": 3, "request_id": 2} - ) + assert response == { + "errors": [], + "data": {"pk": 3}, + "action": "test_async_action", + "response_status": 200, + "request_id": 2, + } - response = await communicator.receive_json_from() + event.set() - assert response == { - "errors": [], - "data": {"pk": 3}, - "action": "test_async_action", - "response_status": 200, - "request_id": 2, - } + response = await communicator.receive_json_from() - event.set() - - response = await communicator.receive_json_from() - - assert response == { - "errors": [], - "data": {"pk": 2}, - "action": "test_detached_async_action", - "response_status": 200, - "request_id": 1, - } - - await communicator.disconnect() + assert response == { + "errors": [], + "data": {"pk": 2}, + "action": "test_detached_async_action", + "response_status": 200, + "request_id": 1, + } @pytest.mark.django_db(transaction=True) @@ -227,43 +203,37 @@ async def test_async_action(self, pk=None, **kwargs): return {"pk": pk}, 200 # Test a normal connection - communicator = WebsocketCommunicator(AConsumer(), "/testws/") - - connected, _ = await communicator.connect() - - assert connected + async with connected_communicator(AConsumer()) as communicator: - await communicator.send_json_to( - {"action": "test_detached_async_action", "pk": 2, "request_id": 1} - ) + await communicator.send_json_to( + {"action": "test_detached_async_action", "pk": 2, "request_id": 1} + ) - await communicator.send_json_to( - {"action": "test_async_action", "pk": 3, "request_id": 2} - ) + await communicator.send_json_to( + {"action": "test_async_action", "pk": 3, "request_id": 2} + ) - response = await communicator.receive_json_from() + response = await communicator.receive_json_from() - assert response == { - "errors": [], - "data": {"pk": 3}, - "action": "test_async_action", - "response_status": 200, - "request_id": 2, - } + assert response == { + "errors": [], + "data": {"pk": 3}, + "action": "test_async_action", + "response_status": 200, + "request_id": 2, + } - event.set() + event.set() - response = await communicator.receive_json_from() + response = await communicator.receive_json_from() - assert response == { - "data": None, - "action": "test_detached_async_action", - "response_status": 429, - "errors": ["Request was throttled."], - "request_id": 1, - } - - await communicator.disconnect() + assert response == { + "data": None, + "action": "test_detached_async_action", + "response_status": 429, + "errors": ["Request was throttled."], + "request_id": 1, + } @pytest.mark.django_db(transaction=True) @@ -278,22 +248,16 @@ async def test_action(self, pk=None, **kwargs): return {}, 200 # Test a normal connection - communicator = WebsocketCommunicator(AConsumer(), "/testws/") - - connected, _ = await communicator.connect() - - assert connected - - await communicator.send_json_to({"pk": 2, "request_id": 1}) + async with connected_communicator(AConsumer()) as communicator: - response = await communicator.receive_json_from() + await communicator.send_json_to({"pk": 2, "request_id": 1}) - assert response == { - "errors": ["Unable to find action in message body."], - "data": None, - "action": None, - "response_status": 405, - "request_id": 1, - } + response = await communicator.receive_json_from() - await communicator.disconnect() + assert response == { + "errors": ["Unable to find action in message body."], + "data": None, + "action": None, + "response_status": 405, + "request_id": 1, + } diff --git a/tests/test_decorators.py b/tests/test_decorators.py index 7b8d9f1..08ce1fe 100644 --- a/tests/test_decorators.py +++ b/tests/test_decorators.py @@ -1,5 +1,5 @@ import pytest -from django.db import transaction, connection, connections +from django.db import connections from djangochannelsrestframework.decorators import action diff --git a/tests/test_django_view_consumer.py b/tests/test_django_view_consumer.py index a0302d8..031ef9c 100644 --- a/tests/test_django_view_consumer.py +++ b/tests/test_django_view_consumer.py @@ -1,11 +1,10 @@ import pytest -from channels.testing import WebsocketCommunicator -from django.http import QueryDict from rest_framework import viewsets from rest_framework.response import Response from rest_framework.views import APIView from djangochannelsrestframework.consumers import view_as_consumer +from tests.communicator import connected_communicator @pytest.mark.django_db(transaction=True) @@ -20,26 +19,21 @@ def get(self, request, format=None): return Response(["test1", "test2"]) # Test a normal connection - communicator = WebsocketCommunicator( - view_as_consumer(TestView.as_view()), "/testws/" - ) + async with connected_communicator(view_as_consumer(TestView.as_view())) as communicator: - connected, _ = await communicator.connect() - assert connected + await communicator.send_json_to({"action": "retrieve", "request_id": 1}) - await communicator.send_json_to({"action": "retrieve", "request_id": 1}) + response = await communicator.receive_json_from() - response = await communicator.receive_json_from() + assert "TestView-get" in results - assert "TestView-get" in results - - assert response == { - "errors": [], - "data": ["test1", "test2"], - "action": "retrieve", - "response_status": 200, - "request_id": 1, - } + assert response == { + "errors": [], + "data": ["test1", "test2"], + "action": "retrieve", + "response_status": 200, + "request_id": 1, + } @pytest.mark.django_db(transaction=True) @@ -54,28 +48,23 @@ def get(self, request, format=None): return Response(self.request.GET) # Test a normal connection - communicator = WebsocketCommunicator( - view_as_consumer(TestView.as_view()), "/testws/" - ) - - connected, _ = await communicator.connect() - assert connected + async with connected_communicator(view_as_consumer(TestView.as_view())) as communicator: - await communicator.send_json_to( - {"action": "retrieve", "request_id": 1, "query": {"value": 1, "othervalue": 42}} - ) + await communicator.send_json_to( + {"action": "retrieve", "request_id": 1, "query": {"value": 1, "othervalue": 42}} + ) - response = await communicator.receive_json_from() + response = await communicator.receive_json_from() - assert "TestView-get" in results + assert "TestView-get" in results - assert response == { - "errors": [], - "data": {"value": 1, "othervalue": 42}, - "action": "retrieve", - "response_status": 200, - "request_id": 1, - } + assert response == { + "errors": [], + "data": {"value": 1, "othervalue": 42}, + "action": "retrieve", + "response_status": 200, + "request_id": 1, + } @pytest.mark.django_db(transaction=True) @@ -90,25 +79,20 @@ def retrieve(self, request, pk, *args, **kwargs): return Response(self.request.GET) # Test a normal connection - communicator = WebsocketCommunicator( - view_as_consumer(TestView.as_view({"get": "retrieve"})), "/testws/" - ) - - connected, _ = await communicator.connect() - assert connected + async with connected_communicator(view_as_consumer(TestView.as_view({"get": "retrieve"}))) as communicator: - await communicator.send_json_to( - {"action": "retrieve", "request_id": 1, "parameters": {"pk": 42}} - ) + await communicator.send_json_to( + {"action": "retrieve", "request_id": 1, "parameters": {"pk": 42}} + ) - response = await communicator.receive_json_from() + response = await communicator.receive_json_from() - assert results["TestView-retrieve"] == 42 + assert results["TestView-retrieve"] == 42 - assert response == { - "errors": [], - "data": {}, - "action": "retrieve", - "response_status": 200, - "request_id": 1, - } + assert response == { + "errors": [], + "data": {}, + "action": "retrieve", + "response_status": 200, + "request_id": 1, + } diff --git a/tests/test_generic_consumer.py b/tests/test_generic_consumer.py index d486f48..519e0a7 100644 --- a/tests/test_generic_consumer.py +++ b/tests/test_generic_consumer.py @@ -1,6 +1,6 @@ import pytest from channels.db import database_sync_to_async -from channels.testing import WebsocketCommunicator +from tests.communicator import connected_communicator from django.contrib.auth import get_user_model from rest_framework import serializers from djangochannelsrestframework.pagination import WebsocketLimitOffsetPagination @@ -43,54 +43,45 @@ def test_sync_action(self, pk=None, **kwargs): return s.data, 200 # Test a normal connection - communicator = WebsocketCommunicator(AConsumer(), "/testws/") - connected, _ = await communicator.connect() - assert connected + async with connected_communicator(AConsumer()) as communicator: - await communicator.send_json_to( - {"action": "test_sync_action", "pk": 2, "request_id": 1} - ) + await communicator.send_json_to( + {"action": "test_sync_action", "pk": 2, "request_id": 1} + ) - response = await communicator.receive_json_from() + response = await communicator.receive_json_from() - assert response == { - "action": "test_sync_action", - "errors": ["Not found"], - "response_status": 404, - "request_id": 1, - "data": None, - } - - user = await database_sync_to_async(get_user_model().objects.create)( - username="test1", email="test@example.com" - ) - - pk = user.id - - assert await database_sync_to_async(get_user_model().objects.filter(pk=pk).exists)() + assert response == { + "action": "test_sync_action", + "errors": ["Not found"], + "response_status": 404, + "request_id": 1, + "data": None, + } - await communicator.disconnect() + user = await database_sync_to_async(get_user_model().objects.create)( + username="test1", email="test@example.com" + ) - communicator = WebsocketCommunicator(AConsumer(), "/testws/") - connected, _ = await communicator.connect() + pk = user.id - assert connected + assert await database_sync_to_async(get_user_model().objects.filter(pk=pk).exists)() - await communicator.send_json_to( - {"action": "test_sync_action", "pk": pk, "request_id": 2} - ) + async with connected_communicator(AConsumer()) as communicator: - response = await communicator.receive_json_from() + await communicator.send_json_to( + {"action": "test_sync_action", "pk": pk, "request_id": 2} + ) - assert response == { - "action": "test_sync_action", - "errors": [], - "response_status": 200, - "request_id": 2, - "data": {"email": "test@example.com", "id": 1, "username": "test1"}, - } + response = await communicator.receive_json_from() - await communicator.disconnect() + assert response == { + "action": "test_sync_action", + "errors": [], + "response_status": 200, + "request_id": 2, + "data": {"email": "test@example.com", "id": 1, "username": "test1"}, + } @pytest.mark.django_db(transaction=True) @@ -112,31 +103,29 @@ class AConsumer(CreateModelMixin, GenericAsyncAPIConsumer): assert not await database_sync_to_async(get_user_model().objects.all().exists)() # Test a normal connection - communicator = WebsocketCommunicator(AConsumer(), "/testws/") - connected, _ = await communicator.connect() - assert connected + async with connected_communicator(AConsumer()) as communicator: - await communicator.send_json_to( - { - "action": "create", - "data": {"username": "test101", "email": "42@example.com"}, - "request_id": 1, - } - ) + await communicator.send_json_to( + { + "action": "create", + "data": {"username": "test101", "email": "42@example.com"}, + "request_id": 1, + } + ) - response = await communicator.receive_json_from() - user = await database_sync_to_async(get_user_model().objects.all().first)() + response = await communicator.receive_json_from() + user = await database_sync_to_async(get_user_model().objects.all().first)() - assert user - pk = user.id + assert user + pk = user.id - assert response == { - "action": "create", - "errors": [], - "response_status": 201, - "request_id": 1, - "data": {"email": "42@example.com", "id": pk, "username": "test101"}, - } + assert response == { + "action": "create", + "errors": [], + "response_status": 201, + "request_id": 1, + "data": {"email": "42@example.com", "id": pk, "username": "test101"}, + } @pytest.mark.django_db(transaction=True) @@ -158,43 +147,41 @@ class AConsumer(ListModelMixin, GenericAsyncAPIConsumer): assert not await database_sync_to_async(get_user_model().objects.all().exists)() # Test a normal connection - communicator = WebsocketCommunicator(AConsumer(), "/testws/") - connected, _ = await communicator.connect() - assert connected + async with connected_communicator(AConsumer()) as communicator: - await communicator.send_json_to({"action": "list", "request_id": 1}) + await communicator.send_json_to({"action": "list", "request_id": 1}) - response = await communicator.receive_json_from() + response = await communicator.receive_json_from() - assert response == { - "action": "list", - "errors": [], - "response_status": 200, - "request_id": 1, - "data": [], - } + assert response == { + "action": "list", + "errors": [], + "response_status": 200, + "request_id": 1, + "data": [], + } - u1 = await database_sync_to_async(get_user_model().objects.create)( - username="test1", email="42@example.com" - ) - u2 = await database_sync_to_async(get_user_model().objects.create)( - username="test2", email="45@example.com" - ) + u1 = await database_sync_to_async(get_user_model().objects.create)( + username="test1", email="42@example.com" + ) + u2 = await database_sync_to_async(get_user_model().objects.create)( + username="test2", email="45@example.com" + ) - await communicator.send_json_to({"action": "list", "request_id": 1}) + await communicator.send_json_to({"action": "list", "request_id": 1}) - response = await communicator.receive_json_from() + response = await communicator.receive_json_from() - assert response == { - "action": "list", - "errors": [], - "response_status": 200, - "request_id": 1, - "data": [ - {"email": "42@example.com", "id": u1.id, "username": "test1"}, - {"email": "45@example.com", "id": u2.id, "username": "test2"}, - ], - } + assert response == { + "action": "list", + "errors": [], + "response_status": 200, + "request_id": 1, + "data": [ + {"email": "42@example.com", "id": u1.id, "username": "test1"}, + {"email": "45@example.com", "id": u2.id, "username": "test2"}, + ], + } @pytest.mark.django_db(transaction=True) @@ -222,86 +209,84 @@ class AConsumer(PaginatedModelListMixin, GenericAsyncAPIConsumer): assert not await database_sync_to_async(get_user_model().objects.all().exists)() # Test a normal connection - communicator = WebsocketCommunicator(AConsumer(), "/testws/") - connected, _ = await communicator.connect() - assert connected - - await communicator.send_json_to({"action": "list", "request_id": 1}) - - response = await communicator.receive_json_from() - - assert response == { - "action": "list", - "errors": [], - "response_status": 200, - "request_id": 1, - "data": { - "results": [], - "count": 0, - "limit": 1, - "offset": 0, - }, - } - - u1 = await database_sync_to_async(get_user_model().objects.create)( - username="test1", email="42@example.com" - ) - u2 = await database_sync_to_async(get_user_model().objects.create)( - username="test2", email="45@example.com" - ) - - await communicator.send_json_to( - { + async with connected_communicator(AConsumer()) as communicator: + + await communicator.send_json_to({"action": "list", "request_id": 1}) + + response = await communicator.receive_json_from() + + assert response == { "action": "list", + "errors": [], + "response_status": 200, "request_id": 1, + "data": { + "results": [], + "count": 0, + "limit": 1, + "offset": 0, + }, } - ) - - response = await communicator.receive_json_from() - - assert response == { - "action": "list", - "errors": [], - "response_status": 200, - "request_id": 1, - "data": { - "count": 2, - "limit": 1, - "offset": 0, - "results": [ - {"email": "42@example.com", "id": u1.id, "username": "test1"}, - ], - }, - } - await communicator.send_json_to({"action": "list", "request_id": 1, "offset": 1}) - - response = await communicator.receive_json_from() - - assert response == { - "action": "list", - "errors": [], - "response_status": 200, - "request_id": 1, - "data": { - "count": 2, - "limit": 1, - "offset": 1, - "results": [ - {"email": "45@example.com", "id": u2.id, "username": "test2"}, - ], - }, - } - await communicator.send_json_to({"action": "list", "request_id": 1, "offset": 2}) - response = await communicator.receive_json_from() + u1 = await database_sync_to_async(get_user_model().objects.create)( + username="test1", email="42@example.com" + ) + u2 = await database_sync_to_async(get_user_model().objects.create)( + username="test2", email="45@example.com" + ) + + await communicator.send_json_to( + { + "action": "list", + "request_id": 1, + } + ) - assert response == { - "action": "list", - "errors": [], - "response_status": 200, - "request_id": 1, - "data": {"count": 2, "limit": 1, "offset": 2, "results": []}, - } + response = await communicator.receive_json_from() + + assert response == { + "action": "list", + "errors": [], + "response_status": 200, + "request_id": 1, + "data": { + "count": 2, + "limit": 1, + "offset": 0, + "results": [ + {"email": "42@example.com", "id": u1.id, "username": "test1"}, + ], + }, + } + await communicator.send_json_to({"action": "list", "request_id": 1, "offset": 1}) + + response = await communicator.receive_json_from() + + assert response == { + "action": "list", + "errors": [], + "response_status": 200, + "request_id": 1, + "data": { + "count": 2, + "limit": 1, + "offset": 1, + "results": [ + {"email": "45@example.com", "id": u2.id, "username": "test2"}, + ], + }, + } + await communicator.send_json_to({"action": "list", "request_id": 1, "offset": 2}) + + response = await communicator.receive_json_from() + + assert response == { + "action": "list", + "errors": [], + "response_status": 200, + "request_id": 1, + "data": {"count": 2, "limit": 1, "offset": 2, "results": []}, + } @pytest.mark.django_db(transaction=True) @@ -331,77 +316,75 @@ class AConsumer( assert not await database_sync_to_async(get_user_model().objects.all().exists)() # Test a normal connection - communicator = WebsocketCommunicator(AConsumer(), "/testws/") - connected, _ = await communicator.connect() - assert connected - - u1 = await database_sync_to_async(get_user_model().objects.create)( - username="test1", email="42@example.com" - ) - u2 = await database_sync_to_async(get_user_model().objects.create)( - username="test2", email="45@example.com" - ) - u3 = await database_sync_to_async(get_user_model().objects.create)( - username="test3", email="46@example.com" - ) - - await communicator.send_json_to( - { + async with connected_communicator(AConsumer()) as communicator: + + u1 = await database_sync_to_async(get_user_model().objects.create)( + username="test1", email="42@example.com" + ) + u2 = await database_sync_to_async(get_user_model().objects.create)( + username="test2", email="45@example.com" + ) + u3 = await database_sync_to_async(get_user_model().objects.create)( + username="test3", email="46@example.com" + ) + + await communicator.send_json_to( + { + "action": "list", + "request_id": 1, + } + ) + + response = await communicator.receive_json_from() + + assert response == { "action": "list", + "errors": [], + "response_status": 200, "request_id": 1, + "data": { + "count": 3, + "limit": 1, + "offset": 0, + "results": [ + {"email": "42@example.com", "id": u1.id, "username": "test1"}, + ], + }, + } + + response = await communicator.receive_json_from() + + assert response == { + "action": "list", + "errors": [], + "response_status": 200, + "request_id": 1, + "data": { + "count": 3, + "limit": 1, + "offset": 1, + "results": [ + {"email": "45@example.com", "id": u2.id, "username": "test2"}, + ], + }, + } + + response = await communicator.receive_json_from() + + assert response == { + "action": "list", + "errors": [], + "response_status": 200, + "request_id": 1, + "data": { + "count": 3, + "limit": 1, + "offset": 2, + "results": [ + {"email": "46@example.com", "id": u3.id, "username": "test3"}, + ], + }, } - ) - - response = await communicator.receive_json_from() - - assert response == { - "action": "list", - "errors": [], - "response_status": 200, - "request_id": 1, - "data": { - "count": 3, - "limit": 1, - "offset": 0, - "results": [ - {"email": "42@example.com", "id": u1.id, "username": "test1"}, - ], - }, - } - - response = await communicator.receive_json_from() - - assert response == { - "action": "list", - "errors": [], - "response_status": 200, - "request_id": 1, - "data": { - "count": 3, - "limit": 1, - "offset": 1, - "results": [ - {"email": "45@example.com", "id": u2.id, "username": "test2"}, - ], - }, - } - - response = await communicator.receive_json_from() - - assert response == { - "action": "list", - "errors": [], - "response_status": 200, - "request_id": 1, - "data": { - "count": 3, - "limit": 1, - "offset": 2, - "results": [ - {"email": "46@example.com", "id": u3.id, "username": "test3"}, - ], - }, - } @pytest.mark.django_db(transaction=True) @@ -423,58 +406,56 @@ class AConsumer(RetrieveModelMixin, GenericAsyncAPIConsumer): assert not await database_sync_to_async(get_user_model().objects.all().exists)() # Test a normal connection - communicator = WebsocketCommunicator(AConsumer(), "/testws/") - connected, _ = await communicator.connect() - assert connected - - await communicator.send_json_to({"action": "retrieve", "pk": 100, "request_id": 1}) - - response = await communicator.receive_json_from() - - assert response == { - "action": "retrieve", - "errors": ["Not found"], - "response_status": 404, - "request_id": 1, - "data": None, - } - - u1 = await database_sync_to_async(get_user_model().objects.create)( - username="test1", email="42@example.com" - ) - u2 = await database_sync_to_async(get_user_model().objects.create)( - username="test2", email="45@example.com" - ) - - # lookup a pk that is not there - await communicator.send_json_to( - {"action": "retrieve", "pk": u1.id - 1, "request_id": 1} - ) - - response = await communicator.receive_json_from() - - assert response == { - "action": "retrieve", - "errors": ["Not found"], - "response_status": 404, - "request_id": 1, - "data": None, - } - - # lookup up u1 - await communicator.send_json_to( - {"action": "retrieve", "pk": u1.id, "request_id": 1} - ) - - response = await communicator.receive_json_from() - - assert response == { - "action": "retrieve", - "errors": [], - "response_status": 200, - "request_id": 1, - "data": {"email": "42@example.com", "id": u1.id, "username": "test1"}, - } + async with connected_communicator(AConsumer()) as communicator: + + await communicator.send_json_to({"action": "retrieve", "pk": 100, "request_id": 1}) + + response = await communicator.receive_json_from() + + assert response == { + "action": "retrieve", + "errors": ["Not found"], + "response_status": 404, + "request_id": 1, + "data": None, + } + + u1 = await database_sync_to_async(get_user_model().objects.create)( + username="test1", email="42@example.com" + ) + u2 = await database_sync_to_async(get_user_model().objects.create)( + username="test2", email="45@example.com" + ) + + # lookup a pk that is not there + await communicator.send_json_to( + {"action": "retrieve", "pk": u1.id - 1, "request_id": 1} + ) + + response = await communicator.receive_json_from() + + assert response == { + "action": "retrieve", + "errors": ["Not found"], + "response_status": 404, + "request_id": 1, + "data": None, + } + + # lookup up u1 + await communicator.send_json_to( + {"action": "retrieve", "pk": u1.id, "request_id": 1} + ) + + response = await communicator.receive_json_from() + + assert response == { + "action": "retrieve", + "errors": [], + "response_status": 200, + "request_id": 1, + "data": {"email": "42@example.com", "id": u1.id, "username": "test1"}, + } @pytest.mark.django_db(transaction=True) @@ -496,60 +477,58 @@ class AConsumer(UpdateModelMixin, GenericAsyncAPIConsumer): assert not await database_sync_to_async(get_user_model().objects.all().exists)() # Test a normal connection - communicator = WebsocketCommunicator(AConsumer(), "/testws/") - connected, _ = await communicator.connect() - assert connected + async with connected_communicator(AConsumer()) as communicator: + + await communicator.send_json_to( + { + "action": "update", + "pk": 100, + "data": {"username": "test101", "email": "42@example.com"}, + "request_id": 1, + } + ) + + response = await communicator.receive_json_from() - await communicator.send_json_to( - { + assert response == { "action": "update", - "pk": 100, - "data": {"username": "test101", "email": "42@example.com"}, + "errors": ["Not found"], + "response_status": 404, "request_id": 1, + "data": None, } - ) - - response = await communicator.receive_json_from() - - assert response == { - "action": "update", - "errors": ["Not found"], - "response_status": 404, - "request_id": 1, - "data": None, - } - - u1 = await database_sync_to_async(get_user_model().objects.create)( - username="test1", email="42@example.com" - ) - await database_sync_to_async(get_user_model().objects.create)( - username="test2", email="45@example.com" - ) - - await communicator.send_json_to( - { + + u1 = await database_sync_to_async(get_user_model().objects.create)( + username="test1", email="42@example.com" + ) + await database_sync_to_async(get_user_model().objects.create)( + username="test2", email="45@example.com" + ) + + await communicator.send_json_to( + { + "action": "update", + "pk": u1.id, + "data": { + "username": "test101", + }, + "request_id": 2, + } + ) + + response = await communicator.receive_json_from() + + assert response == { "action": "update", - "pk": u1.id, - "data": { - "username": "test101", - }, + "errors": [], + "response_status": 200, "request_id": 2, + "data": {"email": "42@example.com", "id": u1.id, "username": "test101"}, } - ) - - response = await communicator.receive_json_from() - assert response == { - "action": "update", - "errors": [], - "response_status": 200, - "request_id": 2, - "data": {"email": "42@example.com", "id": u1.id, "username": "test101"}, - } - - u1 = await database_sync_to_async(get_user_model().objects.get)(id=u1.id) - assert u1.username == "test101" - assert u1.email == "42@example.com" + u1 = await database_sync_to_async(get_user_model().objects.get)(id=u1.id) + assert u1.username == "test101" + assert u1.email == "42@example.com" @pytest.mark.django_db(transaction=True) @@ -571,60 +550,58 @@ class AConsumer(PatchModelMixin, GenericAsyncAPIConsumer): assert not await database_sync_to_async(get_user_model().objects.all().exists)() # Test a normal connection - communicator = WebsocketCommunicator(AConsumer(), "/testws/") - connected, _ = await communicator.connect() - assert connected + async with connected_communicator(AConsumer()) as communicator: + + await communicator.send_json_to( + { + "action": "patch", + "pk": 100, + "data": {"username": "test101", "email": "42@example.com"}, + "request_id": 1, + } + ) - await communicator.send_json_to( - { + response = await communicator.receive_json_from() + + assert response == { "action": "patch", - "pk": 100, - "data": {"username": "test101", "email": "42@example.com"}, + "errors": ["Not found"], + "response_status": 404, "request_id": 1, + "data": None, } - ) - - response = await communicator.receive_json_from() - - assert response == { - "action": "patch", - "errors": ["Not found"], - "response_status": 404, - "request_id": 1, - "data": None, - } - - u1 = await database_sync_to_async(get_user_model().objects.create)( - username="test1", email="42@example.com" - ) - await database_sync_to_async(get_user_model().objects.create)( - username="test2", email="45@example.com" - ) - - await communicator.send_json_to( - { + + u1 = await database_sync_to_async(get_user_model().objects.create)( + username="test1", email="42@example.com" + ) + await database_sync_to_async(get_user_model().objects.create)( + username="test2", email="45@example.com" + ) + + await communicator.send_json_to( + { + "action": "patch", + "pk": u1.id, + "data": { + "email": "00@example.com", + }, + "request_id": 2, + } + ) + + response = await communicator.receive_json_from() + + assert response == { "action": "patch", - "pk": u1.id, - "data": { - "email": "00@example.com", - }, + "errors": [], + "response_status": 200, "request_id": 2, + "data": {"email": "00@example.com", "id": u1.id, "username": "test1"}, } - ) - - response = await communicator.receive_json_from() - - assert response == { - "action": "patch", - "errors": [], - "response_status": 200, - "request_id": 2, - "data": {"email": "00@example.com", "id": u1.id, "username": "test1"}, - } - u1 = await database_sync_to_async(get_user_model().objects.get)(id=u1.id) - assert u1.username == "test1" - assert u1.email == "00@example.com" + u1 = await database_sync_to_async(get_user_model().objects.get)(id=u1.id) + assert u1.username == "test1" + assert u1.email == "00@example.com" @pytest.mark.django_db(transaction=True) @@ -646,55 +623,53 @@ class AConsumer(DeleteModelMixin, GenericAsyncAPIConsumer): assert not await database_sync_to_async(get_user_model().objects.all().exists)() # Test a normal connection - communicator = WebsocketCommunicator(AConsumer(), "/testws/") - connected, _ = await communicator.connect() - assert connected - - await communicator.send_json_to({"action": "delete", "pk": 100, "request_id": 1}) - - response = await communicator.receive_json_from() - - assert response == { - "action": "delete", - "errors": ["Not found"], - "response_status": 404, - "request_id": 1, - "data": None, - } - - u1 = await database_sync_to_async(get_user_model().objects.create)( - username="test1", email="42@example.com" - ) - await database_sync_to_async(get_user_model().objects.create)( - username="test2", email="45@example.com" - ) - - await communicator.send_json_to( - {"action": "delete", "pk": u1.id - 1, "request_id": 1} - ) - - response = await communicator.receive_json_from() - - assert response == { - "action": "delete", - "errors": ["Not found"], - "response_status": 404, - "request_id": 1, - "data": None, - } - - await communicator.send_json_to({"action": "delete", "pk": u1.id, "request_id": 1}) - - response = await communicator.receive_json_from() - - assert response == { - "action": "delete", - "errors": [], - "response_status": 204, - "request_id": 1, - "data": None, - } - - assert not await database_sync_to_async( - get_user_model().objects.filter(id=u1.id).exists - )() + async with connected_communicator(AConsumer()) as communicator: + + await communicator.send_json_to({"action": "delete", "pk": 100, "request_id": 1}) + + response = await communicator.receive_json_from() + + assert response == { + "action": "delete", + "errors": ["Not found"], + "response_status": 404, + "request_id": 1, + "data": None, + } + + u1 = await database_sync_to_async(get_user_model().objects.create)( + username="test1", email="42@example.com" + ) + await database_sync_to_async(get_user_model().objects.create)( + username="test2", email="45@example.com" + ) + + await communicator.send_json_to( + {"action": "delete", "pk": u1.id - 1, "request_id": 1} + ) + + response = await communicator.receive_json_from() + + assert response == { + "action": "delete", + "errors": ["Not found"], + "response_status": 404, + "request_id": 1, + "data": None, + } + + await communicator.send_json_to({"action": "delete", "pk": u1.id, "request_id": 1}) + + response = await communicator.receive_json_from() + + assert response == { + "action": "delete", + "errors": [], + "response_status": 204, + "request_id": 1, + "data": None, + } + + assert not await database_sync_to_async( + get_user_model().objects.filter(id=u1.id).exists + )() diff --git a/tests/test_model_observer.py b/tests/test_model_observer.py index 57ab561..86ce7d8 100644 --- a/tests/test_model_observer.py +++ b/tests/test_model_observer.py @@ -1,11 +1,9 @@ -import asyncio - import pytest from channels import DEFAULT_CHANNEL_LAYER from channels.db import database_sync_to_async from channels.layers import channel_layers -from channels.testing import WebsocketCommunicator -from django.contrib.auth import get_user_model, user_logged_in +from tests.communicator import connected_communicator +from django.contrib.auth import get_user_model from rest_framework import serializers from djangochannelsrestframework.decorators import action @@ -56,123 +54,119 @@ async def update_username(self, pk=None, username=None, **kwargs): assert not await database_sync_to_async(get_user_model().objects.all().exists)() # Test a normal connection - communicator = WebsocketCommunicator(TestConsumer(), "/testws/") - connected, _ = await communicator.connect() - assert connected + async with connected_communicator(TestConsumer()) as communicator: - await communicator.send_json_to({"action": "retrieve", "pk": 100, "request_id": 1}) + await communicator.send_json_to({"action": "retrieve", "pk": 100, "request_id": 1}) - response = await communicator.receive_json_from() + response = await communicator.receive_json_from() - assert response == { - "action": "retrieve", - "errors": ["Not found"], - "response_status": 404, - "request_id": 1, - "data": None, - } + assert response == { + "action": "retrieve", + "errors": ["Not found"], + "response_status": 404, + "request_id": 1, + "data": None, + } - u1 = await database_sync_to_async(get_user_model().objects.create)( - username="test1", email="42@example.com" - ) - u2 = await database_sync_to_async(get_user_model().objects.create)( - username="test2", email="45@example.com" - ) - - # lookup a pk that is not there - await communicator.send_json_to( - {"action": "retrieve", "pk": u1.id - 1, "request_id": 1} - ) - - response = await communicator.receive_json_from() - - assert response == { - "action": "retrieve", - "errors": ["Not found"], - "response_status": 404, - "request_id": 1, - "data": None, - } + u1 = await database_sync_to_async(get_user_model().objects.create)( + username="test1", email="42@example.com" + ) + u2 = await database_sync_to_async(get_user_model().objects.create)( + username="test2", email="45@example.com" + ) - # lookup up u1 - await communicator.send_json_to( - {"action": "retrieve", "pk": u1.id, "request_id": 1} - ) + # lookup a pk that is not there + await communicator.send_json_to( + {"action": "retrieve", "pk": u1.id - 1, "request_id": 1} + ) - response = await communicator.receive_json_from() + response = await communicator.receive_json_from() - assert response == { - "action": "retrieve", - "errors": [], - "response_status": 200, - "request_id": 1, - "data": {"email": "42@example.com", "id": u1.id, "username": "test1"}, - } + assert response == { + "action": "retrieve", + "errors": ["Not found"], + "response_status": 404, + "request_id": 1, + "data": None, + } - # lookup up u1 - await communicator.send_json_to( - {"action": "subscribe_instance", "pk": u1.id, "request_id": 4} - ) + # lookup up u1 + await communicator.send_json_to( + {"action": "retrieve", "pk": u1.id, "request_id": 1} + ) - response = await communicator.receive_json_from() + response = await communicator.receive_json_from() - assert response == { - "action": "subscribe_instance", - "errors": [], - "response_status": 201, - "request_id": 4, - "data": None, - } + assert response == { + "action": "retrieve", + "errors": [], + "response_status": 200, + "request_id": 1, + "data": {"email": "42@example.com", "id": u1.id, "username": "test1"}, + } - u3 = await database_sync_to_async(get_user_model().objects.create)( - username="test3", email="46@example.com" - ) + # lookup up u1 + await communicator.send_json_to( + {"action": "subscribe_instance", "pk": u1.id, "request_id": 4} + ) - # lookup up u1 - await communicator.send_json_to( - { - "action": "update_username", - "pk": u1.id, - "username": "thenewname", - "request_id": 5, + response = await communicator.receive_json_from() + + assert response == { + "action": "subscribe_instance", + "errors": [], + "response_status": 201, + "request_id": 4, + "data": None, } - ) - response = await communicator.receive_json_from() + u3 = await database_sync_to_async(get_user_model().objects.create)( + username="test3", email="46@example.com" + ) - assert response == { - "action": "update_username", - "errors": [], - "response_status": 200, - "request_id": 5, - "data": {"pk": u1.id}, - } + # lookup up u1 + await communicator.send_json_to( + { + "action": "update_username", + "pk": u1.id, + "username": "thenewname", + "request_id": 5, + } + ) - response = await communicator.receive_json_from() + response = await communicator.receive_json_from() - assert response == { - "action": "update", - "errors": [], - "response_status": 200, - "request_id": 4, - "data": {"email": "42@example.com", "id": u1.id, "username": "thenewname"}, - } + assert response == { + "action": "update_username", + "errors": [], + "response_status": 200, + "request_id": 5, + "data": {"pk": u1.id}, + } + + response = await communicator.receive_json_from() - u1_pk = u1.pk + assert response == { + "action": "update", + "errors": [], + "response_status": 200, + "request_id": 4, + "data": {"email": "42@example.com", "id": u1.id, "username": "thenewname"}, + } - await database_sync_to_async(u1.delete)() + u1_pk = u1.pk - response = await communicator.receive_json_from() + await database_sync_to_async(u1.delete)() - assert response == { - "action": "delete", - "errors": [], - "response_status": 204, - "request_id": 4, - "data": {"pk": u1_pk}, - } + response = await communicator.receive_json_from() - await communicator.disconnect() + assert response == { + "action": "delete", + "errors": [], + "response_status": 204, + "request_id": 4, + "data": {"pk": u1_pk}, + } @pytest.mark.django_db(transaction=True) @@ -222,59 +216,53 @@ async def update_username(self, pk=None, name=None, **kwargs): assert not await database_sync_to_async(get_user_model().objects.all().exists)() # Test a normal connection - communicator1 = WebsocketCommunicator(TestOtherConsumer(), "/testws/") - connected, _ = await communicator1.connect() - assert connected + async with connected_communicator(TestOtherConsumer()) as communicator1: + async with connected_communicator(TestUserConsumer()) as communicator2: - # Test a normal connection - communicator2 = WebsocketCommunicator(TestUserConsumer(), "/testws/") - connected, _ = await communicator2.connect() - assert connected - - u1 = await database_sync_to_async(get_user_model().objects.create)( - username="test1", email="42@example.com" - ) - t1 = await database_sync_to_async(TestModel.objects.create)(name="test2") - - await communicator1.send_json_to( - {"action": "subscribe_instance", "pk": t1.id, "request_id": 4} - ) - - response = await communicator1.receive_json_from() - - assert response == { - "action": "subscribe_instance", - "errors": [], - "response_status": 201, - "request_id": 4, - "data": None, - } + u1 = await database_sync_to_async(get_user_model().objects.create)( + username="test1", email="42@example.com" + ) + t1 = await database_sync_to_async(TestModel.objects.create)(name="test2") - await communicator2.send_json_to( - {"action": "subscribe_instance", "pk": u1.id, "request_id": 4} - ) + await communicator1.send_json_to( + {"action": "subscribe_instance", "pk": t1.id, "request_id": 4} + ) - response = await communicator2.receive_json_from() + response = await communicator1.receive_json_from() - assert response == { - "action": "subscribe_instance", - "errors": [], - "response_status": 201, - "request_id": 4, - "data": None, - } + assert response == { + "action": "subscribe_instance", + "errors": [], + "response_status": 201, + "request_id": 4, + "data": None, + } + + await communicator2.send_json_to( + {"action": "subscribe_instance", "pk": u1.id, "request_id": 4} + ) + + response = await communicator2.receive_json_from() - # update the user + assert response == { + "action": "subscribe_instance", + "errors": [], + "response_status": 201, + "request_id": 4, + "data": None, + } - u1.username = "no not a value" + # update the user - await database_sync_to_async(u1.save)() + u1.username = "no not a value" - # user is updated - await communicator2.receive_json_from() + await database_sync_to_async(u1.save)() - # test model is not - assert await communicator1.receive_nothing() + # user is updated + assert await communicator2.receive_json_from() + + # test model is not + assert await communicator1.receive_nothing() @pytest.mark.django_db(transaction=True) @@ -309,97 +297,93 @@ async def update_username(self, pk=None, username=None, **kwargs): assert not await database_sync_to_async(get_user_model().objects.all().exists)() # Test a normal connection - communicator = WebsocketCommunicator(TestConsumerUnsubscribe(), "/testws/") - connected, _ = await communicator.connect() - assert connected - - u1 = await database_sync_to_async(get_user_model().objects.create)( - username="test1", email="42@example.com" - ) - - # lookup up u1 - await communicator.send_json_to( - {"action": "subscribe_instance", "pk": u1.id, "request_id": 4} - ) - - response = await communicator.receive_json_from() - assert await communicator.receive_nothing() - - assert response == { - "action": "subscribe_instance", - "errors": [], - "response_status": 201, - "request_id": 4, - "data": None, - } + async with connected_communicator(TestConsumerUnsubscribe()) as communicator: - await communicator.send_json_to( - { - "action": "update_username", - "pk": u1.id, - "username": "thenewname", - "request_id": 5, + u1 = await database_sync_to_async(get_user_model().objects.create)( + username="test1", email="42@example.com" + ) + + # lookup up u1 + await communicator.send_json_to( + {"action": "subscribe_instance", "pk": u1.id, "request_id": 4} + ) + + response = await communicator.receive_json_from() + assert await communicator.receive_nothing() + + assert response == { + "action": "subscribe_instance", + "errors": [], + "response_status": 201, + "request_id": 4, + "data": None, } - ) - - a = await communicator.receive_json_from() - - b = await communicator.receive_json_from() - - assert { - "action": "update_username", - "errors": [], - "response_status": 200, - "request_id": 5, - "data": {"pk": u1.id}, - } in [a, b] - - assert { - "action": "update", - "errors": [], - "response_status": 200, - "request_id": 4, - "data": {"email": "42@example.com", "id": u1.pk, "username": "thenewname"}, - } in [a, b] - - # unsubscribe - # lookup up u1 - - await communicator.send_json_to( - {"action": "unsubscribe_instance", "pk": u1.id, "request_id": 4} - ) - - response = await communicator.receive_json_from() - - assert response == { - "action": "unsubscribe_instance", - "errors": [], - "response_status": 204, - "request_id": 4, - "data": None, - } - assert await communicator.receive_nothing() - await communicator.send_json_to( - { + await communicator.send_json_to( + { + "action": "update_username", + "pk": u1.id, + "username": "thenewname", + "request_id": 5, + } + ) + + a = await communicator.receive_json_from() + + b = await communicator.receive_json_from() + + assert { "action": "update_username", - "pk": u1.id, - "username": "thenewname", + "errors": [], + "response_status": 200, "request_id": 5, - } - ) + "data": {"pk": u1.id}, + } in [a, b] + + assert { + "action": "update", + "errors": [], + "response_status": 200, + "request_id": 4, + "data": {"email": "42@example.com", "id": u1.pk, "username": "thenewname"}, + } in [a, b] + + # unsubscribe + # lookup up u1 + + await communicator.send_json_to( + {"action": "unsubscribe_instance", "pk": u1.id, "request_id": 4} + ) - response = await communicator.receive_json_from() + response = await communicator.receive_json_from() - assert response == { - "action": "update_username", - "errors": [], - "response_status": 200, - "request_id": 5, - "data": {"pk": u1.id}, - } + assert response == { + "action": "unsubscribe_instance", + "errors": [], + "response_status": 204, + "request_id": 4, + "data": None, + } + assert await communicator.receive_nothing() + + await communicator.send_json_to( + { + "action": "update_username", + "pk": u1.id, + "username": "thenewname", + "request_id": 5, + } + ) - await communicator.disconnect() + response = await communicator.receive_json_from() + + assert response == { + "action": "update_username", + "errors": [], + "response_status": 200, + "request_id": 5, + "data": {"pk": u1.id}, + } @pytest.mark.django_db(transaction=True) @@ -436,108 +420,104 @@ async def update_username(self, pk=None, username=None, **kwargs): assert not await database_sync_to_async(get_user_model().objects.all().exists)() # Test a normal connection - communicator = WebsocketCommunicator(TestConsumerMultipleSubs(), "/testws/") - connected, _ = await communicator.connect() - assert connected - - u1 = await database_sync_to_async(get_user_model().objects.create)( - username="test1", email="42@example.com" - ) - - u2 = await database_sync_to_async(get_user_model().objects.create)( - username="test2", email="45@example.com" - ) - - # Subscribe to instance user 1 - await communicator.send_json_to( - {"action": "subscribe_instance", "pk": u1.id, "request_id": 4} - ) - - response = await communicator.receive_json_from() - - assert response == { - "action": "subscribe_instance", - "errors": [], - "response_status": 201, - "request_id": 4, - "data": None, - } + async with connected_communicator(TestConsumerMultipleSubs()) as communicator: + + u1 = await database_sync_to_async(get_user_model().objects.create)( + username="test1", email="42@example.com" + ) - # Subscribe to instance user 2 - await communicator.send_json_to( - {"action": "subscribe_instance", "pk": u2.id, "request_id": 5} - ) + u2 = await database_sync_to_async(get_user_model().objects.create)( + username="test2", email="45@example.com" + ) - response = await communicator.receive_json_from() + # Subscribe to instance user 1 + await communicator.send_json_to( + {"action": "subscribe_instance", "pk": u1.id, "request_id": 4} + ) - assert response == { - "action": "subscribe_instance", - "errors": [], - "response_status": 201, - "request_id": 5, - "data": None, - } + response = await communicator.receive_json_from() - # lookup up u1 - await communicator.send_json_to( - { - "action": "update_username", - "pk": u1.id, - "username": "new name", - "request_id": 10, + assert response == { + "action": "subscribe_instance", + "errors": [], + "response_status": 201, + "request_id": 4, + "data": None, } - ) - response = await communicator.receive_json_from() + # Subscribe to instance user 2 + await communicator.send_json_to( + {"action": "subscribe_instance", "pk": u2.id, "request_id": 5} + ) - assert response == { - "action": "update_username", - "errors": [], - "response_status": 200, - "request_id": 10, - "data": {"pk": u1.id}, - } + response = await communicator.receive_json_from() - response = await communicator.receive_json_from() + assert response == { + "action": "subscribe_instance", + "errors": [], + "response_status": 201, + "request_id": 5, + "data": None, + } - assert response == { - "action": "update", - "errors": [], - "response_status": 200, - "request_id": 4, - "data": {"email": "42@example.com", "id": u1.id, "username": "new name"}, - } + # lookup up u1 + await communicator.send_json_to( + { + "action": "update_username", + "pk": u1.id, + "username": "new name", + "request_id": 10, + } + ) - assert await communicator.receive_nothing() + response = await communicator.receive_json_from() - # Update U2 - await communicator.send_json_to( - { + assert response == { "action": "update_username", - "pk": u2.id, - "username": "the new name 2", - "request_id": 11, + "errors": [], + "response_status": 200, + "request_id": 10, + "data": {"pk": u1.id}, } - ) - response = await communicator.receive_json_from() + response = await communicator.receive_json_from() - assert response == { - "action": "update_username", - "errors": [], - "response_status": 200, - "request_id": 11, - "data": {"pk": u2.id}, - } + assert response == { + "action": "update", + "errors": [], + "response_status": 200, + "request_id": 4, + "data": {"email": "42@example.com", "id": u1.id, "username": "new name"}, + } - response = await communicator.receive_json_from() + assert await communicator.receive_nothing() - assert response == { - "action": "update", - "errors": [], - "response_status": 200, - "request_id": 5, - "data": {"email": "45@example.com", "id": u2.id, "username": "the new name 2"}, - } + # Update U2 + await communicator.send_json_to( + { + "action": "update_username", + "pk": u2.id, + "username": "the new name 2", + "request_id": 11, + } + ) + + response = await communicator.receive_json_from() + + assert response == { + "action": "update_username", + "errors": [], + "response_status": 200, + "request_id": 11, + "data": {"pk": u2.id}, + } + + response = await communicator.receive_json_from() - await communicator.disconnect() + assert response == { + "action": "update", + "errors": [], + "response_status": 200, + "request_id": 5, + "data": {"email": "45@example.com", "id": u2.id, "username": "the new name 2"}, + } diff --git a/tests/test_model_with_custom_pk_observer.py b/tests/test_model_with_custom_pk_observer.py index 1f90938..72f4e74 100644 --- a/tests/test_model_with_custom_pk_observer.py +++ b/tests/test_model_with_custom_pk_observer.py @@ -1,11 +1,8 @@ from typing import Dict -import asyncio import pytest -from channels import DEFAULT_CHANNEL_LAYER from channels.db import database_sync_to_async -from channels.layers import channel_layers -from channels.testing import WebsocketCommunicator +from tests.communicator import connected_communicator from rest_framework import serializers from djangochannelsrestframework.decorators import action @@ -30,7 +27,7 @@ async def test_subscription_create_notification(settings): class TestSerializer(serializers.ModelSerializer): class Meta: model = TestModelWithCustomPK - fields = ("name",) + fields = ("name", "description") class TestConsumer(GenericAsyncAPIConsumer): @@ -55,25 +52,48 @@ async def subscribe_to_all_changes(self, request_id, **kwargs): await self.model_change.subscribe(request_id=request_id) # connect - communicator = WebsocketCommunicator(TestConsumer(), "/testws/") - connected, _ = await communicator.connect() - assert connected - - # subscribe - subscription_id = 1 - await communicator.send_json_to( - {"action": "subscribe_to_all_changes", "request_id": subscription_id} - ) - - # create an instance - created_instance = await database_sync_to_async( - TestModelWithCustomPK.objects.create - )(name="some_unique_name") - - # check the response - response = await communicator.receive_json_from() - assert response == { - "action": "create", - "request_id": subscription_id, - "data": TestSerializer(created_instance).data, - } + async with connected_communicator(TestConsumer()) as communicator: + + # subscribe + subscription_id = 1 + await communicator.send_json_to( + {"action": "subscribe_to_all_changes", "request_id": subscription_id} + ) + + # create an instance + created_instance = await database_sync_to_async( + TestModelWithCustomPK.objects.create + )(name="some_unique_name") + + # check the response + response = await communicator.receive_json_from() + assert response == { + "action": "create", + "request_id": subscription_id, + "data": TestSerializer(created_instance).data, + } + + # update the instance + created_instance.description = "some description" + await database_sync_to_async(created_instance.save)() + + # check the response + response = await communicator.receive_json_from() + assert response == { + "action": "update", + "request_id": subscription_id, + "data": TestSerializer(created_instance).data, + } + + deleted_instance_data = TestSerializer(created_instance).data + + # delete the instance + await database_sync_to_async(created_instance.delete)() + + # check the response + response = await communicator.receive_json_from() + assert response == { + "action": "delete", + "request_id": subscription_id, + "data": deleted_instance_data, + } diff --git a/tests/test_observer.py b/tests/test_observer.py index 4c64157..13ef303 100644 --- a/tests/test_observer.py +++ b/tests/test_observer.py @@ -5,7 +5,7 @@ from channels import DEFAULT_CHANNEL_LAYER from channels.db import database_sync_to_async from channels.layers import channel_layers -from channels.testing import WebsocketCommunicator +from tests.communicator import connected_communicator from django.contrib.auth import user_logged_in, get_user_model from django.db import transaction from django.utils.text import slugify @@ -40,25 +40,19 @@ async def accept(self, **kwargs): async def handle_user_logged_in(self, message, observer=None, **kwargs): await self.send_json({"message": message, "observer": observer is not None}) - communicator = WebsocketCommunicator(TestConsumer(), "/testws/") + async with connected_communicator(TestConsumer()) as communicator: - connected, _ = await communicator.connect() + user = await database_sync_to_async(get_user_model().objects.create)( + username="test", email="test@example.com" + ) - assert connected + await database_sync_to_async(user_logged_in.send)( + sender=user.__class__, request=None, user=user + ) - user = await database_sync_to_async(get_user_model().objects.create)( - username="test", email="test@example.com" - ) - - await database_sync_to_async(user_logged_in.send)( - sender=user.__class__, request=None, user=user - ) - - response = await communicator.receive_json_from() + response = await communicator.receive_json_from() - assert {"message": {}, "observer": True} == response - - await communicator.disconnect() + assert {"message": {}, "observer": True} == response @pytest.mark.django_db(transaction=True) @@ -86,25 +80,19 @@ async def user_change_observer_wrapper( ): await self.send_json(dict(body=message, action=action, type=message_type)) - communicator = WebsocketCommunicator(TestConsumer(), "/testws/") - - connected, _ = await communicator.connect() - - assert connected - - user = await database_sync_to_async(get_user_model().objects.create)( - username="test", email="test@example.com" - ) + async with connected_communicator(TestConsumer()) as communicator: - response = await communicator.receive_json_from() + user = await database_sync_to_async(get_user_model().objects.create)( + username="test", email="test@example.com" + ) - assert { - "action": "create", - "body": {"pk": user.pk}, - "type": "user.change.observer.wrapper", - } == response + response = await communicator.receive_json_from() - await communicator.disconnect() + assert { + "action": "create", + "body": {"pk": user.pk}, + "type": "user.change.observer.wrapper", + } == response @pytest.mark.django_db(transaction=True) @@ -132,36 +120,30 @@ async def user_change_wrapper_in_transaction( ): await self.send_json(dict(body=message, action=action, type=message_type)) - communicator = WebsocketCommunicator(TestConsumer(), "/testws/") - - connected, _ = await communicator.connect() + async with connected_communicator(TestConsumer()) as communicator: - assert connected + @database_sync_to_async + def create_user_and_wait(): - @database_sync_to_async - def create_user_and_wait(): - - with transaction.atomic(): - user = get_user_model().objects.create( - username="test", email="test@example.com" - ) - assert async_to_sync(communicator.receive_nothing(timeout=0.1)) - user.username = "mike" - user.save() - assert async_to_sync(communicator.receive_nothing(timeout=0.1)) - return user - - user = await create_user_and_wait() + with transaction.atomic(): + user = get_user_model().objects.create( + username="test", email="test@example.com" + ) + assert async_to_sync(communicator.receive_nothing)(timeout=0.1) + user.username = "mike" + user.save() + assert async_to_sync(communicator.receive_nothing)(timeout=0.1) + return user - response = await communicator.receive_json_from() + user = await create_user_and_wait() - assert { - "action": "create", - "body": {"pk": user.pk}, - "type": "user.change.wrapper.in.transaction", - } == response + response = await communicator.receive_json_from() - await communicator.disconnect() + assert { + "action": "create", + "body": {"pk": user.pk}, + "type": "user.change.wrapper.in.transaction", + } == response @pytest.mark.django_db(transaction=True) @@ -189,41 +171,35 @@ async def user_change_observer_delete( ): await self.send_json(dict(body=message, action=action, type=message_type)) - communicator = WebsocketCommunicator(TestConsumerObserverDelete(), "/testws/") + async with connected_communicator(TestConsumerObserverDelete()) as communicator: + await communicator.receive_nothing() - connected, _ = await communicator.connect() + user = await database_sync_to_async(get_user_model())( + username="test", email="test@example.com" + ) + await database_sync_to_async(user.save)() - assert connected - await communicator.receive_nothing() + response = await communicator.receive_json_from() + await communicator.receive_nothing() - user = await database_sync_to_async(get_user_model())( - username="test", email="test@example.com" - ) - await database_sync_to_async(user.save)() + assert { + "action": "create", + "body": {"pk": user.pk}, + "type": "user.change.observer.delete", + } == response + pk = user.pk - response = await communicator.receive_json_from() - await communicator.receive_nothing() + await database_sync_to_async(user.delete)() - assert { - "action": "create", - "body": {"pk": user.pk}, - "type": "user.change.observer.delete", - } == response - pk = user.pk + response = await communicator.receive_json_from() - await database_sync_to_async(user.delete)() + await communicator.receive_nothing() - response = await communicator.receive_json_from() - - await communicator.receive_nothing() - - assert { - "action": "delete", - "body": {"pk": pk}, - "type": "user.change.observer.delete", - } == response - - await communicator.disconnect() + assert { + "action": "delete", + "body": {"pk": pk}, + "type": "user.change.observer.delete", + } == response @pytest.mark.django_db(transaction=True) @@ -251,41 +227,28 @@ async def user_change_many_connections_wrapper( ): await self.send_json(dict(body=message, action=action, type=message_type)) - communicator1 = WebsocketCommunicator(TestConsumer(), "/testws/") - - connected, _ = await communicator1.connect() - - assert connected + async with connected_communicator(TestConsumer()) as communicator2: + async with connected_communicator(TestConsumer()) as communicator1: - communicator2 = WebsocketCommunicator(TestConsumer(), "/testws/") - - connected, _ = await communicator2.connect() - - assert connected - - user = await database_sync_to_async(get_user_model().objects.create)( - username="test", email="test@example.com" - ) - - response = await communicator1.receive_json_from() - - assert { - "action": "create", - "body": {"pk": user.pk}, - "type": "user.change.many.connections.wrapper", - } == response + user = await database_sync_to_async(get_user_model().objects.create)( + username="test", email="test@example.com" + ) - await communicator1.disconnect() + response = await communicator1.receive_json_from() - response = await communicator2.receive_json_from() + assert { + "action": "create", + "body": {"pk": user.pk}, + "type": "user.change.many.connections.wrapper", + } == response - assert { - "action": "create", - "body": {"pk": user.pk}, - "type": "user.change.many.connections.wrapper", - } == response + response = await communicator2.receive_json_from() - await communicator2.disconnect() + assert { + "action": "create", + "body": {"pk": user.pk}, + "type": "user.change.many.connections.wrapper", + } == response @pytest.mark.django_db(transaction=True) @@ -324,41 +287,28 @@ async def user_change_many_consumers_wrapper_2( ): await self.send_json(dict(body=message, action=action, type=message_type)) - communicator1 = WebsocketCommunicator(TestConsumer(), "/testws/") - - connected, _ = await communicator1.connect() - - assert connected - - communicator2 = WebsocketCommunicator(TestConsumer2(), "/testws/") + async with connected_communicator(TestConsumer2()) as communicator2: + async with connected_communicator(TestConsumer()) as communicator1: - connected, _ = await communicator2.connect() - - assert connected - - user = await database_sync_to_async(get_user_model().objects.create)( - username="test", email="test@example.com" - ) - - response = await communicator1.receive_json_from() - - assert { - "action": "create", - "body": {"pk": user.pk}, - "type": "user.change.many.consumers.wrapper.1", - } == response + user = await database_sync_to_async(get_user_model().objects.create)( + username="test", email="test@example.com" + ) - await communicator1.disconnect() + response = await communicator1.receive_json_from() - response = await communicator2.receive_json_from() + assert { + "action": "create", + "body": {"pk": user.pk}, + "type": "user.change.many.consumers.wrapper.1", + } == response - assert { - "action": "create", - "body": {"pk": user.pk}, - "type": "user.change.many.consumers.wrapper.2", - } == response + response = await communicator2.receive_json_from() - await communicator2.disconnect() + assert { + "action": "create", + "body": {"pk": user.pk}, + "type": "user.change.many.consumers.wrapper.2", + } == response @pytest.mark.django_db(transaction=True) @@ -395,25 +345,19 @@ def user_change_custom_groups_wrapper( else: yield "-instance-username-{}".format(instance.username) - communicator = WebsocketCommunicator(TestConsumer(), "/testws/") - - connected, _ = await communicator.connect() + async with connected_communicator(TestConsumer()) as communicator: - assert connected + user = await database_sync_to_async(get_user_model().objects.create)( + username="test", email="test@example.com" + ) - user = await database_sync_to_async(get_user_model().objects.create)( - username="test", email="test@example.com" - ) - - response = await communicator.receive_json_from() + response = await communicator.receive_json_from() - assert { - "action": "create", - "body": {"pk": user.pk}, - "type": "user.change.custom.groups.wrapper", - } == response - - await communicator.disconnect() + assert { + "action": "create", + "body": {"pk": user.pk}, + "type": "user.change.custom.groups.wrapper", + } == response user = await database_sync_to_async(get_user_model().objects.create)( username="test2", email="test@example.com" @@ -452,62 +396,56 @@ async def accept(self, **kwargs): async def users_changes(self, message, action, **kwargs): await self.reply(data=message, action=action) - communicator = WebsocketCommunicator(TestConsumerObserverUsers(), "/testws/") + async with connected_communicator(TestConsumerObserverUsers()) as communicator: - connected, _ = await communicator.connect() + user = await database_sync_to_async(get_user_model().objects.create)( + username="test", email="test@example.com" + ) - assert connected + response = await communicator.receive_json_from() - user = await database_sync_to_async(get_user_model().objects.create)( - username="test", email="test@example.com" - ) - - response = await communicator.receive_json_from() - - assert { - "action": "create", - "response_status": 200, - "request_id": None, - "errors": [], - "data": { - "id": user.pk, - "username": user.username, - }, - } == response - - user.username = "test updated" - await database_sync_to_async(user.save)() + assert { + "action": "create", + "response_status": 200, + "request_id": None, + "errors": [], + "data": { + "id": user.pk, + "username": user.username, + }, + } == response - response = await communicator.receive_json_from() + user.username = "test updated" + await database_sync_to_async(user.save)() - assert { - "action": "update", - "response_status": 200, - "request_id": None, - "errors": [], - "data": { - "id": user.pk, - "username": user.username, - }, - } == response + response = await communicator.receive_json_from() - pk = user.pk - await database_sync_to_async(user.delete)() + assert { + "action": "update", + "response_status": 200, + "request_id": None, + "errors": [], + "data": { + "id": user.pk, + "username": user.username, + }, + } == response - response = await communicator.receive_json_from() + pk = user.pk + await database_sync_to_async(user.delete)() - assert { - "action": "delete", - "response_status": 200, - "request_id": None, - "errors": [], - "data": { - "id": pk, - "username": user.username, - }, - } == response + response = await communicator.receive_json_from() - await communicator.disconnect() + assert { + "action": "delete", + "response_status": 200, + "request_id": None, + "errors": [], + "data": { + "id": pk, + "username": user.username, + }, + } == response @pytest.mark.django_db(transaction=True) @@ -543,25 +481,19 @@ def user_change_custom_groups(self, instance=None, **kwargs): def user_change_custom_groups(self, username=None, **kwargs): yield "-instance-username-{}".format(slugify(username)) - communicator = WebsocketCommunicator(TestConsumerObserverCustomGroups(), "/testws/") - - connected, _ = await communicator.connect() + async with connected_communicator(TestConsumerObserverCustomGroups()) as communicator: - assert connected + user = await database_sync_to_async(get_user_model().objects.create)( + username="test", email="test@example.com" + ) - user = await database_sync_to_async(get_user_model().objects.create)( - username="test", email="test@example.com" - ) + response = await communicator.receive_json_from() - response = await communicator.receive_json_from() - - assert { - "action": "create", - "body": {"pk": user.pk}, - "type": "user.change.custom.groups", - } == response - - await communicator.disconnect() + assert { + "action": "create", + "body": {"pk": user.pk}, + "type": "user.change.custom.groups", + } == response user = await database_sync_to_async(get_user_model().objects.create)( username="test2", email="test@example.com" @@ -620,31 +552,25 @@ def user_change_custom_groups(self, instance=None, **kwargs): def user_change_custom_groups(self, username=None, **kwargs): yield "-instance-username-{}".format(slugify(username)) - communicator = WebsocketCommunicator(TestConsumerObserverCustomGroups(), "/testws/") - - connected, _ = await communicator.connect() - - assert connected + async with connected_communicator(TestConsumerObserverCustomGroups()) as communicator: - await communicator.send_json_to( - { - "action": "subscribe", - "username": "thenewname", - "request_id": 5, - } - ) - - user = await database_sync_to_async(get_user_model().objects.create)( - username="thenewname", email="test@example.com" - ) + await communicator.send_json_to( + { + "action": "subscribe", + "username": "thenewname", + "request_id": 5, + } + ) - response = await communicator.receive_json_from() + user = await database_sync_to_async(get_user_model().objects.create)( + username="thenewname", email="test@example.com" + ) - assert { - "action": "create", - "body": {"pk": user.pk}, - "type": "user.change.custom.groups", - "subscribing_request_ids": [5], - } == response + response = await communicator.receive_json_from() - await communicator.disconnect() + assert { + "action": "create", + "body": {"pk": user.pk}, + "type": "user.change.custom.groups", + "subscribing_request_ids": [5], + } == response diff --git a/tests/test_permission.py b/tests/test_permission.py index 39531e0..826835d 100644 --- a/tests/test_permission.py +++ b/tests/test_permission.py @@ -3,6 +3,8 @@ import pytest from channels.consumer import AsyncConsumer from channels.testing import WebsocketCommunicator + +from tests.communicator import connected_communicator from rest_framework.permissions import BasePermission as DRFBasePermission from djangochannelsrestframework.consumers import AsyncAPIConsumer @@ -37,20 +39,14 @@ async def target(self, *args, **kwargs): return {"response": True}, 200 # Test a normal connection - communicator = WebsocketCommunicator(AConsumer(), "/testws/") - - connected, _ = await communicator.connect() - - assert connected - - assert called == {"can_connect": True} + async with connected_communicator(AConsumer()) as communicator: - await communicator.send_json_to({"action": "target", "request_id": 10}) - response = await communicator.receive_json_from() + assert called == {"can_connect": True} - assert called == {"can_connect": True, "has_permission": True} + await communicator.send_json_to({"action": "target", "request_id": 10}) + response = await communicator.receive_json_from() - await communicator.disconnect() + assert called == {"can_connect": True, "has_permission": True} @pytest.mark.django_db(transaction=True) @@ -88,11 +84,8 @@ class AConsumer(AsyncAPIConsumer): pass # Test a normal connection - communicator = WebsocketCommunicator(AConsumer(), "/testws/") - - connected, _ = await communicator.connect() - assert connected - assert called == {"has_permission": True} + async with connected_communicator(AConsumer()) as communicator: + assert called == {"has_permission": True} @pytest.mark.django_db(transaction=True) @@ -116,11 +109,8 @@ class AConsumer(AsyncAPIConsumer): pass # Test a normal connection - communicator = WebsocketCommunicator(AConsumer(), "/testws/") - - connected, _ = await communicator.connect() - assert connected - assert called == {"has_permission_a": False, "has_permission_b": True} + async with connected_communicator(AConsumer()) as communicator: + assert called == {"has_permission_a": False, "has_permission_b": True} @pytest.mark.django_db(transaction=True) @@ -144,11 +134,8 @@ class AConsumer(AsyncAPIConsumer): pass # Test a normal connection - communicator = WebsocketCommunicator(AConsumer(), "/testws/") - - connected, _ = await communicator.connect() - assert connected - assert called == {"has_permission_a": True, "has_permission_b": True} + async with connected_communicator(AConsumer()) as communicator: + assert called == {"has_permission_a": True, "has_permission_b": True} @pytest.mark.django_db(transaction=True) @@ -167,11 +154,8 @@ class AConsumer(AsyncAPIConsumer): pass # Test a normal connection - communicator = WebsocketCommunicator(AConsumer(), "/testws/") - - connected, _ = await communicator.connect() - assert connected - assert called == {"has_permission": False} + async with connected_communicator(AConsumer()) as communicator: + assert called == {"has_permission": False} @pytest.mark.django_db(transaction=True) @@ -214,14 +198,12 @@ class AConsumer(AsyncAPIConsumer): pass # Test a normal connection - communicator = WebsocketCommunicator(AConsumer(), "/testws/") - - connected, _ = await communicator.connect() - assert connected - assert called == { - "has_permission_a": False, - "has_permission_b": True, - "has_permission_c": True, - "has_permission_d": True, - "has_permission_e": False - } + async with connected_communicator(AConsumer()) as communicator: + + assert called == { + "has_permission_a": False, + "has_permission_b": True, + "has_permission_c": True, + "has_permission_d": True, + "has_permission_e": False + } From 73368fe8d9df39c2f8ba6a0aef033e3d0a006a76 Mon Sep 17 00:00:00 2001 From: Tumblingman <73558578+tumblingman@users.noreply.github.com> Date: Wed, 27 Nov 2024 09:58:27 +0400 Subject: [PATCH 04/13] Updated dependency list. Added delay after subscription to model changes in some tests. --- requirements.txt | 14 +++++++------- setup.py | 11 ++++++----- tests/test_model_with_custom_pk_observer.py | 3 +++ tests/test_observer.py | 2 ++ 4 files changed, 18 insertions(+), 12 deletions(-) diff --git a/requirements.txt b/requirements.txt index 9112a44..c81546b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,8 +1,8 @@ -django==4.2.4 -djangorestframework>=3.14.0 -channels>=4.0.0 -pytest -pytest-asyncio -pytest-django +django>=4.2.16 +djangorestframework>=3.15.2 +channels>=4.1.0 +pytest>=8.3.3 +pytest-django>=4.9.0 +pytest-asyncio>=0.24.0 sphinx -black==20.8b1 +black==24.3.0 diff --git a/setup.py b/setup.py index 4ee40d5..32cce92 100644 --- a/setup.py +++ b/setup.py @@ -12,13 +12,13 @@ license="MIT", packages=find_packages(exclude=["tests"]), include_package_data=True, - install_requires=["Django>=3.2", "channels>=4.0.0", "djangorestframework>=3.14.0"], + install_requires=["Django>=4.2.16", "channels>=4.1.0", "djangorestframework>=3.15.2"], extras_require={ "tests": [ - "channels[daphne]>=4.0.0", - "pytest>=7.0.1", - "pytest-django>=4.5.2", - "pytest-asyncio>=0.18.1", + "channels[daphne]>=4.1.0", + "pytest>=8.3.3", + "pytest-django>=4.9.0", + "pytest-asyncio>=0.24.0", "coverage>=6.3.1", ], }, @@ -33,6 +33,7 @@ "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", "Framework :: Django", "Topic :: Internet :: WWW/HTTP", ], diff --git a/tests/test_model_with_custom_pk_observer.py b/tests/test_model_with_custom_pk_observer.py index 72f4e74..d8102ac 100644 --- a/tests/test_model_with_custom_pk_observer.py +++ b/tests/test_model_with_custom_pk_observer.py @@ -1,3 +1,4 @@ +import asyncio from typing import Dict import pytest @@ -60,6 +61,8 @@ async def subscribe_to_all_changes(self, request_id, **kwargs): {"action": "subscribe_to_all_changes", "request_id": subscription_id} ) + await asyncio.sleep(0.5) + # create an instance created_instance = await database_sync_to_async( TestModelWithCustomPK.objects.create diff --git a/tests/test_observer.py b/tests/test_observer.py index 13ef303..2e552e8 100644 --- a/tests/test_observer.py +++ b/tests/test_observer.py @@ -562,6 +562,8 @@ def user_change_custom_groups(self, username=None, **kwargs): } ) + await asyncio.sleep(0.5) + user = await database_sync_to_async(get_user_model().objects.create)( username="thenewname", email="test@example.com" ) From cbebb66216a0785706f4bcbc544f3c030a4322cf Mon Sep 17 00:00:00 2001 From: Tumblingman <73558578+tumblingman@users.noreply.github.com> Date: Wed, 27 Nov 2024 22:57:41 +0400 Subject: [PATCH 05/13] Refactor websocket connection tests to use AsyncExitStack for improved readability and flexibility --- tests/communicator.py | 17 +++++++- tests/test_model_observer.py | 81 +++++++++++++++++++----------------- tests/test_observer.py | 50 ++++++++++++---------- 3 files changed, 86 insertions(+), 62 deletions(-) diff --git a/tests/communicator.py b/tests/communicator.py index d02b5b9..d59ce2b 100644 --- a/tests/communicator.py +++ b/tests/communicator.py @@ -32,6 +32,12 @@ class Communicator(WebsocketCommunicator): except asyncio.TimeoutError: break """ + _connected = False + + @property + def connected(self): + return self._connected + async def receive_output(self, timeout=1): if self.future.done(): self.future.result() # Ensure exceptions are re-raised if future is complete @@ -43,6 +49,14 @@ async def receive_output(self, timeout=1): self.future.result() raise e # Propagate the timeout exception + async def connect(self, timeout=1): + self._connected, subprotocol = await super().connect(timeout) + return self._connected, subprotocol + + async def disconnect(self, code=1000, timeout=1): + await super().disconnect(code, timeout) + self._connected = False + @asynccontextmanager async def connected_communicator(consumer, path: str = "/testws/") -> Awaitable[Communicator]: @@ -67,4 +81,5 @@ async def connected_communicator(consumer, path: str = "/testws/") -> Awaitable[ assert connected, "Failed to connect to WebSocket" yield communicator finally: - await communicator.disconnect() + if communicator.connected: + await communicator.disconnect() diff --git a/tests/test_model_observer.py b/tests/test_model_observer.py index 86ce7d8..2f04c79 100644 --- a/tests/test_model_observer.py +++ b/tests/test_model_observer.py @@ -1,3 +1,5 @@ +from contextlib import AsyncExitStack + import pytest from channels import DEFAULT_CHANNEL_LAYER from channels.db import database_sync_to_async @@ -216,53 +218,54 @@ async def update_username(self, pk=None, name=None, **kwargs): assert not await database_sync_to_async(get_user_model().objects.all().exists)() # Test a normal connection - async with connected_communicator(TestOtherConsumer()) as communicator1: - async with connected_communicator(TestUserConsumer()) as communicator2: - - u1 = await database_sync_to_async(get_user_model().objects.create)( - username="test1", email="42@example.com" - ) - t1 = await database_sync_to_async(TestModel.objects.create)(name="test2") - - await communicator1.send_json_to( - {"action": "subscribe_instance", "pk": t1.id, "request_id": 4} - ) - - response = await communicator1.receive_json_from() - - assert response == { - "action": "subscribe_instance", - "errors": [], - "response_status": 201, - "request_id": 4, - "data": None, - } + async with AsyncExitStack() as stack: + communicator1 = await stack.enter_async_context(connected_communicator(TestOtherConsumer())) + communicator2 = await stack.enter_async_context(connected_communicator(TestUserConsumer())) - await communicator2.send_json_to( - {"action": "subscribe_instance", "pk": u1.id, "request_id": 4} - ) + u1 = await database_sync_to_async(get_user_model().objects.create)( + username="test1", email="42@example.com" + ) + t1 = await database_sync_to_async(TestModel.objects.create)(name="test2") - response = await communicator2.receive_json_from() + await communicator1.send_json_to( + {"action": "subscribe_instance", "pk": t1.id, "request_id": 4} + ) - assert response == { - "action": "subscribe_instance", - "errors": [], - "response_status": 201, - "request_id": 4, - "data": None, - } + response = await communicator1.receive_json_from() + + assert response == { + "action": "subscribe_instance", + "errors": [], + "response_status": 201, + "request_id": 4, + "data": None, + } + + await communicator2.send_json_to( + {"action": "subscribe_instance", "pk": u1.id, "request_id": 4} + ) + + response = await communicator2.receive_json_from() + + assert response == { + "action": "subscribe_instance", + "errors": [], + "response_status": 201, + "request_id": 4, + "data": None, + } - # update the user + # update the user - u1.username = "no not a value" + u1.username = "no not a value" - await database_sync_to_async(u1.save)() + await database_sync_to_async(u1.save)() - # user is updated - assert await communicator2.receive_json_from() + # user is updated + assert await communicator2.receive_json_from() - # test model is not - assert await communicator1.receive_nothing() + # test model is not + assert await communicator1.receive_nothing() @pytest.mark.django_db(transaction=True) diff --git a/tests/test_observer.py b/tests/test_observer.py index 2e552e8..be9787c 100644 --- a/tests/test_observer.py +++ b/tests/test_observer.py @@ -1,4 +1,5 @@ import asyncio +from contextlib import AsyncExitStack import pytest from asgiref.sync import async_to_sync @@ -227,20 +228,22 @@ async def user_change_many_connections_wrapper( ): await self.send_json(dict(body=message, action=action, type=message_type)) - async with connected_communicator(TestConsumer()) as communicator2: - async with connected_communicator(TestConsumer()) as communicator1: + async with AsyncExitStack() as stack: + communicator1 = await stack.enter_async_context(connected_communicator(TestConsumer())) + communicator2 = await stack.enter_async_context(connected_communicator(TestConsumer())) + user = await database_sync_to_async(get_user_model().objects.create)( + username="test", email="test@example.com" + ) - user = await database_sync_to_async(get_user_model().objects.create)( - username="test", email="test@example.com" - ) + response = await communicator1.receive_json_from() - response = await communicator1.receive_json_from() + assert { + "action": "create", + "body": {"pk": user.pk}, + "type": "user.change.many.connections.wrapper", + } == response - assert { - "action": "create", - "body": {"pk": user.pk}, - "type": "user.change.many.connections.wrapper", - } == response + await communicator1.disconnect() response = await communicator2.receive_json_from() @@ -287,20 +290,23 @@ async def user_change_many_consumers_wrapper_2( ): await self.send_json(dict(body=message, action=action, type=message_type)) - async with connected_communicator(TestConsumer2()) as communicator2: - async with connected_communicator(TestConsumer()) as communicator1: + async with AsyncExitStack() as stack: + communicator1 = await stack.enter_async_context(connected_communicator(TestConsumer())) + communicator2 = await stack.enter_async_context(connected_communicator(TestConsumer2())) - user = await database_sync_to_async(get_user_model().objects.create)( - username="test", email="test@example.com" - ) + user = await database_sync_to_async(get_user_model().objects.create)( + username="test", email="test@example.com" + ) + + response = await communicator1.receive_json_from() - response = await communicator1.receive_json_from() + assert { + "action": "create", + "body": {"pk": user.pk}, + "type": "user.change.many.consumers.wrapper.1", + } == response - assert { - "action": "create", - "body": {"pk": user.pk}, - "type": "user.change.many.consumers.wrapper.1", - } == response + await communicator1.disconnect() response = await communicator2.receive_json_from() From a25166b706e5018cbf86434f38548e2fe861b158 Mon Sep 17 00:00:00 2001 From: Tumblingman <73558578+tumblingman@users.noreply.github.com> Date: Tue, 10 Dec 2024 21:51:36 +0400 Subject: [PATCH 06/13] Fixed unsubscribe logic in the base observer --- .../observer/base_observer.py | 31 ++--- tests/test_observer.py | 129 ++++++++++++++++++ 2 files changed, 138 insertions(+), 22 deletions(-) diff --git a/djangochannelsrestframework/observer/base_observer.py b/djangochannelsrestframework/observer/base_observer.py index 5fc6d02..03ed5bf 100644 --- a/djangochannelsrestframework/observer/base_observer.py +++ b/djangochannelsrestframework/observer/base_observer.py @@ -175,30 +175,17 @@ async def unsubscribe_to_comment_activity(self, request_id, **kwargs): groups = list(self.group_names_for_consumer(*args, consumer=consumer, **kwargs)) for group_name in groups: - # remove group to request mappings - if ( - group_name - in consumer._observer_group_to_request_id[self._stable_observer_id] - ): - # unsubscribe all requests to this group + if group_name in consumer._observer_group_to_request_id[self._stable_observer_id]: if request_id is None: - consumer._observer_group_to_request_id[ - self._stable_observer_id - ].pop(group_name) + consumer._observer_group_to_request_id[self._stable_observer_id].pop(group_name) else: - consumer._observer_group_to_request_id[self._stable_observer_id][ - group_name - ].remove(request_id) - - if ( - len( - consumer._observer_group_to_request_id[self._stable_observer_id][ - group_name - ] - ) - > 0 - ): - await consumer.remove_group(group_name) + consumer._observer_group_to_request_id[self._stable_observer_id][group_name].discard(request_id) + + if not consumer._observer_group_to_request_id[self._stable_observer_id][group_name]: + consumer._observer_group_to_request_id[self._stable_observer_id].pop(group_name) + + if group_name not in consumer._observer_group_to_request_id[self._stable_observer_id]: + await consumer.remove_group(group_name) return groups diff --git a/tests/test_observer.py b/tests/test_observer.py index be9787c..95972af 100644 --- a/tests/test_observer.py +++ b/tests/test_observer.py @@ -582,3 +582,132 @@ def user_change_custom_groups(self, username=None, **kwargs): "type": "user.change.custom.groups", "subscribing_request_ids": [5], } == response + + +@pytest.mark.django_db(transaction=True) +@pytest.mark.asyncio +async def test_observer_unsubscribe_behavior_with_custom_groups(settings): + settings.CHANNEL_LAYERS = { + "default": { + "BACKEND": "channels.layers.InMemoryChannelLayer", + "TEST_CONFIG": { + "expiry": 100500, + }, + }, + } + + layer = channel_layers.make_test_backend(DEFAULT_CHANNEL_LAYER) + + class TestConsumerObserverCustomGroups(AsyncAPIConsumer): + @action() + async def subscribe(self, username, request_id, **kwargs): + await self.user_change_custom_groups.subscribe( + username=username, request_id=request_id + ) + + @action() + async def unsubscribe(self, username, request_id, **kwargs): + await self.user_change_custom_groups.unsubscribe( + username=username, request_id=request_id + ) + + @model_observer(get_user_model()) + async def user_change_custom_groups( + self, + message, + action, + message_type, + observer=None, + subscribing_request_ids=None, + **kwargs + ): + await self.send_json( + dict( + body=message, + action=action, + type=message_type, + subscribing_request_ids=subscribing_request_ids, + ) + ) + + @user_change_custom_groups.groups_for_signal + def user_change_custom_groups(self, instance=None, **kwargs): + yield "-instance-username-{}".format(instance.username) + + @user_change_custom_groups.groups_for_consumer + def user_change_custom_groups(self, username=None, **kwargs): + yield "-instance-username-{}".format(slugify(username)) + + async with connected_communicator(TestConsumerObserverCustomGroups()) as communicator: + + user = await database_sync_to_async(get_user_model().objects.create)( + username="thenewname", email="test@example.com" + ) + + assert await communicator.receive_nothing(timeout=0.5) + + await database_sync_to_async(user.delete)() + + assert await communicator.receive_nothing(timeout=0.5) + + await communicator.send_json_to( + { + "action": "subscribe", + "username": "thenewname", + "request_id": 5, + } + ) + + await asyncio.sleep(0.5) + + user = await database_sync_to_async(get_user_model().objects.create)( + username="thenewname", email="test@example.com" + ) + + response = await communicator.receive_json_from() + + assert { + "action": "create", + "body": {"pk": user.pk}, + "type": "user.change.custom.groups", + "subscribing_request_ids": [5], + } == response + + await communicator.send_json_to( + { + "action": "unsubscribe", + "username": "thenewname", + "request_id": 5, + } + ) + + await communicator.send_json_to( + { + "action": "subscribe", + "username": "thenewname2", + "request_id": 6, + } + ) + + await asyncio.sleep(0.5) + + await database_sync_to_async(user.delete)() + + user = await database_sync_to_async(get_user_model().objects.create)( + username="thenewname", email="test@example.com" + ) + + assert await communicator.receive_nothing(timeout=0.5) + + user = await database_sync_to_async(get_user_model().objects.create)( + username="thenewname2", email="test2@example.com" + ) + + response = await communicator.receive_json_from() + + assert { + "action": "create", + "body": {"pk": user.pk}, + "type": "user.change.custom.groups", + "subscribing_request_ids": [6], + } == response From e328001386d74d552ec13cb7bf1dc9a963785046 Mon Sep 17 00:00:00 2001 From: Tumblingman <73558578+tumblingman@users.noreply.github.com> Date: Thu, 12 Dec 2024 09:53:16 +0400 Subject: [PATCH 07/13] Increase asyncio sleep timeout in test to improve reliability on slower machines --- tests/test_model_with_custom_pk_observer.py | 2 +- tests/test_observer.py | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/test_model_with_custom_pk_observer.py b/tests/test_model_with_custom_pk_observer.py index d8102ac..f3c3306 100644 --- a/tests/test_model_with_custom_pk_observer.py +++ b/tests/test_model_with_custom_pk_observer.py @@ -61,7 +61,7 @@ async def subscribe_to_all_changes(self, request_id, **kwargs): {"action": "subscribe_to_all_changes", "request_id": subscription_id} ) - await asyncio.sleep(0.5) + await asyncio.sleep(2) # create an instance created_instance = await database_sync_to_async( diff --git a/tests/test_observer.py b/tests/test_observer.py index 95972af..189b31b 100644 --- a/tests/test_observer.py +++ b/tests/test_observer.py @@ -568,7 +568,7 @@ def user_change_custom_groups(self, username=None, **kwargs): } ) - await asyncio.sleep(0.5) + await asyncio.sleep(2) user = await database_sync_to_async(get_user_model().objects.create)( username="thenewname", email="test@example.com" @@ -658,7 +658,7 @@ def user_change_custom_groups(self, username=None, **kwargs): } ) - await asyncio.sleep(0.5) + await asyncio.sleep(2) user = await database_sync_to_async(get_user_model().objects.create)( username="thenewname", email="test@example.com" @@ -689,7 +689,7 @@ def user_change_custom_groups(self, username=None, **kwargs): } ) - await asyncio.sleep(0.5) + await asyncio.sleep(2) await database_sync_to_async(user.delete)() @@ -697,7 +697,7 @@ def user_change_custom_groups(self, username=None, **kwargs): username="thenewname", email="test@example.com" ) - assert await communicator.receive_nothing(timeout=0.5) + assert await communicator.receive_nothing() user = await database_sync_to_async(get_user_model().objects.create)( username="thenewname2", email="test2@example.com" From 8168e7aa85081938ac24d25e1e35bc9b6e5cad64 Mon Sep 17 00:00:00 2001 From: Tumblingman <73558578+tumblingman@users.noreply.github.com> Date: Fri, 13 Dec 2024 21:24:23 +0400 Subject: [PATCH 08/13] Add subscription confirmation responses in test observers Ensure test consumers send JSON responses confirming subscription and unsubscription actions. Update tests to validate the new response payloads for improved accuracy and clarity. Fix timeout error on slow machines by explicitly waiting for subscription and unsubscription confirmation before proceeding with further test execution. --- tests/test_model_with_custom_pk_observer.py | 14 ++++++- tests/test_observer.py | 46 +++++++++++++++++++-- 2 files changed, 56 insertions(+), 4 deletions(-) diff --git a/tests/test_model_with_custom_pk_observer.py b/tests/test_model_with_custom_pk_observer.py index f3c3306..59d7117 100644 --- a/tests/test_model_with_custom_pk_observer.py +++ b/tests/test_model_with_custom_pk_observer.py @@ -1,4 +1,5 @@ import asyncio +from http.client import responses from typing import Dict import pytest @@ -51,6 +52,12 @@ def model_change( @action() async def subscribe_to_all_changes(self, request_id, **kwargs): await self.model_change.subscribe(request_id=request_id) + await self.send_json( + dict( + request_id=request_id, + action="subscribed_to_all_changes", + ) + ) # connect async with connected_communicator(TestConsumer()) as communicator: @@ -61,7 +68,12 @@ async def subscribe_to_all_changes(self, request_id, **kwargs): {"action": "subscribe_to_all_changes", "request_id": subscription_id} ) - await asyncio.sleep(2) + response = await communicator.receive_json_from() + + assert response == { + "action": "subscribed_to_all_changes", + "request_id": subscription_id, + } # create an instance created_instance = await database_sync_to_async( diff --git a/tests/test_observer.py b/tests/test_observer.py index 189b31b..e7769b0 100644 --- a/tests/test_observer.py +++ b/tests/test_observer.py @@ -530,6 +530,12 @@ async def subscribe(self, username, request_id, **kwargs): await self.user_change_custom_groups.subscribe( username=username, request_id=request_id ) + await self.send_json( + dict( + request_id=request_id, + action="subscribed", + ) + ) @model_observer(get_user_model()) async def user_change_custom_groups( @@ -568,7 +574,12 @@ def user_change_custom_groups(self, username=None, **kwargs): } ) - await asyncio.sleep(2) + response = await communicator.receive_json_from() + + assert response == { + "action": "subscribed", + "request_id": 5, + } user = await database_sync_to_async(get_user_model().objects.create)( username="thenewname", email="test@example.com" @@ -604,12 +615,24 @@ async def subscribe(self, username, request_id, **kwargs): await self.user_change_custom_groups.subscribe( username=username, request_id=request_id ) + await self.send_json( + dict( + request_id=request_id, + action="subscribed", + ) + ) @action() async def unsubscribe(self, username, request_id, **kwargs): await self.user_change_custom_groups.unsubscribe( username=username, request_id=request_id ) + await self.send_json( + dict( + request_id=request_id, + action="unsubscribed", + ) + ) @model_observer(get_user_model()) async def user_change_custom_groups( @@ -658,7 +681,12 @@ def user_change_custom_groups(self, username=None, **kwargs): } ) - await asyncio.sleep(2) + response = await communicator.receive_json_from() + + assert response == { + "action": "subscribed", + "request_id": 5, + } user = await database_sync_to_async(get_user_model().objects.create)( username="thenewname", email="test@example.com" @@ -681,6 +709,13 @@ def user_change_custom_groups(self, username=None, **kwargs): } ) + response = await communicator.receive_json_from() + + assert response == { + "action": "unsubscribed", + "request_id": 5, + } + await communicator.send_json_to( { "action": "subscribe", @@ -689,7 +724,12 @@ def user_change_custom_groups(self, username=None, **kwargs): } ) - await asyncio.sleep(2) + response = await communicator.receive_json_from() + + assert response == { + "action": "subscribed", + "request_id": 6, + } await database_sync_to_async(user.delete)() From e39ac3c569487dc6102e8679a38a89b2b1c7e0f8 Mon Sep 17 00:00:00 2001 From: Tumblingman <73558578+tumblingman@users.noreply.github.com> Date: Fri, 13 Dec 2024 22:11:31 +0400 Subject: [PATCH 09/13] Add unique suffixes to group names in tests to prevent cross-test signal interference --- tests/test_observer.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/tests/test_observer.py b/tests/test_observer.py index e7769b0..19b3765 100644 --- a/tests/test_observer.py +++ b/tests/test_observer.py @@ -347,9 +347,9 @@ def user_change_custom_groups_wrapper( self, instance=None, username=None, **kwargs ): if username: - yield "-instance-username-{}".format(slugify(username)) + yield "-instance-username-{}-1".format(slugify(username)) else: - yield "-instance-username-{}".format(instance.username) + yield "-instance-username-{}-1".format(instance.username) async with connected_communicator(TestConsumer()) as communicator: @@ -481,11 +481,11 @@ async def user_change_custom_groups( @user_change_custom_groups.groups_for_signal def user_change_custom_groups(self, instance=None, **kwargs): - yield "-instance-username-{}".format(instance.username) + yield "-instance-username-{}-2".format(instance.username) @user_change_custom_groups.groups_for_consumer def user_change_custom_groups(self, username=None, **kwargs): - yield "-instance-username-{}".format(slugify(username)) + yield "-instance-username-{}-2".format(slugify(username)) async with connected_communicator(TestConsumerObserverCustomGroups()) as communicator: @@ -558,11 +558,11 @@ async def user_change_custom_groups( @user_change_custom_groups.groups_for_signal def user_change_custom_groups(self, instance=None, **kwargs): - yield "-instance-username-{}".format(instance.username) + yield "-instance-username-{}-3".format(instance.username) @user_change_custom_groups.groups_for_consumer def user_change_custom_groups(self, username=None, **kwargs): - yield "-instance-username-{}".format(slugify(username)) + yield "-instance-username-{}-3".format(slugify(username)) async with connected_communicator(TestConsumerObserverCustomGroups()) as communicator: @@ -655,11 +655,11 @@ async def user_change_custom_groups( @user_change_custom_groups.groups_for_signal def user_change_custom_groups(self, instance=None, **kwargs): - yield "-instance-username-{}".format(instance.username) + yield "-instance-username-{}-4".format(instance.username) @user_change_custom_groups.groups_for_consumer def user_change_custom_groups(self, username=None, **kwargs): - yield "-instance-username-{}".format(slugify(username)) + yield "-instance-username-{}-4".format(slugify(username)) async with connected_communicator(TestConsumerObserverCustomGroups()) as communicator: From 78cbf1994214c468ef90a20d414b823299a0f952 Mon Sep 17 00:00:00 2001 From: Tumblingman <73558578+tumblingman@users.noreply.github.com> Date: Fri, 13 Dec 2024 21:14:32 +0400 Subject: [PATCH 10/13] Add m2m signal handling and tests for ModelObserver Integrated support for `m2m_changed` signals in `ModelObserver` to handle many-to-many relationship changes. Updated the `database_event` method accordingly and introduced a new test to validate m2m signal handling with actions such as add, remove, and clear operations. Adjusted test data to include `groups` field for verification. --- .../observer/model_observer.py | 26 ++- tests/test_model_observer.py | 171 +++++++++++++++++- 2 files changed, 191 insertions(+), 6 deletions(-) diff --git a/djangochannelsrestframework/observer/model_observer.py b/djangochannelsrestframework/observer/model_observer.py index 44f98ad..8c5d710 100644 --- a/djangochannelsrestframework/observer/model_observer.py +++ b/djangochannelsrestframework/observer/model_observer.py @@ -10,7 +10,7 @@ from channels.layers import get_channel_layer from django.db import transaction from django.db.models import Model -from django.db.models.signals import post_delete, post_save, post_init +from django.db.models.signals import post_delete, post_save, post_init, m2m_changed from rest_framework.serializers import Serializer from djangochannelsrestframework.observer.base_observer import BaseObserver @@ -68,6 +68,13 @@ def _connect(self): self.post_save_receiver, sender=self.model_cls, dispatch_uid=id(self) ) + for field in self.model_cls._meta.many_to_many: + m2m_changed.connect( + self.m2m_changed_receiver, + sender=field.remote_field.through, + dispatch_uid=f"{id(self)}-{field.name}" + ) + post_delete.connect( self.post_delete_receiver, sender=self.model_cls, dispatch_uid=id(self) ) @@ -99,6 +106,23 @@ def post_save_receiver(self, instance: Model, created: bool, **kwargs): else: self.database_event(instance, Action.UPDATE) + def m2m_changed_receiver(self, action: str, instance: Model, reverse: bool, model: Type[Model], pk_set: Set[Any], **kwargs): + """ + Handle many-to-many changes. + """ + if action not in {"post_add", "post_remove", "post_clear"}: + return + + target_instances = [] + if not reverse: + target_instances.append(instance) + else: + for pk in pk_set: + target_instances.append(model.objects.get(pk=pk)) + + for target_instance in target_instances: + self.database_event(target_instance, Action.UPDATE) + def post_delete_receiver(self, instance: Model, **kwargs): self.database_event(instance, Action.DELETE) diff --git a/tests/test_model_observer.py b/tests/test_model_observer.py index 2f04c79..b663222 100644 --- a/tests/test_model_observer.py +++ b/tests/test_model_observer.py @@ -1,6 +1,8 @@ +import asyncio from contextlib import AsyncExitStack import pytest +from django.contrib.auth.models import Group from channels import DEFAULT_CHANNEL_LAYER from channels.db import database_sync_to_async from channels.layers import channel_layers @@ -21,6 +23,7 @@ class Meta: "id", "username", "email", + "groups", ) @@ -104,7 +107,7 @@ async def update_username(self, pk=None, username=None, **kwargs): "errors": [], "response_status": 200, "request_id": 1, - "data": {"email": "42@example.com", "id": u1.id, "username": "test1"}, + "data": {"email": "42@example.com", "id": u1.id, "username": "test1", "groups": []}, } # lookup up u1 @@ -153,7 +156,7 @@ async def update_username(self, pk=None, username=None, **kwargs): "errors": [], "response_status": 200, "request_id": 4, - "data": {"email": "42@example.com", "id": u1.id, "username": "thenewname"}, + "data": {"email": "42@example.com", "id": u1.id, "username": "thenewname", "groups": []}, } u1_pk = u1.pk @@ -348,7 +351,7 @@ async def update_username(self, pk=None, username=None, **kwargs): "errors": [], "response_status": 200, "request_id": 4, - "data": {"email": "42@example.com", "id": u1.pk, "username": "thenewname"}, + "data": {"email": "42@example.com", "id": u1.pk, "username": "thenewname", "groups": []}, } in [a, b] # unsubscribe @@ -490,7 +493,7 @@ async def update_username(self, pk=None, username=None, **kwargs): "errors": [], "response_status": 200, "request_id": 4, - "data": {"email": "42@example.com", "id": u1.id, "username": "new name"}, + "data": {"email": "42@example.com", "id": u1.id, "username": "new name", "groups": []}, } assert await communicator.receive_nothing() @@ -522,5 +525,163 @@ async def update_username(self, pk=None, username=None, **kwargs): "errors": [], "response_status": 200, "request_id": 5, - "data": {"email": "45@example.com", "id": u2.id, "username": "the new name 2"}, + "data": {"email": "45@example.com", "id": u2.id, "username": "the new name 2", "groups": []}, } + + +@pytest.mark.django_db(transaction=True) +@pytest.mark.asyncio +async def test_m2m_observer(settings): + """ + This tests + """ + + settings.CHANNEL_LAYERS = { + "default": { + "BACKEND": "channels.layers.InMemoryChannelLayer", + "TEST_CONFIG": {"expiry": 100500}, + }, + } + + layer = channel_layers.make_test_backend(DEFAULT_CHANNEL_LAYER) + + class TestConsumerMultipleSubs(ObserverModelInstanceMixin, GenericAsyncAPIConsumer): + + queryset = get_user_model().objects.all() + serializer_class = UserSerializer + + async def accept(self, subprotocol=None): + await super().accept() + + assert not await database_sync_to_async(get_user_model().objects.all().exists)() + + # Test a normal connection + async with connected_communicator(TestConsumerMultipleSubs()) as communicator: + + u1 = await database_sync_to_async(get_user_model().objects.create)( + username="test1", email="42@example.com" + ) + + u2 = await database_sync_to_async(get_user_model().objects.create)( + username="test2", email="45@example.com" + ) + + # Subscribe to instance user 1 + await communicator.send_json_to( + {"action": "subscribe_instance", "pk": u1.id, "request_id": 4} + ) + + response = await communicator.receive_json_from() + + assert response == { + "action": "subscribe_instance", + "errors": [], + "response_status": 201, + "request_id": 4, + "data": None, + } + + g1 = await database_sync_to_async(Group.objects.create)(name="group1") + g2 = await database_sync_to_async(Group.objects.create)(name="group2") + g3 = await database_sync_to_async(Group.objects.create)(name="group3") + g4 = await database_sync_to_async(Group.objects.create)(name="group4") + + await database_sync_to_async(u1.groups.add)(g1, g2) + + response = await communicator.receive_json_from() + + assert response == { + "action": "update", + "errors": [], + "response_status": 200, + "request_id": 4, + "data": { + "email": "42@example.com", + "id": u1.id, + "username": "test1", + "groups": [g1.id, g2.id] + }, + } + + await database_sync_to_async(u2.groups.add)(g4) + + await communicator.receive_nothing() + + await database_sync_to_async(g1.user_set.add)(u2) + + await communicator.receive_nothing() + + await database_sync_to_async(g3.user_set.add)(u1, u2) + + response = await communicator.receive_json_from() + + assert response == { + "action": "update", + "errors": [], + "response_status": 200, + "request_id": 4, + "data": { + "email": "42@example.com", + "id": u1.id, + "username": "test1", + "groups": [g1.id, g2.id, g3.id] + }, + } + + await database_sync_to_async(g1.user_set.remove)(u1) + + response = await communicator.receive_json_from() + + assert response == { + "action": "update", + "errors": [], + "response_status": 200, + "request_id": 4, + "data": { + "email": "42@example.com", + "id": u1.id, + "username": "test1", + "groups": [g2.id, g3.id] + }, + } + + await database_sync_to_async(u1.groups.clear)() + + response = await communicator.receive_json_from() + + assert response == { + "action": "update", + "errors": [], + "response_status": 200, + "request_id": 4, + "data": { + "email": "42@example.com", + "id": u1.id, + "username": "test1", + "groups": [] + }, + } + + await database_sync_to_async(u2.groups.clear)() + + await communicator.receive_nothing() + + await database_sync_to_async(u1.groups.set)([g1, g4]) + + response = await communicator.receive_json_from() + assert response == { + "action": "update", + "errors": [], + "response_status": 200, + "request_id": 4, + "data": { + "email": "42@example.com", + "id": u1.id, + "username": "test1", + "groups": [g1.id, g4.id] + }, + } + + await database_sync_to_async(u2.groups.set)([g1, g4]) + + await communicator.receive_nothing() From c4949c7a4ccb1cec45eee65d9e07f6bc39ce90c8 Mon Sep 17 00:00:00 2001 From: Tumblingman <73558578+tumblingman@users.noreply.github.com> Date: Fri, 13 Dec 2024 21:33:48 +0400 Subject: [PATCH 11/13] Convert dispatch_uid to string in signal connections. Using `str(id(self))` for `dispatch_uid` ensures compatibility with cases where signal identifiers are expected to be strings. This change prevents potential issues in environments that enforce strict type checks. --- djangochannelsrestframework/observer/model_observer.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/djangochannelsrestframework/observer/model_observer.py b/djangochannelsrestframework/observer/model_observer.py index 8c5d710..a4cce5d 100644 --- a/djangochannelsrestframework/observer/model_observer.py +++ b/djangochannelsrestframework/observer/model_observer.py @@ -61,11 +61,11 @@ def _connect(self): # this is used to capture the current state for the model post_init.connect( - self.post_init_receiver, sender=self.model_cls, dispatch_uid=id(self) + self.post_init_receiver, sender=self.model_cls, dispatch_uid=str(id(self)) ) post_save.connect( - self.post_save_receiver, sender=self.model_cls, dispatch_uid=id(self) + self.post_save_receiver, sender=self.model_cls, dispatch_uid=str(id(self)) ) for field in self.model_cls._meta.many_to_many: @@ -76,7 +76,7 @@ def _connect(self): ) post_delete.connect( - self.post_delete_receiver, sender=self.model_cls, dispatch_uid=id(self) + self.post_delete_receiver, sender=self.model_cls, dispatch_uid=str(id(self)) ) def post_init_receiver(self, instance: Model, **kwargs): From ba2f2d24cdb34b4b75ef74ede4238012eba0c019 Mon Sep 17 00:00:00 2001 From: Tumblingman <73558578+tumblingman@users.noreply.github.com> Date: Fri, 13 Dec 2024 21:52:36 +0400 Subject: [PATCH 12/13] Warn about limitations of many-to-many field observation. Added a warning to highlight partial support for many-to-many fields in model observation. This warns about signal issues during object deletion and potential non-deterministic behavior due to Django's use of savepoints. References a long-standing Django bug for further context. --- .../observer/model_observer.py | 22 +++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/djangochannelsrestframework/observer/model_observer.py b/djangochannelsrestframework/observer/model_observer.py index a4cce5d..bd0ff46 100644 --- a/djangochannelsrestframework/observer/model_observer.py +++ b/djangochannelsrestframework/observer/model_observer.py @@ -67,18 +67,30 @@ def _connect(self): post_save.connect( self.post_save_receiver, sender=self.model_cls, dispatch_uid=str(id(self)) ) - + have_m2m = False for field in self.model_cls._meta.many_to_many: m2m_changed.connect( self.m2m_changed_receiver, sender=field.remote_field.through, dispatch_uid=f"{id(self)}-{field.name}" ) + have_m2m = True post_delete.connect( self.post_delete_receiver, sender=self.model_cls, dispatch_uid=str(id(self)) ) + if have_m2m: + warnings.warn( + "Model observation with many-to-many fields is partially supported. " + + "If you delete a related object, the signal will not be sent. " + + "This is a Django bug that is over 10 years old: https://code.djangoproject.com/ticket/17688. " + + "Also, when working with many-to-many fields, Django uses savepoints, " + + "working with which is non-deterministic and can lead to unexpected results, " + + "as we do not support them.", + UnsupportedWarning, + ) + def post_init_receiver(self, instance: Model, **kwargs): if instance.pk is None: @@ -106,11 +118,12 @@ def post_save_receiver(self, instance: Model, created: bool, **kwargs): else: self.database_event(instance, Action.UPDATE) - def m2m_changed_receiver(self, action: str, instance: Model, reverse: bool, model: Type[Model], pk_set: Set[Any], **kwargs): + def m2m_changed_receiver(self, action: str, instance: Model, reverse: bool, model: Type[Model], pk_set: Set[Any], + **kwargs): """ Handle many-to-many changes. """ - if action not in {"post_add", "post_remove", "post_clear"}: + if action not in {"post_add", "post_remove", "post_clear"}: return target_instances = [] @@ -162,7 +175,8 @@ def prepare_messages(self, instance: Model, action: Action, **kwargs): yield from self.generate_messages(instance, old_group_names, new_group_names, action, **kwargs) - def generate_messages(self, instance: Model, old_group_names: Set[str], new_group_names: Set[str], action: Action, **kwargs): + def generate_messages(self, instance: Model, old_group_names: Set[str], new_group_names: Set[str], action: Action, + **kwargs): """ Generates messages for the given group names and action. """ From 79b08976c56de41fc84018b0526d9787bd528424 Mon Sep 17 00:00:00 2001 From: Tumblingman <73558578+tumblingman@users.noreply.github.com> Date: Tue, 24 Dec 2024 23:47:41 +0400 Subject: [PATCH 13/13] Fix and enhance many-to-many change handling in ModelObserver Added checks for `through` attribute and improved handling of `pre_clear` and `reverse` cases in many-to-many field changes. Enhanced the logic to avoid duplicates and ensure correct updates to related instances. Updated tests to validate these changes, ensuring robust many-to-many relationship observation. --- .../observer/model_observer.py | 40 +++++++++----- tests/test_model_observer.py | 52 ++++++++++++++++++- 2 files changed, 79 insertions(+), 13 deletions(-) diff --git a/djangochannelsrestframework/observer/model_observer.py b/djangochannelsrestframework/observer/model_observer.py index bd0ff46..6439a41 100644 --- a/djangochannelsrestframework/observer/model_observer.py +++ b/djangochannelsrestframework/observer/model_observer.py @@ -69,12 +69,13 @@ def _connect(self): ) have_m2m = False for field in self.model_cls._meta.many_to_many: - m2m_changed.connect( - self.m2m_changed_receiver, - sender=field.remote_field.through, - dispatch_uid=f"{id(self)}-{field.name}" - ) - have_m2m = True + if hasattr(field.remote_field, 'through'): + m2m_changed.connect( + self.m2m_changed_receiver, + sender=field.remote_field.through, + dispatch_uid=f"{str(id(self))}-{self.model_cls.__name__}-{field.name}" + ) + have_m2m = True post_delete.connect( self.post_delete_receiver, sender=self.model_cls, dispatch_uid=str(id(self)) @@ -118,21 +119,36 @@ def post_save_receiver(self, instance: Model, created: bool, **kwargs): else: self.database_event(instance, Action.UPDATE) - def m2m_changed_receiver(self, action: str, instance: Model, reverse: bool, model: Type[Model], pk_set: Set[Any], - **kwargs): + def m2m_changed_receiver(self, sender, instance: Model, action: str, reverse: bool, model: Type[Model], + pk_set: Set[Any], **kwargs): """ Handle many-to-many changes. """ - if action not in {"post_add", "post_remove", "post_clear"}: + if action not in {"post_add", "post_remove", "post_clear"} and not reverse: + return + + if action not in {"post_add", "post_remove", "pre_clear"} and reverse: return target_instances = [] if not reverse: target_instances.append(instance) else: - for pk in pk_set: - target_instances.append(model.objects.get(pk=pk)) - + if pk_set: + for pk in pk_set: + target_instances.append(model.objects.get(pk=pk)) + else: # pre_clear case + related_field = next( + (field for field in instance._meta.get_fields() + if field.many_to_many and hasattr(field, 'through') and field.through == sender), + None + ) + if related_field: + related_manager = getattr(instance, related_field.related_name or f"{related_field.name}_set", None) + if related_manager: + target_instances.extend(related_manager.all()) + + target_instances = list(set(target_instances)) # remove duplicates if any for target_instance in target_instances: self.database_event(target_instance, Action.UPDATE) diff --git a/tests/test_model_observer.py b/tests/test_model_observer.py index b663222..41875b3 100644 --- a/tests/test_model_observer.py +++ b/tests/test_model_observer.py @@ -1,4 +1,3 @@ -import asyncio from contextlib import AsyncExitStack import pytest @@ -685,3 +684,54 @@ async def accept(self, subprotocol=None): await database_sync_to_async(u2.groups.set)([g1, g4]) await communicator.receive_nothing() + + await database_sync_to_async(u1.groups.set)([g1, g2, g3, g4]) + + response = await communicator.receive_json_from() + + assert response == { + "action": "update", + "errors": [], + "response_status": 200, + "request_id": 4, + "data": { + "email": "42@example.com", + "id": u1.id, + "username": "test1", + "groups": [g1.id, g2.id, g3.id, g4.id] + }, + } + + await database_sync_to_async(g4.user_set.clear)() + + response = await communicator.receive_json_from() + + assert response == { + "action": "update", + "errors": [], + "response_status": 200, + "request_id": 4, + "data": { + "email": "42@example.com", + "id": u1.id, + "username": "test1", + "groups": [g1.id, g2.id, g3.id] + }, + } + + await database_sync_to_async(g3.user_set.remove)(u1) + + response = await communicator.receive_json_from() + + assert response == { + "action": "update", + "errors": [], + "response_status": 200, + "request_id": 4, + "data": { + "email": "42@example.com", + "id": u1.id, + "username": "test1", + "groups": [g1.id, g2.id] + }, + }