Skip to content

Commit

Permalink
Merge pull request #25 from MeteoSwiss-APN/feature/mch-rebased-otel
Browse files Browse the repository at this point in the history
Feature/mch rebased otel
  • Loading branch information
MiquelZuehlke authored Dec 2, 2024
2 parents a01a3bf + 681e0c4 commit 2f15090
Show file tree
Hide file tree
Showing 6 changed files with 370 additions and 105 deletions.
22 changes: 14 additions & 8 deletions polytope_server/broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

from ..common import collection, queue, request_store
from ..common.request import Status

from ..common.observability.otel import restore_trace_context, create_new_span_consumer, create_new_span_producer, update_trace_context

class Broker:
def __init__(self, config):
Expand Down Expand Up @@ -75,8 +75,11 @@ def check_requests(self):

if self.check_limits(active_requests, wr):
assert wr.status == Status.WAITING
active_requests.add(wr)
self.enqueue(wr)
# Restore the trace context for this request
extracted_ctx = restore_trace_context(wr)
with create_new_span_consumer("Enqueue request", request_id=wr.id, parent_context=extracted_ctx):
active_requests.add(wr)
self.enqueue(wr)

if self.queue.count() >= self.max_queue_size:
logging.info("Queue is full")
Expand Down Expand Up @@ -133,11 +136,14 @@ def enqueue(self, request):
logging.info("Queuing request", extra={"request_id": request.id})

try:
# Must update request_store before queue, worker checks request status immediately
request.set_status(Status.QUEUED)
self.request_store.update_request(request)
msg = queue.Message(request.serialize())
self.queue.enqueue(msg)
with create_new_span_producer("Updating request", request_id=request.id):
# Must update request_store before queue, worker checks request status immediately
request.set_status(Status.QUEUED)
# Updating context for trace ctx propagation with the new span as parent
update_trace_context(request)
self.request_store.update_request(request)
msg = queue.Message(request.serialize())
self.queue.enqueue(msg)
except Exception as e:
# If we fail to call this, the request will be stuck (POLY-21)
logging.info(
Expand Down
120 changes: 120 additions & 0 deletions polytope_server/common/observability/otel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
from opentelemetry import trace
from opentelemetry.propagate import inject, extract
from opentelemetry.trace import SpanKind, Status, StatusCode
from contextlib import contextmanager
import logging
from typing import Optional, Generator

def add_or_update_trace_context(request, update: bool = False) -> None:
"""
Injects the current trace context into the request's OpenTelemetry trace context attribute.
Args:
request: The request object to update.
update: Whether this is an update operation (default: False).
"""
carrier = {}
inject(carrier)

if not hasattr(request, "otel_trace"):
request.otel_trace = {}

request.otel_trace['carrier'] = carrier

action = "Updated" if update else "Added"
logging.debug(f"[OTEL] {action} trace context with carrier: {carrier}")

# Optionally set additional attributes on the current span
current_span = trace.get_current_span()
current_span.set_attribute("polytope.request.id", request.id)

def add_trace_context(request) -> None:
"""Adds a new trace context to the request."""
add_or_update_trace_context(request, update=False)

def update_trace_context(request) -> None:
"""Updates the trace context in the request."""
add_or_update_trace_context(request, update=True)

def restore_trace_context(request) -> Optional[trace.Span]:
"""
Restores the trace context from the request.
Args:
request: The request object containing the trace context.
Returns:
The restored context, or None if not available.
"""
if not hasattr(request, 'otel_trace') or 'carrier' not in request.otel_trace:
logging.debug("[OTEL] No trace context found to restore.")
return None

carrier = request.otel_trace['carrier']
logging.debug(f"[OTEL] Restoring context from carrier: {carrier}")
extracted_context = extract(carrier)

current_span = trace.get_current_span()
current_span.set_attribute("polytope.request.id", request.id)

return extracted_context

@contextmanager
def create_new_span(
span_name: str,
request_id: Optional[str] = None,
parent_context: Optional[trace.SpanContext] = None,
kind: SpanKind = SpanKind.SERVER,
) -> Generator[trace.Span, None, None]:
"""
Creates a new span with the specified attributes.
Args:
span_name: Name of the span.
request_id: Optional request ID to associate with the span.
parent_context: Optional parent span context.
kind: The kind of span to create (default: SERVER).
role: Optional role to set as a span attribute.
Yields:
The created span.
"""
tracer = trace.get_tracer(__name__)
attributes = {"polytope.request.id": request_id} if request_id else {}

with tracer.start_as_current_span(span_name, context=parent_context, kind=kind, attributes=attributes) as span:
logging.debug(f"[OTEL] Created new span: {span_name}, parent: {parent_context}")
yield span

@contextmanager
def create_new_span_internal(span_name: str, request_id: Optional[str] = None, parent_context: Optional[trace.SpanContext] = None) -> Generator[trace.Span, None, None]:
"""Creates an internal span."""
yield from create_new_span(span_name, request_id, parent_context, kind=SpanKind.INTERNAL)

# Forcing span kind Server because of AWS representation
@contextmanager
def create_new_span_producer(span_name: str, request_id: Optional[str] = None, parent_context: Optional[trace.SpanContext] = None) -> Generator[trace.Span, None, None]:
"""Creates a producer span."""
yield from create_new_span(span_name, request_id, parent_context, kind=SpanKind.SERVER)

# Forcing span kind Server because of AWS representation
@contextmanager
def create_new_span_consumer(span_name: str, request_id: Optional[str] = None, parent_context: Optional[trace.SpanContext] = None) -> Generator[trace.Span, None, None]:
"""Creates a consumer span."""
yield from create_new_span(span_name, request_id, parent_context, kind=SpanKind.SERVER)

@contextmanager
def create_new_span_server(span_name: str, request_id: Optional[str] = None, parent_context: Optional[trace.SpanContext] = None) -> Generator[trace.Span, None, None]:
"""Creates a server span."""
yield from create_new_span(span_name, request_id, parent_context, kind=SpanKind.SERVER)

def set_span_error(span: trace.Span, exception: Exception) -> None:
"""
Marks a span as having an error.
Args:
span: The span to mark as an error.
exception: The exception to log.
"""
span.set_status(Status(StatusCode.ERROR, str(exception)))
logging.error(f"[OTEL] Span error set with exception: {exception}")
6 changes: 5 additions & 1 deletion polytope_server/common/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import uuid

from .user import User

from .observability.otel import add_trace_context

class Status(enum.Enum):
WAITING = "waiting"
Expand Down Expand Up @@ -57,6 +57,7 @@ class Request:
"user_request",
"content_length",
"content_type",
"otel_trace",
]

def __init__(self, from_dict=None, **kwargs):
Expand All @@ -75,6 +76,9 @@ def __init__(self, from_dict=None, **kwargs):
self.content_length = None
self.content_type = "application/octet-stream"

# Adding context for OpenTelemetry in asynchronous processing
add_trace_context(self)

if from_dict:
self.deserialize(from_dict)

Expand Down
Loading

0 comments on commit 2f15090

Please sign in to comment.