Skip to content

Commit

Permalink
fix(dlq): Make InvalidMessage pickleable
Browse files Browse the repository at this point in the history
We saw a crash in the ingest spans consumer because InvalidMessage is unpickleable.
Make it pickleable to avoid the crash.
  • Loading branch information
nikhars committed Sep 13, 2023
1 parent 55e6f79 commit bc5a60e
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 0 deletions.
3 changes: 3 additions & 0 deletions arroyo/dlq.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ def __eq__(self, other: Any) -> bool:
and self.needs_commit == other.needs_commit
)

def __reduce__(self) -> Tuple[Any, Tuple[Any, ...]]:
return self.__class__, (self.partition, self.offset, self.needs_commit)


@dataclass(frozen=True)
class DlqLimit:
Expand Down
8 changes: 8 additions & 0 deletions tests/test_dlq.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import pickle
from datetime import datetime
from typing import Generator
from unittest.mock import ANY
Expand Down Expand Up @@ -109,3 +110,10 @@ def test_dlq_policy_wrapper() -> None:
)
wrapper.produce(message)
wrapper.flush({partition: 11})


def test_invalid_message_pickleable() -> None:
exc = InvalidMessage(Partition(Topic("test_topic"), 0), 2)
pickled_exc = pickle.dumps(exc)
unpickled_exc = pickle.loads(pickled_exc)
assert exc == unpickled_exc

0 comments on commit bc5a60e

Please sign in to comment.