Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(profiling): associate spans with task samples #11493

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions ddtrace/internal/datadog/profiling/stack_v2/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ def link_span(span: typing.Optional[typing.Union[context.Context, ddspan.Span]])
span_id = span.span_id
local_root_span_id = span._local_root.span_id
local_root_span_type = span._local_root.span_type
print(
"link_span called with span_id: %s, local_root_span_id: %s, local_root_span_type: %s"
% (span_id, local_root_span_id, local_root_span_type)
)
_stack_v2.link_span(span_id, local_root_span_id, local_root_span_type) # type: ignore # noqa: F405

except Exception as e:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,14 @@ StackRenderer::render_thread_begin(PyThreadState* tstate,

const std::optional<Span> active_span = ThreadSpanLinks::get_instance().get_active_span_from_thread_id(thread_id);
if (active_span) {
std::cout << "ACTIVE SPAN FOUND thread_id: " << thread_id << " span_id: " << active_span->span_id
<< " local_root_span_id: " << active_span->local_root_span_id
<< " span_type: " << active_span->span_type << std::endl;
ddup_push_span_id(sample, active_span->span_id);
ddup_push_local_root_span_id(sample, active_span->local_root_span_id);
ddup_push_trace_type(sample, std::string_view(active_span->span_type));
} else {
std::cout << "thread_id: " << thread_id << " no active span" << std::endl;
}
}

Expand Down Expand Up @@ -89,6 +94,22 @@ StackRenderer::render_task_begin(std::string_view name)
ddup_push_walltime(sample, thread_state.wall_time_ns, 1);
ddup_push_cputime(sample, thread_state.cpu_time_ns, 1); // initialized to 0, so possibly a no-op
ddup_push_monotonic_ns(sample, thread_state.now_time_ns);

std::cout << "render_task_begin " << name << " thread_state.id " << thread_state.id << std::endl;
// We also want to make sure the tid -> span_id mapping is present in the sample for the task
const std::optional<Span> active_span =
ThreadSpanLinks::get_instance().get_active_span_from_thread_id(thread_state.id);
if (active_span) {
std::cout << "render_task_begin ACTIVE SPAN FOUND thread_id: " << thread_state.id
<< " span_id: " << active_span->span_id
<< " local_root_span_id: " << active_span->local_root_span_id
<< " span_type: " << active_span->span_type << std::endl;
ddup_push_span_id(sample, active_span->span_id);
ddup_push_local_root_span_id(sample, active_span->local_root_span_id);
ddup_push_trace_type(sample, std::string_view(active_span->span_type));
} else {
std::cout << "render_task_begin thread_id: " << thread_state.id << " no active span" << std::endl;
}
}

ddup_push_task_name(sample, name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ _stack_v2_link_span(PyObject* self, PyObject* args, PyObject* kwargs)
span_type = empty_string.c_str();
}

std::cout << "link_span thread_id: " << thread_id << " span_id: " << span_id
<< " local_root_span_id: " << local_root_span_id << " span_type: " << span_type << std::endl;

ThreadSpanLinks::get_instance().link_span(thread_id, span_id, local_root_span_id, std::string(span_type));

Py_RETURN_NONE;
Expand Down
2 changes: 2 additions & 0 deletions ddtrace/profiling/_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ def _(asyncio):

init_stack_v2 = config.stack.v2_enabled and stack_v2.is_available

print("asyncio.init_stack_v2", init_stack_v2)

