Skip to content

Commit

Permalink
Cut peak memory footprint in per_v_transform_reduce_dst_key_aggregate…
Browse files Browse the repository at this point in the history
…d_outgoing_e (#4484)

Cut peak memory usage.

For an ultimate solution, we need to implement our own function to emulate ncclReduce with a custom reduction operator.

Authors:
  - Seunghwa Kang (https://github.com/seunghwak)
  - Naim (https://github.com/naimnv)

Approvers:
  - Naim (https://github.com/naimnv)
  - Chuck Hastings (https://github.com/ChuckHastings)
  - Joseph Nke (https://github.com/jnke2016)

URL: #4484
  • Loading branch information
seunghwak authored Jun 28, 2024
1 parent fd90b98 commit 3ca5d78
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 42 deletions.
3 changes: 2 additions & 1 deletion cpp/src/community/leiden_impl.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,8 @@ void flatten_leiden_dendrogram(raft::handle_t const& handle,
Dendrogram<vertex_t> const& dendrogram,
vertex_t* clustering)
{
rmm::device_uvector<vertex_t> vertex_ids_v(graph_view.number_of_vertices(), handle.get_stream());
rmm::device_uvector<vertex_t> vertex_ids_v(graph_view.local_vertex_partition_range_size(),
handle.get_stream());

detail::sequence_fill(handle.get_stream(),
vertex_ids_v.begin(),
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/community/louvain_impl.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,8 @@ void flatten_dendrogram(raft::handle_t const& handle,
Dendrogram<vertex_t> const& dendrogram,
vertex_t* clustering)
{
rmm::device_uvector<vertex_t> vertex_ids_v(graph_view.number_of_vertices(), handle.get_stream());
rmm::device_uvector<vertex_t> vertex_ids_v(graph_view.local_vertex_partition_range_size(),
handle.get_stream());

detail::sequence_fill(handle.get_stream(),
vertex_ids_v.begin(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ void per_v_transform_reduce_dst_key_aggregated_outgoing_e(

rmm::device_uvector<vertex_t> majors(0, handle.get_stream());
auto e_op_result_buffer = allocate_dataframe_buffer<T>(0, handle.get_stream());
std::vector<size_t> rx_offsets{};
for (size_t i = 0; i < graph_view.number_of_local_edge_partitions(); ++i) {
auto edge_partition =
edge_partition_device_view_t<vertex_t, edge_t, GraphViewType::is_multi_gpu>(
Expand Down Expand Up @@ -1041,6 +1042,10 @@ void per_v_transform_reduce_dst_key_aggregated_outgoing_e(

// FIXME: additional optimization is possible if reduce_op is a pure function (and reduce_op
// can be mapped to ncclRedOp_t).
// FIXME: Memory footprint can grow proportional to minor_comm_size in the worst case. If
// reduce_op can be mapped to ncclRedOp_t, we can use ncclReduce to sovle this probelm. If
// reduce_op cannot be mapped to ncclRedOp_t, we need to implement our own multi-GPU reduce
// function.

auto rx_sizes = host_scalar_gather(minor_comm, tmp_majors.size(), i, handle.get_stream());
std::vector<size_t> rx_displs{};
Expand Down Expand Up @@ -1073,58 +1078,41 @@ void per_v_transform_reduce_dst_key_aggregated_outgoing_e(
if (static_cast<size_t>(minor_comm_rank) == i) {
majors = std::move(rx_majors);
e_op_result_buffer = std::move(rx_tmp_e_op_result_buffer);
rx_offsets = std::vector<size_t>(rx_sizes.size() + 1);
rx_offsets[0] = 0;
std::inclusive_scan(rx_sizes.begin(), rx_sizes.end(), rx_offsets.begin() + 1);
}
} else {
majors = std::move(tmp_majors);
e_op_result_buffer = std::move(tmp_e_op_result_buffer);
rx_offsets = {0, majors.size()};
}
}

if constexpr (GraphViewType::is_multi_gpu) {
thrust::sort_by_key(handle.get_thrust_policy(),
majors.begin(),
majors.end(),
get_dataframe_buffer_begin(e_op_result_buffer));
auto num_uniques = thrust::count_if(handle.get_thrust_policy(),
thrust::make_counting_iterator(size_t{0}),
thrust::make_counting_iterator(majors.size()),
detail::is_first_in_run_t<vertex_t const*>{majors.data()});
rmm::device_uvector<vertex_t> unique_majors(num_uniques, handle.get_stream());
auto reduced_e_op_result_buffer =
allocate_dataframe_buffer<T>(unique_majors.size(), handle.get_stream());
thrust::reduce_by_key(handle.get_thrust_policy(),
majors.begin(),
majors.end(),
get_dataframe_buffer_begin(e_op_result_buffer),
unique_majors.begin(),
get_dataframe_buffer_begin(reduced_e_op_result_buffer),
thrust::equal_to<vertex_t>{},
reduce_op);
majors = std::move(unique_majors);
e_op_result_buffer = std::move(reduced_e_op_result_buffer);
}

// 2. update final results

thrust::fill(handle.get_thrust_policy(),
vertex_value_output_first,
vertex_value_output_first + graph_view.local_vertex_partition_range_size(),
T{});

thrust::scatter(handle.get_thrust_policy(),
get_dataframe_buffer_begin(e_op_result_buffer),
get_dataframe_buffer_end(e_op_result_buffer),
thrust::make_transform_iterator(
majors.begin(),
detail::vertex_local_offset_t<vertex_t, GraphViewType::is_multi_gpu>{
graph_view.local_vertex_partition_view()}),
vertex_value_output_first);

thrust::transform(handle.get_thrust_policy(),
vertex_value_output_first,
vertex_value_output_first + graph_view.local_vertex_partition_range_size(),
vertex_value_output_first,
detail::reduce_with_init_t<ReduceOp, T>{reduce_op, init});
init);

auto pair_first =
thrust::make_zip_iterator(majors.begin(), get_dataframe_buffer_begin(e_op_result_buffer));
for (size_t i = 0; i < rx_offsets.size() - 1; ++i) {
thrust::for_each(
handle.get_thrust_policy(),
pair_first + rx_offsets[i],
pair_first + rx_offsets[i + 1],
[vertex_value_output_first,
reduce_op,
major_first = graph_view.local_vertex_partition_range_first()] __device__(auto pair) {
auto major = thrust::get<0>(pair);
auto major_offset = major - major_first;
auto e_op_result = thrust::get<1>(pair);
*(vertex_value_output_first + major_offset) =
reduce_op(*(vertex_value_output_first + major_offset), e_op_result);
});
}
}

} // namespace cugraph

0 comments on commit 3ca5d78

Please sign in to comment.