From ccad13a8102e394b3caa7c77f3ffed098a8ceafa Mon Sep 17 00:00:00 2001
From: Alan Tai <alan.tai@reddit.com>
Date: Wed, 25 Jan 2023 15:37:16 -0800
Subject: [PATCH 1/7] Added flush queue signal handler within event publisher.

---
 baseplate/sidecars/__init__.py        |  6 ++++
 baseplate/sidecars/event_publisher.py | 48 ++++++++++++++++++++-------
 2 files changed, 42 insertions(+), 12 deletions(-)

diff --git a/baseplate/sidecars/__init__.py b/baseplate/sidecars/__init__.py
index 0ff05fc4f..979b9bb55 100644
--- a/baseplate/sidecars/__init__.py
+++ b/baseplate/sidecars/__init__.py
@@ -63,6 +63,12 @@ def age(self) -> float:
         if not self.batch_start:
             return 0
         return time.time() - self.batch_start
+    
+    @property
+    def is_ready(self) -> bool:
+        if self.age >= self.max_age:
+            return True
+        return False
 
     def add(self, item: Optional[bytes]) -> None:
         if self.age >= self.max_age:
diff --git a/baseplate/sidecars/event_publisher.py b/baseplate/sidecars/event_publisher.py
index 9c3dcb2eb..5bc751002 100644
--- a/baseplate/sidecars/event_publisher.py
+++ b/baseplate/sidecars/event_publisher.py
@@ -5,7 +5,10 @@
 import hashlib
 import hmac
 import logging
+import signal
+import sys
 
+from types import FrameType
 from typing import Any
 from typing import List
 from typing import Optional
@@ -163,6 +166,14 @@ def publish(self, payload: SerializedBatch) -> None:
 
 SERIALIZER_BY_VERSION = {"2": V2Batch, "2j": V2JBatch}
 
+def publish_serialized_batch(publisher: BatchPublisher, batcher: TimeLimitedBatch) -> None:
+    """Serializes batch, publishes it using the publisher, and then resets the batch for more messages."""
+    serialized_batch = batcher.serialize()
+    try:
+        publisher.publish(serialized_batch)
+    except Exception:
+        logger.exception("Events publishing failed.")
+    batcher.reset()
 
 def publish_events() -> None:
     arg_parser = argparse.ArgumentParser()
@@ -213,7 +224,30 @@ def publish_events() -> None:
     serializer = SERIALIZER_BY_VERSION[cfg.collector.version]()
     batcher = TimeLimitedBatch(serializer, MAX_BATCH_AGE)
     publisher = BatchPublisher(metrics_client, cfg)
+    
 
+    def flush_queue_signal_handler(_signo: int, _frame: FrameType) -> None:
+        """Signal handler for flushing messages from the queue and publishing them."""
+        message: Optional[bytes]
+        logger.info("Shutdown signal received. Flushing events...")
+        
+        while True:
+            try:
+                message = event_queue.get(timeout=0.2)
+            except TimedOutError:
+                if len(batcher.serialize()) > 0:
+                    publish_serialized_batch(publisher, batcher)
+                break
+            
+            if batcher.is_ready:
+                publish_serialized_batch(publisher, batcher)
+            batcher.add(message)
+        sys.exit(0)
+
+    for sig in (signal.SIGINT, signal.SIGTERM):
+        signal.signal(sig, flush_queue_signal_handler)
+        signal.siginterrupt(sig, False)
+    
     while True:
         message: Optional[bytes]
 
@@ -222,18 +256,8 @@ def publish_events() -> None:
         except TimedOutError:
             message = None
 
-        try:
-            batcher.add(message)
-            continue
-        except BatchFull:
-            pass
-
-        serialized = batcher.serialize()
-        try:
-            publisher.publish(serialized)
-        except Exception:
-            logger.exception("Events publishing failed.")
-        batcher.reset()
+        if batcher.is_ready:
+            publish_serialized_batch(publisher, batcher)
         batcher.add(message)
 
 

From 9c7ec10ded9c4368e0a5d7bf7aba58a73a2cc79e Mon Sep 17 00:00:00 2001
From: Alan Tai <alan.tai@reddit.com>
Date: Wed, 25 Jan 2023 15:45:45 -0800
Subject: [PATCH 2/7] Linting and formatting.

---
 baseplate/sidecars/__init__.py        |  2 +-
 baseplate/sidecars/event_publisher.py | 17 +++++++++--------
 2 files changed, 10 insertions(+), 9 deletions(-)

