Skip to content

Commit

Permalink
Add subject sanitization and retry to publish on error (#66)
Browse files Browse the repository at this point in the history
  • Loading branch information
FelixTJDietrich authored Aug 22, 2024
1 parent cbd89be commit f839d1f
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 6 deletions.
2 changes: 2 additions & 0 deletions server/webhook-ingest/app/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,5 @@ def filter(self, record: logging.LogRecord) -> bool:

uvicorn_logger = logging.getLogger("uvicorn.access")
uvicorn_logger.addFilter(EndpointFilter(path="/health"))

uvicorn_error = logging.getLogger("uvicorn.error")
9 changes: 6 additions & 3 deletions server/webhook-ingest/app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,14 @@ async def github_webhook(
repo = payload["repository"]["name"]
elif "organization" in payload:
org = payload["organization"]["login"]

subject = f"github.{org}.{repo}.{event_type}"

org_sanitized = org.replace('.', '~')
repo_sanitized = repo.replace('.', '~')

subject = f"github.{org_sanitized}.{repo_sanitized}.{event_type}"

# Publish the payload to NATS JetStream
await nats_client.js.publish(subject, body)
await nats_client.publish_with_retry(subject, body)

return { "status": "ok" }

Expand Down
20 changes: 19 additions & 1 deletion server/webhook-ingest/app/nats_client.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
import asyncio
from nats.aio.client import Client as NATS
from app.config import settings
from app.logger import logger
from app.logger import logger, uvicorn_error

class NATSClient:
MAX_RETRIES = 10
RETRY_BACKOFF_FACTOR = 2

def __init__(self):
self.nc = NATS()

Expand Down Expand Up @@ -36,6 +40,20 @@ async def closed_cb():
async def publish(self, subject: str, message: bytes):
ack = await self.js.publish(subject, message)
logger.info(f"Published message to {subject}: {ack}")
return ack

async def publish_with_retry(self, subject: str, message: bytes):
for attempt in range(self.MAX_RETRIES):
try:
ack = await self.publish(subject, message)
return ack # Successfully published, return the ack
except Exception as e:
uvicorn_error.error(f"NATS request failed: {e}, retrying in {wait_time} seconds... (Attempt {attempt + 1}/{self.MAX_RETRIES})")
wait_time = self.RETRY_BACKOFF_FACTOR ** attempt
await asyncio.sleep(wait_time)

uvicorn_error.error(f"Failed to publish to {subject} after {self.MAX_RETRIES} attempts")
raise Exception(f"Failed to publish to {subject} after {self.MAX_RETRIES} attempts")

async def close(self):
await self.nc.close()
Expand Down
4 changes: 2 additions & 2 deletions server/webhook-ingest/compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ services:
interval: 30s
timeout: 10s
retries: 5
start_period: 10s
start_period: 3s
logging:
driver: "json-file"
options:
Expand Down Expand Up @@ -46,7 +46,7 @@ services:
interval: 30s
timeout: 10s
retries: 5
start_period: 10s
start_period: 3s
logging:
driver: "json-file"
options:
Expand Down

0 comments on commit f839d1f

Please sign in to comment.