Skip to content

Enable build_sorted_mst with data on host memory #997

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: branch-25.08
Choose a base branch
from
Open
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 155 additions & 2 deletions cpp/src/cluster/detail/mst.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#pragma once

#include "../../sparse/neighbors/cross_component_nn.cuh"
#include <cuvs/distance/distance.hpp>
#include <raft/core/resource/cuda_stream.hpp>
#include <raft/sparse/op/sort.cuh>
#include <raft/sparse/solver/mst.cuh>
Expand Down Expand Up @@ -59,7 +60,7 @@ void merge_msts(raft::sparse::solver::Graph_COO<value_idx, value_idx, value_t>&
* @tparam value_idx index type
* @tparam value_t floating-point value type
* @param[in] handle raft handle
* @param[in] X original dense data from which knn grpah was constructed
* @param[in] X original dense data on device memory from which knn graph was constructed
* @param[inout] msf edge list containing the mst result
* @param[in] m number of rows in X
* @param[in] n number of columns in X
Expand Down Expand Up @@ -117,6 +118,149 @@ void connect_knn_graph(
merge_msts<value_idx, value_t>(msf, new_mst, stream);
}

/**
* Connect an unconnected knn graph (one in which mst returns an msf). The
* device buffers underlying the Graph_COO object are modified in-place.
* @tparam value_idx index type
* @tparam value_t floating-point value type
* @param[in] handle raft handle
* @param[in] X original dense data on host memory from which knn graph was constructed
* @param[inout] msf edge list containing the mst result
* @param[in] m number of rows in X
* @param[in] n number of columns in X
* @param[in] n_components number of components in color
* @param[inout] color the color labels array returned from the mst invocation
* @return updated MST edge list
*/
template <typename value_idx, typename value_t>
void connect_knn_graph(
raft::resources const& handle,
const value_t* X,
raft::sparse::solver::Graph_COO<value_idx, value_idx, value_t>& msf,
size_t m,
size_t n,
int n_components,
value_idx* color,
cuvs::distance::DistanceType metric = cuvs::distance::DistanceType::L2SqrtExpanded)
{
auto stream = raft::resource::get_cuda_stream(handle);

// Copy color array from device to host
std::vector<value_idx> h_color(m);
raft::copy(h_color.data(), color, m, stream);
raft::resource::sync_stream(handle, stream);

std::unordered_map<value_idx, value_idx> color_remap;
value_idx new_label = 0;

// Build remapping table so that colors are compact integers (i.e. consecutive colors)
for (size_t i = 0; i < m; ++i) {
if (color_remap.find(h_color[i]) == color_remap.end()) {
color_remap[h_color[i]] = new_label++;
}
}

for (size_t i = 0; i < m; ++i) {
h_color[i] = color_remap[h_color[i]];
}

// make key (color) : value (vector of ids that have that color)
std::unordered_map<value_idx, std::vector<value_idx>> component_map;
for (value_idx i = 0; i < static_cast<value_idx>(m); ++i) {
component_map[h_color[i]].push_back(i);
}

std::vector<std::tuple<value_idx, value_idx, value_t>> selected_edges;

std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<> dis;

auto pairwise_dist = raft::make_device_matrix<value_t, int64_t>(handle, 1, 1);
auto data_u = raft::make_device_matrix<value_t, int64_t>(handle, 1, n);
auto data_v = raft::make_device_matrix<value_t, int64_t>(handle, 1, n);

// connect i-1 component and i component
for (int i = 1; i < n_components; ++i) {
value_idx color_a = i - 1;
value_idx color_b = i;

const auto& nodes_a = component_map[color_a];
const auto& nodes_b = component_map[color_b];

// Randomly pick a data index from each component
dis.param(std::uniform_int_distribution<>::param_type(0, nodes_a.size() - 1));
value_idx u = nodes_a[dis(gen)];

dis.param(std::uniform_int_distribution<>::param_type(0, nodes_b.size() - 1));
value_idx v = nodes_b[dis(gen)];

// calculate the distance between the two data vectors
raft::copy(data_u.data_handle(), X + u * n, n, stream);
raft::copy(data_v.data_handle(), X + v * n, n, stream);
cuvs::distance::pairwise_distance(handle,
raft::make_const_mdspan(data_u.view()),
raft::make_const_mdspan(data_v.view()),
pairwise_dist.view(),
metric);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on your answer to my previous question, if using omp, I would also create new handles with streams picked from a pool for this call to saturate the GPU with concurrency (pairwise distance of 1 row at a time on the GPU will likely be slower than doing this on the CPU)


value_t h_scalar; // Host variable
raft::copy(&h_scalar, pairwise_dist.data_handle(), 1, stream);

selected_edges.emplace_back(u, v, h_scalar);
}

// sort in order of ascending starting vertex so we can call sorted_coo_to_csr
std::sort(selected_edges.begin(),
selected_edges.end(),
[](const std::tuple<value_idx, value_idx, value_t>& a,
const std::tuple<value_idx, value_idx, value_t>& b) {
value_idx au = std::get<0>(a);
value_idx bu = std::get<0>(b);
value_idx av = std::get<1>(a);
value_idx bv = std::get<1>(b);
return (au < bu) || (au == bu && av < bv);
});

// Upload selected edges to device
size_t new_nnz = selected_edges.size();

rmm::device_uvector<value_idx> new_rows(new_nnz, stream);
rmm::device_uvector<value_idx> new_cols(new_nnz, stream);
rmm::device_uvector<value_t> new_vals(new_nnz, stream);

std::vector<value_idx> h_rows(new_nnz), h_cols(new_nnz);
std::vector<value_t> h_vals(new_nnz);
for (size_t i = 0; i < new_nnz; ++i) {
auto [u, v, w] = selected_edges[i];
h_rows[i] = u;
h_cols[i] = v;
h_vals[i] = w;
}

raft::copy(new_rows.data(), h_rows.data(), new_nnz, stream);
raft::copy(new_cols.data(), h_cols.data(), new_nnz, stream);
raft::copy(new_vals.data(), h_vals.data(), new_nnz, stream);

rmm::device_uvector<value_idx> indptr2(m + 1, stream);
raft::sparse::convert::sorted_coo_to_csr(new_rows.data(), new_nnz, indptr2.data(), m + 1, stream);

// On the second call, we hand the MST the original colors
// and the new set of edges and let it restart the optimization process
auto new_mst = raft::sparse::solver::mst<value_idx, value_idx, value_t, double>(handle,
indptr2.data(),
new_cols.data(),
new_vals.data(),
m,
new_nnz,
color,
stream,
false,
false);

merge_msts<value_idx, value_t>(msf, new_mst, stream);
}

/**
* Constructs an MST and sorts the resulting edges in ascending
* order by their weight.
Expand All @@ -130,6 +274,7 @@ void connect_knn_graph(
* @tparam value_idx
* @tparam value_t
* @param[in] handle raft handle
* @param[in] X dataset residing on host or device memory
* @param[in] indptr CSR indptr of connectivities graph
* @param[in] indices CSR indices array of connectivities graph
* @param[in] pw_dists CSR weights array of connectivities graph
Expand Down Expand Up @@ -168,8 +313,16 @@ void build_sorted_mst(
int iters = 1;
int n_components = cuvs::sparse::neighbors::get_n_components(color, m, stream);

cudaPointerAttributes attr;
RAFT_CUDA_TRY(cudaPointerGetAttributes(&attr, X));
bool data_on_device = attr.type == cudaMemoryTypeDevice;

while (n_components > 1 && iters < max_iter) {
connect_knn_graph<value_idx, value_t>(handle, X, mst_coo, m, n, color, reduction_op);
if (data_on_device) {
connect_knn_graph<value_idx, value_t>(handle, X, mst_coo, m, n, color, reduction_op);
} else {
connect_knn_graph<value_idx, value_t>(handle, X, mst_coo, m, n, n_components, color, metric);
}

iters++;

Expand Down
Loading