From a73399c85b6cf504ca8e6f913e2d340bc0fad45a Mon Sep 17 00:00:00 2001 From: Chaskin Saroff Date: Thu, 29 Sep 2022 19:02:56 -0700 Subject: [PATCH 1/3] Add support for arbitrary deserializers --- rele/subscription.py | 36 ++++++++++++++++++++++++++++++++---- tests/test_subscription.py | 15 +++++++++++++++ 2 files changed, 47 insertions(+), 4 deletions(-) diff --git a/rele/subscription.py b/rele/subscription.py index 078ae78..2e5ca54 100644 --- a/rele/subscription.py +++ b/rele/subscription.py @@ -35,7 +35,14 @@ def callback_func(self, data, **kwargs): """ def __init__( - self, func, topic, prefix="", suffix="", filter_by=None, backend_filter_by=None + self, + func, + topic, + prefix="", + suffix="", + filter_by=None, + backend_filter_by=None, + deserialize=lambda x: json.loads(x.data.decode("utf-8")), ): self._func = func self.topic = topic @@ -43,6 +50,7 @@ def __init__( self._suffix = suffix self._filters = self._init_filters(filter_by) self.backend_filter_by = backend_filter_by + self._deserialize = self._init_deserialize(deserialize) def _init_filters(self, filter_by): if filter_by and not ( @@ -61,6 +69,11 @@ def _init_filters(self, filter_by): return None + def _init_deserialize(self, deserialize): + if not callable(deserialize): + raise ValueError("deserialize must be a callable or None.") + return deserialize + @property def name(self): name_parts = [self._prefix, self.topic, self._suffix] @@ -109,8 +122,8 @@ def __call__(self, message): start_time = time.time() try: - data = json.loads(message.data.decode("utf-8")) - except json.JSONDecodeError as e: + data = self._subscription._deserialize(message) + except Exception as e: message.ack() run_middleware_hook( "post_process_message_failure", @@ -145,7 +158,14 @@ def __call__(self, message): run_middleware_hook("post_process_message") -def sub(topic, prefix=None, suffix=None, filter_by=None, backend_filter_by=None): +def sub( + topic, + prefix=None, + suffix=None, + filter_by=None, + backend_filter_by=None, + deserialize=lambda x: json.loads(x.data.decode("utf-8")), +): """Decorator function that makes declaring a PubSub Subscription simple. The Subscriber returned will automatically create and name @@ -181,6 +201,11 @@ def purpose_2(data, **kwargs): def sub_process_landscape_photos(data, **kwargs): pass + @sub(topic='string-messages', deserialize=lambda x: x.data.decode('utf-8')) + def sub_process_non_json(data, **kwargs): + pass + + :param topic: string The topic that is being subscribed to. :param prefix: string An optional prefix to the subscription name. Useful to namespace your subscription with your project name @@ -190,6 +215,8 @@ def sub_process_landscape_photos(data, **kwargs): :param filter_by: Union[function, list] An optional function or tuple of functions that filters the messages to be processed by the sub regarding their attributes. + :param deserialize: function An optional deserialization function that + replaces the default json deserialization. :return: :class:`~rele.subscription.Subscription` """ @@ -214,6 +241,7 @@ def decorator(func): suffix=suffix, filter_by=filter_by, backend_filter_by=backend_filter_by, + deserialize=deserialize, ) return decorator diff --git a/tests/test_subscription.py b/tests/test_subscription.py index 6b1f800..a2aca59 100644 --- a/tests/test_subscription.py +++ b/tests/test_subscription.py @@ -51,6 +51,15 @@ def sub_process_landscape_gif_photos(data, **kwargs): return f'Received a {kwargs.get("format")} photo of type {kwargs.get("type")}' +def deserialize_handler(message): + return message.data.decode("utf-8") + + +@sub(topic="string-data", deserialize=deserialize_handler) +def sub_string_deserializer(data, **kwargs): + return data + + class TestSubscription: def test_subs_return_subscription_objects(self): assert isinstance(sub_stub, Subscription) @@ -294,6 +303,12 @@ def crashy_sub_stub(data, **kwargs): } assert failed_log.subscription_message == str(message_wrapper) + def test_deserialize_handler(self, message_wrapper_invalid_json): + callback = Callback(sub_string_deserializer) + res = callback(message_wrapper_invalid_json) + + assert res == "foobar" + def test_log_acks_called_message_when_not_json_serializable( self, caplog, message_wrapper_invalid_json, published_at ): From 3f78bbc93454d51165a273b8966cfc5d402aa3de Mon Sep 17 00:00:00 2001 From: Chaskin Saroff Date: Fri, 30 Sep 2022 00:09:38 -0700 Subject: [PATCH 2/3] Change serialize to serializer --- rele/subscription.py | 22 +++++++++++----------- tests/test_subscription.py | 6 +++--- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/rele/subscription.py b/rele/subscription.py index 2e5ca54..ab3e771 100644 --- a/rele/subscription.py +++ b/rele/subscription.py @@ -42,7 +42,7 @@ def __init__( suffix="", filter_by=None, backend_filter_by=None, - deserialize=lambda x: json.loads(x.data.decode("utf-8")), + deserializer=lambda x: json.loads(x.data.decode("utf-8")), ): self._func = func self.topic = topic @@ -50,7 +50,7 @@ def __init__( self._suffix = suffix self._filters = self._init_filters(filter_by) self.backend_filter_by = backend_filter_by - self._deserialize = self._init_deserialize(deserialize) + self._deserializer = self._init_deserializer(deserializer) def _init_filters(self, filter_by): if filter_by and not ( @@ -69,10 +69,10 @@ def _init_filters(self, filter_by): return None - def _init_deserialize(self, deserialize): - if not callable(deserialize): - raise ValueError("deserialize must be a callable or None.") - return deserialize + def _init_deserializer(self, deserializer): + if not callable(deserializer): + raise ValueError("deserializer must be a callable or None.") + return deserializer @property def name(self): @@ -122,7 +122,7 @@ def __call__(self, message): start_time = time.time() try: - data = self._subscription._deserialize(message) + data = self._subscription._deserializer(message) except Exception as e: message.ack() run_middleware_hook( @@ -164,7 +164,7 @@ def sub( suffix=None, filter_by=None, backend_filter_by=None, - deserialize=lambda x: json.loads(x.data.decode("utf-8")), + deserializer=lambda x: json.loads(x.data.decode("utf-8")), ): """Decorator function that makes declaring a PubSub Subscription simple. @@ -201,7 +201,7 @@ def purpose_2(data, **kwargs): def sub_process_landscape_photos(data, **kwargs): pass - @sub(topic='string-messages', deserialize=lambda x: x.data.decode('utf-8')) + @sub(topic='string-messages', deserializer=lambda x: x.data.decode('utf-8')) def sub_process_non_json(data, **kwargs): pass @@ -215,7 +215,7 @@ def sub_process_non_json(data, **kwargs): :param filter_by: Union[function, list] An optional function or tuple of functions that filters the messages to be processed by the sub regarding their attributes. - :param deserialize: function An optional deserialization function that + :param deserializer: function An optional deserialization function that replaces the default json deserialization. :return: :class:`~rele.subscription.Subscription` """ @@ -241,7 +241,7 @@ def decorator(func): suffix=suffix, filter_by=filter_by, backend_filter_by=backend_filter_by, - deserialize=deserialize, + deserializer=deserializer, ) return decorator diff --git a/tests/test_subscription.py b/tests/test_subscription.py index a2aca59..0e723ef 100644 --- a/tests/test_subscription.py +++ b/tests/test_subscription.py @@ -51,11 +51,11 @@ def sub_process_landscape_gif_photos(data, **kwargs): return f'Received a {kwargs.get("format")} photo of type {kwargs.get("type")}' -def deserialize_handler(message): +def deserializer_handler(message): return message.data.decode("utf-8") -@sub(topic="string-data", deserialize=deserialize_handler) +@sub(topic="string-data", deserializer=deserializer_handler) def sub_string_deserializer(data, **kwargs): return data @@ -303,7 +303,7 @@ def crashy_sub_stub(data, **kwargs): } assert failed_log.subscription_message == str(message_wrapper) - def test_deserialize_handler(self, message_wrapper_invalid_json): + def test_deserializer_handler(self, message_wrapper_invalid_json): callback = Callback(sub_string_deserializer) res = callback(message_wrapper_invalid_json) From f62b7c07f138d3a0d9ea57af6f66b5e43906f1db Mon Sep 17 00:00:00 2001 From: Chaskin Saroff Date: Wed, 19 Apr 2023 14:21:31 -0700 Subject: [PATCH 3/3] Convert lambda to regular function --- rele/subscription.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/rele/subscription.py b/rele/subscription.py index ab3e771..ad6e5b4 100644 --- a/rele/subscription.py +++ b/rele/subscription.py @@ -8,6 +8,8 @@ logger = logging.getLogger(__name__) +def json_deserializer(message): + return json.loads(message.data.decode("utf-8")) class Subscription: """The Subscription class @@ -42,7 +44,7 @@ def __init__( suffix="", filter_by=None, backend_filter_by=None, - deserializer=lambda x: json.loads(x.data.decode("utf-8")), + deserializer=json_deserializer, ): self._func = func self.topic = topic @@ -164,7 +166,7 @@ def sub( suffix=None, filter_by=None, backend_filter_by=None, - deserializer=lambda x: json.loads(x.data.decode("utf-8")), + deserializer=json_deserializer, ): """Decorator function that makes declaring a PubSub Subscription simple.