Skip to content

Commit

Permalink
add support for multiple signals receivers
Browse files Browse the repository at this point in the history
  • Loading branch information
ivellios committed Sep 22, 2023
1 parent 64e51ce commit a5a3aab
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 4 deletions.
22 changes: 21 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,30 @@ def handle_profile_updated(sender, message: ProfileMessage, **kwargs):
...
```

The above task will be executed by celery worker for the `handle_profile_updated` function. This function works as any other celery task, so you can route it to specific queue, set the priority, etc.
The above task will be executed by celery worker for the `handle_profile_updated` task. This function works as any other celery task, so you can route it to specific queue, set the priority, etc.

```python
app.conf.task_routes = {
'handle_profile_updated': {'queue': 'profile-updated-queue'},
}
```

# Options

You can also pass the celery options to the task using the param in the decorator:

```python
@receiver_task(profile_updated_signal, celery_task_options={'queue': 'profile-updated-queue'})
```

The decorator also accepts all other keyword arguments as regular `django.dispatch.receiver` decorator (ie. same as [Signal.connect](https://docs.djangoproject.com/en/4.2/topics/signals/#django.dispatch.Signal.connect). For example you can set the `dispatch_uid` to avoid registering the same receiver multiple times.

```python
@receiver_task(profile_updated_signal, dispatch_uid='profile_updated')
```

As in `@receiver` decorator you can also pass a list of signals that the receiver should be connected to:

```python
@receiver_task([profile_updated_signal, profile_deleted_signal])
```
9 changes: 6 additions & 3 deletions src/celery_signals/receivers.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

from django.conf import settings
from django.core.exceptions import ImproperlyConfigured
from django.dispatch import receiver
from unified_signals import UnifiedSignal


Expand Down Expand Up @@ -34,7 +33,7 @@ def get_celery_app():


def receiver_task(
signal: UnifiedSignal, # TODO: also accept multiple signals as original Signal does
signal: typing.Union[UnifiedSignal, typing.Iterable[UnifiedSignal]],
celery_task_options: typing.Optional[typing.Dict] = None,
**options,
):
Expand All @@ -60,7 +59,11 @@ def producer(
message_data = json.dumps(message.__dict__) if message else "{}"
return consumer.delay(message_data, *_args, **_kwargs)

signal.connect(producer, **options)
if isinstance(signal, (list, tuple)):
for s in signal:
s.connect(producer, **options)
else:
signal.connect(producer, **options)

return func

Expand Down
18 changes: 18 additions & 0 deletions tests/test_receivers.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,21 @@ def test_receivers_import_with_celery_app_defined_incorrectly():
get_celery_app()

settings.EVENT_SIGNALS_CELERY_APP = OLD_EVENT_SIGNALS_CELERY_APP


@override_settings(
CELERY_TASK_ALWAYS_EAGER=True, EVENT_SIGNALS_CELERY_APP="tests.testapp.celery.app"
)
def test_handling_multiple_signals():
signal_1 = UnifiedSignal(DataMock)
signal_2 = UnifiedSignal(DataMock)

@receiver_task([signal_1, signal_2], weak=False)
def handle_signal(sender, message, **kwargs):
if sender == "aaa":
assert message.field == 10
if sender == "bbb":
assert message.field == 5

signal_1.send("aaa", DataMock(field=10))
signal_2.send("bbb", DataMock(field=5))

0 comments on commit a5a3aab

Please sign in to comment.