diff --git a/README.md b/README.md index df71b66..ed9d9ea 100644 --- a/README.md +++ b/README.md @@ -8,8 +8,19 @@ Check out the [CloudEvents spec](https://github.com/cloudevents/spec/blob/v1.0/s This package has no dependencies beyond the Python standard library with the base install. Optionally depends on the `avro` package for Avro encode/decode functionality. +## Features + +* Implements CloudEvents 1.0 spec. +* JSON and JSON batch encoding/decoding. +* Avro encoding/decoding. +* Simple API. + ## News +### 0.2.3 - (*2020-09-30*) + +* Added support for encoding/decoding batch events in JSON. + ### 0.2.2 - (*2020-09-29*) * First public release. @@ -30,6 +41,8 @@ Install with JSON and Avro codecs: ## Usage: +### Creating Events + Create a CloudEvent with required attributes: ```python @@ -101,6 +114,8 @@ Extension attributes can be accessed using the `attribute` method: assert event.attribute("external1") == "foo/bar" ``` +### Encoding/Decoding Events in JSON + Encode an event in JSON: ```python @@ -111,6 +126,30 @@ encoded_event = Json.encode(event) Note that blank fields won't be encoded. +Encode a batch of events in JSON: + +```python +from spce import CloudEvent, Json + +event_batch = [ + CloudEvent( + type="OximeterMeasured", + source="oximeter/123", + id="1000", + datacontenttype="application/json", + data=r'{"spo2": 99})', + ), + CloudEvent( + type="OximeterMeasured", + source="oximeter/123", + id="1001", + datacontenttype="application/json", + data=b'\x01binarydata\x02', + ), +] +encoded_batch = Json.encode(event_batch) +``` + Decode an event in JSON: ```python @@ -132,6 +171,34 @@ text = """ decoded_event = Json.decode(text) ``` +Decode a batch of events in JSON: + +```python +text = r''' + [ + { + "type":"OximeterMeasured", + "source":"oximeter/123", + "id":"1000", + "specversion":"1.0", + "datacontenttype": "application/json", + "data": "{\"spo2\": 99}" + }, + { + "type":"OximeterMeasured", + "source":"oximeter/123", + "id":"1001", + "specversion":"1.0", + "datacontenttype": "application/json", + "data_base64": "AWJpbmFyeWRhdGEC" + } + ] +''' +decoded_events = Json.decode(text) +``` + +### Encoding/Decoding Events in Avro + Encode an event in Avro: ```python diff --git a/setup.py b/setup.py index d761fef..30e51c3 100644 --- a/setup.py +++ b/setup.py @@ -19,7 +19,7 @@ setup( name='spce', - version='0.2.2', + version='0.2.3', packages=['spce'], url='https://github.com/scaleplandev/spce-python', license='Apache 2.0', diff --git a/spce/json.py b/spce/json.py index 5a151de..b6d47aa 100644 --- a/spce/json.py +++ b/spce/json.py @@ -14,6 +14,7 @@ import json from base64 import b64encode, b64decode +from typing import Union, Iterable from .cloudevents import CloudEvent @@ -25,24 +26,38 @@ class Json: _ENCODER = json.JSONEncoder() @classmethod - def encode(cls, event: CloudEvent): - kvs = [] - encoder = cls._ENCODER - for attr, value in event._attributes.items(): - if value: - kvs.append('"%s":%s' % (attr, encoder.encode(value))) - if event._data: - if event._has_binary_data: - kvs.append('"data_base64":%s' % encoder.encode(b64encode(event._data).decode())) - else: - kvs.append('"data":%s' % encoder.encode(event._data)) - return "{%s}" % ",".join(kvs) + def encode(cls, event: Union[CloudEvent, Iterable[CloudEvent]]) -> str: + if isinstance(event, Iterable): + encoded = [cls.encode(e) for e in event] + return "[%s]" % ",".join(encoded) + elif isinstance(event, CloudEvent): + kvs = [] + encoder = cls._ENCODER + for attr, value in event._attributes.items(): + if value: + kvs.append('"%s":%s' % (attr, encoder.encode(value))) + if event._data: + if event._has_binary_data: + kvs.append('"data_base64":%s' % encoder.encode(b64encode(event._data).decode())) + else: + kvs.append('"data":%s' % encoder.encode(event._data)) + return "{%s}" % ",".join(kvs) + else: + raise TypeError("JSON.encode cannot encode %s" % type(event)) @classmethod - def decode(cls, text: str) -> CloudEvent: + def decode(cls, text: str) -> Union[CloudEvent, Iterable[CloudEvent]]: d = json.loads(text) + if isinstance(d, dict): + return CloudEvent(**cls._normalize_data(d)) + elif isinstance(d, Iterable): + return [CloudEvent(**cls._normalize_data(it)) for it in d] + else: + raise TypeError("JSON.decode cannot decode %s" % type(d)) + + @classmethod + def _normalize_data(cls, d: dict) -> dict: if "data_base64" in d: d["data"] = b64decode(d["data_base64"]) del d["data_base64"] - - return CloudEvent(**d) + return d diff --git a/tests/json_test.py b/tests/json_test.py index 84089e6..fa1cc72 100644 --- a/tests/json_test.py +++ b/tests/json_test.py @@ -120,6 +120,72 @@ def test_encode_extension_attribute(self): ''' self.assertEqual(json.loads(target), json.loads(encoded)) + def test_encode_batch_0_items(self): + self.assertEqual("[]", Json.encode([])) + + def test_encode_batch_1_item(self): + event_batch = [ + CloudEvent( + type="OximeterMeasured", + source="oximeter/123", + id="1000", + datacontenttype="application/json", + data=json.dumps({"spo2": 99}), + ) + ] + encoded_batch = Json.encode(event_batch) + target = r''' + [{ + "type":"OximeterMeasured", + "source":"oximeter/123", + "id":"1000", + "specversion":"1.0", + "datacontenttype": "application/json", + "data": "{\"spo2\": 99}" + }] + ''' + self.assertEqual(json.loads(target), json.loads(encoded_batch)) + + def test_encode_batch_2_items(self): + event_batch = [ + CloudEvent( + type="OximeterMeasured", + source="oximeter/123", + id="1000", + datacontenttype="application/json", + data=json.dumps({"spo2": 99}), + ), + CloudEvent( + type="OximeterMeasured", + source="oximeter/123", + id="1001", + datacontenttype="application/json", + data=b'\x01binarydata\x02', + ), + ] + encoded_batch = Json.encode(event_batch) + target = r''' + [ + { + "type":"OximeterMeasured", + "source":"oximeter/123", + "id":"1000", + "specversion":"1.0", + "datacontenttype": "application/json", + "data": "{\"spo2\": 99}" + }, + { + "type":"OximeterMeasured", + "source":"oximeter/123", + "id":"1001", + "specversion":"1.0", + "datacontenttype": "application/json", + "data_base64": "AWJpbmFyeWRhdGEC" + } + ] + ''' + self.assertEqual(json.loads(target), json.loads(encoded_batch)) + class JsonDecoderTests(unittest.TestCase): @@ -222,3 +288,67 @@ def test_decode_extension_attribute(self): ) event = Json.decode(encoded_event) self.assertEqual(target, event) + + def test_decode_batch_0_items(self): + self.assertEqual([], Json.decode("[]")) + + def test_decode_batch_1_item(self): + encoded_batch = r''' + [{ + "type":"OximeterMeasured", + "source":"oximeter/123", + "id":"1000", + "specversion":"1.0", + "datacontenttype": "application/json", + "data": "{\"spo2\": 99}" + }] + ''' + target = [ + CloudEvent( + type="OximeterMeasured", + source="oximeter/123", + id="1000", + datacontenttype="application/json", + data=json.dumps({"spo2": 99}), + ) + ] + self.assertEqual(target, Json.decode(encoded_batch)) + + def test_decode_batch_2_items(self): + encoded_batch = r''' + [ + { + "type":"OximeterMeasured", + "source":"oximeter/123", + "id":"1000", + "specversion":"1.0", + "datacontenttype": "application/json", + "data": "{\"spo2\": 99}" + }, + { + "type":"OximeterMeasured", + "source":"oximeter/123", + "id":"1001", + "specversion":"1.0", + "datacontenttype": "application/json", + "data_base64": "AWJpbmFyeWRhdGEC" + } + ] + ''' + target = [ + CloudEvent( + type="OximeterMeasured", + source="oximeter/123", + id="1000", + datacontenttype="application/json", + data=json.dumps({"spo2": 99}), + ), + CloudEvent( + type="OximeterMeasured", + source="oximeter/123", + id="1001", + datacontenttype="application/json", + data=b'\x01binarydata\x02', + ), + ] + self.assertEqual(target, Json.decode(encoded_batch))