From bc5a60e6db454cd51561ca1ea299908e5a893ab2 Mon Sep 17 00:00:00 2001 From: Nikhar Saxena Date: Tue, 12 Sep 2023 23:50:10 -0700 Subject: [PATCH] fix(dlq): Make InvalidMessage pickleable We saw a crash in the ingest spans consumer because InvalidMessage is unpickleable. Make it pickleable to avoid the crash. --- arroyo/dlq.py | 3 +++ tests/test_dlq.py | 8 ++++++++ 2 files changed, 11 insertions(+) diff --git a/arroyo/dlq.py b/arroyo/dlq.py index 7519934d..d8dcd4ec 100644 --- a/arroyo/dlq.py +++ b/arroyo/dlq.py @@ -65,6 +65,9 @@ def __eq__(self, other: Any) -> bool: and self.needs_commit == other.needs_commit ) + def __reduce__(self) -> Tuple[Any, Tuple[Any, ...]]: + return self.__class__, (self.partition, self.offset, self.needs_commit) + @dataclass(frozen=True) class DlqLimit: diff --git a/tests/test_dlq.py b/tests/test_dlq.py index 6d6f933b..1b024521 100644 --- a/tests/test_dlq.py +++ b/tests/test_dlq.py @@ -1,3 +1,4 @@ +import pickle from datetime import datetime from typing import Generator from unittest.mock import ANY @@ -109,3 +110,10 @@ def test_dlq_policy_wrapper() -> None: ) wrapper.produce(message) wrapper.flush({partition: 11}) + + +def test_invalid_message_pickleable() -> None: + exc = InvalidMessage(Partition(Topic("test_topic"), 0), 2) + pickled_exc = pickle.dumps(exc) + unpickled_exc = pickle.loads(pickled_exc) + assert exc == unpickled_exc