Skip to content

Commit

Permalink
Add health and metrics endpoints
Browse files Browse the repository at this point in the history
  • Loading branch information
ericbroda committed May 20, 2024
1 parent 5227272 commit b89df52
Show file tree
Hide file tree
Showing 5 changed files with 221 additions and 64 deletions.
6 changes: 3 additions & 3 deletions bin/dockerize.sh
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@ echo " "

prepare() {
echo "Preparing..."
cp "$PROJECT_DIR/requirements.txt" "$WORKING_DIR"
cp "$PROJECT_DIR/docker/Dockerfile" "$WORKING_DIR"
cp "$PROJECT_DIR/src/*.py" "$WORKING_DIR"
cp $PROJECT_DIR/requirements.txt "$WORKING_DIR"
cp $PROJECT_DIR/docker/Dockerfile "$WORKING_DIR"
cp $PROJECT_DIR/src/*.py "$WORKING_DIR"
}

cleanup() {
Expand Down
35 changes: 35 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
annotated-types==0.6.0
anyio==4.3.0
certifi==2024.2.2
click==8.1.7
dnspython==2.6.1
email_validator==2.1.1
fastapi==0.111.0
fastapi-cli==0.0.3
h11==0.14.0
httpcore==1.0.5
httptools==0.6.1
httpx==0.27.0
idna==3.7
Jinja2==3.1.4
markdown-it-py==3.0.0
MarkupSafe==2.1.5
mdurl==0.1.2
orjson==3.10.3
pydantic==2.7.1
pydantic_core==2.18.2
Pygments==2.18.0
python-dotenv==1.0.1
python-multipart==0.0.9
PyYAML==6.0.1
rich==13.7.1
shellingham==1.5.4
sniffio==1.3.1
starlette==0.37.2
typer==0.12.3
typing_extensions==4.11.0
ujson==5.9.0
uvicorn==0.29.0
uvloop==0.19.0
watchfiles==0.21.0
websockets==12.0
108 changes: 87 additions & 21 deletions src/middleware.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,33 @@
import base64
import logging

from fastapi import Depends, Request, Response
from fastapi import Request
import base64
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import JSONResponse, PlainTextResponse, StreamingResponse
from starlette.responses import StreamingResponse
from starlette.types import Send
from starlette.datastructures import MutableHeaders
import uuid

import state

STATE_TRACEID = "state-traceid"
STATE_METRICS = "state-metrics"

def safe_decode(data):
try:
return data.decode('utf-8')
except UnicodeDecodeError:
return base64.b64encode(data).decode('utf-8')
HEADER_USERNAME = "OSC-DM-Username"
HEADER_CORRELATION_ID = "OSC-DM-Correlation-ID"
USERNAME_UNKNOWN = "unknown"

class LoggingMiddleware(BaseHTTPMiddleware):
"""
FastAPI middleware is used to add processing to
each request. This is used to perform several capabilities:
- add "TRACE" identifiers to request and responses that
link requests to their responses
- track basic statistics about service usage for
URLs and username (HEADER_USERNAME in header)
- create correlation id that can allow messages
to be tracked end-to-end (assuming each communication
participate propagates key headers)
"""
async def dispatch(self, request: Request, call_next):
logger = logging.getLogger(__name__)

Expand All @@ -26,53 +38,101 @@ async def dispatch(self, request: Request, call_next):
except Exception:
try:
body = await request.body()
body = safe_decode(body)
body = _safe_decode(body)
except Exception as e:
body = f"Failed to read body: {str(e)}"

# Get a trace identifier to correlate requests and responses
# Get the correlation id, and add it if it does not exist
correlation_id = request.headers.get(HEADER_CORRELATION_ID)
if correlation_id is None:
logger.warning(f"Missing header:{HEADER_CORRELATION_ID} url:{str(request.url)} headers:{request.headers} ")
correlation_id = str(uuid.uuid4())
headers = MutableHeaders(request._headers)
headers[HEADER_CORRELATION_ID] = correlation_id
request._headers = headers
logger.warning(f"Added header:{HEADER_CORRELATION_ID}:{correlation_id} url:{str(request.url)} headers:{request.headers} ")

# Get the username, and add it if it does not exist
username = request.headers.get(HEADER_USERNAME)
if username is None:
logger.warning(f"Missing header:{HEADER_USERNAME} url:{str(request.url)} headers:{request.headers} ")
username = USERNAME_UNKNOWN
headers = MutableHeaders(request._headers)
headers[HEADER_USERNAME] = username
request._headers = headers
logger.warning(f"Added header:{HEADER_USERNAME}:{username} url:{str(request.url)} headers:{request.headers} ")

# Get a trace identifier to track requests and responses logs
trace_id = state.gstate(STATE_TRACEID)
if not trace_id:
trace_id = 0
state.gstate(STATE_TRACEID, trace_id)
trace_id = state.gstate(STATE_TRACEID)

url = str(request.url)
request_info = {
"url": str(request.url),
"url": url,
"method": request.method,
"headers": dict(request.headers),
"parameters": dict(request.query_params),
"body": body
}
logger.info(f"TRACE-{trace_id}-REQ: {request_info}")
logger.info(f"TRACE-{trace_id}:{correlation_id}-REQ:{request_info}")

response = await call_next(request)

status_code = response.status_code

# Add the correlation id and username to the response
response.headers[HEADER_CORRELATION_ID] = correlation_id
response.headers[HEADER_USERNAME] = username

# Handle username counts
metrics = state.gstate(STATE_METRICS)
if not metrics:
metrics = {}
state.gstate(STATE_METRICS, metrics)
metrics = state.gstate(STATE_METRICS)

# Update counts for username
username = request.headers.get(HEADER_USERNAME)
if not username:
username = USERNAME_UNKNOWN
if username not in metrics:
metrics[username] = {}
if url not in metrics[username]:
metrics[username][url] = {}
if status_code not in metrics[username][url]:
metrics[username][url][status_code] = 0
metrics[username][url][status_code] += 1
logger.info(f"Using metrics:{metrics}")

# Log response
response_body = ""
if isinstance(response, StreamingResponse):
original_body_iterator = response.body_iterator
logging_response = LoggingStreamingResponse(original_body_iterator, status_code=response.status_code, headers=dict(response.headers))
logging_response = _LoggingStreamingResponse(original_body_iterator, status_code=response.status_code, headers=dict(response.headers))
response_body = logging_response.body
else:
response_body = response.body.decode("utf-8") if hasattr(response, 'body') else str(response)


response_info = {
"status_code": response.status_code,
"headers": response.headers,
"body": response_body
}
logger.info(f"TRACE-{trace_id}-RSP: {response_info}")
logger.info(f"TRACE-{trace_id}:{correlation_id}-RSP:{response_info}")

state.gstate(STATE_TRACEID, trace_id + 1)

return response

import asyncio

from starlette.types import Message, Send
@staticmethod
def get_metrics():
usernames = state.gstate(STATE_METRICS)
return usernames


class LoggingStreamingResponse(StreamingResponse):
class _LoggingStreamingResponse(StreamingResponse):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.body_chunks = []
Expand All @@ -95,3 +155,9 @@ async def stream_response(self, send: Send):
@property
def body(self):
return b"".join(self.body_chunks).decode("utf-8")

def _safe_decode(data):
try:
return data.decode('utf-8')
except UnicodeDecodeError:
return base64.b64encode(data).decode('utf-8')
Loading

0 comments on commit b89df52

Please sign in to comment.