diff --git a/baseplate/sidecars/__init__.py b/baseplate/sidecars/__init__.py
index 979b9bb55..989208fd0 100644
--- a/baseplate/sidecars/__init__.py
+++ b/baseplate/sidecars/__init__.py
@@ -63,7 +63,7 @@ def age(self) -> float:
         if not self.batch_start:
             return 0
         return time.time() - self.batch_start
-    
+
     @property
     def is_ready(self) -> bool:
         if self.age >= self.max_age:
diff --git a/baseplate/sidecars/event_publisher.py b/baseplate/sidecars/event_publisher.py
index 5bc751002..be304d0c1 100644
--- a/baseplate/sidecars/event_publisher.py
+++ b/baseplate/sidecars/event_publisher.py
@@ -166,7 +166,8 @@ def publish(self, payload: SerializedBatch) -> None:
 
 SERIALIZER_BY_VERSION = {"2": V2Batch, "2j": V2JBatch}
 
-def publish_serialized_batch(publisher: BatchPublisher, batcher: TimeLimitedBatch) -> None:
+
+def serialize_and_publish_batch(publisher: BatchPublisher, batcher: TimeLimitedBatch) -> None:
     """Serializes batch, publishes it using the publisher, and then resets the batch for more messages."""
     serialized_batch = batcher.serialize()
     try:
