From d34e3d6522f1f3d8e9fbea6581b7ce37de7e1005 Mon Sep 17 00:00:00 2001 From: Seunghwa Kang <45857425+seunghwak@users.noreply.github.com> Date: Mon, 20 Nov 2023 12:33:35 -0800 Subject: [PATCH] Address FIXMEs (#3988) This PR works on addressing FIXMEs (and reduce the number of outstanding FIXMEs). Authors: - Seunghwa Kang (https://github.com/seunghwak) - Naim (https://github.com/naimnv) - Ralph Liu (https://github.com/nv-rliu) Approvers: - Naim (https://github.com/naimnv) - Joseph Nke (https://github.com/jnke2016) - Chuck Hastings (https://github.com/ChuckHastings) URL: https://github.com/rapidsai/cugraph/pull/3988 --- cpp/include/cugraph/algorithms.hpp | 45 --------- cpp/include/cugraph/utilities/device_comm.hpp | 8 +- .../cugraph/utilities/host_scalar_comm.hpp | 98 ++++++++++++++----- .../cugraph/utilities/shuffle_comm.cuh | 5 - cpp/src/centrality/katz_centrality_impl.cuh | 2 - .../weakly_connected_components_impl.cuh | 40 ++------ 6 files changed, 83 insertions(+), 115 deletions(-) diff --git a/cpp/include/cugraph/algorithms.hpp b/cpp/include/cugraph/algorithms.hpp index 78846bc5766..8501eedce5c 100644 --- a/cpp/include/cugraph/algorithms.hpp +++ b/cpp/include/cugraph/algorithms.hpp @@ -464,51 +464,6 @@ k_truss_subgraph(raft::handle_t const& handle, size_t number_of_vertices, int k); -// FIXME: Internally distances is of int (signed 32-bit) data type, but current -// template uses data from VT, ET, WT from the legacy::GraphCSR View even if weights -// are not considered -/** - * @Synopsis Performs a breadth first search traversal of a graph starting from a vertex. - * - * @throws cugraph::logic_error with a custom message when an error occurs. - * - * @tparam VT Type of vertex identifiers. Supported value : int (signed, - * 32-bit) - * @tparam ET Type of edge identifiers. Supported value : int (signed, - * 32-bit) - * @tparam WT Type of edge weights. Supported values : int (signed, 32-bit) - * - * @param[in] handle Library handle (RAFT). If a communicator is set in the handle, - the multi GPU version will be selected. - * @param[in] graph cuGraph graph descriptor, should contain the connectivity - * information as a CSR - * - * @param[out] distances If set to a valid pointer, this is populated by distance of - * every vertex in the graph from the starting vertex - * - * @param[out] predecessors If set to a valid pointer, this is populated by bfs traversal - * predecessor of every vertex - * - * @param[out] sp_counters If set to a valid pointer, this is populated by bfs traversal - * shortest_path counter of every vertex - * - * @param[in] start_vertex The starting vertex for breadth first search traversal - * - * @param[in] directed Treat the input graph as directed - * - * @param[in] mg_batch If set to true use SG BFS path when comms are initialized. - * - */ -template -void bfs(raft::handle_t const& handle, - legacy::GraphCSRView const& graph, - VT* distances, - VT* predecessors, - double* sp_counters, - const VT start_vertex, - bool directed = true, - bool mg_batch = false); - /** * @brief Compute Hungarian algorithm on a weighted bipartite graph * diff --git a/cpp/include/cugraph/utilities/device_comm.hpp b/cpp/include/cugraph/utilities/device_comm.hpp index 7087724921a..990074e781b 100644 --- a/cpp/include/cugraph/utilities/device_comm.hpp +++ b/cpp/include/cugraph/utilities/device_comm.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2022, NVIDIA CORPORATION. + * Copyright (c) 2020-2023, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -806,9 +806,6 @@ device_sendrecv(raft::comms::comms_t const& comm, size_t constexpr tuple_size = thrust::tuple_size::value_type>::value; - // FIXME: NCCL 2.7 supports only one ncclSend and one ncclRecv for a source rank and destination - // rank inside ncclGroupStart/ncclGroupEnd, so we cannot place this inside - // ncclGroupStart/ncclGroupEnd, this restriction will be lifted in NCCL 2.8 detail::device_sendrecv_tuple_iterator_element_impl::value_type>::value; - // FIXME: NCCL 2.7 supports only one ncclSend and one ncclRecv for a source rank and destination - // rank inside ncclGroupStart/ncclGroupEnd, so we cannot place this inside - // ncclGroupStart/ncclGroupEnd, this restriction will be lifted in NCCL 2.8 detail::device_multicast_sendrecv_tuple_iterator_element_impl std::enable_if_t::value, std::vector> host_scalar_allgather( raft::comms::comms_t const& comm, T input, cudaStream_t stream) { - std::vector rx_counts(comm.get_size(), size_t{1}); - std::vector displacements(rx_counts.size(), size_t{0}); - std::iota(displacements.begin(), displacements.end(), size_t{0}); - rmm::device_uvector d_outputs(rx_counts.size(), stream); + rmm::device_uvector d_outputs(comm.get_size(), stream); raft::update_device(d_outputs.data() + comm.get_rank(), &input, 1, stream); - // FIXME: better use allgather - comm.allgatherv(d_outputs.data() + comm.get_rank(), - d_outputs.data(), - rx_counts.data(), - displacements.data(), - stream); - std::vector h_outputs(rx_counts.size()); - raft::update_host(h_outputs.data(), d_outputs.data(), rx_counts.size(), stream); + comm.allgather(d_outputs.data() + comm.get_rank(), d_outputs.data(), size_t{1}, stream); + std::vector h_outputs(d_outputs.size()); + raft::update_host(h_outputs.data(), d_outputs.data(), d_outputs.size(), stream); auto status = comm.sync_stream(stream); CUGRAPH_EXPECTS(status == raft::comms::status_t::SUCCESS, "sync_stream() failure."); return h_outputs; @@ -277,11 +269,6 @@ std::enable_if_t::value, std::vector::value; - std::vector rx_counts(comm.get_size(), tuple_size); - std::vector displacements(rx_counts.size(), size_t{0}); - for (size_t i = 0; i < displacements.size(); ++i) { - displacements[i] = i * tuple_size; - } std::vector h_tuple_scalar_elements(tuple_size); rmm::device_uvector d_allgathered_tuple_scalar_elements(comm.get_size() * tuple_size, stream); @@ -292,12 +279,10 @@ host_scalar_allgather(raft::comms::comms_t const& comm, T input, cudaStream_t st h_tuple_scalar_elements.data(), tuple_size, stream); - // FIXME: better use allgather - comm.allgatherv(d_allgathered_tuple_scalar_elements.data() + comm.get_rank() * tuple_size, - d_allgathered_tuple_scalar_elements.data(), - rx_counts.data(), - displacements.data(), - stream); + comm.allgather(d_allgathered_tuple_scalar_elements.data() + comm.get_rank() * tuple_size, + d_allgathered_tuple_scalar_elements.data(), + tuple_size, + stream); std::vector h_allgathered_tuple_scalar_elements(comm.get_size() * tuple_size); raft::update_host(h_allgathered_tuple_scalar_elements.data(), d_allgathered_tuple_scalar_elements.data(), @@ -318,6 +303,71 @@ host_scalar_allgather(raft::comms::comms_t const& comm, T input, cudaStream_t st return ret; } +template +std::enable_if_t::value, T> host_scalar_scatter( + raft::comms::comms_t const& comm, + std::vector const& inputs, // relevant only in root + int root, + cudaStream_t stream) +{ + CUGRAPH_EXPECTS( + ((comm.get_rank() == root) && (inputs.size() == static_cast(comm.get_size()))) || + ((comm.get_rank() != root) && (inputs.size() == 0)), + "inputs.size() should match with comm.get_size() in root and should be 0 otherwise."); + rmm::device_uvector d_outputs(comm.get_size(), stream); + if (comm.get_rank() == root) { + raft::update_device(d_outputs.data(), inputs.data(), inputs.size(), stream); + } + comm.bcast(d_outputs.data(), d_outputs.size(), root, stream); + T h_output{}; + raft::update_host(&h_output, d_outputs.data() + comm.get_rank(), 1, stream); + auto status = comm.sync_stream(stream); + CUGRAPH_EXPECTS(status == raft::comms::status_t::SUCCESS, "sync_stream() failure."); + return h_output; +} + +template +std::enable_if_t::value, T> host_scalar_scatter( + raft::comms::comms_t const& comm, + std::vector const& inputs, // relevant only in root + int root, + cudaStream_t stream) +{ + CUGRAPH_EXPECTS( + ((comm.get_rank() == root) && (inputs.size() == static_cast(comm.get_size()))) || + ((comm.get_rank() != root) && (inputs.size() == 0)), + "inputs.size() should match with comm.get_size() in root and should be 0 otherwise."); + size_t constexpr tuple_size = thrust::tuple_size::value; + rmm::device_uvector d_scatter_tuple_scalar_elements(comm.get_size() * tuple_size, + stream); + if (comm.get_rank() == root) { + for (int i = 0; i < comm.get_size(); ++i) { + std::vector h_tuple_scalar_elements(tuple_size); + detail::update_vector_of_tuple_scalar_elements_from_tuple_impl() + .update(h_tuple_scalar_elements, inputs[i]); + raft::update_device(d_scatter_tuple_scalar_elements.data() + i * tuple_size, + h_tuple_scalar_elements.data(), + tuple_size, + stream); + } + } + comm.bcast( + d_scatter_tuple_scalar_elements.data(), d_scatter_tuple_scalar_elements.size(), root, stream); + std::vector h_tuple_scalar_elements(tuple_size); + raft::update_host(h_tuple_scalar_elements.data(), + d_scatter_tuple_scalar_elements.data() + comm.get_rank() * tuple_size, + tuple_size, + stream); + auto status = comm.sync_stream(stream); + CUGRAPH_EXPECTS(status == raft::comms::status_t::SUCCESS, "sync_stream() failure."); + + T ret{}; + detail::update_tuple_from_vector_of_tuple_scalar_elements_impl().update( + ret, h_tuple_scalar_elements); + + return ret; +} + // Return value is valid only in root (return value may better be std::optional in C++17 or later) template std::enable_if_t::value, std::vector> host_scalar_gather( diff --git a/cpp/include/cugraph/utilities/shuffle_comm.cuh b/cpp/include/cugraph/utilities/shuffle_comm.cuh index 6a260144324..ab6a54cc1c0 100644 --- a/cpp/include/cugraph/utilities/shuffle_comm.cuh +++ b/cpp/include/cugraph/utilities/shuffle_comm.cuh @@ -80,7 +80,6 @@ compute_tx_rx_counts_offsets_ranks(raft::comms::comms_t const& comm, rmm::device_uvector d_rx_value_counts(comm_size, stream_view); - // FIXME: this needs to be replaced with AlltoAll once NCCL 2.8 is released. std::vector tx_counts(comm_size, size_t{1}); std::vector tx_offsets(comm_size); std::iota(tx_offsets.begin(), tx_offsets.end(), size_t{0}); @@ -835,7 +834,6 @@ auto shuffle_values(raft::comms::comms_t const& comm, allocate_dataframe_buffer::value_type>( rx_offsets.size() > 0 ? rx_offsets.back() + rx_counts.back() : size_t{0}, stream_view); - // FIXME: this needs to be replaced with AlltoAll once NCCL 2.8 is released // (if num_tx_dst_ranks == num_rx_src_ranks == comm_size). device_multicast_sendrecv(comm, tx_value_first, @@ -889,7 +887,6 @@ auto groupby_gpu_id_and_shuffle_values(raft::comms::comms_t const& comm, allocate_dataframe_buffer::value_type>( rx_offsets.size() > 0 ? rx_offsets.back() + rx_counts.back() : size_t{0}, stream_view); - // FIXME: this needs to be replaced with AlltoAll once NCCL 2.8 is released // (if num_tx_dst_ranks == num_rx_src_ranks == comm_size). device_multicast_sendrecv(comm, tx_value_first, @@ -946,7 +943,6 @@ auto groupby_gpu_id_and_shuffle_kv_pairs(raft::comms::comms_t const& comm, allocate_dataframe_buffer::value_type>( rx_keys.size(), stream_view); - // FIXME: this needs to be replaced with AlltoAll once NCCL 2.8 is released // (if num_tx_dst_ranks == num_rx_src_ranks == comm_size). device_multicast_sendrecv(comm, tx_key_first, @@ -959,7 +955,6 @@ auto groupby_gpu_id_and_shuffle_kv_pairs(raft::comms::comms_t const& comm, rx_src_ranks, stream_view); - // FIXME: this needs to be replaced with AlltoAll once NCCL 2.8 is released // (if num_tx_dst_ranks == num_rx_src_ranks == comm_size). device_multicast_sendrecv(comm, tx_value_first, diff --git a/cpp/src/centrality/katz_centrality_impl.cuh b/cpp/src/centrality/katz_centrality_impl.cuh index 202d00a5771..ac31043d862 100644 --- a/cpp/src/centrality/katz_centrality_impl.cuh +++ b/cpp/src/centrality/katz_centrality_impl.cuh @@ -74,8 +74,6 @@ void katz_centrality( CUGRAPH_EXPECTS(epsilon >= 0.0, "Invalid input argument: epsilon should be non-negative."); if (do_expensive_check) { - // FIXME: should I check for betas? - if (has_initial_guess) { auto num_negative_values = count_if_v(handle, pull_graph_view, katz_centralities, [] __device__(auto, auto val) { diff --git a/cpp/src/components/weakly_connected_components_impl.cuh b/cpp/src/components/weakly_connected_components_impl.cuh index 615a50ded54..b7b6e139cfa 100644 --- a/cpp/src/components/weakly_connected_components_impl.cuh +++ b/cpp/src/components/weakly_connected_components_impl.cuh @@ -236,18 +236,16 @@ struct v_op_t { auto tag = thrust::get<1>(tagged_v); auto v_offset = vertex_partition.local_vertex_partition_offset_from_vertex_nocheck(thrust::get<0>(tagged_v)); - // FIXME: better switch to atomic_ref after - // https://github.com/nvidia/libcudacxx/milestone/2 - auto old = - atomicCAS(level_components + v_offset, invalid_component_id::value, tag); - if (old != invalid_component_id::value && old != tag) { // conflict + cuda::atomic_ref v_component(*(level_components + v_offset)); + auto old = invalid_component_id::value; + bool success = v_component.compare_exchange_strong(old, tag, cuda::std::memory_order_relaxed); + if (!success && (old != tag)) { // conflict return thrust::make_tuple(thrust::optional{bucket_idx_conflict}, thrust::optional{std::byte{0}} /* dummy */); } else { - auto update = (old == invalid_component_id::value); return thrust::make_tuple( - update ? thrust::optional{bucket_idx_next} : thrust::nullopt, - update ? thrust::optional{std::byte{0}} /* dummy */ : thrust::nullopt); + success ? thrust::optional{bucket_idx_next} : thrust::nullopt, + success ? thrust::optional{std::byte{0}} /* dummy */ : thrust::nullopt); } } @@ -457,33 +455,11 @@ void weakly_connected_components_impl(raft::handle_t const& handle, std::numeric_limits::max()); } - // FIXME: we need to add host_scalar_scatter -#if 1 - rmm::device_uvector d_counts(comm_size, handle.get_stream()); - raft::update_device(d_counts.data(), - init_max_new_root_counts.data(), - init_max_new_root_counts.size(), - handle.get_stream()); - device_bcast( - comm, d_counts.data(), d_counts.data(), d_counts.size(), int{0}, handle.get_stream()); - raft::update_host( - &init_max_new_roots, d_counts.data() + comm_rank, size_t{1}, handle.get_stream()); -#else init_max_new_roots = - host_scalar_scatter(comm, init_max_new_root_counts.data(), int{0}, handle.get_stream()); -#endif + host_scalar_scatter(comm, init_max_new_root_counts, int{0}, handle.get_stream()); } else { - // FIXME: we need to add host_scalar_scatter -#if 1 - rmm::device_uvector d_counts(comm_size, handle.get_stream()); - device_bcast( - comm, d_counts.data(), d_counts.data(), d_counts.size(), int{0}, handle.get_stream()); - raft::update_host( - &init_max_new_roots, d_counts.data() + comm_rank, size_t{1}, handle.get_stream()); -#else init_max_new_roots = - host_scalar_scatter(comm, init_max_new_root_counts.data(), int{0}, handle.get_stream()); -#endif + host_scalar_scatter(comm, std::vector{}, int{0}, handle.get_stream()); } handle.sync_stream();