Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/mch rebased otel #25

Merged
merged 5 commits into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 14 additions & 8 deletions polytope_server/broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

from ..common import collection, queue, request_store
from ..common.request import Status

from ..common.observability.otel import restore_trace_context, create_new_span_consumer, create_new_span_producer, update_trace_context

class Broker:
def __init__(self, config):
Expand Down Expand Up @@ -75,8 +75,11 @@ def check_requests(self):

if self.check_limits(active_requests, wr):
assert wr.status == Status.WAITING
active_requests.add(wr)
self.enqueue(wr)
# Restore the trace context for this request
extracted_ctx = restore_trace_context(wr)
with create_new_span_consumer("Enqueue request", request_id=wr.id, parent_context=extracted_ctx):
active_requests.add(wr)
self.enqueue(wr)

if self.queue.count() >= self.max_queue_size:
logging.info("Queue is full")
Expand Down Expand Up @@ -133,11 +136,14 @@ def enqueue(self, request):
logging.info("Queuing request", extra={"request_id": request.id})

try:
# Must update request_store before queue, worker checks request status immediately
request.set_status(Status.QUEUED)
self.request_store.update_request(request)
msg = queue.Message(request.serialize())
self.queue.enqueue(msg)
with create_new_span_producer("Updating request", request_id=request.id):
# Must update request_store before queue, worker checks request status immediately
request.set_status(Status.QUEUED)
# Updating context for trace ctx propagation with the new span as parent
update_trace_context(request)
self.request_store.update_request(request)
msg = queue.Message(request.serialize())
self.queue.enqueue(msg)
except Exception as e:
# If we fail to call this, the request will be stuck (POLY-21)
logging.info(
Expand Down
120 changes: 120 additions & 0 deletions polytope_server/common/observability/otel.py
MiquelZuehlke marked this conversation as resolved.
Show resolved Hide resolved
MiquelZuehlke marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
from opentelemetry import trace
from opentelemetry.propagate import inject, extract
from opentelemetry.trace import SpanKind, Status, StatusCode
from contextlib import contextmanager
import logging
from typing import Optional, Generator

def add_or_update_trace_context(request, update: bool = False) -> None:
"""
Injects the current trace context into the request's OpenTelemetry trace context attribute.

Args:
request: The request object to update.
update: Whether this is an update operation (default: False).
"""
carrier = {}
inject(carrier)

if not hasattr(request, "otel_trace"):
request.otel_trace = {}

request.otel_trace['carrier'] = carrier

action = "Updated" if update else "Added"
logging.debug(f"[OTEL] {action} trace context with carrier: {carrier}")

# Optionally set additional attributes on the current span
current_span = trace.get_current_span()
current_span.set_attribute("polytope.request.id", request.id)

def add_trace_context(request) -> None:
"""Adds a new trace context to the request."""
add_or_update_trace_context(request, update=False)

def update_trace_context(request) -> None:
"""Updates the trace context in the request."""
add_or_update_trace_context(request, update=True)

def restore_trace_context(request) -> Optional[trace.Span]:
"""
Restores the trace context from the request.

Args:
request: The request object containing the trace context.

Returns:
The restored context, or None if not available.
"""
if not hasattr(request, 'otel_trace') or 'carrier' not in request.otel_trace:
logging.debug("[OTEL] No trace context found to restore.")
return None

carrier = request.otel_trace['carrier']
logging.debug(f"[OTEL] Restoring context from carrier: {carrier}")
extracted_context = extract(carrier)

current_span = trace.get_current_span()
current_span.set_attribute("polytope.request.id", request.id)

return extracted_context

@contextmanager
def create_new_span(
span_name: str,
request_id: Optional[str] = None,
parent_context: Optional[trace.SpanContext] = None,
kind: SpanKind = SpanKind.SERVER,
) -> Generator[trace.Span, None, None]:
"""
Creates a new span with the specified attributes.

Args:
span_name: Name of the span.
request_id: Optional request ID to associate with the span.
parent_context: Optional parent span context.
kind: The kind of span to create (default: SERVER).
role: Optional role to set as a span attribute.

Yields:
The created span.
"""
tracer = trace.get_tracer(__name__)
attributes = {"polytope.request.id": request_id} if request_id else {}

with tracer.start_as_current_span(span_name, context=parent_context, kind=kind, attributes=attributes) as span:
logging.debug(f"[OTEL] Created new span: {span_name}, parent: {parent_context}")
yield span

@contextmanager
def create_new_span_internal(span_name: str, request_id: Optional[str] = None, parent_context: Optional[trace.SpanContext] = None) -> Generator[trace.Span, None, None]:
"""Creates an internal span."""
yield from create_new_span(span_name, request_id, parent_context, kind=SpanKind.INTERNAL)

# Forcing span kind Server because of AWS representation
@contextmanager
def create_new_span_producer(span_name: str, request_id: Optional[str] = None, parent_context: Optional[trace.SpanContext] = None) -> Generator[trace.Span, None, None]:
"""Creates a producer span."""
yield from create_new_span(span_name, request_id, parent_context, kind=SpanKind.SERVER)

# Forcing span kind Server because of AWS representation
@contextmanager
def create_new_span_consumer(span_name: str, request_id: Optional[str] = None, parent_context: Optional[trace.SpanContext] = None) -> Generator[trace.Span, None, None]:
"""Creates a consumer span."""
yield from create_new_span(span_name, request_id, parent_context, kind=SpanKind.SERVER)

@contextmanager
def create_new_span_server(span_name: str, request_id: Optional[str] = None, parent_context: Optional[trace.SpanContext] = None) -> Generator[trace.Span, None, None]:
"""Creates a server span."""
yield from create_new_span(span_name, request_id, parent_context, kind=SpanKind.SERVER)

def set_span_error(span: trace.Span, exception: Exception) -> None:
"""
Marks a span as having an error.

Args:
span: The span to mark as an error.
exception: The exception to log.
"""
span.set_status(Status(StatusCode.ERROR, str(exception)))
logging.error(f"[OTEL] Span error set with exception: {exception}")
6 changes: 5 additions & 1 deletion polytope_server/common/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import uuid

from .user import User

from .observability.otel import add_trace_context

class Status(enum.Enum):
WAITING = "waiting"
Expand Down Expand Up @@ -57,6 +57,7 @@ class Request:
"user_request",
"content_length",
"content_type",
"otel_trace",
]

def __init__(self, from_dict=None, **kwargs):
Expand All @@ -75,6 +76,9 @@ def __init__(self, from_dict=None, **kwargs):
self.content_length = None
self.content_type = "application/octet-stream"

# Adding context for OpenTelemetry in asynchronous processing
add_trace_context(self)

if from_dict:
self.deserialize(from_dict)

Expand Down
Loading