diff --git a/libkineto/src/CuptiActivityProfiler.cpp b/libkineto/src/CuptiActivityProfiler.cpp index 8e68ac6a2..0f742d4ee 100644 --- a/libkineto/src/CuptiActivityProfiler.cpp +++ b/libkineto/src/CuptiActivityProfiler.cpp @@ -208,6 +208,12 @@ std::ostream& operator<<( return oss; } +CuptiActivityProfiler::~CuptiActivityProfiler() { + if (collectTraceThread_ && collectTraceThread_->joinable()) { + collectTraceThread_->join(); + } +} + void CuptiActivityProfiler::transferCpuTrace( std::unique_ptr cpuTrace) { std::lock_guard guard(mutex_); @@ -1120,6 +1126,33 @@ void CuptiActivityProfiler::configure( currentRunloopState_ = RunloopState::Warmup; } +void CuptiActivityProfiler::collectTrace( + bool collection_done, + const std::chrono::time_point& now) { + if (libkineto::api().client()) { + libkineto::api().client()->stop(); + } + +#if defined(HAS_CUPTI) || defined(HAS_ROCTRACER) + if (cupti_.stopCollection) { + ecs_.cupti_stopped_early = cupti_.stopCollection; + LOG(ERROR) + << "State: CollectTrace stopped by CUPTI. (Buffer size configured is " + << config_->activitiesMaxGpuBufferSize() / 1024 / 1024 << "MB)"; + } +#endif // HAS_CUPTI || HAS_ROCTRACER + std::lock_guard guard(mutex_); + stopTraceInternal(now); + VLOG_IF(0, collection_done) << "Reached profile end time"; + UST_LOGGER_MARK_COMPLETED(kCollectionStage); +} + +void CuptiActivityProfiler::ensureCollectTraceDone() { + if (collectTraceThread_ && collectTraceThread_->joinable()) { + collectTraceThread_->join(); + collectTraceThread_.reset(nullptr); + } +} void CuptiActivityProfiler::toggleCollectionDynamic(const bool enable) { #ifdef HAS_CUPTI if (enable) { @@ -1266,26 +1299,26 @@ const time_point CuptiActivityProfiler::performRunLoopStep( ) { // Update runloop state first to prevent further updates to shared state LOG(INFO) << "Tracing complete."; - VLOG_IF(1, currentIter > 0) + VLOG_IF(1, currentIter >= 0) << "This state change was invoked by application's step() call"; - if (libkineto::api().client()) { - libkineto::api().client()->stop(); - } - -#if defined(HAS_CUPTI) || defined(HAS_ROCTRACER) - if (cupti_.stopCollection) { - ecs_.cupti_stopped_early = cupti_.stopCollection; - LOG(ERROR) - << "State: CollectTrace stopped by CUPTI. (Buffer size configured is " - << config_->activitiesMaxGpuBufferSize() / 1024 / 1024 << "MB)"; + // currentIter >= 0 means this is an iteration-based collection, + // triggered by pytorch main thread, it should be executed in another + // thread in case pytorch main thread is blocked + if (currentIter >= 0) { + // if collectTraceThread_ is already running, there's no need to + // execute collectTrace twice. + if (!collectTraceThread_) { + std::lock_guard guard(mutex_); + collectTraceThread_ = std::make_unique( + &CuptiActivityProfiler::collectTrace, + this, + collection_done, + now); + } + break; } -#endif // HAS_CUPTI || HAS_ROCTRACER - - std::lock_guard guard(mutex_); - stopTraceInternal(now); - VLOG_IF(0, collection_done) << "Reached profile end time"; - UST_LOGGER_MARK_COMPLETED(kCollectionStage); + collectTrace(collection_done, now); } else if (derivedConfig_->isProfilingByIteration()) { // nothing to do here } else if ( @@ -1305,6 +1338,10 @@ const time_point CuptiActivityProfiler::performRunLoopStep( if (currentIter >= 0) { return new_wakeup_time; } + + // Before processing, we should wait for collectTrace thread to be done. + ensureCollectTraceDone(); + // FIXME: Probably want to allow interruption here // for quickly handling trace request via synchronous API std::lock_guard guard(mutex_); diff --git a/libkineto/src/CuptiActivityProfiler.h b/libkineto/src/CuptiActivityProfiler.h index 01ca5cb2e..d47bba33b 100644 --- a/libkineto/src/CuptiActivityProfiler.h +++ b/libkineto/src/CuptiActivityProfiler.h @@ -121,7 +121,7 @@ class CuptiActivityProfiler { CuptiActivityProfiler(RoctracerActivityApi& rai, bool cpuOnly); CuptiActivityProfiler(const CuptiActivityProfiler&) = delete; CuptiActivityProfiler& operator=(const CuptiActivityProfiler&) = delete; - + ~CuptiActivityProfiler(); bool isActive() const { return currentRunloopState_ != RunloopState::WaitForRequest; } @@ -170,6 +170,13 @@ class CuptiActivityProfiler { stopTraceInternal(now); } + // Collect CPU and GPU traces + void collectTrace( + bool collectionDone, + const std::chrono::time_point& now); + + // Ensure collectTrace is done + void ensureCollectTraceDone(); // Process CPU and GPU traces void processTrace(ActivityLogger& logger) { std::lock_guard guard(mutex_); @@ -483,6 +490,11 @@ class CuptiActivityProfiler { // Mutex to protect non-atomic access to below state std::recursive_mutex mutex_; + // Add a thread to collect both cpu and gpu traces in case torch main thread + // is blocked when profiling by iterations is enabled. Issue #953 shows + // details. + std::unique_ptr collectTraceThread_{nullptr}; + // Runloop phase std::atomic currentRunloopState_{RunloopState::WaitForRequest}; diff --git a/libkineto/test/CuptiActivityProfilerTest.cpp b/libkineto/test/CuptiActivityProfilerTest.cpp index eef9588d4..713a8b83b 100644 --- a/libkineto/test/CuptiActivityProfilerTest.cpp +++ b/libkineto/test/CuptiActivityProfilerTest.cpp @@ -443,8 +443,8 @@ TEST(CuptiActivityProfiler, AsyncTraceUsingIter) { EXPECT_TRUE(profiler.isActive()); auto nextnext = next + milliseconds(1000); - profiler.performRunLoopStep(nextnext, nextnext); + profiler.ensureCollectTraceDone(); profiler.performRunLoopStep(nextnext, nextnext); // Assert that tracing has completed