@partial(wrap, sys.modules["asyncio.events"].BaseDefaultEventLoopPolicy.set_event_loop)
def _(f, args, kwargs):
loop = get_argument_value(args, kwargs, 1, "loop")
Expand Down
2 changes: 2 additions & 0 deletions ddtrace/profiling/collector/threading.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,14 @@ def _set_original(

# Also patch threading.Thread so echion can track thread lifetimes
def init_stack_v2():
print("in threading.init_stack_v2")
if config.stack.v2_enabled and stack_v2.is_available:
_thread_set_native_id = Thread._set_native_id
_thread_bootstrap_inner = Thread._bootstrap_inner

def thread_set_native_id(self, *args, **kswargs):
_thread_set_native_id(self, *args, **kswargs)
print("stack_v2.register_thread", self.ident, self.native_id, self.name)
stack_v2.register_thread(self.ident, self.native_id, self.name)

def thread_bootstrap_inner(self, *args, **kwargs):
Expand Down
8 changes: 6 additions & 2 deletions ddtrace/profiling/profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,10 @@ def __init__(
version: Optional[str] = None,
tracer: Any = ddtrace.tracer,
api_key: Optional[str] = None,
_stack_v2_enabled: Optional[bool] = None,
agentless: bool = profiling_config.agentless,
_memory_collector_enabled: bool = profiling_config.memory.enabled,
_stack_collector_enabled: bool = profiling_config.stack.enabled,
_stack_v2_enabled: bool = profiling_config.stack.v2_enabled,
_lock_collector_enabled: bool = profiling_config.lock.enabled,
enable_code_provenance: bool = profiling_config.code_provenance,
endpoint_collection_enabled: bool = profiling_config.endpoint_collection,
Expand All @@ -141,7 +141,10 @@ def __init__(
self.agentless: bool = agentless
self._memory_collector_enabled: bool = _memory_collector_enabled
self._stack_collector_enabled: bool = _stack_collector_enabled
self._stack_v2_enabled: bool = _stack_v2_enabled
self._stack_v2_enabled: bool = (
_stack_v2_enabled if _stack_v2_enabled is not None else profiling_config.stack.v2_enabled
)
print(profiling_config.stack.v2_enabled, _stack_v2_enabled, self._stack_v2_enabled)
self._lock_collector_enabled: bool = _lock_collector_enabled
self.enable_code_provenance: bool = enable_code_provenance
self.endpoint_collection_enabled: bool = endpoint_collection_enabled
Expand Down Expand Up @@ -313,6 +316,7 @@ def __post_init__(self):
r,
tracer=self.tracer,
endpoint_collection_enabled=self.endpoint_collection_enabled,
_stack_collector_v2_enabled=self._stack_v2_enabled,
)
)
LOG.debug("Profiling collector (stack) initialized")
Expand Down
2 changes: 1 addition & 1 deletion tests/profiling/collector/pprof_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@


UINT64_MAX = (1 << 64) - 1
DEBUG_TEST = False
DEBUG_TEST = True


# Clamp the value to the range [0, UINT64_MAX] as done in clamp_to_uint64_unsigned
Expand Down
8 changes: 8 additions & 0 deletions tests/profiling_v2/collector/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import pytest

import ddtrace


@pytest.fixture
def tracer():
return ddtrace.Tracer()
166 changes: 93 additions & 73 deletions tests/profiling_v2/collector/test_stack_asyncio.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,35 @@
import asyncio
import glob
import os
import sys
import time

import pytest

from ddtrace.internal.datadog.profiling import stack_v2
from ddtrace.profiling import _asyncio
from ddtrace.profiling import profiler
from ddtrace.settings.profiling import config
from tests.profiling.collector import _asyncio_compat
from tests.profiling.collector import pprof_utils


@pytest.mark.skipif(sys.version_info < (3, 8), reason="stack v2 is available only on 3.8+ as echion does")
def test_asyncio(monkeypatch):
pprof_output_prefix = "/tmp/test_asyncio"
monkeypatch.setattr(config.stack, "v2_enabled", True)
monkeypatch.setattr(config, "output_pprof", pprof_output_prefix)
@pytest.mark.subprocess(
env=dict(
DD_PROFILING_OUTPUT_PPROF="/tmp/test_stack_asyncio",
DD_PROFILING_STACK_V2_ENABLED="true",
),
out=None,
)
def test_asyncio():
import asyncio
import os
import sys
import time
import uuid

import pytest

from ddtrace import ext
from ddtrace import tracer
from ddtrace.internal.datadog.profiling import stack_v2
from ddtrace.profiling import _asyncio
from ddtrace.profiling import profiler
from tests.profiling.collector import _asyncio_compat
from tests.profiling.collector import pprof_utils

if sys.version_info[:2] == (3, 7):
pytest.skip("stack_v2 is not supported on Python 3.7")

assert stack_v2.is_available, stack_v2.failure_msg

Expand All @@ -36,16 +47,24 @@ async def hello():
await stuff()
return (t1, t2)

p = profiler.Profiler()
resource = str(uuid.uuid4())
span_type = ext.SpanTypes.WEB

p = profiler.Profiler(tracer=tracer)
assert p._profiler._stack_v2_enabled
p.start()
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
if _asyncio_compat.PY38_AND_LATER:
maintask = loop.create_task(hello(), name="main")
else:
maintask = loop.create_task(hello())

t1, t2 = loop.run_until_complete(maintask)
with tracer.trace("test_asyncio", resource=resource, span_type=span_type) as span:
span_id = span.span_id
local_root_span_id = span._local_root.span_id

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
if _asyncio_compat.PY38_AND_LATER:
maintask = loop.create_task(hello(), name="main")
else:
maintask = loop.create_task(hello())

t1, t2 = loop.run_until_complete(maintask)
p.stop()

t1_name = _asyncio._task_get_name(t1)
Expand All @@ -54,61 +73,62 @@ async def hello():
assert t1_name == "sleep 1"
assert t2_name == "sleep 2"

output_filename = pprof_output_prefix + "." + str(os.getpid())
output_filename = os.environ["DD_PROFILING_OUTPUT_PPROF"] + "." + str(os.getpid())

profile = pprof_utils.parse_profile(output_filename)

samples_with_span_id = pprof_utils.get_samples_with_label_key(profile, "span id")
assert len(samples_with_span_id) > 0
# print(samples_with_span_id)

# get samples with task_name
samples = pprof_utils.get_samples_with_label_key(profile, "task name")
# The next fails if stack_v2 is not properly configured with asyncio task
# tracking via ddtrace.profiling._asyncio
assert len(samples) > 0

# We'd like to check whether there exist samples with
# 1. task name label "main"
# - function name label "hello"
# - and line number is between
# 2. task name label t1_name or t2_name
# - function name label "stuff"
# And they all have thread name "MainThread"

checked_main = False
checked_t1 = False
checked_t2 = False

for sample in samples:
task_name_label = pprof_utils.get_label_with_key(profile.string_table, sample, "task name")
task_name = profile.string_table[task_name_label.str]

thread_name_label = pprof_utils.get_label_with_key(profile.string_table, sample, "thread name")
thread_name = profile.string_table[thread_name_label.str]

location_id = sample.location_id[0]
location = pprof_utils.get_location_with_id(profile, location_id)
line = location.line[0]
function = pprof_utils.get_function_with_id(profile, line.function_id)
function_name = profile.string_table[function.name]

if task_name == "main":
assert thread_name == "MainThread"
assert function_name == "hello"
checked_main = True
elif task_name == t1_name or task_name == t2_name:
assert thread_name == "MainThread"
assert function_name == "stuff"
if task_name == t1_name:
checked_t1 = True
if task_name == t2_name:
checked_t2 = True

assert checked_main
assert checked_t1
assert checked_t2

# cleanup output file
for f in glob.glob(pprof_output_prefix + ".*"):
try:
os.remove(f)
except Exception as e:
print("Error removing file: {}".format(e))
pass
# print(samples)

pprof_utils.assert_profile_has_sample(
profile,
samples,
expected_sample=pprof_utils.StackEvent(
thread_name="MainThread",
task_name="hello",
span_id=span_id,
local_root_span_id=local_root_span_id,
locations=[
pprof_utils.StackLocation(
function_name="hello", filename="test_stack_asyncio.py", line_no=hello.__code__.co_firstlineno + 3
)
],
),
)

# pprof_utils.assert_profile_has_sample(
# profile,
# samples,
# expected_sample=pprof_utils.StackEvent(
# thread_name="MainThread",
# task_name=t1_name,
# locations=[
# pprof_utils.StackLocation(
# function_name="stuff", filename="test_stack_asyncio.py", line_no=stuff.__code__.co_firstlineno + 3
# ),
# ],
# ),
# )

# pprof_utils.assert_profile_has_sample(
# profile,
# samples,
# expected_sample=pprof_utils.StackEvent(
# thread_name="MainThread",
# task_name=t2_name,
# locations=[
# pprof_utils.StackLocation(
# function_name="stuff", filename="test_stack_asyncio.py", line_no=stuff.__code__.co_firstlineno + 3
# ),
# ],
# ),
# )
Loading