diff --git a/cpp/src/community/leiden_impl.cuh b/cpp/src/community/leiden_impl.cuh index c1b8fe1d7d3..166eb334301 100644 --- a/cpp/src/community/leiden_impl.cuh +++ b/cpp/src/community/leiden_impl.cuh @@ -638,7 +638,8 @@ void flatten_leiden_dendrogram(raft::handle_t const& handle, Dendrogram const& dendrogram, vertex_t* clustering) { - rmm::device_uvector vertex_ids_v(graph_view.number_of_vertices(), handle.get_stream()); + rmm::device_uvector vertex_ids_v(graph_view.local_vertex_partition_range_size(), + handle.get_stream()); detail::sequence_fill(handle.get_stream(), vertex_ids_v.begin(), diff --git a/cpp/src/community/louvain_impl.cuh b/cpp/src/community/louvain_impl.cuh index 2c524e9dfb8..a4b4b4a7bcd 100644 --- a/cpp/src/community/louvain_impl.cuh +++ b/cpp/src/community/louvain_impl.cuh @@ -292,7 +292,8 @@ void flatten_dendrogram(raft::handle_t const& handle, Dendrogram const& dendrogram, vertex_t* clustering) { - rmm::device_uvector vertex_ids_v(graph_view.number_of_vertices(), handle.get_stream()); + rmm::device_uvector vertex_ids_v(graph_view.local_vertex_partition_range_size(), + handle.get_stream()); detail::sequence_fill(handle.get_stream(), vertex_ids_v.begin(), diff --git a/cpp/src/prims/per_v_transform_reduce_dst_key_aggregated_outgoing_e.cuh b/cpp/src/prims/per_v_transform_reduce_dst_key_aggregated_outgoing_e.cuh index 7be30b0a5f0..5a5e9332094 100644 --- a/cpp/src/prims/per_v_transform_reduce_dst_key_aggregated_outgoing_e.cuh +++ b/cpp/src/prims/per_v_transform_reduce_dst_key_aggregated_outgoing_e.cuh @@ -343,6 +343,7 @@ void per_v_transform_reduce_dst_key_aggregated_outgoing_e( rmm::device_uvector majors(0, handle.get_stream()); auto e_op_result_buffer = allocate_dataframe_buffer(0, handle.get_stream()); + std::vector rx_offsets{}; for (size_t i = 0; i < graph_view.number_of_local_edge_partitions(); ++i) { auto edge_partition = edge_partition_device_view_t( @@ -1041,6 +1042,10 @@ void per_v_transform_reduce_dst_key_aggregated_outgoing_e( // FIXME: additional optimization is possible if reduce_op is a pure function (and reduce_op // can be mapped to ncclRedOp_t). + // FIXME: Memory footprint can grow proportional to minor_comm_size in the worst case. If + // reduce_op can be mapped to ncclRedOp_t, we can use ncclReduce to sovle this probelm. If + // reduce_op cannot be mapped to ncclRedOp_t, we need to implement our own multi-GPU reduce + // function. auto rx_sizes = host_scalar_gather(minor_comm, tmp_majors.size(), i, handle.get_stream()); std::vector rx_displs{}; @@ -1073,58 +1078,41 @@ void per_v_transform_reduce_dst_key_aggregated_outgoing_e( if (static_cast(minor_comm_rank) == i) { majors = std::move(rx_majors); e_op_result_buffer = std::move(rx_tmp_e_op_result_buffer); + rx_offsets = std::vector(rx_sizes.size() + 1); + rx_offsets[0] = 0; + std::inclusive_scan(rx_sizes.begin(), rx_sizes.end(), rx_offsets.begin() + 1); } } else { majors = std::move(tmp_majors); e_op_result_buffer = std::move(tmp_e_op_result_buffer); + rx_offsets = {0, majors.size()}; } } - if constexpr (GraphViewType::is_multi_gpu) { - thrust::sort_by_key(handle.get_thrust_policy(), - majors.begin(), - majors.end(), - get_dataframe_buffer_begin(e_op_result_buffer)); - auto num_uniques = thrust::count_if(handle.get_thrust_policy(), - thrust::make_counting_iterator(size_t{0}), - thrust::make_counting_iterator(majors.size()), - detail::is_first_in_run_t{majors.data()}); - rmm::device_uvector unique_majors(num_uniques, handle.get_stream()); - auto reduced_e_op_result_buffer = - allocate_dataframe_buffer(unique_majors.size(), handle.get_stream()); - thrust::reduce_by_key(handle.get_thrust_policy(), - majors.begin(), - majors.end(), - get_dataframe_buffer_begin(e_op_result_buffer), - unique_majors.begin(), - get_dataframe_buffer_begin(reduced_e_op_result_buffer), - thrust::equal_to{}, - reduce_op); - majors = std::move(unique_majors); - e_op_result_buffer = std::move(reduced_e_op_result_buffer); - } - // 2. update final results thrust::fill(handle.get_thrust_policy(), vertex_value_output_first, vertex_value_output_first + graph_view.local_vertex_partition_range_size(), - T{}); - - thrust::scatter(handle.get_thrust_policy(), - get_dataframe_buffer_begin(e_op_result_buffer), - get_dataframe_buffer_end(e_op_result_buffer), - thrust::make_transform_iterator( - majors.begin(), - detail::vertex_local_offset_t{ - graph_view.local_vertex_partition_view()}), - vertex_value_output_first); - - thrust::transform(handle.get_thrust_policy(), - vertex_value_output_first, - vertex_value_output_first + graph_view.local_vertex_partition_range_size(), - vertex_value_output_first, - detail::reduce_with_init_t{reduce_op, init}); + init); + + auto pair_first = + thrust::make_zip_iterator(majors.begin(), get_dataframe_buffer_begin(e_op_result_buffer)); + for (size_t i = 0; i < rx_offsets.size() - 1; ++i) { + thrust::for_each( + handle.get_thrust_policy(), + pair_first + rx_offsets[i], + pair_first + rx_offsets[i + 1], + [vertex_value_output_first, + reduce_op, + major_first = graph_view.local_vertex_partition_range_first()] __device__(auto pair) { + auto major = thrust::get<0>(pair); + auto major_offset = major - major_first; + auto e_op_result = thrust::get<1>(pair); + *(vertex_value_output_first + major_offset) = + reduce_op(*(vertex_value_output_first + major_offset), e_op_result); + }); + } } } // namespace cugraph