Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/mch rebased otel #25

Merged
merged 5 commits into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 14 additions & 8 deletions polytope_server/broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

from ..common import collection, queue, request_store
from ..common.request import Status

from ..common.observability.otel import restore_trace_context, create_new_span_consumer, create_new_span_producer, update_trace_context

class Broker:
def __init__(self, config):
Expand Down Expand Up @@ -75,8 +75,11 @@ def check_requests(self):

if self.check_limits(active_requests, wr):
assert wr.status == Status.WAITING
active_requests.add(wr)
self.enqueue(wr)
# Restore the trace context for this request
extracted_ctx = restore_trace_context(wr)
with create_new_span_consumer("Enqueue request", request_id=wr.id, parent_context=extracted_ctx):
active_requests.add(wr)
self.enqueue(wr)

if self.queue.count() >= self.max_queue_size:
logging.info("Queue is full")
Expand Down Expand Up @@ -133,11 +136,14 @@ def enqueue(self, request):
logging.info("Queuing request", extra={"request_id": request.id})

try:
# Must update request_store before queue, worker checks request status immediately
request.set_status(Status.QUEUED)
self.request_store.update_request(request)
msg = queue.Message(request.serialize())
self.queue.enqueue(msg)
with create_new_span_producer("Updating request", request_id=request.id):
# Must update request_store before queue, worker checks request status immediately
request.set_status(Status.QUEUED)
# Updating context for trace ctx propagation with the new span as parent
update_trace_context(request)
self.request_store.update_request(request)
msg = queue.Message(request.serialize())
self.queue.enqueue(msg)
except Exception as e:
# If we fail to call this, the request will be stuck (POLY-21)
logging.info(
Expand Down
86 changes: 86 additions & 0 deletions polytope_server/common/observability/otel.py
MiquelZuehlke marked this conversation as resolved.
Show resolved Hide resolved
MiquelZuehlke marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
from opentelemetry import trace
from opentelemetry.propagate import inject, extract
from contextlib import contextmanager
import logging

def add_trace_context(request):
carrier = {}

# Inject the context into the carrier
inject(carrier)

# Adding trace_id and span_id to the request data for logging
request.otel_trace_ctx = {
'carrier': carrier # Store the injected carrier
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

7-15 are also repeated in update_trace_context. Extract these


current_span = trace.get_current_span()
current_span.set_attribute("polytope.request.id", request.id)
Copy link

@samweyermann samweyermann Nov 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we set this attribute in add_ and restore_? It seems like this should be done only in create_new_span as otherwise it seems that it will be unreliably set.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are passing the context between services, therefore, each service starts a new span where we restore the context (so we keep track of the propagation) but the attributes are missing and need to be updated again


logging.debug(f"[OTEL] Request created with carrier: {request.otel_trace_ctx['carrier']}")

def update_trace_context(request):
carrier = {}

# Inject the context into the carrier
inject(carrier)

# Adding trace_id and span_id to the request data for logging
request.otel_trace_ctx = {
'carrier': carrier # Store the injected carrier
}

logging.debug(f"[OTEL] Request created with carrier: {request.otel_trace_ctx['carrier']}")


def restore_trace_context(request):
MiquelZuehlke marked this conversation as resolved.
Show resolved Hide resolved
# If otel context is not set, return
if not hasattr(request, 'otel_trace_ctx'):
return

logging.debug(f"[OTEL] Restoring context from carrier: {request.otel_trace_ctx['carrier']}")

# Extract the context from the stored request's carrier
extracted_context = extract(carrier=request.otel_trace_ctx['carrier'])

current_span = trace.get_current_span()
current_span.set_attribute("polytope.request.id", request.id)

return extracted_context

@contextmanager
def create_new_span_internal(span_name, request_id=None, parent_context=None, kind=trace.SpanKind.INTERNAL):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If keeping this as four methods, make it clear that _internal is referring to the span and not that this is a way of marking this as an internal method (or perhaps someone more experience with Python would understand this immediately?)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a great question. From my perspective, when discussing OpenTelemetry and spans, it should be clear that we're referring to the type of the span, as that's its primary characteristic.

Given the context of OpenTelemetry, I believe it would be intuitive for someone familiar with the framework to understand that _internal refers to the span type rather than indicating a private or internal method.

with create_new_span(span_name, request_id, parent_context, kind) as span:
span.set_attribute("role", "internal")
yield span

# Forcing span kind Server because of AWS representation
@contextmanager
def create_new_span_producer(span_name, request_id=None, parent_context=None, kind=trace.SpanKind.SERVER):
with create_new_span(span_name, request_id, parent_context, kind) as span:
span.set_attribute("role", "producer")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason to do this as four separate methods rather than passing the role as an argument? The only one that is odd is create_new_span_internal since there is only one valid mapping from kind to role.

In the case of passing the role, it could be passed as an enum value similarly to kind so that we limit what the options are to these three methods

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes you are right, during testing, I frequently had to switch between Consumer, Producer, and Server span kinds. To simplify this and avoid modifying each service individually, I created dedicated functions for each type. This way, it's clear that, for example, a Producer span is being forced to a Server kind due to AWS requirements. Following that logic, it felt inconsistent to leave Internal spans handled differently, so I aligned it with the same approach.

yield span

# Forcing span kind Server because of AWS representation
@contextmanager
def create_new_span_consumer(span_name, request_id=None, parent_context=None, kind=trace.SpanKind.SERVER):
with create_new_span(span_name, request_id, parent_context, kind) as span:
span.set_attribute("role", "consumer")
yield span

@contextmanager
def create_new_span_server(span_name, request_id=None, parent_context=None, kind=trace.SpanKind.SERVER):
with create_new_span(span_name, request_id, parent_context, kind) as span:
span.set_attribute("role", "server")
yield span

@contextmanager
def create_new_span(span_name, request_id=None, parent_context=None, kind=trace.SpanKind.SERVER):
tracer = trace.get_tracer(__name__)

with tracer.start_as_current_span(span_name, context=parent_context, kind=kind, attributes={"polytope.request.id": request_id }) as span:
logging.debug(f"[OTEL] Creating new span: {span_name} parent: {parent_context}")
yield span

def set_span_error(span, exception):
span.set_status(trace.Status(trace.StatusCode.ERROR, str(exception)))
6 changes: 5 additions & 1 deletion polytope_server/common/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import uuid

from .user import User

from .observability.otel import add_trace_context

class Status(enum.Enum):
WAITING = "waiting"
Expand Down Expand Up @@ -57,6 +57,7 @@ class Request:
"user_request",
"content_length",
"content_type",
"otel_trace_ctx",

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible we will want other otel fields in the future? If so, I recommend calling this otel_trace and storing the context as a field within to avoid polluting the request with many otel_trace_* slots or the expensive refactor in the future to change this slot.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a valid point, although the goal is to propagate only the context, keeping the implementation minimally intrusive in the codebase, I agree that makes sense to have a more general naming for this field

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. Based on the docs I think just the context is reasonable.

]

def __init__(self, from_dict=None, **kwargs):
Expand All @@ -75,6 +76,9 @@ def __init__(self, from_dict=None, **kwargs):
self.content_length = None
self.content_type = "application/octet-stream"

# Adding context for OpenTelemetry in asynchronous processing
add_trace_context(self)

if from_dict:
self.deserialize(from_dict)

Expand Down
Loading