Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DO NOT MERGE] Experiment to leverage CUDASTF #4833

Draft
wants to merge 14 commits into
base: branch-25.02
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
316 changes: 179 additions & 137 deletions cpp/CMakeLists.txt

Large diffs are not rendered by default.

9 changes: 9 additions & 0 deletions cpp/cmake/thirdparty/cccl_override.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"packages": {
"cccl": {
"version": "2.8.0",
"git_url": "https://github.com/NVIDIA/cccl.git",
"git_tag": "main"
}
}
}
9 changes: 8 additions & 1 deletion cpp/cmake/thirdparty/get_cccl.cmake
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# =============================================================================
# Copyright (c) 2020-2023, NVIDIA CORPORATION.
# Copyright (c) 2020-2025, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
Expand All @@ -15,6 +15,13 @@
# This function finds CCCL and sets any additional necessary environment variables.
function(find_and_configure_cccl)
include(${rapids-cmake-dir}/cpm/cccl.cmake)
include(${rapids-cmake-dir}/cpm/package_override.cmake)

rapids_cpm_package_override("${CMAKE_CURRENT_FUNCTION_LIST_DIR}/cccl_override.json")
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may be something we surround with some CUGRAPH_USE_STF option for example


# Enable cudax namespace install
set(CCCL_ENABLE_UNSTABLE ON)

rapids_cpm_cccl(BUILD_EXPORT_SET cugraph-exports INSTALL_EXPORT_SET cugraph-exports)
endfunction()

