From ccad13a8102e394b3caa7c77f3ffed098a8ceafa Mon Sep 17 00:00:00 2001 From: Alan Tai <alan.tai@reddit.com> Date: Wed, 25 Jan 2023 15:37:16 -0800 Subject: [PATCH 1/7] Added flush queue signal handler within event publisher. --- baseplate/sidecars/__init__.py | 6 ++++ baseplate/sidecars/event_publisher.py | 48 ++++++++++++++++++++------- 2 files changed, 42 insertions(+), 12 deletions(-) diff --git a/baseplate/sidecars/__init__.py b/baseplate/sidecars/__init__.py index 0ff05fc4f..979b9bb55 100644 --- a/baseplate/sidecars/__init__.py +++ b/baseplate/sidecars/__init__.py @@ -63,6 +63,12 @@ def age(self) -> float: if not self.batch_start: return 0 return time.time() - self.batch_start + + @property + def is_ready(self) -> bool: + if self.age >= self.max_age: + return True + return False def add(self, item: Optional[bytes]) -> None: if self.age >= self.max_age: diff --git a/baseplate/sidecars/event_publisher.py b/baseplate/sidecars/event_publisher.py index 9c3dcb2eb..5bc751002 100644 --- a/baseplate/sidecars/event_publisher.py +++ b/baseplate/sidecars/event_publisher.py @@ -5,7 +5,10 @@ import hashlib import hmac import logging +import signal +import sys +from types import FrameType from typing import Any from typing import List from typing import Optional @@ -163,6 +166,14 @@ def publish(self, payload: SerializedBatch) -> None: SERIALIZER_BY_VERSION = {"2": V2Batch, "2j": V2JBatch} +def publish_serialized_batch(publisher: BatchPublisher, batcher: TimeLimitedBatch) -> None: + """Serializes batch, publishes it using the publisher, and then resets the batch for more messages.""" + serialized_batch = batcher.serialize() + try: + publisher.publish(serialized_batch) + except Exception: + logger.exception("Events publishing failed.") + batcher.reset() def publish_events() -> None: arg_parser = argparse.ArgumentParser() @@ -213,7 +224,30 @@ def publish_events() -> None: serializer = SERIALIZER_BY_VERSION[cfg.collector.version]() batcher = TimeLimitedBatch(serializer, MAX_BATCH_AGE) publisher = BatchPublisher(metrics_client, cfg) + + def flush_queue_signal_handler(_signo: int, _frame: FrameType) -> None: + """Signal handler for flushing messages from the queue and publishing them.""" + message: Optional[bytes] + logger.info("Shutdown signal received. Flushing events...") + + while True: + try: + message = event_queue.get(timeout=0.2) + except TimedOutError: + if len(batcher.serialize()) > 0: + publish_serialized_batch(publisher, batcher) + break + + if batcher.is_ready: + publish_serialized_batch(publisher, batcher) + batcher.add(message) + sys.exit(0) + + for sig in (signal.SIGINT, signal.SIGTERM): + signal.signal(sig, flush_queue_signal_handler) + signal.siginterrupt(sig, False) + while True: message: Optional[bytes] @@ -222,18 +256,8 @@ def publish_events() -> None: except TimedOutError: message = None - try: - batcher.add(message) - continue - except BatchFull: - pass - - serialized = batcher.serialize() - try: - publisher.publish(serialized) - except Exception: - logger.exception("Events publishing failed.") - batcher.reset() + if batcher.is_ready: + publish_serialized_batch(publisher, batcher) batcher.add(message) From 9c7ec10ded9c4368e0a5d7bf7aba58a73a2cc79e Mon Sep 17 00:00:00 2001 From: Alan Tai <alan.tai@reddit.com> Date: Wed, 25 Jan 2023 15:45:45 -0800 Subject: [PATCH 2/7] Linting and formatting. --- baseplate/sidecars/__init__.py | 2 +- baseplate/sidecars/event_publisher.py | 17 +++++++++-------- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/baseplate/sidecars/__init__.py b/baseplate/sidecars/__init__.py index 979b9bb55..989208fd0 100644 --- a/baseplate/sidecars/__init__.py +++ b/baseplate/sidecars/__init__.py @@ -63,7 +63,7 @@ def age(self) -> float: if not self.batch_start: return 0 return time.time() - self.batch_start - + @property def is_ready(self) -> bool: if self.age >= self.max_age: diff --git a/baseplate/sidecars/event_publisher.py b/baseplate/sidecars/event_publisher.py index 5bc751002..be304d0c1 100644 --- a/baseplate/sidecars/event_publisher.py +++ b/baseplate/sidecars/event_publisher.py @@ -166,7 +166,8 @@ def publish(self, payload: SerializedBatch) -> None: SERIALIZER_BY_VERSION = {"2": V2Batch, "2j": V2JBatch} -def publish_serialized_batch(publisher: BatchPublisher, batcher: TimeLimitedBatch) -> None: + +def serialize_and_publish_batch(publisher: BatchPublisher, batcher: TimeLimitedBatch) -> None: """Serializes batch, publishes it using the publisher, and then resets the batch for more messages.""" serialized_batch = batcher.serialize() try: @@ -175,6 +176,7 @@ def publish_serialized_batch(publisher: BatchPublisher, batcher: TimeLimitedBatc logger.exception("Events publishing failed.") batcher.reset() + def publish_events() -> None: arg_parser = argparse.ArgumentParser() arg_parser.add_argument( @@ -224,30 +226,29 @@ def publish_events() -> None: serializer = SERIALIZER_BY_VERSION[cfg.collector.version]() batcher = TimeLimitedBatch(serializer, MAX_BATCH_AGE) publisher = BatchPublisher(metrics_client, cfg) - def flush_queue_signal_handler(_signo: int, _frame: FrameType) -> None: """Signal handler for flushing messages from the queue and publishing them.""" message: Optional[bytes] logger.info("Shutdown signal received. Flushing events...") - + while True: try: message = event_queue.get(timeout=0.2) except TimedOutError: if len(batcher.serialize()) > 0: - publish_serialized_batch(publisher, batcher) + serialize_and_publish_batch(publisher, batcher) break - + if batcher.is_ready: - publish_serialized_batch(publisher, batcher) + serialize_and_publish_batch(publisher, batcher) batcher.add(message) sys.exit(0) for sig in (signal.SIGINT, signal.SIGTERM): signal.signal(sig, flush_queue_signal_handler) signal.siginterrupt(sig, False) - + while True: message: Optional[bytes] @@ -257,7 +258,7 @@ def flush_queue_signal_handler(_signo: int, _frame: FrameType) -> None: message = None if batcher.is_ready: - publish_serialized_batch(publisher, batcher) + serialize_and_publish_batch(publisher, batcher) batcher.add(message) From 0c71eba680d4cf6ef69af22899b6b8fb17115943 Mon Sep 17 00:00:00 2001 From: Alan Tai <alan.tai@reddit.com> Date: Thu, 26 Jan 2023 11:50:35 -0800 Subject: [PATCH 3/7] Updating is_ready to simply return the result of the condition --- baseplate/sidecars/__init__.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/baseplate/sidecars/__init__.py b/baseplate/sidecars/__init__.py index 989208fd0..d3f790383 100644 --- a/baseplate/sidecars/__init__.py +++ b/baseplate/sidecars/__init__.py @@ -66,9 +66,7 @@ def age(self) -> float: @property def is_ready(self) -> bool: - if self.age >= self.max_age: - return True - return False + return self.age >= self.max_age def add(self, item: Optional[bytes]) -> None: if self.age >= self.max_age: From 6b3828c45b2a62c15497c76f955b3d84145b47df Mon Sep 17 00:00:00 2001 From: Alan Tai <alan.tai@reddit.com> Date: Tue, 16 May 2023 16:14:31 -0700 Subject: [PATCH 4/7] Updating to use try add batcher msg instead of checking whether the property is true. --- baseplate/sidecars/event_publisher.py | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/baseplate/sidecars/event_publisher.py b/baseplate/sidecars/event_publisher.py index be304d0c1..f30710981 100644 --- a/baseplate/sidecars/event_publisher.py +++ b/baseplate/sidecars/event_publisher.py @@ -240,9 +240,13 @@ def flush_queue_signal_handler(_signo: int, _frame: FrameType) -> None: serialize_and_publish_batch(publisher, batcher) break - if batcher.is_ready: - serialize_and_publish_batch(publisher, batcher) - batcher.add(message) + try: + batcher.add(message) + continue + except BatchFull: + pass + + serialize_and_publish_batch(publisher, batcher) sys.exit(0) for sig in (signal.SIGINT, signal.SIGTERM): @@ -257,9 +261,13 @@ def flush_queue_signal_handler(_signo: int, _frame: FrameType) -> None: except TimedOutError: message = None - if batcher.is_ready: - serialize_and_publish_batch(publisher, batcher) - batcher.add(message) + try: + batcher.add(message) + continue + except BatchFull: + pass + + serialize_and_publish_batch(publisher, batcher) if __name__ == "__main__": From 017b538124558155e99b6c0fb61bcba0c324c7a3 Mon Sep 17 00:00:00 2001 From: Alan Tai <alan.tai@reddit.com> Date: Tue, 16 May 2023 16:15:14 -0700 Subject: [PATCH 5/7] Removing property for is_full since it is not useful anymore. --- baseplate/sidecars/__init__.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/baseplate/sidecars/__init__.py b/baseplate/sidecars/__init__.py index d3f790383..0ff05fc4f 100644 --- a/baseplate/sidecars/__init__.py +++ b/baseplate/sidecars/__init__.py @@ -64,10 +64,6 @@ def age(self) -> float: return 0 return time.time() - self.batch_start - @property - def is_ready(self) -> bool: - return self.age >= self.max_age - def add(self, item: Optional[bytes]) -> None: if self.age >= self.max_age: raise BatchFull From 75b13c2cb2b32d18fe245bebcd2d616f39d65b42 Mon Sep 17 00:00:00 2001 From: Alan Tai <alan.tai@reddit.com> Date: Tue, 16 May 2023 17:14:21 -0700 Subject: [PATCH 6/7] Adding missing batcher.add(message) after publishing batch, so we don't drop the message that failed to publish. --- baseplate/sidecars/event_publisher.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/baseplate/sidecars/event_publisher.py b/baseplate/sidecars/event_publisher.py index f30710981..22b31a099 100644 --- a/baseplate/sidecars/event_publisher.py +++ b/baseplate/sidecars/event_publisher.py @@ -247,6 +247,7 @@ def flush_queue_signal_handler(_signo: int, _frame: FrameType) -> None: pass serialize_and_publish_batch(publisher, batcher) + batcher.add(message) sys.exit(0) for sig in (signal.SIGINT, signal.SIGTERM): @@ -268,6 +269,7 @@ def flush_queue_signal_handler(_signo: int, _frame: FrameType) -> None: pass serialize_and_publish_batch(publisher, batcher) + batcher.add(message) if __name__ == "__main__": From fec5c07ff901c4ba1665106a6ab9370da1b7f6c6 Mon Sep 17 00:00:00 2001 From: Alan Tai <alan.tai@reddit.com> Date: Tue, 16 May 2023 17:29:49 -0700 Subject: [PATCH 7/7] Lint. --- baseplate/sidecars/event_publisher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/baseplate/sidecars/event_publisher.py b/baseplate/sidecars/event_publisher.py index 22b31a099..a98974bd1 100644 --- a/baseplate/sidecars/event_publisher.py +++ b/baseplate/sidecars/event_publisher.py @@ -245,7 +245,7 @@ def flush_queue_signal_handler(_signo: int, _frame: FrameType) -> None: continue except BatchFull: pass - + serialize_and_publish_batch(publisher, batcher) batcher.add(message) sys.exit(0)