diff --git a/cpp/include/cugraph/sampling_functions.hpp b/cpp/include/cugraph/sampling_functions.hpp index 35c51c1ea6d..0ccf49ddfb6 100644 --- a/cpp/include/cugraph/sampling_functions.hpp +++ b/cpp/include/cugraph/sampling_functions.hpp @@ -1152,7 +1152,10 @@ lookup_endpoints_from_edge_ids_and_types( * @param dst_biases Optional bias for randomly selecting destination vertices. If std::nullopt * vertices will be selected uniformly. In multi-GPU environment the biases should be partitioned * based on the vertex partitions. - * @param num_samples Number of negative samples to generate + * @param num_samples Number of negative samples to generate. In SG mode this represents the total + * number of samples to generate. In MG mode, each gpu will provide the number of samples desired + * on that GPU. The total number of samples in MG mode will be the aggregation of these values, the + * resulting samples will be randomly distributed across the ranks. * @param remove_duplicates If true, remove duplicate samples * @param remove_existing_edges If true, remove samples that are actually edges in the graph * @param exact_number_of_samples If true, repeat generation until we get the exact number of diff --git a/cpp/src/detail/permute_range.cuh b/cpp/src/detail/permute_range.cuh index c7cd57c2048..a9d1b27f52d 100644 --- a/cpp/src/detail/permute_range.cuh +++ b/cpp/src/detail/permute_range.cuh @@ -1,5 +1,5 @@ /* - * Copyright (c) 2024, NVIDIA CORPORATION. + * Copyright (c) 2024-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -58,7 +58,7 @@ rmm::device_uvector permute_range(raft::handle_t const& handle, sub_range_sizes.begin(), sub_range_sizes.end(), sub_range_sizes.begin(), global_start); CUGRAPH_EXPECTS( sub_range_sizes[comm_rank] == local_range_start, - "Invalid input arguments: a rage must have contiguous and non-overlapping values"); + "Invalid input arguments: a range must have contiguous and non-overlapping values"); } rmm::device_uvector permuted_integers(local_range_size, handle.get_stream()); diff --git a/cpp/src/detail/permute_range_v32.cu b/cpp/src/detail/permute_range_v32.cu index 6a7bc059901..91d23487f03 100644 --- a/cpp/src/detail/permute_range_v32.cu +++ b/cpp/src/detail/permute_range_v32.cu @@ -1,5 +1,5 @@ /* - * Copyright (c) 2024, NVIDIA CORPORATION. + * Copyright (c) 2024-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,23 +16,6 @@ #include "detail/permute_range.cuh" -#include -#include -#include -#include -#include - -#include -#include -#include - -#include -#include - -#include -#include -#include - namespace cugraph { namespace detail { diff --git a/cpp/src/detail/permute_range_v64.cu b/cpp/src/detail/permute_range_v64.cu index ad7daf16419..a6dbc9a72ae 100644 --- a/cpp/src/detail/permute_range_v64.cu +++ b/cpp/src/detail/permute_range_v64.cu @@ -1,5 +1,5 @@ /* - * Copyright (c) 2024, NVIDIA CORPORATION. + * Copyright (c) 2024-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,23 +16,6 @@ #include "detail/permute_range.cuh" -#include -#include -#include -#include -#include - -#include -#include -#include - -#include -#include - -#include -#include -#include - namespace cugraph { namespace detail { diff --git a/cpp/src/sampling/negative_sampling_impl.cuh b/cpp/src/sampling/negative_sampling_impl.cuh index 541eda67860..9aedc5dfc35 100644 --- a/cpp/src/sampling/negative_sampling_impl.cuh +++ b/cpp/src/sampling/negative_sampling_impl.cuh @@ -16,8 +16,11 @@ #pragma once +#include "cugraph/detail/collect_comm_wrapper.hpp" +#include "cugraph/utilities/device_comm.hpp" #include "prims/reduce_v.cuh" #include "prims/update_edge_src_dst_property.cuh" +#include "thrust/iterator/zip_iterator.h" #include "utilities/collect_comm.cuh" #include @@ -26,6 +29,10 @@ #include #include +#include +#include +#include + #include #include @@ -37,6 +44,8 @@ #include #include +#include + namespace cugraph { namespace detail { @@ -265,11 +274,19 @@ std::tuple, rmm::device_uvector> negativ bool exact_number_of_samples, bool do_expensive_check) { - rmm::device_uvector src(0, handle.get_stream()); - rmm::device_uvector dst(0, handle.get_stream()); + rmm::device_uvector srcs(0, handle.get_stream()); + rmm::device_uvector dsts(0, handle.get_stream()); // Optimistically assume we can do this in one pass - size_t samples_in_this_batch = num_samples; + size_t total_samples{num_samples}; + std::vector samples_per_gpu; + + if constexpr (multi_gpu) { + samples_per_gpu = host_scalar_allgather(handle.get_comms(), num_samples, handle.get_stream()); + total_samples = std::reduce(samples_per_gpu.begin(), samples_per_gpu.end()); + } + + size_t samples_in_this_batch = total_samples; // Normalize the biases and (for MG) determine how the biases are // distributed across the GPUs. @@ -298,16 +315,16 @@ std::tuple, rmm::device_uvector> negativ : 0); } - auto batch_src = create_local_samples( + auto batch_srcs = create_local_samples( handle, rng_state, graph_view, normalized_src_biases, gpu_src_biases, samples_in_this_batch); - auto batch_dst = create_local_samples( + auto batch_dsts = create_local_samples( handle, rng_state, graph_view, normalized_dst_biases, gpu_dst_biases, samples_in_this_batch); if constexpr (multi_gpu) { auto vertex_partition_range_lasts = graph_view.vertex_partition_range_lasts(); - std::tie(batch_src, - batch_dst, + std::tie(batch_srcs, + batch_dsts, std::ignore, std::ignore, std::ignore, @@ -320,8 +337,8 @@ std::tuple, rmm::device_uvector> negativ int32_t, int32_t>( handle, - std::move(batch_src), - std::move(batch_dst), + std::move(batch_srcs), + std::move(batch_dsts), std::nullopt, std::nullopt, std::nullopt, @@ -333,42 +350,43 @@ std::tuple, rmm::device_uvector> negativ if (remove_existing_edges) { auto has_edge_flags = graph_view.has_edge(handle, - raft::device_span{batch_src.data(), batch_src.size()}, - raft::device_span{batch_dst.data(), batch_dst.size()}, + raft::device_span{batch_srcs.data(), batch_srcs.size()}, + raft::device_span{batch_dsts.data(), batch_dsts.size()}, do_expensive_check); - auto begin_iter = thrust::make_zip_iterator(batch_src.begin(), batch_dst.begin()); + auto begin_iter = thrust::make_zip_iterator(batch_srcs.begin(), batch_dsts.begin()); auto new_end = thrust::remove_if(handle.get_thrust_policy(), begin_iter, - begin_iter + batch_src.size(), + begin_iter + batch_srcs.size(), has_edge_flags.begin(), thrust::identity()); - batch_src.resize(thrust::distance(begin_iter, new_end), handle.get_stream()); - batch_dst.resize(thrust::distance(begin_iter, new_end), handle.get_stream()); + batch_srcs.resize(thrust::distance(begin_iter, new_end), handle.get_stream()); + batch_dsts.resize(thrust::distance(begin_iter, new_end), handle.get_stream()); } if (remove_duplicates) { thrust::sort(handle.get_thrust_policy(), - thrust::make_zip_iterator(batch_src.begin(), batch_dst.begin()), - thrust::make_zip_iterator(batch_src.end(), batch_dst.end())); + thrust::make_zip_iterator(batch_srcs.begin(), batch_dsts.begin()), + thrust::make_zip_iterator(batch_srcs.end(), batch_dsts.end())); - auto new_end = thrust::unique(handle.get_thrust_policy(), - thrust::make_zip_iterator(batch_src.begin(), batch_dst.begin()), - thrust::make_zip_iterator(batch_src.end(), batch_dst.end())); + auto new_end = + thrust::unique(handle.get_thrust_policy(), + thrust::make_zip_iterator(batch_srcs.begin(), batch_dsts.begin()), + thrust::make_zip_iterator(batch_srcs.end(), batch_dsts.end())); - size_t new_size = - thrust::distance(thrust::make_zip_iterator(batch_src.begin(), batch_dst.begin()), new_end); + size_t new_size = thrust::distance( + thrust::make_zip_iterator(batch_srcs.begin(), batch_dsts.begin()), new_end); - if (src.size() > 0) { - rmm::device_uvector new_src(src.size() + new_size, handle.get_stream()); - rmm::device_uvector new_dst(dst.size() + new_size, handle.get_stream()); + if (srcs.size() > 0) { + rmm::device_uvector new_src(srcs.size() + new_size, handle.get_stream()); + rmm::device_uvector new_dst(dsts.size() + new_size, handle.get_stream()); thrust::merge(handle.get_thrust_policy(), - thrust::make_zip_iterator(batch_src.begin(), batch_dst.begin()), + thrust::make_zip_iterator(batch_srcs.begin(), batch_dsts.begin()), new_end, - thrust::make_zip_iterator(src.begin(), dst.begin()), - thrust::make_zip_iterator(src.end(), dst.end()), + thrust::make_zip_iterator(srcs.begin(), dsts.begin()), + thrust::make_zip_iterator(srcs.end(), dsts.end()), thrust::make_zip_iterator(new_src.begin(), new_dst.begin())); new_end = thrust::unique(handle.get_thrust_policy(), @@ -378,32 +396,32 @@ std::tuple, rmm::device_uvector> negativ new_size = thrust::distance(thrust::make_zip_iterator(new_src.begin(), new_dst.begin()), new_end); - src = std::move(new_src); - dst = std::move(new_dst); + srcs = std::move(new_src); + dsts = std::move(new_dst); } else { - src = std::move(batch_src); - dst = std::move(batch_dst); + srcs = std::move(batch_srcs); + dsts = std::move(batch_dsts); } - src.resize(new_size, handle.get_stream()); - dst.resize(new_size, handle.get_stream()); - } else if (src.size() > 0) { - size_t current_end = src.size(); + srcs.resize(new_size, handle.get_stream()); + dsts.resize(new_size, handle.get_stream()); + } else if (srcs.size() > 0) { + size_t current_end = srcs.size(); - src.resize(src.size() + batch_src.size(), handle.get_stream()); - dst.resize(dst.size() + batch_dst.size(), handle.get_stream()); + srcs.resize(srcs.size() + batch_srcs.size(), handle.get_stream()); + dsts.resize(dsts.size() + batch_dsts.size(), handle.get_stream()); thrust::copy(handle.get_thrust_policy(), - thrust::make_zip_iterator(batch_src.begin(), batch_dst.begin()), - thrust::make_zip_iterator(batch_src.end(), batch_dst.end()), - thrust::make_zip_iterator(src.begin(), dst.begin()) + current_end); + thrust::make_zip_iterator(batch_srcs.begin(), batch_dsts.begin()), + thrust::make_zip_iterator(batch_srcs.end(), batch_dsts.end()), + thrust::make_zip_iterator(srcs.begin(), dsts.begin()) + current_end); } else { - src = std::move(batch_src); - dst = std::move(batch_dst); + srcs = std::move(batch_srcs); + dsts = std::move(batch_dsts); } if (exact_number_of_samples) { - size_t current_sample_size = src.size(); + size_t current_sample_size = srcs.size(); if constexpr (multi_gpu) { current_sample_size = cugraph::host_scalar_allreduce( handle.get_comms(), current_sample_size, raft::comms::op_t::SUM, handle.get_stream()); @@ -412,16 +430,142 @@ std::tuple, rmm::device_uvector> negativ // FIXME: We could oversample and discard the unnecessary samples // to reduce the number of iterations in the outer loop, but it seems like // exact_number_of_samples is an edge case not worth optimizing for at this time. - samples_in_this_batch = num_samples - current_sample_size; + samples_in_this_batch = total_samples - current_sample_size; } else { samples_in_this_batch = 0; } } - src.shrink_to_fit(handle.get_stream()); - dst.shrink_to_fit(handle.get_stream()); + srcs.shrink_to_fit(handle.get_stream()); + dsts.shrink_to_fit(handle.get_stream()); + + if constexpr (multi_gpu) { + auto const& comm = handle.get_comms(); + auto const comm_size = comm.get_size(); + auto const comm_rank = comm.get_rank(); + + // Randomly shuffle the samples so that each gpu gets their + // desired number of samples + + if (!exact_number_of_samples) { + // If we didn't force generating the exact number of samples, + // we might have fewer samples than requested. We need to + // accommodate this situation. For now we'll just + // uniformly(-ish) reduce the requested size. + size_t total_extracted = host_scalar_allreduce( + handle.get_comms(), srcs.size(), raft::comms::op_t::SUM, handle.get_stream()); + size_t reduction = total_samples - total_extracted; + + while (reduction > 0) { + size_t est_reduction_per_gpu = (reduction + comm_size - 1) / comm_size; + for (size_t i = 0; i < samples_per_gpu.size(); ++i) { + if (samples_per_gpu[i] > est_reduction_per_gpu) { + samples_per_gpu[i] -= est_reduction_per_gpu; + reduction -= est_reduction_per_gpu; + } else { + reduction -= samples_per_gpu[i]; + samples_per_gpu[i] = 0; + } + + if (reduction < est_reduction_per_gpu) est_reduction_per_gpu = reduction; + } + } + num_samples = samples_per_gpu[comm_rank]; + } + + // Mimic the logic of permute_range... + // + // 1) Randomly assign each entry to a GPU + // 2) Count how many are assigned to each GPU + // 3) Allgatherv (allgather?) to give each GPU a count for how many entries are destined for + // that GPU 4) Identify extras/deficits for each GPU, arbitrarily adjust counts to make correct + // 5) Shuffle accordingly + // + rmm::device_uvector gpu_assignment(srcs.size(), handle.get_stream()); + + cugraph::detail::uniform_random_fill(handle.get_stream(), + gpu_assignment.data(), + gpu_assignment.size(), + int{0}, + int{comm_size}, + rng_state); + + thrust::sort_by_key(handle.get_thrust_policy(), + gpu_assignment.begin(), + gpu_assignment.end(), + thrust::make_zip_iterator(srcs.begin(), dsts.begin())); + + rmm::device_uvector d_send_counts(comm_size, handle.get_stream()); + thrust::tabulate( + handle.get_thrust_policy(), + d_send_counts.begin(), + d_send_counts.end(), + [gpu_assignment_span = raft::device_span{ + gpu_assignment.data(), gpu_assignment.size()}] __device__(size_t i) { + auto begin = thrust::lower_bound( + thrust::seq, gpu_assignment_span.begin(), gpu_assignment_span.end(), static_cast(i)); + auto end = + thrust::upper_bound(thrust::seq, begin, gpu_assignment_span.end(), static_cast(i)); + return thrust::distance(begin, end); + }); + + std::vector tx_value_counts(comm_size, 0); + raft::update_host( + tx_value_counts.data(), d_send_counts.data(), d_send_counts.size(), handle.get_stream()); + + std::forward_as_tuple(std::tie(srcs, dsts), std::ignore) = + cugraph::shuffle_values(handle.get_comms(), + thrust::make_zip_iterator(srcs.begin(), dsts.begin()), + tx_value_counts, + handle.get_stream()); + + rmm::device_uvector fractional_random_numbers(srcs.size(), handle.get_stream()); + + cugraph::detail::uniform_random_fill(handle.get_stream(), + fractional_random_numbers.data(), + fractional_random_numbers.size(), + float{0.0}, + float{1.0}, + rng_state); + thrust::sort_by_key(handle.get_thrust_policy(), + fractional_random_numbers.begin(), + fractional_random_numbers.end(), + thrust::make_zip_iterator(srcs.begin(), dsts.begin())); + + size_t nr_extras{0}; + size_t nr_deficits{0}; + if (srcs.size() > num_samples) { + nr_extras = srcs.size() - static_cast(num_samples); + } else { + nr_deficits = static_cast(num_samples) - srcs.size(); + } + + auto extra_srcs = cugraph::detail::device_allgatherv( + handle, comm, raft::device_span(srcs.data() + num_samples, nr_extras)); + // nr_extras > 0 ? nr_extras : 0)); + auto extra_dsts = cugraph::detail::device_allgatherv( + handle, comm, raft::device_span(dsts.data() + num_samples, nr_extras)); + // nr_extras > 0 ? nr_extras : 0)); + + srcs.resize(num_samples, handle.get_stream()); + dsts.resize(num_samples, handle.get_stream()); + auto deficits = + cugraph::host_scalar_allgather(handle.get_comms(), nr_deficits, handle.get_stream()); + + std::exclusive_scan(deficits.begin(), deficits.end(), deficits.begin(), vertex_t{0}); + + raft::copy(srcs.data() + num_samples - nr_deficits, + extra_srcs.begin() + deficits[comm_rank], + nr_deficits, + handle.get_stream()); + + raft::copy(dsts.data() + num_samples - nr_deficits, + extra_dsts.begin() + deficits[comm_rank], + nr_deficits, + handle.get_stream()); + } - return std::make_tuple(std::move(src), std::move(dst)); + return std::make_tuple(std::move(srcs), std::move(dsts)); } } // namespace cugraph diff --git a/cpp/tests/sampling/mg_negative_sampling.cpp b/cpp/tests/sampling/mg_negative_sampling.cpp index 7c64bb7fbbb..eb9f4fbb394 100644 --- a/cpp/tests/sampling/mg_negative_sampling.cpp +++ b/cpp/tests/sampling/mg_negative_sampling.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2024, NVIDIA CORPORATION. + * Copyright (c) 2024-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -14,6 +14,7 @@ * limitations under the License. */ +#include "cugraph/utilities/host_scalar_comm.hpp" #include "utilities/base_fixture.hpp" #include "utilities/conversion_utilities.hpp" #include "utilities/property_generator_utilities.hpp" @@ -85,8 +86,9 @@ class Tests_MGNegative_Sampling : public ::testing::TestWithParamview()); } - size_t num_samples = - graph_view.compute_number_of_edges(*handle_) * negative_sampling_usecase.sample_multiplier; + size_t num_samples = graph_view.compute_number_of_edges(*handle_) * + negative_sampling_usecase.sample_multiplier / + handle_->get_comms().get_size(); rmm::device_uvector src_bias_v(0, handle_->get_stream()); rmm::device_uvector dst_bias_v(0, handle_->get_stream()); @@ -150,26 +152,8 @@ class Tests_MGNegative_Sampling : public ::testing::TestWithParam{src_out.data(), src_out.size()}, raft::device_span{dst_out.data(), dst_out.size()}); - // TODO: Move this to validation_utilities... - auto h_vertex_partition_range_lasts = graph_view.vertex_partition_range_lasts(); - rmm::device_uvector d_vertex_partition_range_lasts( - h_vertex_partition_range_lasts.size(), handle_->get_stream()); - raft::update_device(d_vertex_partition_range_lasts.data(), - h_vertex_partition_range_lasts.data(), - h_vertex_partition_range_lasts.size(), - handle_->get_stream()); - - size_t error_count = cugraph::test::count_edges_on_wrong_int_gpu( - *handle_, - raft::device_span{src_out.data(), src_out.size()}, - raft::device_span{dst_out.data(), dst_out.size()}, - raft::device_span{d_vertex_partition_range_lasts.data(), - d_vertex_partition_range_lasts.size()}); - - ASSERT_EQ(error_count, 0) << "generate edges out of range > 0"; - if ((negative_sampling_usecase.remove_duplicates) && (src_out.size() > 0)) { - error_count = cugraph::test::count_duplicate_vertex_pairs_sorted( + size_t error_count = cugraph::test::count_duplicate_vertex_pairs_sorted( *handle_, raft::device_span{src_out.data(), src_out.size()}, raft::device_span{dst_out.data(), dst_out.size()}); @@ -184,7 +168,7 @@ class Tests_MGNegative_Sampling : public ::testing::TestWithParam( *handle_, graph_view, std::nullopt, std::nullopt, std::nullopt, std::nullopt); - error_count = cugraph::test::count_intersection( + size_t error_count = cugraph::test::count_intersection( *handle_, raft::device_span{graph_src.data(), graph_src.size()}, raft::device_span{graph_dst.data(), graph_dst.size()}, @@ -202,7 +186,9 @@ class Tests_MGNegative_Sampling : public ::testing::TestWithParamget_comms(), src_out.size(), raft::comms::op_t::SUM, handle_->get_stream()); - ASSERT_EQ(sz, num_samples) << "Expected exact number of samples"; + size_t aggregate_sample_count = cugraph::host_scalar_allreduce( + handle_->get_comms(), num_samples, raft::comms::op_t::SUM, handle_->get_stream()); + ASSERT_EQ(sz, aggregate_sample_count) << "Expected exact number of samples"; } // TBD: How do we determine if we have properly reflected the biases?