From e96f6b5ffcbeddd5f5c067d901b06fe3fc8317e6 Mon Sep 17 00:00:00 2001 From: Benjamin Brock Date: Wed, 28 Feb 2024 13:47:45 -0800 Subject: [PATCH 1/4] Update to avoid using contexts --- include/dr/shp/algorithms/exclusive_scan.hpp | 9 ++++---- include/dr/shp/algorithms/inclusive_scan.hpp | 3 +-- include/dr/shp/algorithms/sort.hpp | 24 ++++++++------------ include/dr/shp/distributed_vector.hpp | 5 ++-- 4 files changed, 18 insertions(+), 23 deletions(-) diff --git a/include/dr/shp/algorithms/exclusive_scan.hpp b/include/dr/shp/algorithms/exclusive_scan.hpp index 5e724499d0..8c1c008e4e 100644 --- a/include/dr/shp/algorithms/exclusive_scan.hpp +++ b/include/dr/shp/algorithms/exclusive_scan.hpp @@ -37,8 +37,8 @@ void exclusive_scan_impl_(ExecutionPolicy &&policy, R &&r, O &&o, U init, if constexpr (std::is_same_v, device_policy>) { - U *d_inits = sycl::malloc_device(rng::size(zipped_segments), - shp::devices()[0], shp::context()); + U *d_inits = + sycl::malloc_device(rng::size(zipped_segments), __detail::queue(0)); std::vector events; @@ -63,12 +63,11 @@ void exclusive_scan_impl_(ExecutionPolicy &&policy, R &&r, O &&o, U init, shp::copy(d_inits, d_inits + inits.size(), inits.data() + 1); - sycl::free(d_inits, shp::context()); + sycl::free(d_inits, __detail::queue(0)); inits[0] = init; - auto root = dr::shp::devices()[0]; - dr::shp::device_allocator allocator(dr::shp::context(), root); + dr::shp::device_allocator allocator(__detail::queue(0)); dr::shp::vector> partial_sums( std::size_t(zipped_segments.size()), allocator); diff --git a/include/dr/shp/algorithms/inclusive_scan.hpp b/include/dr/shp/algorithms/inclusive_scan.hpp index a6cdc828cb..9f2426925d 100644 --- a/include/dr/shp/algorithms/inclusive_scan.hpp +++ b/include/dr/shp/algorithms/inclusive_scan.hpp @@ -42,8 +42,7 @@ void inclusive_scan_impl_(ExecutionPolicy &&policy, R &&r, O &&o, std::vector events; - auto root = dr::shp::devices()[0]; - dr::shp::device_allocator allocator(dr::shp::context(), root); + dr::shp::device_allocator allocator(__detail::queue(0)); dr::shp::vector> partial_sums( std::size_t(zipped_segments.size()), allocator); diff --git a/include/dr/shp/algorithms/sort.hpp b/include/dr/shp/algorithms/sort.hpp index 7fb0b516e5..59c342c294 100644 --- a/include/dr/shp/algorithms/sort.hpp +++ b/include/dr/shp/algorithms/sort.hpp @@ -81,8 +81,8 @@ void sort(R &&r, Compare comp = Compare()) { // Each segment has `n_splitters` medians, // so `n_segments * n_splitters` medians total. - T *medians = sycl::malloc_device(n_segments * n_splitters, - shp::devices()[0], shp::context()); + T *medians = + sycl::malloc_device(n_segments * n_splitters, shp::__detail::queue(0)); std::size_t segment_id = 0; for (auto &&segment : segments) { @@ -150,13 +150,11 @@ void sort(R &&r, Compare comp = Compare()) { auto &&local_segment = dr::shp::__detail::local(segment); - std::size_t *splitter_i = sycl::malloc_shared( - n_splitters, q.get_device(), shp::context()); + std::size_t *splitter_i = sycl::malloc_shared(n_splitters, q); splitter_indices.push_back(splitter_i); // Local copy `medians_l` necessary due to [GSD-3893] - T *medians_l = - sycl::malloc_device(n_splitters, q.get_device(), shp::context()); + T *medians_l = sycl::malloc_device(n_splitters, q); q.memcpy(medians_l, medians, sizeof(T) * n_splitters).wait(); @@ -164,7 +162,7 @@ void sort(R &&r, Compare comp = Compare()) { rng::end(local_segment), medians_l, medians_l + n_splitters, splitter_i, comp); - sycl::free(medians_l, shp::context()); + sycl::free(medians_l, q); auto p_first = rng::begin(local_segment); auto p_last = p_first; @@ -269,15 +267,13 @@ void sort(R &&r, Compare comp = Compare()) { // Free temporary memory. - for (auto &&sorted_seg : sorted_segments) { - sycl::free(sorted_seg, shp::context()); + for (std::size_t i = 0; i < sorted_segegments.size(); i++) { + auto &&q = dr::shp::__detail::queue(dr::ranges::rank(segments[i])); + sycl::free(sorted_segments[i], q); + sycl::free(splitter_indices[i], q); } - for (auto &&splitter_i : splitter_indices) { - sycl::free(splitter_i, shp::context()); - } - - sycl::free(medians, shp::context()); + sycl::free(medians, shp::__detail::queue(0)); } template > diff --git a/include/dr/shp/distributed_vector.hpp b/include/dr/shp/distributed_vector.hpp index 823862c21b..85fa4d81be 100644 --- a/include/dr/shp/distributed_vector.hpp +++ b/include/dr/shp/distributed_vector.hpp @@ -145,8 +145,9 @@ struct distributed_vector { std::size_t rank = 0; for (auto &&device : dr::shp::devices()) { - segments_.emplace_back(segment_type( - segment_size_, Allocator(dr::shp::context(), device), rank++)); + segments_.emplace_back( + segment_type(segment_size_, Allocator(__detail::queue(rank)), rank)); + rank++; } } From 797681a0f758f259ebfd004fbb6eb43d0a5b1861 Mon Sep 17 00:00:00 2001 From: Benjamin Brock Date: Wed, 28 Feb 2024 13:52:54 -0800 Subject: [PATCH 2/4] Fix typo --- include/dr/shp/algorithms/sort.hpp | 2 +- include/dr/shp/init.hpp | 13 +++++++++++-- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/include/dr/shp/algorithms/sort.hpp b/include/dr/shp/algorithms/sort.hpp index 59c342c294..df02e12fcc 100644 --- a/include/dr/shp/algorithms/sort.hpp +++ b/include/dr/shp/algorithms/sort.hpp @@ -267,7 +267,7 @@ void sort(R &&r, Compare comp = Compare()) { // Free temporary memory. - for (std::size_t i = 0; i < sorted_segegments.size(); i++) { + for (std::size_t i = 0; i < sorted_segments.size(); i++) { auto &&q = dr::shp::__detail::queue(dr::ranges::rank(segments[i])); sycl::free(sorted_segments[i], q); sycl::free(splitter_indices[i], q); diff --git a/include/dr/shp/init.hpp b/include/dr/shp/init.hpp index e6f99238b0..03c8a2e009 100644 --- a/include/dr/shp/init.hpp +++ b/include/dr/shp/init.hpp @@ -46,12 +46,12 @@ inline std::size_t nprocs() { return __detail::ngpus(); } inline device_policy par_unseq; template -inline void init(R &&devices) +inline void init(sycl::context context, R &&devices) requires( std::is_same_v>>) { __detail::devices_.assign(rng::begin(devices), rng::end(devices)); - __detail::global_context_ = new sycl::context(__detail::devices_); + __detail::global_context_ = new sycl::context(context); __detail::ngpus_ = rng::size(__detail::devices_); for (auto &&device : __detail::devices_) { @@ -64,6 +64,15 @@ inline void init(R &&devices) par_unseq = device_policy(__detail::devices_); } +template +inline void init(R &&devices) + requires( + std::is_same_v>>) +{ + sycl::context context(devices); + init(context, devices); +} + template <__detail::sycl_device_selector Selector> inline void init(Selector &&selector) { auto devices = get_numa_devices(selector); From 88d3209ae521f857a2813173f9596f49ba21f19b Mon Sep 17 00:00:00 2001 From: Benjamin Brock Date: Wed, 28 Feb 2024 14:02:21 -0800 Subject: [PATCH 3/4] Fix for compiler warning --- include/dr/shp/distributed_vector.hpp | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/include/dr/shp/distributed_vector.hpp b/include/dr/shp/distributed_vector.hpp index 85fa4d81be..3870b7fdf8 100644 --- a/include/dr/shp/distributed_vector.hpp +++ b/include/dr/shp/distributed_vector.hpp @@ -143,11 +143,9 @@ struct distributed_vector { (count + dr::shp::devices().size() - 1) / dr::shp::devices().size(); capacity_ = segment_size_ * dr::shp::devices().size(); - std::size_t rank = 0; - for (auto &&device : dr::shp::devices()) { + for (std::size_t rank = 0; rank < dr::shp::devices().size(); rank++) { segments_.emplace_back( segment_type(segment_size_, Allocator(__detail::queue(rank)), rank)); - rank++; } } From 0f49682e78ef29b547c3dbf3205e31b6126dc23b Mon Sep 17 00:00:00 2001 From: Benjamin Brock Date: Thu, 29 Feb 2024 12:25:12 -0800 Subject: [PATCH 4/4] In the middle of debugging --- the remaining issue is caused by `combine_events` --- examples/shp/vector_example.cpp | 47 +++++++++++++++- include/dr/shp/algorithms/copy.hpp | 24 +++++++- include/dr/shp/algorithms/fill.hpp | 4 ++ include/dr/shp/detail.hpp | 35 ++++++++++-- include/dr/shp/distributed_vector.hpp | 5 ++ include/dr/shp/init.hpp | 79 ++++++++++++++++++++++++++- include/dr/shp/vector.hpp | 3 + 7 files changed, 185 insertions(+), 12 deletions(-) diff --git a/examples/shp/vector_example.cpp b/examples/shp/vector_example.cpp index a7eefdf2aa..b1f7fac9b9 100644 --- a/examples/shp/vector_example.cpp +++ b/examples/shp/vector_example.cpp @@ -3,19 +3,27 @@ // SPDX-License-Identifier: BSD-3-Clause #include +#include int main(int argc, char **argv) { printf("Creating NUMA devices...\n"); - auto devices = dr::shp::get_numa_devices(sycl::default_selector_v); - dr::shp::init(devices); + dr::shp::init(sycl::default_selector_v); - for (auto &device : devices) { + for (auto &device : dr::shp::devices()) { std::cout << " Device: " << device.get_info() << "\n"; } + fmt::print("First check...\n"); + dr::shp::check_queues(); + + fmt::print("Initializing distributed vector...\n"); dr::shp::distributed_vector> v(100); + fmt::print("Second check...\n"); + dr::shp::check_queues(); + + fmt::print("For each...\n"); dr::shp::for_each(dr::shp::par_unseq, dr::shp::enumerate(v), [](auto &&tuple) { auto &&[idx, value] = tuple; @@ -25,6 +33,10 @@ int main(int argc, char **argv) { dr::shp::for_each(dr::shp::par_unseq, v, [](auto &&value) { value = value + 2; }); + fmt::print("Third check...\n"); + dr::shp::check_queues(); + + fmt::print("Reduce...\n"); std::size_t sum = dr::shp::reduce(dr::shp::par_unseq, v, int(0), std::plus{}); dr::shp::print_range(v); @@ -34,10 +46,19 @@ int main(int argc, char **argv) { std::vector local_vec(v.size()); std::iota(local_vec.begin(), local_vec.end(), 0); + fmt::print("Fourth check...\n"); + dr::shp::check_queues(); + dr::shp::print_range(local_vec, "local vec"); + fmt::print("Fourth Two check...\n"); + dr::shp::check_queues(); + dr::shp::copy(local_vec.begin(), local_vec.end(), v.begin()); + fmt::print("Fourth Three check...\n"); + dr::shp::check_queues(); + dr::shp::print_range(v, "vec after copy"); dr::shp::for_each(dr::shp::par_unseq, v, @@ -45,6 +66,9 @@ int main(int argc, char **argv) { dr::shp::print_range(v, "vec after update"); + fmt::print("Fourth One check...\n"); + dr::shp::check_queues(); + dr::shp::copy(v.begin(), v.end(), local_vec.begin()); dr::shp::print_range(local_vec, "local vec after copy"); @@ -55,5 +79,22 @@ int main(int argc, char **argv) { v.resize(50); dr::shp::print_range(v, "resized to 50"); + fmt::print("Fifth check...\n"); + dr::shp::check_queues(); + + fmt::print("Getting ready to finalize...\n"); + fflush(stdout); + + fmt::print("Check queues...\n"); + dr::shp::check_queues(); + + fmt::print("Finalizing...\n"); + fflush(stdout); + + dr::shp::finalize(); + + + fmt::print("Exiting...\n"); + fflush(stdout); return 0; } diff --git a/include/dr/shp/algorithms/copy.hpp b/include/dr/shp/algorithms/copy.hpp index b4750fa2f5..960258355e 100644 --- a/include/dr/shp/algorithms/copy.hpp +++ b/include/dr/shp/algorithms/copy.hpp @@ -111,6 +111,8 @@ sycl::event copy_async(InputIt first, InputIt last, OutputIt d_first) { std::vector events; + fmt::print("copy_async...\n"); + while (first != last) { auto &&segment = *segment_iter; auto size = rng::distance(segment); @@ -120,14 +122,23 @@ sycl::event copy_async(InputIt first, InputIt last, OutputIt d_first) { auto local_last = first; rng::advance(local_last, n_to_copy); + fmt::print("copying...\n"); events.emplace_back( dr::shp::copy_async(first, local_last, rng::begin(segment))); + fmt::print("check queues...\n"); + dr::shp::check_queues(); + + fmt::print("continue...\n"); ++segment_iter; rng::advance(first, n_to_copy); } - return dr::shp::__detail::combine_events(events); + for (auto&& event : events) { + event.wait(); + } + return events[0]; + // return dr::shp::__detail::combine_events(events); } auto copy(rng::contiguous_range auto r, dr::distributed_iterator auto d_first) { @@ -142,7 +153,11 @@ template requires __detail::is_syclmemcopyable, std::iter_value_t> OutputIt copy(InputIt first, InputIt last, OutputIt d_first) { + fmt::print("Async copy...\n"); copy_async(first, last, d_first).wait(); + fmt::print("Checking queues...\n"); + dr::shp::check_queues(); + fmt::print("Returning...\n"); return d_first + (last - first); } @@ -166,7 +181,12 @@ sycl::event copy_async(InputIt first, InputIt last, OutputIt d_first) { rng::advance(d_first, size); } - return dr::shp::__detail::combine_events(events); + for (auto&& event : events) { + event.wait(); + } + return events[0]; + + // return dr::shp::__detail::combine_events(events); } template diff --git a/include/dr/shp/algorithms/fill.hpp b/include/dr/shp/algorithms/fill.hpp index a9527c1f1e..17aa881ad0 100644 --- a/include/dr/shp/algorithms/fill.hpp +++ b/include/dr/shp/algorithms/fill.hpp @@ -39,7 +39,9 @@ template requires(std::indirectly_writable, U>) sycl::event fill_async(device_ptr first, device_ptr last, const U &value) { + fmt::print("Fill async...\n"); auto &&q = __detail::get_queue_for_pointer(first); + fmt::print("Got queue...\n"); auto *arr = first.get_raw_pointer(); // not using q.fill because of CMPLRLLVM-46438 return dr::__detail::parallel_for(q, sycl::range<>(last - first), @@ -49,7 +51,9 @@ sycl::event fill_async(device_ptr first, device_ptr last, template requires(std::indirectly_writable, U>) void fill(device_ptr first, device_ptr last, const U &value) { + fmt::print("Fill...\n"); fill_async(first, last, value).wait(); + fmt::print("Fill.\n"); } template diff --git a/include/dr/shp/detail.hpp b/include/dr/shp/detail.hpp index 523be31507..5a134707a0 100644 --- a/include/dr/shp/detail.hpp +++ b/include/dr/shp/detail.hpp @@ -11,6 +11,8 @@ #include #include +#include + namespace dr::shp { namespace __detail { @@ -23,26 +25,48 @@ concept is_syclmemcopyable = std::is_same_v, Dest> && template sycl::usm::alloc get_pointer_type(Iter iter) { - return sycl::get_pointer_type(std::to_address(iter), shp::context()); + for (auto&& device : shp::devices()) { + try { + return sycl::get_pointer_type(std::to_address(iter), __detail::queue(device).get_context()); + } catch(...) {} + } + assert(false); } template sycl::usm::alloc get_pointer_type(shp::device_ptr ptr) { - return sycl::get_pointer_type(ptr.get_raw_pointer(), shp::context()); + for (auto&& device : shp::devices()) { + try { + return sycl::get_pointer_type(ptr.get_raw_pointer(), __detail::queue(device).get_context()); + } catch(...) {} + } + assert(false); } template sycl::device get_pointer_device(Iter iter) { - return sycl::get_pointer_device(std::to_address(iter), shp::context()); + for (auto&& device : shp::devices()) { + try { + return sycl::get_pointer_device(std::to_address(iter), __detail::queue(device).get_context()); + } catch(...) {} + } + assert(false); } template sycl::device get_pointer_device(shp::device_ptr ptr) { - return sycl::get_pointer_device(ptr.get_raw_pointer(), shp::context()); + for (auto&& device : shp::devices()) { + try { + return sycl::get_pointer_device(ptr.get_raw_pointer(), __detail::queue(device).get_context()); + } catch(...) {} + } + assert(false); } template sycl::queue &get_queue_for_pointer(InputIt iter) { if (get_pointer_type(iter) == sycl::usm::alloc::device) { + fmt::print("Get pointer device...\n"); auto device = get_pointer_device(iter); + fmt::print("Got device...\n"); return __detail::queue(device); } else { return default_queue(); @@ -73,7 +97,8 @@ inline sycl::event combine_events(sycl::queue &q, } inline sycl::event combine_events(const std::vector &events) { - auto &&q = __detail::queue(0); + sycl::queue q(sycl::cpu_selector_v); + // auto &&q = __detail::queue(0); return combine_events(q, events); } diff --git a/include/dr/shp/distributed_vector.hpp b/include/dr/shp/distributed_vector.hpp index 3870b7fdf8..0dfc263523 100644 --- a/include/dr/shp/distributed_vector.hpp +++ b/include/dr/shp/distributed_vector.hpp @@ -14,6 +14,8 @@ #include #include +#include + namespace dr::shp { template class distributed_vector_accessor { @@ -143,10 +145,13 @@ struct distributed_vector { (count + dr::shp::devices().size() - 1) / dr::shp::devices().size(); capacity_ = segment_size_ * dr::shp::devices().size(); + fmt::print("Allocating segments...\n"); for (std::size_t rank = 0; rank < dr::shp::devices().size(); rank++) { + fmt::print("Segment {}...\n", rank); segments_.emplace_back( segment_type(segment_size_, Allocator(__detail::queue(rank)), rank)); } + fmt::print("Returning...\n"); } distributed_vector(std::size_t count, const T &value) diff --git a/include/dr/shp/init.hpp b/include/dr/shp/init.hpp index 03c8a2e009..49d85ad92c 100644 --- a/include/dr/shp/init.hpp +++ b/include/dr/shp/init.hpp @@ -15,6 +15,8 @@ #include #include +#include + namespace dr::shp { namespace __detail { @@ -73,21 +75,94 @@ inline void init(R &&devices) init(context, devices); } +void exception_handler(sycl::exception_list exceptions) { + for (const std::exception_ptr& e : exceptions) { + try { + std::rethrow_exception(e); + } catch(const sycl::exception& e) { + if (e.code().value() != 34) { + throw; + } + /* + std::string exception_string(e.what()); + if (exception_string.find("PI_ERROR_INVALID_CONTEXT") == std::string::npos) { + } + */ + } + } +} + +template <__detail::sycl_device_selector Selector> +inline void init_cuda(Selector&& selector) { + std::vector devices; + + for (auto&& platform : sycl::platform::get_platforms()) { + std::cout << "Platform: " << platform.get_info() << std::endl; + + if (platform.get_backend() == sycl::backend::ext_oneapi_cuda) { + for (auto&& device : platform.get_devices()) { + std::cout << " Device: " << device.get_info() + << std::endl; + devices.push_back(device); + } + } + } + + __detail::devices_.assign(rng::begin(devices), rng::end(devices)); + __detail::global_context_ = new sycl::context(devices[0]); + __detail::ngpus_ = rng::size(__detail::devices_); + + for (auto &&device : __detail::devices_) { + // sycl::queue q(device, exception_handler); + sycl::queue q(device); + __detail::queues_.push_back(q); + + __detail::dpl_policies_.emplace_back(__detail::queues_.back()); + } +} + template <__detail::sycl_device_selector Selector> inline void init(Selector &&selector) { - auto devices = get_numa_devices(selector); - init(devices); + sycl::platform p(selector); + + if (p.get_backend() == sycl::backend::ext_oneapi_cuda) { + init_cuda(selector); + } else { + auto devices = get_numa_devices(selector); + init(devices); + } } inline void init() { init(sycl::default_selector_v); } inline void finalize() { + fmt::print("Destroying policies...\n"); __detail::dpl_policies_.clear(); + fmt::print("Queues...\n"); __detail::queues_.clear(); + fmt::print("Devices...\n"); __detail::devices_.clear(); + fmt::print("Context...\n"); delete __detail::global_context_; } +inline void check_queues() { + for (auto&& queue : __detail::queues_) { + queue.wait_and_throw(); + } + + /* + for (auto&& policy : __detail::dpl_policies_) { + fmt::print("Looking at policy...\n"); + try { + policy.queue().wait_and_throw(); + } catch (...) { + fmt::print("Caught exception\n"); + } + } + */ +} + namespace __detail { inline sycl::queue &queue(std::size_t rank) { return queues_[rank]; } diff --git a/include/dr/shp/vector.hpp b/include/dr/shp/vector.hpp index 7860e1f22e..db717f3626 100644 --- a/include/dr/shp/vector.hpp +++ b/include/dr/shp/vector.hpp @@ -5,6 +5,7 @@ #pragma once #include +#include namespace dr::shp { @@ -41,7 +42,9 @@ template > class vector { : allocator_(alloc) { change_capacity_impl_(count); using namespace std; + fmt::print("Calling fill...\n"); fill(data(), data() + size(), T{}); + fmt::print("Returning from fill...\n"); } template