Skip to content

Commit

Permalink
[PGNCCL] Watchdog prints call-time traceback when reporting timeout (p…
Browse files Browse the repository at this point in the history
…ytorch#139659)

### Motivation
Today, watchdog only reports that it found a collective timeout:
```
[rank1]:[E1104 14:02:18.767594328 ProcessGroupNCCL.cpp:688] [Rank 1] Watchdog caught collective operation timeout: WorkNCCL(SeqNum=1, OpType=ALLREDUCE, NumelIn=200, NumelOut=200, Timeout(ms)=5000) ran for 5096 milliseconds before timing out.
```
While this is nice, it is hard to associate the error with user's program or library stack.

### This PR
This PR gives watchdog the ability to report the call-time stack of the collective, so that it would be easier to track the error back to the program's behavior.

The call-time stack was recorded by Flight Recorder with minimal overhead (for details, please read this [doc](https://dev-discuss.pytorch.org/t/fast-combined-c-python-torchscript-inductor-tracebacks/1158) written by @zdevito ). In `ProcessGroupNCCL`, we are only tracking / reporting the python part so that it fits most PyTorch users.

### Demo
[stack_demo.py](https://gist.github.com/kwen2501/6758e18d305d67fc6f3f926217825c09).

```
TORCH_NCCL_TRACE_BUFFER_SIZE=100 torchrun --nproc-per-node 2 stack_demo.py
```
`TORCH_NCCL_TRACE_BUFFER_SIZE` is for turning on the Flight Recorder.

Output:
```
[rank0]:[E1104 14:19:27.591610653 ProcessGroupNCCL.cpp:695] Stack trace of the timedout collective operation:
#0 all_reduce from /data/users/kw2501/pytorch/torch/distributed/distributed_c10d.py:2696
#1 wrapper from /data/users/kw2501/pytorch/torch/distributed/c10d_logger.py:83
#2 bar from /data/users/kw2501/sync_async/repro.py:15
#3 foo from /data/users/kw2501/sync_async/repro.py:24
#4 main from /data/users/kw2501/sync_async/repro.py:34
pytorch#5 <module> from /data/users/kw2501/sync_async/repro.py:40

[rank1]:[E1104 14:19:27.771430164 ProcessGroupNCCL.cpp:695] Stack trace of the timedout collective operation:
#0 all_gather_into_tensor from /data/users/kw2501/pytorch/torch/distributed/distributed_c10d.py:3630
#1 wrapper from /data/users/kw2501/pytorch/torch/distributed/c10d_logger.py:83
#2 baz from /data/users/kw2501/sync_async/repro.py:20
#3 foo from /data/users/kw2501/sync_async/repro.py:26
#4 main from /data/users/kw2501/sync_async/repro.py:34
pytorch#5 <module> from /data/users/kw2501/sync_async/repro.py:40
```

From the log above, we can tell that `bar()` and `baz()` are the places where the two ranks divert.

Pull Request resolved: pytorch#139659
Approved by: https://github.com/wconstab, https://github.com/fduwjj
  • Loading branch information
kwen2501 authored and pytorchmergebot committed Nov 5, 2024
1 parent ee42a99 commit 5f2ed50
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 0 deletions.
44 changes: 44 additions & 0 deletions torch/csrc/distributed/c10d/NCCLUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,33 @@ void DebugInfoWriter::registerWriter(std::unique_ptr<DebugInfoWriter> writer) {
writer_ = std::move(writer);
}

// Returns the traceback of current entry, in string form.
// Note: `getTraceback` invokes `torch::symbolize`, which may need to acquire
// the GIL. If you don't want to block the current thread or take the risk of a
// GIL deadlock, you can use an asynchronous calling mechanism like std::async.
std::string NCCLTraceBuffer::Entry::getTraceback() {
torch::CapturedTraceback* traceback = traceback_.get();
torch::SymbolizedTracebacks s_tbs = torch::symbolize({traceback});
// We use 0 because we only have one traceback here.
const auto& s_tb = s_tbs.tracebacks.at(0);
std::stringstream oss;
for (auto idx : c10::irange(s_tb.size())) {
auto frame_id = s_tb[idx];
const auto& frame = s_tbs.all_frames.at(frame_id);
oss << "#" << idx << " " << frame.funcname << " from " << frame.filename
<< ":" << frame.lineno << '\n';
}
/* Resulted format is like:
#0 all_reduce from pytorch/torch/distributed/distributed_c10d.py:2696
#1 wrapper from pytorch/torch/distributed/c10d_logger.py:83
#2 bar from /home/user/repro.py:15
#3 foo from /home/user/repro.py:24
#4 main from /home/user/repro.py:34
#5 <module> from /home/user/repro.py:40
*/
return oss.str();
}

std::optional<size_t> NCCLTraceBuffer::record(
size_t pg_id,
const std::tuple<std::string, std::string>& pg_name,
Expand Down Expand Up @@ -495,6 +522,23 @@ std::vector<NCCLTraceBuffer::Entry> NCCLTraceBuffer::dump_entries() {
return result;
}

// Returns the entry with the given id, if it exists. Otherwise, returns
// std::nullopt.
std::optional<NCCLTraceBuffer::Entry> NCCLTraceBuffer::getEntry(
std::optional<size_t> id) {
if (!enabled_ || !id) {
return std::nullopt;
}

std::unique_lock<std::mutex> guard(mutex_);
Entry entry = entries_.at(*id % max_entries_);
if (entry.id_ == *id) {
return entry;
} else {
return std::nullopt;
}
}

void NCCLTraceBuffer::retire_id(
std::optional<size_t> id,
bool compute_duration) {
Expand Down
7 changes: 7 additions & 0 deletions torch/csrc/distributed/c10d/NCCLUtils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,9 @@ struct NCCLTraceBuffer {
c10::SmallVector<int64_t, 8> sizes_; // flattened from inputs, outputs
bool retired_ = false; // is this work entry no longer in the workMetaList_?
// a retired but not completed event has timed out

// Returns the traceback of current entry, in string form.
std::string getTraceback();
};

bool enabled_ = false;
Expand Down Expand Up @@ -699,6 +702,10 @@ struct NCCLTraceBuffer {

std::vector<Entry> dump_entries();

// Returns the entry with the given id, if it exists. Otherwise, returns
// std::nullopt.
std::optional<Entry> getEntry(std::optional<size_t> id);

/*
Mark an Event as completed and free its events.
This is called by the watchdog thread, and is asynchronous from the
Expand Down
28 changes: 28 additions & 0 deletions torch/csrc/distributed/c10d/ProcessGroupNCCL.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -690,6 +690,34 @@ bool ProcessGroupNCCL::WorkNCCL::checkTimeout(
" milliseconds before timing out.");

LOG(ERROR) << exceptionMsg;

// Get the stack trace of the work at call time
// First step we get the corresponding record entry from FR, based on work's
// trace_id_
std::optional<NCCLTraceBuffer::Entry> entry =
NCCLTraceBuffer::get()->getEntry(trace_id_);
if (entry.has_value()) {
auto entryVal = entry.value();
// Get stack trace from FR entry, in string format
// Note: `getTraceback` call below invokes `torch::symbolize`, which may
// need to acquire the GIL. In order for watchdog to be block-free, we make
// the call with std::async.
auto future = std::async(
std::launch::async, [&entryVal]() { return entryVal.getTraceback(); });
// Wait for the future to complete or timeout
auto status = future.wait_for(std::chrono::seconds(8));
if (status == std::future_status::ready) {
std::string tracebackStr = future.get();
LOG(ERROR) << "Stack trace of the timedout collective operation: \n"
<< tracebackStr;
} // else, symbolizer probably timed out, we skip logging the stack trace.
} else {
LOG(ERROR)
<< "Stack trace of the timedout collective not found, "
<< "potentially because FlightRecorder is disabled. "
<< "You can enable it by setting TORCH_NCCL_TRACE_BUFFER_SIZE to a non-zero value.";
}

std::exception_ptr exception_ptr =
std::make_exception_ptr(C10_BUILD_ERROR(DistBackendError, exceptionMsg));
setException(exception_ptr);
Expand Down

0 comments on commit 5f2ed50

Please sign in to comment.