@@ -175,6 +176,7 @@ def publish_serialized_batch(publisher: BatchPublisher, batcher: TimeLimitedBatc
         logger.exception("Events publishing failed.")
     batcher.reset()
 
+
 def publish_events() -> None:
     arg_parser = argparse.ArgumentParser()
     arg_parser.add_argument(
@@ -224,30 +226,29 @@ def publish_events() -> None:
     serializer = SERIALIZER_BY_VERSION[cfg.collector.version]()
     batcher = TimeLimitedBatch(serializer, MAX_BATCH_AGE)
     publisher = BatchPublisher(metrics_client, cfg)
-    
 
     def flush_queue_signal_handler(_signo: int, _frame: FrameType) -> None:
         """Signal handler for flushing messages from the queue and publishing them."""
         message: Optional[bytes]
         logger.info("Shutdown signal received. Flushing events...")
-        
+
         while True:
             try:
                 message = event_queue.get(timeout=0.2)
             except TimedOutError:
                 if len(batcher.serialize()) > 0:
-                    publish_serialized_batch(publisher, batcher)
+                    serialize_and_publish_batch(publisher, batcher)
                 break
-            
+
             if batcher.is_ready:
-                publish_serialized_batch(publisher, batcher)
+                serialize_and_publish_batch(publisher, batcher)
             batcher.add(message)
         sys.exit(0)
 
     for sig in (signal.SIGINT, signal.SIGTERM):
         signal.signal(sig, flush_queue_signal_handler)
         signal.siginterrupt(sig, False)
-    
+
     while True:
         message: Optional[bytes]
 
@@ -257,7 +258,7 @@ def flush_queue_signal_handler(_signo: int, _frame: FrameType) -> None:
             message = None
 
         if batcher.is_ready:
-            publish_serialized_batch(publisher, batcher)
+            serialize_and_publish_batch(publisher, batcher)
         batcher.add(message)
 
 

From 0c71eba680d4cf6ef69af22899b6b8fb17115943 Mon Sep 17 00:00:00 2001
From: Alan Tai <alan.tai@reddit.com>
Date: Thu, 26 Jan 2023 11:50:35 -0800
Subject: [PATCH 3/7] Updating is_ready to simply return the result of the
 condition

---
 baseplate/sidecars/__init__.py | 4 +---
 1 file changed, 1 insertion(+), 3 deletions(-)

diff --git a/baseplate/sidecars/__init__.py b/baseplate/sidecars/__init__.py
index 989208fd0..d3f790383 100644
--- a/baseplate/sidecars/__init__.py
+++ b/baseplate/sidecars/__init__.py
@@ -66,9 +66,7 @@ def age(self) -> float:
 
     @property
     def is_ready(self) -> bool:
-        if self.age >= self.max_age:
-            return True
-        return False
+        return self.age >= self.max_age
 
     def add(self, item: Optional[bytes]) -> None:
         if self.age >= self.max_age:

From 6b3828c45b2a62c15497c76f955b3d84145b47df Mon Sep 17 00:00:00 2001
From: Alan Tai <alan.tai@reddit.com>
Date: Tue, 16 May 2023 16:14:31 -0700
Subject: [PATCH 4/7] Updating to use try add batcher msg instead of checking
 whether the property is true.

---
 baseplate/sidecars/event_publisher.py | 20 ++++++++++++++------
 1 file changed, 14 insertions(+), 6 deletions(-)

diff --git a/baseplate/sidecars/event_publisher.py b/baseplate/sidecars/event_publisher.py
index be304d0c1..f30710981 100644
--- a/baseplate/sidecars/event_publisher.py
+++ b/baseplate/sidecars/event_publisher.py
@@ -240,9 +240,13 @@ def flush_queue_signal_handler(_signo: int, _frame: FrameType) -> None:
                     serialize_and_publish_batch(publisher, batcher)
                 break
 
-            if batcher.is_ready:
-                serialize_and_publish_batch(publisher, batcher)
-            batcher.add(message)
+            try:
+                batcher.add(message)
+                continue
+            except BatchFull:
+                pass
+            
+            serialize_and_publish_batch(publisher, batcher)
         sys.exit(0)
 
     for sig in (signal.SIGINT, signal.SIGTERM):
@@ -257,9 +261,13 @@ def flush_queue_signal_handler(_signo: int, _frame: FrameType) -> None:
         except TimedOutError:
             message = None
 
-        if batcher.is_ready:
-            serialize_and_publish_batch(publisher, batcher)
-        batcher.add(message)
+        try:
+            batcher.add(message)
+            continue
+        except BatchFull:
+            pass
+
+        serialize_and_publish_batch(publisher, batcher)
 
 
 if __name__ == "__main__":

From 017b538124558155e99b6c0fb61bcba0c324c7a3 Mon Sep 17 00:00:00 2001
From: Alan Tai <alan.tai@reddit.com>
Date: Tue, 16 May 2023 16:15:14 -0700
Subject: [PATCH 5/7] Removing property for is_full since it is not useful
 anymore.

---
 baseplate/sidecars/__init__.py | 4 ----
 1 file changed, 4 deletions(-)

diff --git a/baseplate/sidecars/__init__.py b/baseplate/sidecars/__init__.py
index d3f790383..0ff05fc4f 100644
--- a/baseplate/sidecars/__init__.py
+++ b/baseplate/sidecars/__init__.py
@@ -64,10 +64,6 @@ def age(self) -> float:
             return 0
         return time.time() - self.batch_start
 
-    @property
-    def is_ready(self) -> bool:
-        return self.age >= self.max_age
-
     def add(self, item: Optional[bytes]) -> None:
         if self.age >= self.max_age:
             raise BatchFull

From 75b13c2cb2b32d18fe245bebcd2d616f39d65b42 Mon Sep 17 00:00:00 2001
From: Alan Tai <alan.tai@reddit.com>
Date: Tue, 16 May 2023 17:14:21 -0700
Subject: [PATCH 6/7] Adding missing batcher.add(message) after publishing
 batch, so we don't drop the message that failed to publish.

---
 baseplate/sidecars/event_publisher.py | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/baseplate/sidecars/event_publisher.py b/baseplate/sidecars/event_publisher.py
index f30710981..22b31a099 100644
--- a/baseplate/sidecars/event_publisher.py
+++ b/baseplate/sidecars/event_publisher.py
@@ -247,6 +247,7 @@ def flush_queue_signal_handler(_signo: int, _frame: FrameType) -> None:
                 pass
             
             serialize_and_publish_batch(publisher, batcher)
+            batcher.add(message)
         sys.exit(0)
 
     for sig in (signal.SIGINT, signal.SIGTERM):
@@ -268,6 +269,7 @@ def flush_queue_signal_handler(_signo: int, _frame: FrameType) -> None:
             pass
 
         serialize_and_publish_batch(publisher, batcher)
+        batcher.add(message)
 
 
 if __name__ == "__main__":

From fec5c07ff901c4ba1665106a6ab9370da1b7f6c6 Mon Sep 17 00:00:00 2001
From: Alan Tai <alan.tai@reddit.com>
Date: Tue, 16 May 2023 17:29:49 -0700
Subject: [PATCH 7/7] Lint.

---
 baseplate/sidecars/event_publisher.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/baseplate/sidecars/event_publisher.py b/baseplate/sidecars/event_publisher.py
index 22b31a099..a98974bd1 100644
--- a/baseplate/sidecars/event_publisher.py
+++ b/baseplate/sidecars/event_publisher.py
@@ -245,7 +245,7 @@ def flush_queue_signal_handler(_signo: int, _frame: FrameType) -> None:
                 continue
             except BatchFull:
                 pass
-            
+
             serialize_and_publish_batch(publisher, batcher)
             batcher.add(message)
         sys.exit(0)