Expand Down
35 changes: 25 additions & 10 deletions cpp/include/cugraph/utilities/shuffle_comm.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -145,43 +145,58 @@ compute_tx_rx_counts_offsets_ranks(raft::comms::comms_t const& comm,

template <typename key_type, typename KeyToGroupIdOp>
struct key_group_id_less_t {
KeyToGroupIdOp key_to_group_id_op{};
int pivot{};
key_group_id_less_t(KeyToGroupIdOp op, int pivot_) : key_to_group_id_op(::std::move(op)), pivot(pivot_) {}
__device__ bool operator()(key_type k) const { return key_to_group_id_op(k) < pivot; }

private:
KeyToGroupIdOp key_to_group_id_op;
int pivot;
};

template <typename value_type, typename ValueToGroupIdOp>
struct value_group_id_less_t {
ValueToGroupIdOp value_to_group_id_op{};
int pivot{};
value_group_id_less_t(ValueToGroupIdOp op, int pivot_) : value_to_group_id_op(::std::move(op)), pivot(pivot_) {}
__device__ bool operator()(value_type v) const { return value_to_group_id_op(v) < pivot; }

private:
ValueToGroupIdOp value_to_group_id_op;
int pivot;
};

template <typename key_type, typename value_type, typename KeyToGroupIdOp>
struct kv_pair_group_id_less_t {
KeyToGroupIdOp key_to_group_id_op{};
int pivot{};
kv_pair_group_id_less_t(KeyToGroupIdOp op, int pivot_) : key_to_group_id_op(::std::move(op)), pivot(pivot_) {}
__device__ bool operator()(thrust::tuple<key_type, value_type> t) const
{
return key_to_group_id_op(thrust::get<0>(t)) < pivot;
}

private:
KeyToGroupIdOp key_to_group_id_op;
int pivot;
};

template <typename value_type, typename ValueToGroupIdOp>
struct value_group_id_greater_equal_t {
ValueToGroupIdOp value_to_group_id_op{};
int pivot{};
value_group_id_greater_equal_t(ValueToGroupIdOp op, int pivot_) : value_to_group_id_op(::std::move(op)), pivot(pivot_) {}
__device__ bool operator()(value_type v) const { return value_to_group_id_op(v) >= pivot; }

private:
ValueToGroupIdOp value_to_group_id_op;
int pivot;
};

template <typename key_type, typename value_type, typename KeyToGroupIdOp>
struct kv_pair_group_id_greater_equal_t {
KeyToGroupIdOp key_to_group_id_op{};
int pivot{};
kv_pair_group_id_greater_equal_t(KeyToGroupIdOp op, int pivot_) : key_to_group_id_op(::std::move(op)), pivot(pivot_) {}
__device__ bool operator()(thrust::tuple<key_type, value_type> t) const
{
return key_to_group_id_op(thrust::get<0>(t)) >= pivot;
}

private:
KeyToGroupIdOp key_to_group_id_op;
int pivot;
};

template <typename ValueIterator, typename ValueToGroupIdOp>
Expand Down
10 changes: 10 additions & 0 deletions cpp/src/prims/detail/extract_transform_v_frontier_e.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@
#include <utility>
#include <vector>

#include <cuda/experimental/stf.cuh>
#include <raft/core/resource/custom_resource.hpp>

using namespace cuda::experimental::stf;

namespace cugraph {

namespace detail {
Expand Down Expand Up @@ -702,6 +707,9 @@ extract_transform_v_frontier_e(raft::handle_t const& handle,
constexpr bool try_bitmap = GraphViewType::is_multi_gpu && std::is_same_v<key_t, vertex_t> &&
KeyBucketType::is_sorted_unique;

async_resources_handle& cudastf_handle = *raft::resource::get_custom_resource<async_resources_handle>(handle);
stream_ctx cudastf_ctx(handle.get_stream(), cudastf_handle);

if (do_expensive_check) {
auto frontier_vertex_first =
thrust_tuple_get_or_identity<decltype(frontier.begin()), 0>(frontier.begin());
Expand Down Expand Up @@ -1597,6 +1605,8 @@ extract_transform_v_frontier_e(raft::handle_t const& handle,
if (loop_stream_pool_indices) { handle.sync_stream_pool(*loop_stream_pool_indices); }
}

cudastf_ctx.finalize();

return std::make_tuple(std::move(key_buffer), std::move(value_buffer));
}

Expand Down
89 changes: 64 additions & 25 deletions cpp/src/prims/detail/per_v_transform_reduce_e.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@
#include <type_traits>
#include <utility>

#include <cuda/experimental/stf.cuh>
#include <raft/core/resource/custom_resource.hpp>

using namespace cuda::experimental::stf;

namespace cugraph {

namespace detail {
Expand Down Expand Up @@ -1151,6 +1156,15 @@ void per_v_transform_reduce_e_edge_partition(
std::optional<raft::host_span<size_t const>> key_segment_offsets,
std::optional<raft::host_span<size_t const>> const& edge_partition_stream_pool_indices)
{
async_resources_handle& cudastf_handle = *raft::resource::get_custom_resource<async_resources_handle>(handle);
stream_ctx cudastf_ctx(handle.get_stream(), cudastf_handle);

logical_data<void_interface> output_tokens[4];
for (size_t i = 0; i < 4; i++)
{
output_tokens[i] = cudastf_ctx.logical_token();
}

constexpr bool use_input_key = !std::is_same_v<OptionalKeyIterator, void*>;

using vertex_t = typename GraphViewType::vertex_type;
Expand All @@ -1174,10 +1188,13 @@ void per_v_transform_reduce_e_edge_partition(

if constexpr (update_major && !use_input_key) { // this is necessary as we don't visit
// every vertex in the hypersparse segment
thrust::fill(rmm::exec_policy_nosync(exec_stream),
output_buffer + (*key_segment_offsets)[3],
output_buffer + (*key_segment_offsets)[4],
major_init);
// TODO task write output_token[3]
cudastf_ctx.task(output_tokens[3].write())->*[=](cudaStream_t stream) {
thrust::fill(rmm::exec_policy_nosync(stream),
output_buffer + (*key_segment_offsets)[3],
output_buffer + (*key_segment_offsets)[4],
major_init);
};
}

auto segment_size = use_input_key
Expand All @@ -1187,8 +1204,9 @@ void per_v_transform_reduce_e_edge_partition(
raft::grid_1d_thread_t update_grid(segment_size,
detail::per_v_transform_reduce_e_kernel_block_size,
handle.get_device_properties().maxGridSize[0]);
size_t token_idx = 0;
auto segment_output_buffer = output_buffer;
if constexpr (update_major) { segment_output_buffer += (*key_segment_offsets)[3]; }
if constexpr (update_major) { segment_output_buffer += (*key_segment_offsets)[3]; token_idx +=3; }
auto segment_key_first = edge_partition_key_first;
auto segment_key_last = edge_partition_key_last;
if constexpr (use_input_key) {
Expand All @@ -1199,20 +1217,22 @@ void per_v_transform_reduce_e_edge_partition(
assert(segment_key_first == nullptr);
assert(segment_key_last == nullptr);
}
detail::per_v_transform_reduce_e_hypersparse<update_major, GraphViewType>
<<<update_grid.num_blocks, update_grid.block_size, 0, exec_stream>>>(
edge_partition,
segment_key_first,
segment_key_last,
edge_partition_src_value_input,
edge_partition_dst_value_input,
edge_partition_e_value_input,
edge_partition_e_mask,
segment_output_buffer,
e_op,
major_init,
reduce_op,
pred_op);
cudastf_ctx.task(output_tokens[token_idx].rw())->*[=](cudaStream_t stream) {
detail::per_v_transform_reduce_e_hypersparse<update_major, GraphViewType>
<<<update_grid.num_blocks, update_grid.block_size, 0, stream>>>(
edge_partition,
segment_key_first,
segment_key_last,
edge_partition_src_value_input,
edge_partition_dst_value_input,
edge_partition_e_value_input,
edge_partition_e_mask,
segment_output_buffer,
e_op,
major_init,
reduce_op,
pred_op);
};
}
}
if ((*key_segment_offsets)[3] - (*key_segment_offsets)[2]) {
Expand All @@ -1223,8 +1243,9 @@ void per_v_transform_reduce_e_edge_partition(
raft::grid_1d_thread_t update_grid((*key_segment_offsets)[3] - (*key_segment_offsets)[2],
detail::per_v_transform_reduce_e_kernel_block_size,
handle.get_device_properties().maxGridSize[0]);
size_t token_idx = 0;
auto segment_output_buffer = output_buffer;
if constexpr (update_major) { segment_output_buffer += (*key_segment_offsets)[2]; }
if constexpr (update_major) { segment_output_buffer += (*key_segment_offsets)[2]; token_idx += 2; }
std::optional<segment_key_iterator_t>
segment_key_first{}; // std::optional as thrust::transform_iterator's default constructor
// is a deleted function, segment_key_first should always have a value
Expand All @@ -1234,8 +1255,10 @@ void per_v_transform_reduce_e_edge_partition(
segment_key_first = thrust::make_counting_iterator(edge_partition.major_range_first());
}
*segment_key_first += (*key_segment_offsets)[2];

cudastf_ctx.task(output_tokens[token_idx].rw())->*[=](cudaStream_t stream) {
detail::per_v_transform_reduce_e_low_degree<update_major, GraphViewType>
<<<update_grid.num_blocks, update_grid.block_size, 0, exec_stream>>>(
<<<update_grid.num_blocks, update_grid.block_size, 0, stream>>>(
edge_partition,
*segment_key_first,
*segment_key_first + ((*key_segment_offsets)[3] - (*key_segment_offsets)[2]),
Expand All @@ -1248,6 +1271,7 @@ void per_v_transform_reduce_e_edge_partition(
major_init,
reduce_op,
pred_op);
};
}
if ((*key_segment_offsets)[2] - (*key_segment_offsets)[1] > 0) {
auto exec_stream = edge_partition_stream_pool_indices
Expand All @@ -1257,8 +1281,9 @@ void per_v_transform_reduce_e_edge_partition(
raft::grid_1d_warp_t update_grid((*key_segment_offsets)[2] - (*key_segment_offsets)[1],
detail::per_v_transform_reduce_e_kernel_block_size,
handle.get_device_properties().maxGridSize[0]);
size_t token_idx = 0;
auto segment_output_buffer = output_buffer;
if constexpr (update_major) { segment_output_buffer += (*key_segment_offsets)[1]; }
if constexpr (update_major) { segment_output_buffer += (*key_segment_offsets)[1]; token_idx += 1;}
std::optional<segment_key_iterator_t>
segment_key_first{}; // std::optional as thrust::transform_iterator's default constructor
// is a deleted function, segment_key_first should always have a value
Expand All @@ -1268,8 +1293,10 @@ void per_v_transform_reduce_e_edge_partition(
segment_key_first = thrust::make_counting_iterator(edge_partition.major_range_first());
}
*segment_key_first += (*key_segment_offsets)[1];

cudastf_ctx.task(output_tokens[token_idx].rw())->*[=](cudaStream_t stream) {
detail::per_v_transform_reduce_e_mid_degree<update_major, GraphViewType>
<<<update_grid.num_blocks, update_grid.block_size, 0, exec_stream>>>(
<<<update_grid.num_blocks, update_grid.block_size, 0, stream>>>(
edge_partition,
*segment_key_first,
*segment_key_first + ((*key_segment_offsets)[2] - (*key_segment_offsets)[1]),
Expand All @@ -1283,6 +1310,7 @@ void per_v_transform_reduce_e_edge_partition(
major_identity_element,
reduce_op,
pred_op);
};
}
if ((*key_segment_offsets)[1] > 0) {
auto exec_stream = edge_partition_stream_pool_indices
Expand All @@ -1303,8 +1331,9 @@ void per_v_transform_reduce_e_edge_partition(
} else {
segment_key_first = thrust::make_counting_iterator(edge_partition.major_range_first());
}
cudastf_ctx.task(output_tokens[0].rw())->*[=](cudaStream_t stream) {
detail::per_v_transform_reduce_e_high_degree<update_major, GraphViewType>
<<<update_grid.num_blocks, update_grid.block_size, 0, exec_stream>>>(
<<<update_grid.num_blocks, update_grid.block_size, 0, stream>>>(
edge_partition,
*segment_key_first,
*segment_key_first + (*key_segment_offsets)[1],
Expand All @@ -1318,6 +1347,7 @@ void per_v_transform_reduce_e_edge_partition(
major_identity_element,
reduce_op,
pred_op);
};
}
} else {
auto exec_stream = edge_partition_stream_pool_indices
Expand Down Expand Up @@ -1361,6 +1391,8 @@ void per_v_transform_reduce_e_edge_partition(
pred_op);
}
}

cudastf_ctx.finalize();
}

template <bool incoming, // iterate over incoming edges (incoming == true) or outgoing edges
Expand Down Expand Up @@ -1610,7 +1642,7 @@ void per_v_transform_reduce_e(raft::handle_t const& handle,
edge_partition.major_range_first(),
handle.get_stream());
assert((*key_segment_offsets).back() == *((*key_segment_offsets).rbegin() + 1));
assert(sorted_uniue_nzd_key_last == sorted_unique_key_first + (*key_segment_offsets).back());
assert(sorted_unique_nzd_key_last == sorted_unique_key_first + (*key_segment_offsets).back());
}
} else {
tmp_vertex_value_output_first = vertex_value_output_first;
Expand Down Expand Up @@ -3093,6 +3125,9 @@ void per_v_transform_reduce_e(raft::handle_t const& handle,
}
if (loop_stream_pool_indices) { handle.sync_stream_pool(*loop_stream_pool_indices); }

// TODO BEGIN
//stream_ctx stf_ctx(handle.get_stream());

for (size_t j = 0; j < loop_count; ++j) {
if (process_local_edges[j]) {
auto partition_idx = i + j;
Expand Down Expand Up @@ -3265,6 +3300,10 @@ void per_v_transform_reduce_e(raft::handle_t const& handle,
}
}
}

//stf_ctx.finalize();

// TODO END
if (stream_pool_indices) { handle.sync_stream_pool(*stream_pool_indices); }

if constexpr (GraphViewType::is_multi_gpu && update_major) {
Expand Down
Loading
Loading