From f839d1f03267654f897c05605dc8a9e89ebb6b8a Mon Sep 17 00:00:00 2001 From: "Felix T.J. Dietrich" Date: Thu, 22 Aug 2024 23:39:21 +0200 Subject: [PATCH] Add subject sanitization and retry to publish on error (#66) --- server/webhook-ingest/app/logger.py | 2 ++ server/webhook-ingest/app/main.py | 9 ++++++--- server/webhook-ingest/app/nats_client.py | 20 +++++++++++++++++++- server/webhook-ingest/compose.yaml | 4 ++-- 4 files changed, 29 insertions(+), 6 deletions(-) diff --git a/server/webhook-ingest/app/logger.py b/server/webhook-ingest/app/logger.py index 19dde470..028a283d 100644 --- a/server/webhook-ingest/app/logger.py +++ b/server/webhook-ingest/app/logger.py @@ -28,3 +28,5 @@ def filter(self, record: logging.LogRecord) -> bool: uvicorn_logger = logging.getLogger("uvicorn.access") uvicorn_logger.addFilter(EndpointFilter(path="/health")) + +uvicorn_error = logging.getLogger("uvicorn.error") \ No newline at end of file diff --git a/server/webhook-ingest/app/main.py b/server/webhook-ingest/app/main.py index 68874657..9a34e3dd 100644 --- a/server/webhook-ingest/app/main.py +++ b/server/webhook-ingest/app/main.py @@ -59,11 +59,14 @@ async def github_webhook( repo = payload["repository"]["name"] elif "organization" in payload: org = payload["organization"]["login"] - - subject = f"github.{org}.{repo}.{event_type}" + + org_sanitized = org.replace('.', '~') + repo_sanitized = repo.replace('.', '~') + + subject = f"github.{org_sanitized}.{repo_sanitized}.{event_type}" # Publish the payload to NATS JetStream - await nats_client.js.publish(subject, body) + await nats_client.publish_with_retry(subject, body) return { "status": "ok" } diff --git a/server/webhook-ingest/app/nats_client.py b/server/webhook-ingest/app/nats_client.py index d7dabec1..a4326aeb 100644 --- a/server/webhook-ingest/app/nats_client.py +++ b/server/webhook-ingest/app/nats_client.py @@ -1,8 +1,12 @@ +import asyncio from nats.aio.client import Client as NATS from app.config import settings -from app.logger import logger +from app.logger import logger, uvicorn_error class NATSClient: + MAX_RETRIES = 10 + RETRY_BACKOFF_FACTOR = 2 + def __init__(self): self.nc = NATS() @@ -36,6 +40,20 @@ async def closed_cb(): async def publish(self, subject: str, message: bytes): ack = await self.js.publish(subject, message) logger.info(f"Published message to {subject}: {ack}") + return ack + + async def publish_with_retry(self, subject: str, message: bytes): + for attempt in range(self.MAX_RETRIES): + try: + ack = await self.publish(subject, message) + return ack # Successfully published, return the ack + except Exception as e: + uvicorn_error.error(f"NATS request failed: {e}, retrying in {wait_time} seconds... (Attempt {attempt + 1}/{self.MAX_RETRIES})") + wait_time = self.RETRY_BACKOFF_FACTOR ** attempt + await asyncio.sleep(wait_time) + + uvicorn_error.error(f"Failed to publish to {subject} after {self.MAX_RETRIES} attempts") + raise Exception(f"Failed to publish to {subject} after {self.MAX_RETRIES} attempts") async def close(self): await self.nc.close() diff --git a/server/webhook-ingest/compose.yaml b/server/webhook-ingest/compose.yaml index c60eaade..adf11d22 100644 --- a/server/webhook-ingest/compose.yaml +++ b/server/webhook-ingest/compose.yaml @@ -18,7 +18,7 @@ services: interval: 30s timeout: 10s retries: 5 - start_period: 10s + start_period: 3s logging: driver: "json-file" options: @@ -46,7 +46,7 @@ services: interval: 30s timeout: 10s retries: 5 - start_period: 10s + start_period: 3s logging: driver: "json-file" options: