Skip to content

Commit

Permalink
Properly manage JNI resources in JVMTI wallclock sampler
Browse files Browse the repository at this point in the history
  • Loading branch information
jbachorik committed Jan 14, 2025
1 parent fe6aba3 commit 027351d
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 57 deletions.
83 changes: 58 additions & 25 deletions ddprof-lib/src/main/cpp/wallClock.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,49 +161,83 @@ void WallClockJVMTI::timerLoop() {
// Attach to JVM as the first step
VM::attachThread("Datadog Profiler Wallclock Sampler");
auto collectThreads = [&](std::vector<ThreadEntry>& threads) {
jvmtiEnv* jvmti = VM::jvmti();
if (jvmti == nullptr) {
return;
}
JNIEnv* jni = VM::jni();

jint threads_count = 0;
jthread* threads_ptr = nullptr;
jvmti->GetAllThreads(&threads_count, &threads_ptr);
jvmtiEnv* jvmti = VM::jvmti();
if (jvmti == nullptr) {
return;
}
JNIEnv* jni = VM::jni();

bool do_filter = Profiler::instance()->threadFilter()->enabled();
int self = OS::threadId();
jint threads_count = 0;
jthread* threads_ptr = nullptr;
jvmti->GetAllThreads(&threads_count, &threads_ptr);

for (int i = 0; i < threads_count; i++) {
jthread thread = threads_ptr[i];
if (thread != nullptr) {
VMThread* nThread = VMThread::fromJavaThread(jni, thread);
if (nThread == nullptr) {
continue;
}
int tid = nThread->osThreadId();
if (tid != self && (!do_filter || Profiler::instance()->threadFilter()->accept(tid))) {
threads.push_back({nThread, thread});
}
bool do_filter = Profiler::instance()->threadFilter()->enabled();
int self = OS::threadId();
for (int i = 0; i < threads_count; i++) {
jthread thread = threads_ptr[i];
if (thread != nullptr) {
VMThread* nThread = VMThread::fromJavaThread(jni, thread);
if (nThread == nullptr) {
jni->DeleteLocalRef(thread);
continue;
}
jint thread_state;
if (jvmti->GetThreadState(thread, &thread_state) == JVMTI_ERROR_NONE &&
(thread_state & JVMTI_THREAD_STATE_TERMINATED) == 0) {
int tid = VMThread::nativeThreadId(jni, thread);
if (tid != -1 && tid != self && (!do_filter || Profiler::instance()->threadFilter()->accept(tid))) {
threads.push_back({nThread, jni->NewWeakGlobalRef(thread)});
}
}
jni->DeleteLocalRef(thread);
}
jvmti->Deallocate((unsigned char*)threads_ptr);
}
jvmti->Deallocate((unsigned char*)threads_ptr);
};

auto sampleThreads = [&](ThreadEntry& thread_entry, int& num_failures, int& threads_already_exited, int& permission_denied) {
static jint max_stack_depth = (jint)Profiler::instance()->max_stack_depth();
static jvmtiFrameInfo* frame_buffer = new jvmtiFrameInfo[max_stack_depth];
static jvmtiEnv* jvmti = VM::jvmti();
static JNIEnv* jni = VM::jni();

int num_frames = 0;
jvmtiError err = jvmti->GetStackTrace(thread_entry.java, 0, max_stack_depth, frame_buffer, &num_frames);
if (thread_entry.java_ref == nullptr) {
num_failures++;
return false;
}

jobject thread = jni->NewLocalRef(thread_entry.java_ref);
if (thread == nullptr) {
num_failures++;
jni->DeleteWeakGlobalRef(thread_entry.java_ref);
return false;
}

jint thread_state;
jvmtiError state_err = jvmti->GetThreadState(thread, &thread_state);
if (state_err != JVMTI_ERROR_NONE || (thread_state & JVMTI_THREAD_STATE_TERMINATED) != 0) {
num_failures++;
jni->DeleteWeakGlobalRef(thread_entry.java_ref);
jni->DeleteLocalRef(thread);
return false;
}
jvmtiError err = jvmti->GetStackTrace(thread, 0, max_stack_depth, frame_buffer, &num_frames);
// cleanup the reference(s) to the java thread
jni->DeleteWeakGlobalRef(thread_entry.java_ref);
jni->DeleteLocalRef(thread);

if (err != JVMTI_ERROR_NONE) {
num_failures++;
if (err == JVMTI_ERROR_THREAD_NOT_ALIVE) {
threads_already_exited++;
}
return false;
}
if (num_frames == 0) {
// some JVMTI attached threads are Java-like but have no stack; we can just ignore them
return true;
}
ExecutionEvent event;
VMThread* vm_thread = thread_entry.native;
int raw_thread_state = vm_thread->state();
Expand Down Expand Up @@ -267,6 +301,5 @@ void WallClockASGCT::timerLoop() {
}
return true;
};

timerLoopCommon<int>(collectThreads, sampleThreads, _reservoir_size, _interval);
}
69 changes: 37 additions & 32 deletions ddprof-lib/src/main/cpp/wallClock.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ class BaseWallClock : public Engine {
// Profiler::recordSample().
int _reservoir_size;

pthread_t _thread;
virtual void timerLoop() = 0;
virtual void initialize(Arguments& args) {};
pthread_t _thread;
virtual void timerLoop() = 0;
virtual void initialize(Arguments& args) {};

static void *threadEntry(void *wall_clock) {
((BaseWallClock *)wall_clock)->timerLoop();
Expand Down Expand Up @@ -76,38 +76,43 @@ class BaseWallClock : public Engine {

while (_running.load(std::memory_order_relaxed)) {
collectThreads(threads);
int size = threads.size();
if (threads.size() > 0) {
int num_failures = 0;
int threads_already_exited = 0;
int permission_denied = 0;
std::vector<ThreadType> sample = reservoir.sample(threads);
for (ThreadType thread : sample) {
if (!sampleThreads(thread, num_failures, threads_already_exited, permission_denied)) {
continue;
}
}

int num_failures = 0;
int threads_already_exited = 0;
int permission_denied = 0;
std::vector<ThreadType> sample = reservoir.sample(threads);
for (ThreadType thread : sample) {
if (!sampleThreads(thread, num_failures, threads_already_exited, permission_denied)) {
continue;
epoch.updateNumSamplableThreads(threads.size());
epoch.updateNumFailedSamples(num_failures);
epoch.updateNumSuccessfulSamples(sample.size() - num_failures);
epoch.updateNumExitedThreads(threads_already_exited);
epoch.updateNumPermissionDenied(permission_denied);
u64 endTime = TSC::ticks();
u64 duration = TSC::ticks_to_millis(endTime - startTime);
if (epoch.hasChanged() || duration >= 1000) {
epoch.endEpoch(duration);
Profiler::instance()->recordWallClockEpoch(self, &epoch);
epoch.newEpoch(endTime);
startTime = endTime;
} else {
epoch.clean();
}
}

epoch.updateNumSamplableThreads(threads.size());
epoch.updateNumFailedSamples(num_failures);
epoch.updateNumSuccessfulSamples(sample.size() - num_failures);
epoch.updateNumExitedThreads(threads_already_exited);
epoch.updateNumPermissionDenied(permission_denied);
u64 endTime = TSC::ticks();
u64 duration = TSC::ticks_to_millis(endTime - startTime);
if (epoch.hasChanged() || duration >= 1000) {
epoch.endEpoch(duration);
Profiler::instance()->recordWallClockEpoch(self, &epoch);
epoch.newEpoch(endTime);
startTime = endTime;
} else {
epoch.clean();
threads.clear();
}

threads.clear();
// Get a random sleep duration
// clamp the random interval to <1,2N-1>
// the probability of clamping is extremely small, close to zero
OS::sleep(std::min(std::max((long int)1, static_cast<long int>(distribution(generator))), ((_interval * 2) - 1)));
// restrict the random interval to <N/2,2N-1>
long int delay = _interval;
do {
delay = static_cast<long int>(distribution(generator));
} while (delay < interval / 2 || delay > 2 * interval);
OS::sleep(delay);
}
}

Expand Down Expand Up @@ -159,8 +164,8 @@ class WallClockJVMTI : public BaseWallClock {
void timerLoop() override;
public:
struct ThreadEntry {
VMThread* native;
jthread java;
VMThread* native;
jobject java_ref;
};
WallClockJVMTI() : BaseWallClock() {}
const char* name() override {
Expand Down

0 comments on commit 027351d

Please sign in to comment.