Skip to content

Commit

Permalink
Add new middleware trace/logging (#13)
Browse files Browse the repository at this point in the history
Signed-off-by: Eric Broda <[email protected]>
Co-authored-by: Eric Broda <[email protected]>
  • Loading branch information
ericbroda and ericbroda authored May 17, 2024
1 parent 84ac617 commit 5227272
Show file tree
Hide file tree
Showing 3 changed files with 174 additions and 12 deletions.
47 changes: 47 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@

# Temporary directories
tmp/
rawdata/

# Other stuff
*.iml

# Temporary devops repo
.devops

Expand All @@ -16,18 +24,37 @@ node_modules
credentials.env
config.toml


# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class



.idea/*
*.iml


# C extensions
*.so

# Distribution / packaging

dist/
build/
*.egg-info/
*.egg

# Compiled Python files
*.pyc

# IDE specific files
.idea/
.vscode/

# Environment and virtualenv files

.Python
.pdm-python
.pdm-build
Expand Down Expand Up @@ -136,13 +163,33 @@ celerybeat.pid
*.sage.py

# Environments

.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/


# macOS specific files
.DS_Store

# Jupyter Notebook
.ipynb_checkpoints/

# Testing
htmlcov/
.tox/

# Pytest
.pytest_cache/

# Coverage
.coverage
.coverage.*

examples/itr_ui/
itr_env/

Expand Down
97 changes: 97 additions & 0 deletions src/middleware.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import base64
import logging

from fastapi import Depends, Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import JSONResponse, PlainTextResponse, StreamingResponse

import state

STATE_TRACEID = "state-traceid"

def safe_decode(data):
try:
return data.decode('utf-8')
except UnicodeDecodeError:
return base64.b64encode(data).decode('utf-8')

class LoggingMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
logger = logging.getLogger(__name__)

body = {}
if request.method not in ["GET", "HEAD", "OPTIONS"]:
try:
body = await request.json()
except Exception:
try:
body = await request.body()
body = safe_decode(body)
except Exception as e:
body = f"Failed to read body: {str(e)}"

# Get a trace identifier to correlate requests and responses
trace_id = state.gstate(STATE_TRACEID)
if not trace_id:
trace_id = 0
state.gstate(STATE_TRACEID, trace_id)
trace_id = state.gstate(STATE_TRACEID)

request_info = {
"url": str(request.url),
"method": request.method,
"headers": dict(request.headers),
"parameters": dict(request.query_params),
"body": body
}
logger.info(f"TRACE-{trace_id}-REQ: {request_info}")

response = await call_next(request)

response_body = ""
if isinstance(response, StreamingResponse):
original_body_iterator = response.body_iterator
logging_response = LoggingStreamingResponse(original_body_iterator, status_code=response.status_code, headers=dict(response.headers))
response_body = logging_response.body
else:
response_body = response.body.decode("utf-8") if hasattr(response, 'body') else str(response)


response_info = {
"status_code": response.status_code,
"body": response_body
}
logger.info(f"TRACE-{trace_id}-RSP: {response_info}")

state.gstate(STATE_TRACEID, trace_id + 1)

return response

import asyncio

from starlette.types import Message, Send


class LoggingStreamingResponse(StreamingResponse):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.body_chunks = []

async def stream_response(self, send: Send):
async for chunk in self.body_iterator:
self.body_chunks.append(chunk)
await send({
"type": "http.response.body",
"body": chunk,
"more_body": True
})

await send({
"type": "http.response.body",
"body": b"",
"more_body": False
})

@property
def body(self):
return b"".join(self.body_chunks).decode("utf-8")
42 changes: 30 additions & 12 deletions src/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

# Project imports
import utilities
from middleware import LoggingMiddleware

# Set up logging
LOGGING_FORMAT = \
Expand All @@ -27,7 +28,8 @@
logger = logging.getLogger(__name__)

# Constants
ENDPOINT_PREFIX = "/api/monitor"
ENDPOINT_MONITOR = "/monitor"
ENDPOINT_PREFIX = "/api" + ENDPOINT_MONITOR
DEFAULT_HOST = "0.0.0.0"
DEFAULT_PORT = 8000
DEFAULT_CONFIGURATION="./config/config.yaml"
Expand All @@ -37,13 +39,15 @@

SERVICE_REGISTRAR="osc-dm-registrar-srv"
SERVICE_SEARCH="osc-dm-search-srv"
SERVICE_PROXY="osc-dm-proxy-srv"

#####
# STARTUP
#####

# Set up server
app = FastAPI()
app.add_middleware(LoggingMiddleware)

@app.on_event("startup")
async def startup_event():
Expand All @@ -63,10 +67,19 @@ async def startup_event():
#####


@app.get(ENDPOINT_PREFIX + "/status")
async def status_get():
@app.get(ENDPOINT_PREFIX + "/health")
async def health_get():
"""
Get observed status information
Return health information
"""
statistics = state.gstate(STATE_STATISTICS)
return statistics


@app.get(ENDPOINT_PREFIX + "/metrics")
async def health_get():
"""
Return health information
"""
statistics = state.gstate(STATE_STATISTICS)
return statistics
Expand All @@ -88,9 +101,10 @@ async def _task(param1, param2):
configuration = state.gstate(STATE_CONFIGURATION)
host = configuration["proxy"]["host"]
port = configuration["proxy"]["port"]
service = "/api/registration/products"
service = "/api/registrar/products"
method = "GET"

# Get all known products from registrar
response = None
try:
logger.info("Get products")
Expand All @@ -99,27 +113,30 @@ async def _task(param1, param2):
except Exception as e:
logger.error(f"Error getting product information, exception:{e}")

# Get addresses for all products and add to
if response:
products = response
for product in products:
logger.info(f"Using product:{product}")
address = product["address"]
uuid = product["uuid"]
endpoint = f"/api/discovery/uuid/{uuid}"
statistics[address] = { "endpoint": endpoint, "status": "UNKNOWN"}
endpoint = f"/api/dataproducts/uuid/{uuid}"
statistics[address] = { "endpoint": endpoint, "health": "UNKNOWN"}

logger.info(f"Using statistics:{statistics}")
# Get info (health and metrics) from each service and data product
logger.info(f"Current statistics:{statistics}")
for name in statistics:
try:
logger.info(f"Getting status name:{name} info:{statistics[name]}")
service = statistics[name]["endpoint"]
method = "GET"
response = await utilities.httprequest(host, port, service, method)
statistics[name]["status"] = "UP"
statistics[name]["health"] = "OK"
logger.info(f"Getting status name:{name} SUCCESS (UP)")
except Exception as e:
logger.error(f"Error getting status name:{name} info:{statistics[name]}, exception:{e}")
statistics[name]["status"] = "DOWN"
statistics[name]["health"] = "NOT-OK"
logger.info(f"Full statistics:{statistics}")


async def _repeat_every(interval_sec, func, *args):
Expand Down Expand Up @@ -162,8 +179,9 @@ async def _repeat_every(interval_sec, func, *args):
state.gstate(STATE_CONFIGURATION, configuration)

statistics = {
SERVICE_REGISTRAR: {"endpoint": "/api/registration/status", "status": "UNKNOWN" },
SERVICE_SEARCH: {"endpoint": "/api/search/status", "status": "UNKNOWN" }
SERVICE_PROXY: {"endpoint": "/api/proxy/health", "health": "UNKNOWN" },
SERVICE_REGISTRAR: {"endpoint": "/api/registrar/health", "health": "UNKNOWN" },
SERVICE_SEARCH: {"endpoint": "/api/search/health", "health": "UNKNOWN" }
}
state.gstate(STATE_STATISTICS, statistics)

Expand Down

0 comments on commit 5227272

Please sign in to comment.