Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make CollectTrace for profiling by iteration async #966

Closed
wants to merge 12 commits into from
71 changes: 54 additions & 17 deletions libkineto/src/CuptiActivityProfiler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,12 @@ std::ostream& operator<<(
return oss;
}

CuptiActivityProfiler::~CuptiActivityProfiler() {
if (collectTraceThread_ && collectTraceThread_->joinable()) {
collectTraceThread_->join();
}
}

void CuptiActivityProfiler::transferCpuTrace(
std::unique_ptr<libkineto::CpuTraceBuffer> cpuTrace) {
std::lock_guard<std::recursive_mutex> guard(mutex_);
Expand Down Expand Up @@ -1120,6 +1126,33 @@ void CuptiActivityProfiler::configure(
currentRunloopState_ = RunloopState::Warmup;
}

void CuptiActivityProfiler::collectTrace(
bool collection_done,
const std::chrono::time_point<std::chrono::system_clock>& now) {
if (libkineto::api().client()) {
libkineto::api().client()->stop();
}

#if defined(HAS_CUPTI) || defined(HAS_ROCTRACER)
if (cupti_.stopCollection) {
ecs_.cupti_stopped_early = cupti_.stopCollection;
LOG(ERROR)
<< "State: CollectTrace stopped by CUPTI. (Buffer size configured is "
<< config_->activitiesMaxGpuBufferSize() / 1024 / 1024 << "MB)";
}
#endif // HAS_CUPTI || HAS_ROCTRACER
std::lock_guard<std::recursive_mutex> guard(mutex_);
stopTraceInternal(now);
VLOG_IF(0, collection_done) << "Reached profile end time";
UST_LOGGER_MARK_COMPLETED(kCollectionStage);
}

void CuptiActivityProfiler::ensureCollectTraceDone() {
if (collectTraceThread_ && collectTraceThread_->joinable()) {
collectTraceThread_->join();
collectTraceThread_.reset(nullptr);
}
}
void CuptiActivityProfiler::toggleCollectionDynamic(const bool enable) {
#ifdef HAS_CUPTI
if (enable) {
Expand Down Expand Up @@ -1266,26 +1299,26 @@ const time_point<system_clock> CuptiActivityProfiler::performRunLoopStep(
) {
// Update runloop state first to prevent further updates to shared state
LOG(INFO) << "Tracing complete.";
VLOG_IF(1, currentIter > 0)
VLOG_IF(1, currentIter >= 0)
<< "This state change was invoked by application's step() call";

if (libkineto::api().client()) {
libkineto::api().client()->stop();
}

#if defined(HAS_CUPTI) || defined(HAS_ROCTRACER)
if (cupti_.stopCollection) {
ecs_.cupti_stopped_early = cupti_.stopCollection;
LOG(ERROR)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason why we removed this LOG? Doesn't appear in the new function

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those lines are added to the main branch after this pr. I'd rebase from master branch, and update this pr.
image

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh interesting, that diff was supposed to be only formatting hmm. I'll add it myself, thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

main...staugust:kineto:main2_new , I have reorganized the code on a new branch based on Kineto's main branch, hoping this will help.

<< "State: CollectTrace stopped by CUPTI. (Buffer size configured is "
<< config_->activitiesMaxGpuBufferSize() / 1024 / 1024 << "MB)";
// currentIter >= 0 means this is an iteration-based collection,
// triggered by pytorch main thread, it should be executed in another
// thread in case pytorch main thread is blocked
if (currentIter >= 0) {
// if collectTraceThread_ is already running, there's no need to
// execute collectTrace twice.
if (!collectTraceThread_) {
std::lock_guard<std::recursive_mutex> guard(mutex_);
collectTraceThread_ = std::make_unique<std::thread>(
&CuptiActivityProfiler::collectTrace,
this,
collection_done,
now);
}
break;
}
#endif // HAS_CUPTI || HAS_ROCTRACER

std::lock_guard<std::recursive_mutex> guard(mutex_);
stopTraceInternal(now);
VLOG_IF(0, collection_done) << "Reached profile end time";
UST_LOGGER_MARK_COMPLETED(kCollectionStage);
collectTrace(collection_done, now);
} else if (derivedConfig_->isProfilingByIteration()) {
// nothing to do here
} else if (
Expand All @@ -1305,6 +1338,10 @@ const time_point<system_clock> CuptiActivityProfiler::performRunLoopStep(
if (currentIter >= 0) {
return new_wakeup_time;
}

// Before processing, we should wait for collectTrace thread to be done.
ensureCollectTraceDone();

// FIXME: Probably want to allow interruption here
// for quickly handling trace request via synchronous API
std::lock_guard<std::recursive_mutex> guard(mutex_);
Expand Down
14 changes: 13 additions & 1 deletion libkineto/src/CuptiActivityProfiler.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ class CuptiActivityProfiler {
CuptiActivityProfiler(RoctracerActivityApi& rai, bool cpuOnly);
CuptiActivityProfiler(const CuptiActivityProfiler&) = delete;
CuptiActivityProfiler& operator=(const CuptiActivityProfiler&) = delete;

~CuptiActivityProfiler();
bool isActive() const {
return currentRunloopState_ != RunloopState::WaitForRequest;
}
Expand Down Expand Up @@ -170,6 +170,13 @@ class CuptiActivityProfiler {
stopTraceInternal(now);
}

// Collect CPU and GPU traces
void collectTrace(
bool collectionDone,
const std::chrono::time_point<std::chrono::system_clock>& now);

// Ensure collectTrace is done
void ensureCollectTraceDone();
// Process CPU and GPU traces
void processTrace(ActivityLogger& logger) {
std::lock_guard<std::recursive_mutex> guard(mutex_);
Expand Down Expand Up @@ -483,6 +490,11 @@ class CuptiActivityProfiler {
// Mutex to protect non-atomic access to below state
std::recursive_mutex mutex_;

// Add a thread to collect both cpu and gpu traces in case torch main thread
// is blocked when profiling by iterations is enabled. Issue #953 shows
// details.
std::unique_ptr<std::thread> collectTraceThread_{nullptr};

// Runloop phase
std::atomic<RunloopState> currentRunloopState_{RunloopState::WaitForRequest};

Expand Down
2 changes: 1 addition & 1 deletion libkineto/test/CuptiActivityProfilerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -443,8 +443,8 @@ TEST(CuptiActivityProfiler, AsyncTraceUsingIter) {
EXPECT_TRUE(profiler.isActive());

auto nextnext = next + milliseconds(1000);

profiler.performRunLoopStep(nextnext, nextnext);
profiler.ensureCollectTraceDone();
profiler.performRunLoopStep(nextnext, nextnext);

// Assert that tracing has completed
Expand Down
Loading