From b89df52278e0f7fbdac4e640fe0d7aa6a29240ee Mon Sep 17 00:00:00 2001 From: Eric Broda Date: Mon, 20 May 2024 12:49:56 -0400 Subject: [PATCH] Add health and metrics endpoints --- bin/dockerize.sh | 6 +-- requirements.txt | 35 +++++++++++++ src/middleware.py | 108 +++++++++++++++++++++++++++++++-------- src/server.py | 125 +++++++++++++++++++++++++++++++++------------- src/utilities.py | 11 ++-- 5 files changed, 221 insertions(+), 64 deletions(-) create mode 100644 requirements.txt diff --git a/bin/dockerize.sh b/bin/dockerize.sh index 05bc952..c0ecf0a 100755 --- a/bin/dockerize.sh +++ b/bin/dockerize.sh @@ -75,9 +75,9 @@ echo " " prepare() { echo "Preparing..." - cp "$PROJECT_DIR/requirements.txt" "$WORKING_DIR" - cp "$PROJECT_DIR/docker/Dockerfile" "$WORKING_DIR" - cp "$PROJECT_DIR/src/*.py" "$WORKING_DIR" + cp $PROJECT_DIR/requirements.txt "$WORKING_DIR" + cp $PROJECT_DIR/docker/Dockerfile "$WORKING_DIR" + cp $PROJECT_DIR/src/*.py "$WORKING_DIR" } cleanup() { diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..120e0d9 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,35 @@ +annotated-types==0.6.0 +anyio==4.3.0 +certifi==2024.2.2 +click==8.1.7 +dnspython==2.6.1 +email_validator==2.1.1 +fastapi==0.111.0 +fastapi-cli==0.0.3 +h11==0.14.0 +httpcore==1.0.5 +httptools==0.6.1 +httpx==0.27.0 +idna==3.7 +Jinja2==3.1.4 +markdown-it-py==3.0.0 +MarkupSafe==2.1.5 +mdurl==0.1.2 +orjson==3.10.3 +pydantic==2.7.1 +pydantic_core==2.18.2 +Pygments==2.18.0 +python-dotenv==1.0.1 +python-multipart==0.0.9 +PyYAML==6.0.1 +rich==13.7.1 +shellingham==1.5.4 +sniffio==1.3.1 +starlette==0.37.2 +typer==0.12.3 +typing_extensions==4.11.0 +ujson==5.9.0 +uvicorn==0.29.0 +uvloop==0.19.0 +watchfiles==0.21.0 +websockets==12.0 diff --git a/src/middleware.py b/src/middleware.py index 4305086..b3ff854 100644 --- a/src/middleware.py +++ b/src/middleware.py @@ -1,21 +1,33 @@ -import base64 import logging - -from fastapi import Depends, Request, Response +from fastapi import Request +import base64 from starlette.middleware.base import BaseHTTPMiddleware -from starlette.responses import JSONResponse, PlainTextResponse, StreamingResponse +from starlette.responses import StreamingResponse +from starlette.types import Send +from starlette.datastructures import MutableHeaders +import uuid import state STATE_TRACEID = "state-traceid" +STATE_METRICS = "state-metrics" -def safe_decode(data): - try: - return data.decode('utf-8') - except UnicodeDecodeError: - return base64.b64encode(data).decode('utf-8') +HEADER_USERNAME = "OSC-DM-Username" +HEADER_CORRELATION_ID = "OSC-DM-Correlation-ID" +USERNAME_UNKNOWN = "unknown" class LoggingMiddleware(BaseHTTPMiddleware): + """ + FastAPI middleware is used to add processing to + each request. This is used to perform several capabilities: + - add "TRACE" identifiers to request and responses that + link requests to their responses + - track basic statistics about service usage for + URLs and username (HEADER_USERNAME in header) + - create correlation id that can allow messages + to be tracked end-to-end (assuming each communication + participate propagates key headers) + """ async def dispatch(self, request: Request, call_next): logger = logging.getLogger(__name__) @@ -26,53 +38,101 @@ async def dispatch(self, request: Request, call_next): except Exception: try: body = await request.body() - body = safe_decode(body) + body = _safe_decode(body) except Exception as e: body = f"Failed to read body: {str(e)}" - # Get a trace identifier to correlate requests and responses + # Get the correlation id, and add it if it does not exist + correlation_id = request.headers.get(HEADER_CORRELATION_ID) + if correlation_id is None: + logger.warning(f"Missing header:{HEADER_CORRELATION_ID} url:{str(request.url)} headers:{request.headers} ") + correlation_id = str(uuid.uuid4()) + headers = MutableHeaders(request._headers) + headers[HEADER_CORRELATION_ID] = correlation_id + request._headers = headers + logger.warning(f"Added header:{HEADER_CORRELATION_ID}:{correlation_id} url:{str(request.url)} headers:{request.headers} ") + + # Get the username, and add it if it does not exist + username = request.headers.get(HEADER_USERNAME) + if username is None: + logger.warning(f"Missing header:{HEADER_USERNAME} url:{str(request.url)} headers:{request.headers} ") + username = USERNAME_UNKNOWN + headers = MutableHeaders(request._headers) + headers[HEADER_USERNAME] = username + request._headers = headers + logger.warning(f"Added header:{HEADER_USERNAME}:{username} url:{str(request.url)} headers:{request.headers} ") + + # Get a trace identifier to track requests and responses logs trace_id = state.gstate(STATE_TRACEID) if not trace_id: trace_id = 0 state.gstate(STATE_TRACEID, trace_id) trace_id = state.gstate(STATE_TRACEID) + url = str(request.url) request_info = { - "url": str(request.url), + "url": url, "method": request.method, "headers": dict(request.headers), "parameters": dict(request.query_params), "body": body } - logger.info(f"TRACE-{trace_id}-REQ: {request_info}") + logger.info(f"TRACE-{trace_id}:{correlation_id}-REQ:{request_info}") response = await call_next(request) - + status_code = response.status_code + + # Add the correlation id and username to the response + response.headers[HEADER_CORRELATION_ID] = correlation_id + response.headers[HEADER_USERNAME] = username + + # Handle username counts + metrics = state.gstate(STATE_METRICS) + if not metrics: + metrics = {} + state.gstate(STATE_METRICS, metrics) + metrics = state.gstate(STATE_METRICS) + + # Update counts for username + username = request.headers.get(HEADER_USERNAME) + if not username: + username = USERNAME_UNKNOWN + if username not in metrics: + metrics[username] = {} + if url not in metrics[username]: + metrics[username][url] = {} + if status_code not in metrics[username][url]: + metrics[username][url][status_code] = 0 + metrics[username][url][status_code] += 1 + logger.info(f"Using metrics:{metrics}") + + # Log response response_body = "" if isinstance(response, StreamingResponse): original_body_iterator = response.body_iterator - logging_response = LoggingStreamingResponse(original_body_iterator, status_code=response.status_code, headers=dict(response.headers)) + logging_response = _LoggingStreamingResponse(original_body_iterator, status_code=response.status_code, headers=dict(response.headers)) response_body = logging_response.body else: response_body = response.body.decode("utf-8") if hasattr(response, 'body') else str(response) - response_info = { "status_code": response.status_code, + "headers": response.headers, "body": response_body } - logger.info(f"TRACE-{trace_id}-RSP: {response_info}") + logger.info(f"TRACE-{trace_id}:{correlation_id}-RSP:{response_info}") state.gstate(STATE_TRACEID, trace_id + 1) return response -import asyncio - -from starlette.types import Message, Send + @staticmethod + def get_metrics(): + usernames = state.gstate(STATE_METRICS) + return usernames -class LoggingStreamingResponse(StreamingResponse): +class _LoggingStreamingResponse(StreamingResponse): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.body_chunks = [] @@ -95,3 +155,9 @@ async def stream_response(self, send: Send): @property def body(self): return b"".join(self.body_chunks).decode("utf-8") + +def _safe_decode(data): + try: + return data.decode('utf-8') + except UnicodeDecodeError: + return base64.b64encode(data).decode('utf-8') diff --git a/src/server.py b/src/server.py index 79433a6..bccc153 100644 --- a/src/server.py +++ b/src/server.py @@ -32,14 +32,20 @@ ENDPOINT_PREFIX = "/api" + ENDPOINT_MONITOR DEFAULT_HOST = "0.0.0.0" DEFAULT_PORT = 8000 -DEFAULT_CONFIGURATION="./config/config.yaml" +DEFAULT_CONFIGURATION = "./config/config.yaml" -STATE_CONFIGURATION="state-configuration" -STATE_STATISTICS="state-statistics" +STATE_CONFIGURATION = "state-configuration" +STATE_METRICS = "state-statistics" +STATE_HEALTH = "state-health" +STATE_ADDRESSES = "state-addresses" -SERVICE_REGISTRAR="osc-dm-registrar-srv" -SERVICE_SEARCH="osc-dm-search-srv" -SERVICE_PROXY="osc-dm-proxy-srv" +SERVICE_REGISTRAR = "osc-dm-registrar-srv" +SERVICE_SEARCH = "osc-dm-search-srv" +SERVICE_PROXY = "osc-dm-proxy-srv" + +HEADER_USERNAME = "OSC-DM-Username" +HEADER_CORRELATION_ID = "OSC-DM-Correlation-ID" +USERNAME = "osc-dm-search-srv" ##### # STARTUP @@ -58,7 +64,12 @@ async def startup_event(): logger.info("Running startup event") param1 = "fake param 1" param2 = "fake param 2" + + logger.info("Running task immediately...") await _task(param1, param2) # Immediate invocation at startup + + interval = configuration["monitor"]["interval_seconds"] + logger.info(f"Running task interval (seconds):{interval}") asyncio.create_task(_repeat_every(configuration["monitor"]["interval_seconds"], _task, param1, param2)) # Periodic invocation @@ -72,17 +83,17 @@ async def health_get(): """ Return health information """ - statistics = state.gstate(STATE_STATISTICS) + statistics = state.gstate(STATE_HEALTH) return statistics @app.get(ENDPOINT_PREFIX + "/metrics") -async def health_get(): +async def metrics_get(): """ - Return health information + Return metrics information """ - statistics = state.gstate(STATE_STATISTICS) - return statistics + metrics = state.gstate(STATE_METRICS) + return metrics ##### @@ -96,7 +107,9 @@ async def _task(param1, param2): """ logger.info(f"Executing task param1:{param1} param2:{param2}") - statistics = state.gstate(STATE_STATISTICS) + health = state.gstate(STATE_HEALTH) + metrics = state.gstate(STATE_METRICS) + addresses = state.gstate(STATE_ADDRESSES) configuration = state.gstate(STATE_CONFIGURATION) host = configuration["proxy"]["host"] @@ -108,12 +121,17 @@ async def _task(param1, param2): response = None try: logger.info("Get products") - response = await utilities.httprequest(host, port, service, method) + import uuid + headers = { + HEADER_USERNAME: USERNAME, + HEADER_CORRELATION_ID: str(uuid.uuid4()) + } + response = await utilities.httprequest(host, port, service, method, headers=headers) logger.info("Get products SUCCESS") except Exception as e: logger.error(f"Error getting product information, exception:{e}") - # Get addresses for all products and add to + # Get address for all health checks and add to addresses to check if response: products = response for product in products: @@ -121,22 +139,46 @@ async def _task(param1, param2): address = product["address"] uuid = product["uuid"] endpoint = f"/api/dataproducts/uuid/{uuid}" - statistics[address] = { "endpoint": endpoint, "health": "UNKNOWN"} + addresses[address] = endpoint - # Get info (health and metrics) from each service and data product - logger.info(f"Current statistics:{statistics}") - for name in statistics: + # Get info (health) from each service and data product + logger.info(f"Current health:{health}") + for address in addresses: try: - logger.info(f"Getting status name:{name} info:{statistics[name]}") - service = statistics[name]["endpoint"] + logger.info(f"Getting health address:{address}") + service = addresses[address] + "/health" method = "GET" - response = await utilities.httprequest(host, port, service, method) - statistics[name]["health"] = "OK" - logger.info(f"Getting status name:{name} SUCCESS (UP)") + import uuid + headers = { + HEADER_USERNAME: USERNAME, + HEADER_CORRELATION_ID: str(uuid.uuid4()) + } + response = await utilities.httprequest(host, port, service, method, headers=headers) + health[address] = "OK" + logger.info(f"Successful getting health address:{address} response:{response}") except Exception as e: - logger.error(f"Error getting status name:{name} info:{statistics[name]}, exception:{e}") - statistics[name]["health"] = "NOT-OK" - logger.info(f"Full statistics:{statistics}") + logger.error(f"Error getting health address:{address}, exception:{e}") + health[address] = "NOT-OK" + logger.info(f"Full health:{health}") + + # Get info (metrics) from each service and data product + logger.info(f"Current metrics:{metrics}") + for address in addresses: + try: + logger.info(f"Getting metrics address:{address}") + service = addresses[address] + "/metrics" + method = "GET" + headers = { + HEADER_USERNAME: USERNAME, + HEADER_CORRELATION_ID: str(uuid.uuid4()) + } + response = await utilities.httprequest(host, port, service, method, headers=headers) + metrics[address] = response + logger.info(f"Successful getting metrics address:{address} response:{response}") + except Exception as e: + logger.error(f"Error getting metrics address:{address}, exception:{e}") + metrics[address] = "NOT-AVAILABLE" + logger.info(f"Full metrics:{metrics}") async def _repeat_every(interval_sec, func, *args): @@ -149,6 +191,8 @@ async def _repeat_every(interval_sec, func, *args): await asyncio.sleep(interval_sec) + + ##### # MAINLINE ##### @@ -178,19 +222,32 @@ async def _repeat_every(interval_sec, func, *args): logger.info(f"Configuration:{configuration}") state.gstate(STATE_CONFIGURATION, configuration) - statistics = { - SERVICE_PROXY: {"endpoint": "/api/proxy/health", "health": "UNKNOWN" }, - SERVICE_REGISTRAR: {"endpoint": "/api/registrar/health", "health": "UNKNOWN" }, - SERVICE_SEARCH: {"endpoint": "/api/search/health", "health": "UNKNOWN" } + addresses = { + SERVICE_PROXY: "/api/proxy", + SERVICE_REGISTRAR: "/api/registrar", + SERVICE_SEARCH: "/api/search", } - state.gstate(STATE_STATISTICS, statistics) + state.gstate(STATE_ADDRESSES, addresses) + health = { + SERVICE_PROXY: "UNKNOWN", + SERVICE_REGISTRAR: "UNKNOWN", + SERVICE_SEARCH: "UNKNOWN", + } + state.gstate(STATE_HEALTH, health) + + metrics = { + SERVICE_PROXY: {}, + SERVICE_REGISTRAR: {}, + SERVICE_SEARCH: {}, + } + state.gstate(STATE_METRICS, metrics) # Start the server try: - logger.info(f"STARTING service on host:{args.host} port:{args.port}") + logger.info(f"Startingservice on host:{args.host} port:{args.port}") uvicorn.run(app, host=args.host, port=args.port) except Exception as e: - logger.info(f"STOPPING service, exception:{e}") + logger.info(f"Stopping service, exception:{e}") finally: - logger.info(f"TERMINATING service on host:{args.host} port:{args.port}") + logger.info(f"Terminating service") diff --git a/src/utilities.py b/src/utilities.py index 04d5e49..41d649d 100644 --- a/src/utilities.py +++ b/src/utilities.py @@ -6,10 +6,9 @@ # # Created: 2024-04-15 by eric.broda@brodagroupsoftware.com -import logging -from typing import Any, Dict, List, Optional - +from typing import List, Optional, Dict, Any import httpx +import logging from bgsexception import BgsException, BgsNotFoundException @@ -33,7 +32,7 @@ async def httprequest(host: str, port: int, service: str, method: str, Returns: - requests.Response: The response object """ - logger.info(f"Issue request, method:{method} data:{data} obj:{obj} files:{files}") + logger.info(f"Issue request, method:{method} headers:{headers} data:{data} obj:{obj} files:{files}") url = f"http://{host}:{port}{service}" method = method.upper() @@ -95,7 +94,7 @@ def shttprequest(host: str, port: int, service: str, method: str, Returns: - requests.Response: The response object """ - logger.info(f"Issue request, method:{method} data:{data} obj:{obj} files:{files}") + logger.info(f"Issue request, method:{method} headers:{headers} data:{data} obj:{obj} files:{files}") url = f"http://{host}:{port}{service}" method = method.upper() @@ -134,4 +133,4 @@ def shttprequest(host: str, port: int, service: str, method: str, except Exception as e: msg = f"Unexpected error for {url}: {e}" logger.error(msg) - raise BgsException(msg) + raise BgsException(msg) \ No newline at end of file