Skip to content

Commit

Permalink
chore(di): trigger probes
Browse files Browse the repository at this point in the history
We implement trigger probes. These allows triggering the capture of
debug information along a trace, ensuring all the relevant probes are
also triggered.
  • Loading branch information
P403n1x87 committed Oct 5, 2024
1 parent e0dd63b commit 766843f
Show file tree
Hide file tree
Showing 10 changed files with 228 additions and 48 deletions.
3 changes: 3 additions & 0 deletions ddtrace/contrib/trace_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from ddtrace.internal.compat import ensure_text
from ddtrace.internal.compat import ip_is_global
from ddtrace.internal.compat import parse
from ddtrace.internal.core.event_hub import dispatch
from ddtrace.internal.logger import get_logger
from ddtrace.internal.utils.cache import cached
from ddtrace.internal.utils.http import normalize_header_name
Expand Down Expand Up @@ -574,6 +575,8 @@ def activate_distributed_headers(tracer, int_config=None, request_headers=None,
# have a context with the same trace id active
tracer.context_provider.activate(context)

dispatch("distributed_context.activated", (context,))


def _flatten(
obj, # type: Any
Expand Down
22 changes: 22 additions & 0 deletions ddtrace/debugging/_live.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import typing as t

from ddtrace.debugging._session import Session
from ddtrace.internal import core


def handle_distributed_context(context: t.Any) -> None:
debug_tag = context._meta.get("_dd.p.debug")
if debug_tag is None:
return

ident, _, level = debug_tag.partition(":")

Session(ident=ident, level=int(level)).link_to_trace(context)


def enable() -> None:
core.on("distributed_context.activated", handle_distributed_context, "live_debugger")


def disable() -> None:
core.reset_listeners("distributed_context.activated", handle_distributed_context)
109 changes: 63 additions & 46 deletions ddtrace/debugging/_origin/span.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,27 @@
from dataclasses import dataclass
from functools import partial
from itertools import count
from pathlib import Path
import sys

# from threading import current_thread
from threading import current_thread
from types import FrameType
from types import FunctionType
import typing as t
import uuid

import ddtrace

# from ddtrace import config
from ddtrace._trace.processor import SpanProcessor

# from ddtrace.debugging._debugger import Debugger
from ddtrace.debugging._probe.model import DEFAULT_CAPTURE_LIMITS
from ddtrace.debugging._probe.model import LiteralTemplateSegment
from ddtrace.debugging._probe.model import LogFunctionProbe
from ddtrace.debugging._probe.model import LogLineProbe
from ddtrace.debugging._probe.model import ProbeEvaluateTimingForMethod
from ddtrace.debugging._session import Session
from ddtrace.debugging._signal.collector import SignalCollector
from ddtrace.debugging._signal.collector import SignalContext

# from ddtrace.debugging._signal.snapshot import Snapshot
from ddtrace.debugging._signal.snapshot import Snapshot
from ddtrace.debugging._uploader import LogsIntakeUploaderV1
from ddtrace.debugging._uploader import UploaderProduct
from ddtrace.ext import EXIT_SPAN_TYPES
from ddtrace.internal import compat
from ddtrace.internal import core
Expand All @@ -40,13 +39,13 @@ def frame_stack(frame: FrameType) -> t.Iterator[FrameType]:
_frame = _frame.f_back


def wrap_entrypoint(f: t.Callable) -> None:
def wrap_entrypoint(collector: SignalCollector, f: t.Callable) -> None:
if not _isinstance(f, FunctionType):
return

_f = t.cast(FunctionType, f)
if not EntrySpanWrappingContext.is_wrapped(_f):
EntrySpanWrappingContext(_f).wrap()
EntrySpanWrappingContext(collector, _f).wrap()


@dataclass
Expand Down Expand Up @@ -120,9 +119,11 @@ class EntrySpanLocation:


class EntrySpanWrappingContext(WrappingContext):
def __init__(self, f):
def __init__(self, collector: SignalCollector, f: FunctionType) -> None:
super().__init__(f)

self.collector = collector

filename = str(Path(f.__code__.co_filename).resolve())
name = f.__qualname__
module = f.__module__
Expand Down Expand Up @@ -152,25 +153,26 @@ def __enter__(self):
s.set_tag_str("_dd.code_origin.frames.0.type", location.module)
s.set_tag_str("_dd.code_origin.frames.0.method", location.name)

# TODO[gab]: This will be enabled as part of the live debugger/distributed debugging
# if ld_config.enabled:
# # Create a snapshot
# snapshot = Snapshot(
# probe=location.probe,
# frame=self.__frame__,
# thread=current_thread(),
# trace_context=root,
# )
# Check if we have a debugging session running for the current trace
session = Session.get_for_trace()
if session is not None and session.level >= 2:
# Create a snapshot
snapshot = Snapshot(
probe=location.probe,
frame=self.__frame__,
thread=current_thread(),
trace_context=root,
)

# # Capture on entry
# context = Debugger.get_collector().attach(snapshot)
# Capture on entry
context = self.collector.attach(snapshot)

# # Correlate the snapshot with the span
# root.set_tag_str("_dd.code_origin.frames.0.snapshot_id", snapshot.uuid)
# span.set_tag_str("_dd.code_origin.frames.0.snapshot_id", snapshot.uuid)
# Correlate the snapshot with the span
root.set_tag_str("_dd.code_origin.frames.0.snapshot_id", snapshot.uuid)
span.set_tag_str("_dd.code_origin.frames.0.snapshot_id", snapshot.uuid)

# self.set("context", context)
# self.set("start_time", compat.monotonic_ns())
self.set("context", context)
self.set("start_time", compat.monotonic_ns())

return self

Expand All @@ -194,6 +196,8 @@ def __exit__(self, exc_type, exc_value, traceback):

@dataclass
class SpanCodeOriginProcessor(SpanProcessor):
__uploader__ = LogsIntakeUploaderV1

_instance: t.Optional["SpanCodeOriginProcessor"] = None

def on_span_start(self, span: Span) -> None:
Expand All @@ -219,24 +223,25 @@ def on_span_start(self, span: Span) -> None:
# DEV: Without a function object we cannot infer the function
# and any potential class name.

# TODO[gab]: This will be enabled as part of the live debugger/distributed debugging
# if ld_config.enabled:
# # Create a snapshot
# snapshot = Snapshot(
# probe=ExitSpanProbe.from_frame(frame),
# frame=frame,
# thread=current_thread(),
# trace_context=span,
# )
# Check if we have a debugging session running for the current trace
session = Session.get_for_trace()
if session is not None and session.level >= 2:
# Create a snapshot
snapshot = Snapshot(
probe=ExitSpanProbe.from_frame(frame),
frame=frame,
thread=current_thread(),
trace_context=span,
)

# # Capture on entry
# snapshot.line()
# Capture on entry
snapshot.line()

# # Collect
# Debugger.get_collector().push(snapshot)
# Collect
self.__uploader__.get_collector().push(snapshot)

# # Correlate the snapshot with the span
# span.set_tag_str(f"_dd.code_origin.frames.{n}.snapshot_id", snapshot.uuid)
# Correlate the snapshot with the span
span.set_tag_str(f"_dd.code_origin.frames.{n}.snapshot_id", snapshot.uuid)

def on_span_finish(self, span: Span) -> None:
pass
Expand All @@ -246,17 +251,29 @@ def enable(cls):
if cls._instance is not None:
return

core.on("service_entrypoint.patch", wrap_entrypoint)

instance = cls._instance = cls()

# Register code origin for span with the snapshot uploader
cls.__uploader__.register(UploaderProduct.CODE_ORIGIN_SPAN)

# Register the processor for exit spans
instance.register()

# Register the entrypoint wrapping for entry spans
core.on("service_entrypoint.patch", partial(wrap_entrypoint, cls.__uploader__.get_collector()))

@classmethod
def disable(cls):
if cls._instance is None:
return

# Unregister the entrypoint wrapping for entry spans
core.reset_listeners("service_entrypoint.patch", wrap_entrypoint)

# Unregister the processor for exit spans
cls._instance.unregister()
cls._instance = None

core.reset_listeners("service_entrypoint.patch", wrap_entrypoint)
# Unregister code origin for span with the snapshot uploader
cls.__uploader__.unregister(UploaderProduct.CODE_ORIGIN_SPAN)

cls._instance = None
31 changes: 31 additions & 0 deletions ddtrace/debugging/_products/live_debugger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from ddtrace.settings.live_debugging import config


# TODO[gab]: Uncomment when the product is ready
# requires = ["tracer"]


def post_preload():
pass


def start() -> None:
if config.enabled:
from ddtrace.debugging._live import enable

enable()


def restart() -> None:
pass


def stop(join: bool = False):
if config.enabled:
from ddtrace.debugging._live import disable

disable()


def at_exit(join: bool = False):
stop(join=join)
34 changes: 34 additions & 0 deletions ddtrace/debugging/_session.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from dataclasses import dataclass
import typing as t
from weakref import WeakKeyDictionary as wkdict

from ddtrace import tracer


@dataclass
class Session:
ident: str
level: int

def link_to_trace(self, trace_context: t.Optional[t.Any] = None):
SessionManager.link_session_to_trace(self, trace_context)

@classmethod
def get_for_trace(cls) -> t.Optional["Session"]:
return SessionManager.get_session_for_trace()


class SessionManager:
_session_trace_map: wkdict = wkdict() # Trace context to Session mapping

@classmethod
def link_session_to_trace(cls, session, trace_context: t.Optional[t.Any] = None) -> None:
cls._session_trace_map[trace_context or tracer.current_root_span()] = session

@classmethod
def get_session_for_trace(cls) -> t.Optional[Session]:
root = tracer.current_root_span()
if root is None:
return None

return cls._session_trace_map.get(root.context or root)
1 change: 1 addition & 0 deletions ddtrace/debugging/_uploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class UploaderProduct(str, Enum):

DEBUGGER = "dynamic_instrumentation"
EXCEPTION_REPLAY = "exception_replay"
CODE_ORIGIN_SPAN = "code_origin.span"


class LogsIntakeUploaderV1(ForksafeAwakeablePeriodicService):
Expand Down
16 changes: 16 additions & 0 deletions ddtrace/settings/live_debugging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from envier import En


class LiveDebuggerConfig(En):
__prefix__ = "dd.live_debugging"

enabled = En.v(
bool,
"enabled",
default=False,
help_type="Boolean",
help="Enable the live debugger.",
)


config = LiveDebuggerConfig()
6 changes: 6 additions & 0 deletions docs/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -671,3 +671,9 @@ Exception Replay
----------------

.. ddtrace-envier-configuration:: ddtrace.settings.exception_replay:ExceptionReplayConfig


Live Debugging
--------------

.. ddtrace-envier-configuration:: ddtrace.settings.live_debugging:LiveDebuggerConfig
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ ddtrace = "ddtrace.contrib.pytest.plugin"
"ddtrace.pytest_benchmark" = "ddtrace.contrib.pytest_benchmark.plugin"

[project.entry-points.'ddtrace.products']
"live-debugger" = "ddtrace.debugging._products.live_debugger"
"remote-configuration" = "ddtrace.internal.remoteconfig.product"

[project.urls]
Expand Down
Loading

0 comments on commit 766843f

Please sign in to comment.