For channels 2, see here
The Django-channels-jsonrpc is aimed to enable JSON-RPC functionnality on top of the excellent django channels project and especially their Websockets functionality. It is aimed to be:
- Fully integrated with Channels
- Fully implement JSON-RPC 1 and 2 protocol
- Support both WebSocket and HTTP transports
- Easy integration
The only Django-channels-jsonrpc dependency is the Django channels project
Download and extract the latest pre-built release.
Install the dependencies and devDependencies and start the server.
$ pip install django-channels-jsonrpc
See complete example here, and in particular consumer.py
It is intended to be used as a Websocket consumer. See documentation except... simplier...
Import JsonRpcConsumer class and create the consumer
from channels_jsonrpc import JsonRpcConsumer
class MyJsonRpcConsumer(JsonRpcConsumer):
def connect(self, message, **kwargs):
"""
Perform things on WebSocket connection start
"""
self.message.reply_channel.send({"accept": True})
print("connect")
# Do stuff if needed
def disconnect(self, message, **kwargs):
"""
Perform things on WebSocket connection close
"""
print("disconnect")
# Do stuff if needed
JsonRpcConsumer derives from Channels WebSocketConsumer, you can read about all it's features here: https://channels.readthedocs.io/en/stable/generics.html#websockets
Then the last step is to create the RPC methos hooks. IT is done with the decorator:
@MyJsonRpcConsumer.rpc_method()
Like this:
@MyJsonRpcConsumer.rpc_method()
def ping():
return "pong"
MyJsonRpcConsumer.rpc_method() accept a string as a parameter to 'rename' the function
@MyJsonRpcConsumer.rpc_method("mymodule.rpc.ping")
def ping():
return "pong"
Will now be callable with "method":"mymodule.rpc.ping" in the rpc call:
{"id":1, "jsonrpc":"2.0","method":"mymodule.rpc.ping","params":{}}
RPC methods can obviously accept parameters. They also return "results" or "errors":
@MyJsonRpcConsumer.rpc_method("mymodule.rpc.ping")
def ping(fake_an_error):
if fake_an_error:
# Will return an error to the client
# --> {"id":1, "jsonrpc":"2.0","method":"mymodule.rpc.ping","params":{}}
# <-- {"id": 1, "jsonrpc": "2.0", "error": {"message": "fake_error", "code": -32000, "data": ["fake_error"]}}
raise Exception("fake_error")
else:
# Will return a result to the client
# --> {"id":1, "jsonrpc":"2.0","method":"mymodule.rpc.ping","params":{}}
# <-- {"id": 1, "jsonrpc": "2.0", "result": "pong"}
return "pong"
The original channel message - that can contain sessions (if activated with http_user) and other important info can be easily accessed by retrieving the **kwargs
and get a parameter named original_message
MyJsonRpcConsumerTest.rpc_method()
def json_rpc_method(param1, **kwargs):
original_message = kwargs["orginal_message"]
##do something with original_message
Example:
class MyJsonRpcConsumerTest(JsonRpcConsumer):
# Set to True to automatically port users from HTTP cookies
# (you don't need channel_session_user, this implies it)
# https://channels.readthedocs.io/en/stable/generics.html#websockets
http_user = True
....
@MyJsonRpcConsumerTest.rpc_method()
def ping(**kwargs):
original_message = kwargs["orginal_message"]
original_message.channel_session["test"] = True
return "pong"
Those are the one sent from the client to the server.
They are dealt with the same way RPC methods are, except that instead of using rpc_method()
, you can use rpc_notification()
Thos rpc_notifications
can also retrieve the original_message
object
# Will be triggered when receiving this
# --> {"jsonrpc":"2.0","method":"notification.alt_name","params":["val_param1", "val_param2"]}
@MyJsonRpcWebsocketConsumerTest.rpc_notification("notification.alt_name")
def notification1(param1, param2, **kwargs):
original_message = kwargs["orginal_message"]
# Do something with notification
# ...
# Notification shouldn't return anything.
return
The server might want to send notifications to one or more of its clients. For that JsonRpcWebsocketConsumer
provides 2 static methods:
- JsonRpcWebsocketConsumer.notify_group(group_name, method, params)
Using channels'groups you can notify a whole group using this method
@MyJsonRpcWebsocketConsumerTest.rpc_method()
def send_to_group(group_name):
MyJsonRpcWebsocketConsumerTest.notify_group(group_name, "notification.notif", {"payload": 1234})
return True
Calling the RPC-method will send this notification to all the group group_name
- JsonRpcWebsocketConsumer.notify_channel(reply_channel, method, params)
This will notify only one channel/client.
@MyJsonRpcWebsocketConsumerTest.rpc_method()
def send_to_reply_channel(**kwargs):
original_message = kwarg["original_message"]
MyJsonRpcWebsocketConsumerTest.notify_channel(original_message.reply_channel,
"notification.ownnotif",
{"payload": 12})
return True
The reply_channel
can be found in theoriginal_message
object.
If you want to restrict rpc methods or notifications access to a specific transport method (http or websocket)
The two decorator rpc_method()
and rpc_notification()
accept parameters to restric their use. websocket
(default: True) and http
(default: True)
You can use them like this:
@MyJsonRpcWebsocketConsumerTest.rpc_notification("notification.alt_name", websocket=True, http=False)
def notification1(param1, param2, **kwargs):
original_message = kwargs["orginal_message"]
# This notification will only be used when using websocket transport
return
from django.core.serializers.json import DjangoJSONEncoder
class DjangoJsonRpcConsumer(JsonRpcConsumer):
json_encoder_class = DjangoJSONEncoder
The JsonRpcConsumer class can be tested the same way Channels Consumers are tested. See here
You just need to remember to set your JsonRpcConsumer class to TEST_MODE in the test:
from channels.tests import ChannelTestCase, HttpClient
from .consumer import MyJsonRpcConsumer
MyJsonRpcConsumer.TEST_MODE = True
class TestsJsonConsumer(ChannelTestCase):
def assertResult(self, method, params, result, error=False):
client = HttpClient()
client.send_and_consume('websocket.receive', text=request(method, params))
key = "result" if not error else "error"
message = client.receive()
if message is None or key not in message:
raise KeyError("'%s' key not in message: %s" % (key, message))
self.assertEquals(message[key], result)
def assertError(self, method, params, result):
self.assertResult(method, params, result, True)
def test_assert_result(self):
self.assertResult("ping", {}, "pong")
MIT
Have fun with Websockets!
Free Software, Hell Yeah!