From bd9bb367b79d961c4cd75d8992df625cb709c1ca Mon Sep 17 00:00:00 2001 From: Howard Huang Date: Thu, 30 Mar 2023 11:33:14 -0700 Subject: [PATCH] Allow ports to be reused in gloo (#97677) Summary: Pull Request resolved: https://github.com/pytorch/pytorch/pull/97677 X-link: https://github.com/facebookincubator/gloo/pull/353 ProcessGroupGloo and gloo seem to be opening and closing sockets without allowing the port to be reused. We see this issue pop up in larger training jobs "Address already in use" and we assume it to be because all the ephemeral ports are exhausted. This diff allows ports to be reused, we see a reduced number of ports being in `TIME_WAIT` state. context: https://fb.workplace.com/groups/319878845696681/permalink/5988899781205532/ another issue: https://fb.workplace.com/groups/319878845696681/permalink/958768178474408/ Test Plan: Add a gloo test to create 4 groups of size 64 using multithreaded PG + gloo. In total 256 ranks. Differential Revision: D44029927 fbshipit-source-id: 9c31c38485333602c33e12c12813bea33ccb9438 --- test/distributed/test_multi_threaded_pg.py | 29 +++++++++++++++++++ .../distributed/c10d/ProcessGroupGloo.cpp | 18 ++++++++++++ 2 files changed, 47 insertions(+) diff --git a/test/distributed/test_multi_threaded_pg.py b/test/distributed/test_multi_threaded_pg.py index 36334b0d0ffc95..3f73c64b6b01c8 100644 --- a/test/distributed/test_multi_threaded_pg.py +++ b/test/distributed/test_multi_threaded_pg.py @@ -220,5 +220,34 @@ def test_gather(self): for i in range(self.world_size): self.assertEqual(gather_list[i], torch.ones(3, 3) * i) +class TestLargeWorld(MultiThreadedTestCase): + @property + def world_size(self): + return 64 + + def setUp(self): + super().setUp() + self._spawn_threads() + + def test_gloo_init(self): + groups = [] + num_ports_used = 0 + num_groups = 4 + # create multiple gloo groups with 64 ranks + for i in range(num_groups): + group = dist.new_group(backend="gloo") + groups.append(group) + + # tear down gloo groups + for i in range(num_groups): + dist.destroy_process_group(groups[i]) + groups.clear() + self.assertEqual(len(groups), 0) + + # create multiple gloo groups with 64 ranks + for i in range(num_groups): + group = dist.new_group(backend="gloo") + groups.append(group) + if __name__ == "__main__": run_tests() diff --git a/torch/csrc/distributed/c10d/ProcessGroupGloo.cpp b/torch/csrc/distributed/c10d/ProcessGroupGloo.cpp index 9ebb35afccc5bd..a6caf6167d1f30 100644 --- a/torch/csrc/distributed/c10d/ProcessGroupGloo.cpp +++ b/torch/csrc/distributed/c10d/ProcessGroupGloo.cpp @@ -638,6 +638,24 @@ bool doesHostnameResolveToUsableAddress(const std::string& hostname) { struct addrinfo* rp = nullptr; for (rp = result; rp != nullptr; rp = rp->ai_next) { auto fd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol); + + // Set SO_REUSEADDR to signal that reuse of the listening port is OK. + int on = 1; + rv = setsockopt( + fd, + SOL_SOCKET, + SO_REUSEADDR, + reinterpret_cast(&on), + sizeof(on)); + if (rv == -1) { +#ifdef _WIN32 + closesocket(fd); +#else + close(fd); +#endif + logAndThrow("setsockopt: ", strerror(errno)); + } + if (fd == -1) { continue; }