Skip to content

Commit

Permalink
chore(di): trigger probes
Browse files Browse the repository at this point in the history
We implement trigger probes. These allows triggering the capture of
debug information along a trace, ensuring all the relevant probes are
also triggered.
  • Loading branch information
P403n1x87 committed Oct 16, 2024
1 parent 4cbc4ed commit 965b6e6
Show file tree
Hide file tree
Showing 15 changed files with 344 additions and 63 deletions.
15 changes: 14 additions & 1 deletion ddtrace/_trace/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,17 @@ class Context(object):
boundaries.
"""

__slots__ = ["trace_id", "span_id", "_lock", "_meta", "_metrics", "_span_links", "_baggage", "_is_remote"]
__slots__ = [
"trace_id",
"span_id",
"_lock",
"_meta",
"_metrics",
"_span_links",
"_baggage",
"_is_remote",
"__weakref__",
]

def __init__(
self,
Expand Down Expand Up @@ -266,4 +276,7 @@ def __repr__(self) -> str:
self._is_remote,
)

def __hash__(self) -> int:
return hash((self.trace_id, self.span_id))

__str__ = __repr__
3 changes: 3 additions & 0 deletions ddtrace/contrib/trace_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from ddtrace.internal.compat import ensure_text
from ddtrace.internal.compat import ip_is_global
from ddtrace.internal.compat import parse
from ddtrace.internal.core.event_hub import dispatch
from ddtrace.internal.logger import get_logger
from ddtrace.internal.utils.cache import cached
from ddtrace.internal.utils.http import normalize_header_name
Expand Down Expand Up @@ -578,6 +579,8 @@ def activate_distributed_headers(tracer, int_config=None, request_headers=None,
# have a context with the same trace id active
tracer.context_provider.activate(context)

dispatch("distributed_context.activated", (context,))


def _flatten(
obj, # type: Any
Expand Down
11 changes: 10 additions & 1 deletion ddtrace/debugging/_debugger.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,17 @@
from ddtrace.debugging._probe.model import MetricFunctionProbe
from ddtrace.debugging._probe.model import MetricLineProbe
from ddtrace.debugging._probe.model import Probe
from ddtrace.debugging._probe.model import ProbeEvaluateTimingForMethod
from ddtrace.debugging._probe.model import SpanDecorationFunctionProbe
from ddtrace.debugging._probe.model import SpanDecorationLineProbe
from ddtrace.debugging._probe.model import SpanFunctionProbe
from ddtrace.debugging._probe.model import TriggerFunctionProbe
from ddtrace.debugging._probe.registry import ProbeRegistry
from ddtrace.debugging._probe.remoteconfig import ProbePollerEvent
from ddtrace.debugging._probe.remoteconfig import ProbePollerEventType
from ddtrace.debugging._probe.remoteconfig import ProbeRCAdapter
from ddtrace.debugging._probe.status import ProbeStatusLogger
from ddtrace.debugging._session import Session
from ddtrace.debugging._signal.collector import SignalCollector
from ddtrace.debugging._signal.collector import SignalContext
from ddtrace.debugging._signal.metric_sample import MetricSample
Expand Down Expand Up @@ -221,6 +224,10 @@ def _open_contexts(self) -> None:
frame=frame,
thread=thread,
)
elif isinstance(probe, TriggerFunctionProbe) and probe.evaluate_at is ProbeEvaluateTimingForMethod.ENTER:
Session(probe.session_id, probe.level).link_to_trace(trace_context)
# This probe does not emit any signals
continue
else:
log.error("Unsupported probe type: %s", type(probe))
continue
Expand Down Expand Up @@ -391,7 +398,9 @@ def _dd_debugger_hook(self, probe: Probe) -> None:
meter=self._probe_meter,
)
elif isinstance(probe, LogLineProbe):
if probe.take_snapshot:
session_id = probe.tags.get("sessionId")
session = Session.lookup(session_id) if session_id is not None else None
if session is None and probe.take_snapshot:
# TODO: Global limit evaluated before probe conditions
if self._global_rate_limiter.limit() is RateLimitExceeded:
return
Expand Down
22 changes: 22 additions & 0 deletions ddtrace/debugging/_live.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import typing as t

from ddtrace.debugging._session import Session
from ddtrace.internal import core


def handle_distributed_context(context: t.Any) -> None:
debug_tag = context._meta.get("_dd.p.debug")
if debug_tag is None:
return

for session in debug_tag.split(","):
ident, _, level = session.partition(":")
Session(ident=ident, level=int(level or 0)).link_to_trace(context)


def enable() -> None:
core.on("distributed_context.activated", handle_distributed_context, "live_debugger")


def disable() -> None:
core.reset_listeners("distributed_context.activated", handle_distributed_context)
112 changes: 66 additions & 46 deletions ddtrace/debugging/_origin/span.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,27 @@
from dataclasses import dataclass
from functools import partial
from itertools import count
from pathlib import Path
import sys

# from threading import current_thread
from threading import current_thread
from types import FrameType
from types import FunctionType
import typing as t
import uuid

import ddtrace

# from ddtrace import config
from ddtrace._trace.processor import SpanProcessor

# from ddtrace.debugging._debugger import Debugger
from ddtrace.debugging._probe.model import DEFAULT_CAPTURE_LIMITS
from ddtrace.debugging._probe.model import LiteralTemplateSegment
from ddtrace.debugging._probe.model import LogFunctionProbe
from ddtrace.debugging._probe.model import LogLineProbe
from ddtrace.debugging._probe.model import ProbeEvaluateTimingForMethod
from ddtrace.debugging._session import Session
from ddtrace.debugging._signal.collector import SignalCollector
from ddtrace.debugging._signal.collector import SignalContext

# from ddtrace.debugging._signal.snapshot import Snapshot
from ddtrace.debugging._signal.snapshot import Snapshot
from ddtrace.debugging._uploader import LogsIntakeUploaderV1
from ddtrace.debugging._uploader import UploaderProduct
from ddtrace.ext import EXIT_SPAN_TYPES
from ddtrace.internal import compat
from ddtrace.internal import core
Expand All @@ -40,13 +39,13 @@ def frame_stack(frame: FrameType) -> t.Iterator[FrameType]:
_frame = _frame.f_back


def wrap_entrypoint(f: t.Callable) -> None:
def wrap_entrypoint(collector: SignalCollector, f: t.Callable) -> None:
if not _isinstance(f, FunctionType):
return

_f = t.cast(FunctionType, f)
if not EntrySpanWrappingContext.is_wrapped(_f):
EntrySpanWrappingContext(_f).wrap()
EntrySpanWrappingContext(collector, _f).wrap()


@dataclass
Expand Down Expand Up @@ -120,9 +119,11 @@ class EntrySpanLocation:


class EntrySpanWrappingContext(WrappingContext):
def __init__(self, f):
def __init__(self, collector: SignalCollector, f: FunctionType) -> None:
super().__init__(f)

self.collector = collector

filename = str(Path(f.__code__.co_filename).resolve())
name = f.__qualname__
module = f.__module__
Expand Down Expand Up @@ -152,25 +153,26 @@ def __enter__(self):
s.set_tag_str("_dd.code_origin.frames.0.type", location.module)
s.set_tag_str("_dd.code_origin.frames.0.method", location.name)

# TODO[gab]: This will be enabled as part of the live debugger/distributed debugging
# if ld_config.enabled:
# # Create a snapshot
# snapshot = Snapshot(
# probe=location.probe,
# frame=self.__frame__,
# thread=current_thread(),
# trace_context=root,
# )
# Check if we have any level 2 debugging sessions running for the
# current trace
if any(s.level >= 2 for s in Session.from_trace()):
# Create a snapshot
snapshot = Snapshot(
probe=location.probe,
frame=self.__frame__,
thread=current_thread(),
trace_context=root,
)

# # Capture on entry
# context = Debugger.get_collector().attach(snapshot)
# Capture on entry
context = self.collector.attach(snapshot)

# # Correlate the snapshot with the span
# root.set_tag_str("_dd.code_origin.frames.0.snapshot_id", snapshot.uuid)
# span.set_tag_str("_dd.code_origin.frames.0.snapshot_id", snapshot.uuid)
# Correlate the snapshot with the span
root.set_tag_str("_dd.code_origin.frames.0.snapshot_id", snapshot.uuid)
span.set_tag_str("_dd.code_origin.frames.0.snapshot_id", snapshot.uuid)

# self.set("context", context)
# self.set("start_time", compat.monotonic_ns())
self.set("context", context)
self.set("start_time", compat.monotonic_ns())

return self

Expand All @@ -194,7 +196,10 @@ def __exit__(self, exc_type, exc_value, traceback):

@dataclass
class SpanCodeOriginProcessor(SpanProcessor):
__uploader__ = LogsIntakeUploaderV1

_instance: t.Optional["SpanCodeOriginProcessor"] = None
_handler: t.Optional[t.Callable] = None

def on_span_start(self, span: Span) -> None:
if span.span_type not in EXIT_SPAN_TYPES:
Expand All @@ -219,24 +224,25 @@ def on_span_start(self, span: Span) -> None:
# DEV: Without a function object we cannot infer the function
# and any potential class name.

# TODO[gab]: This will be enabled as part of the live debugger/distributed debugging
# if ld_config.enabled:
# # Create a snapshot
# snapshot = Snapshot(
# probe=ExitSpanProbe.from_frame(frame),
# frame=frame,
# thread=current_thread(),
# trace_context=span,
# )
# Check if we have any level 2 debugging sessions running for
# the current trace
if any(s.level >= 2 for s in Session.from_trace()):
# Create a snapshot
snapshot = Snapshot(
probe=ExitSpanProbe.from_frame(frame),
frame=frame,
thread=current_thread(),
trace_context=span,
)

# # Capture on entry
# snapshot.line()
# Capture on entry
snapshot.line()

# # Collect
# Debugger.get_collector().push(snapshot)
# Collect
self.__uploader__.get_collector().push(snapshot)

# # Correlate the snapshot with the span
# span.set_tag_str(f"_dd.code_origin.frames.{n}.snapshot_id", snapshot.uuid)
# Correlate the snapshot with the span
span.set_tag_str(f"_dd.code_origin.frames.{n}.snapshot_id", snapshot.uuid)

def on_span_finish(self, span: Span) -> None:
pass
Expand All @@ -246,17 +252,31 @@ def enable(cls):
if cls._instance is not None:
return

core.on("service_entrypoint.patch", wrap_entrypoint)

instance = cls._instance = cls()

# Register code origin for span with the snapshot uploader
cls.__uploader__.register(UploaderProduct.CODE_ORIGIN_SPAN)

# Register the processor for exit spans
instance.register()

# Register the entrypoint wrapping for entry spans
cls._handler = handler = partial(wrap_entrypoint, cls.__uploader__.get_collector())
core.on("service_entrypoint.patch", handler)

@classmethod
def disable(cls):
if cls._instance is None:
return

# Unregister the entrypoint wrapping for entry spans
core.reset_listeners("service_entrypoint.patch", cls._handler)
cls._handler = None

# Unregister the processor for exit spans
cls._instance.unregister()
cls._instance = None

core.reset_listeners("service_entrypoint.patch", wrap_entrypoint)
# Unregister code origin for span with the snapshot uploader
cls.__uploader__.unregister(UploaderProduct.CODE_ORIGIN_SPAN)

cls._instance = None
24 changes: 22 additions & 2 deletions ddtrace/debugging/_probe/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from typing import Union

from ddtrace.debugging._expressions import DDExpression
from ddtrace.debugging._session import SessionId
from ddtrace.internal.compat import maybe_stringify
from ddtrace.internal.logger import get_logger
from ddtrace.internal.module import _resolve
Expand Down Expand Up @@ -292,12 +293,31 @@ class SpanDecorationFunctionProbe(Probe, FunctionLocationMixin, SpanDecorationMi
pass


LineProbe = Union[LogLineProbe, MetricLineProbe, SpanDecorationLineProbe]
FunctionProbe = Union[LogFunctionProbe, MetricFunctionProbe, SpanFunctionProbe, SpanDecorationFunctionProbe]
@dataclass
class SessionMixin:
session_id: SessionId
level: int


@dataclass
class TriggerLineProbe(Probe, LineLocationMixin, SessionMixin):
pass


@dataclass
class TriggerFunctionProbe(Probe, FunctionLocationMixin, SessionMixin):
pass


LineProbe = Union[LogLineProbe, MetricLineProbe, SpanDecorationLineProbe, TriggerLineProbe]
FunctionProbe = Union[
LogFunctionProbe, MetricFunctionProbe, SpanFunctionProbe, SpanDecorationFunctionProbe, TriggerFunctionProbe
]


class ProbeType(str, Enum):
LOG_PROBE = "LOG_PROBE"
METRIC_PROBE = "METRIC_PROBE"
SPAN_PROBE = "SPAN_PROBE"
SPAN_DECORATION_PROBE = "SPAN_DECORATION_PROBE"
TRIGGER_PROBE = "TRIGGER_PROBE"
Loading

0 comments on commit 965b6e6

Please sign in to comment.