Skip to content

Commit d70d05d

Browse files
kx79wqRUrlus
authored andcommitted
ENH: new function zip_sp_matmul_topn that can zip matrices zip_j A.dot(B_j)
Function will return a zipped matrix Z in CSR format, zip_j C_j, where Z = [sorted top n results > lower_bound for each row of C_j], where C_j = A.dot(B_j) and where B has been split row-wise into sub-matrices B_j. Function only allows for sorted variant of sp_matzip function; unsorted variant (sorted based on insertion order) cannot be (made) equal to unsorted function on full matrices. zip_sp_matmul_topn by default sorts by value. And added python function to zip split matrices. Plus added two unit tests to test functionality. NB Skip unit test test_stack_zip_sp_matmul_topn for python 3.8 due to bug in scipy vstack function, it does not support all data types.
1 parent 976f6ec commit d70d05d

File tree

12 files changed

+518
-6
lines changed

12 files changed

+518
-6
lines changed

CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ set(SDTN_SRC_FILES
4242
${SDTN_SRC_PREF}/extension.cpp
4343
${SDTN_SRC_PREF}/sp_matmul_bindings.cpp
4444
${SDTN_SRC_PREF}/sp_matmul_topn_bindings.cpp
45+
${SDTN_SRC_PREF}/zip_sp_matmul_topn_bindings.cpp
4546
)
4647

4748
include(FindDependencies)

README.md

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,47 @@ pip install sparse_dot_topn --no-binary sparse_dot_topn
8080
Building from source can enable architecture specific optimisations and is recommended for those that have a C++ compiler installed.
8181
See INSTALLATION.md for details.
8282

83+
## Distributing the top-n multiplication of two large O(10M+) sparse matrices over a cluster
84+
85+
The top-n multiplication of two large O(10M+) sparse matrices can be broken down into smaller chunks.
86+
For example, one may want to split sparse matrices into matrices with just 1M rows, and do the
87+
the (top-n) multiplication of all those matrix pairs.
88+
Reasons to do this are to reduce the memory footprint of each pair, and to employ available distributed computing power.
89+
90+
The pairs can be distributed and calculated over a cluster (eg. we use a spark cluster).
91+
The resulting matrix-products are then zipped and stacked in order to reproduce the full matrix product.
92+
93+
Here's an example how to do this, where we are matching 1000 rows in sparse matrix A against 600 rows in sparse matrix B,
94+
and both A and B are split into chunks.
95+
96+
```python
97+
import numpy as np
98+
import scipy.sparse as sparse
99+
from sparse_dot_topn import sp_matmul_topn, zip_sp_matmul_topn
100+
101+
# 1a. Example matching 1000 rows in sparse matrix A against 600 rows in sparse matrix B.
102+
A = sparse.random(1000, 2000, density=0.1, format="csr", dtype=np.float32, random_state=rng)
103+
B = sparse.random(600, 2000, density=0.1, format="csr", dtype=np.float32, random_state=rng)
104+
105+
# 1b. Reference full matrix product with top-n
106+
C_ref = sp_matmul_topn(A, B.T, top_n=10, threshold=0.01, sort=True)
107+
108+
# 2a. Split the sparse matrices. Here A is split into three parts, and B into five parts.
109+
As = [A[i*200:(i+1)*200] for i in range(5)]
110+
Bs = [B[:100], B[100:300], B[300:]]
111+
112+
# 2b. Perform the top-n multiplication of all sub-matrix pairs, here in a double loop.
113+
# E.g. all sub-matrix pairs could be distributed over a cluster and multiplied there.
114+
Cs = [[sp_matmul_topn(Aj, Bi.T, top_n=10, threshold=0.01, sort=True) for Bi in Bs] for Aj in As]
115+
116+
# 2c. top-n zipping of the C-matrices, done over the index of the B sub-matrices.
117+
Czip = [zip_sp_matmul_topn(top_n=10, C_mats=Cis) for Cis in Cs]
118+
119+
# 2d. stacking over zipped C-matrices, done over the index of the A sub-matrices
120+
# The resulting matrix C equals C_ref.
121+
C = sparse.vstack(Czip, dtype=np.float32)
122+
```
123+
83124
## Migrating to v1.
84125

85126
**sparse\_dot\_topn** v1 is a significant change from `v0.*` with a new bindings and API.
@@ -147,4 +188,5 @@ You can read about it in a [blog](https://medium.com/@ingwbaa/https-medium-com-i
147188
* [Stephane Collet](https://github.com/stephanecollot)
148189
* [Particular Miner](https://github.com/ParticularMiner) (no ING affiliation)
149190
* [Ralph Urlus](https://github.com/RUrlus)
191+
* [Max Baak](https://github.com/mbaak)
150192

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,8 +135,8 @@ select = [
135135
"RSE",
136136
# flynt
137137
"FLY",
138-
"CPY001"
139138
]
139+
140140
ignore = [
141141
"E501", # line length
142142
"PLR0913", # too many arguments

src/sparse_dot_topn/__init__.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,16 @@
22
import importlib.metadata
33

44
__version__ = importlib.metadata.version("sparse_dot_topn")
5-
from sparse_dot_topn.api import awesome_cossim_topn, sp_matmul, sp_matmul_topn
5+
from sparse_dot_topn.api import awesome_cossim_topn, sp_matmul, sp_matmul_topn, zip_sp_matmul_topn
66
from sparse_dot_topn.lib import _sparse_dot_topn_core as _core
77
from sparse_dot_topn.lib._sparse_dot_topn_core import _has_openmp_support
88

9-
__all__ = ["awesome_cossim_topn", "sp_matmul", "sp_matmul_topn", "_core", "__version__", "_has_openmp_support"]
9+
__all__ = [
10+
"awesome_cossim_topn",
11+
"sp_matmul",
12+
"sp_matmul_topn",
13+
"zip_sp_matmul_topn",
14+
"_core",
15+
"__version__",
16+
"_has_openmp_support",
17+
]

src/sparse_dot_topn/api.py

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,3 +282,60 @@ def sp_matmul_topn(
282282
msg = "sparse_dot_topn: extension was compiled without parallelisation (OpenMP) support, ignoring ``n_threads``"
283283
warnings.warn(msg, stacklevel=1)
284284
return csr_matrix(func(**kwargs), shape=(A_nrows, B_ncols))
285+
286+
287+
def zip_sp_matmul_topn(top_n: int, C_mats: list[csr_matrix]) -> csr_matrix:
288+
"""Compute zip-matrix C = zip_i C_i = zip_i A * B_i = A * B whilst only storing the `top_n` elements.
289+
290+
Combine the sub-matrices together and keep only the `top_n` elements per row.
291+
292+
Pre-calling this function, matrix B has been split row-wise into chunks B_i, and C_i = A * B_i have been calculated.
293+
This function computes C = zip_i C_i, which is equivalent to A * B when only keeping the `top_n` elements.
294+
It allows very large matrices to be split and multiplied with a limited memory footprint.
295+
296+
Args:
297+
top_n: the number of results to retain; should be smaller or equal to top_n used to obtain C_mats.
298+
C_mats: a list with each C_i sub-matrix, with format csr_matrix.
299+
300+
Returns:
301+
C: zipped result matrix
302+
303+
Raises:
304+
TypeError: when not all elements of `C_mats` is a csr_matrix or trivially convertable
305+
ValueError: when not all elements of `C_mats` has the same number of rows
306+
"""
307+
_nrows = []
308+
ncols = []
309+
data = []
310+
indptr = []
311+
indices = []
312+
for C in C_mats:
313+
# check correct type of each C
314+
if isinstance(C, (coo_matrix, csc_matrix)):
315+
C = C.tocsr(False)
316+
elif not isinstance(C, csr_matrix):
317+
msg = f"type of `C` must be one of `csr_matrix`, `csc_matrix` or `csr_matrix`, got `{type(C)}`"
318+
raise TypeError(msg)
319+
320+
nrows, c_nc = C.shape
321+
_nrows.append(nrows)
322+
ncols.append(c_nc)
323+
data.append(C.data)
324+
indptr.append(C.indptr)
325+
indices.append(C.indices)
326+
327+
if not np.all(np.diff(_nrows) == 0):
328+
msg = "Each `C` in `C_mats` should have the same number of rows."
329+
raise ValueError(msg)
330+
331+
return csr_matrix(
332+
_core.zip_sp_matmul_topn(
333+
top_n=top_n,
334+
Z_max_nnz=nrows * top_n,
335+
nrows=nrows,
336+
B_ncols=np.asarray(ncols, int),
337+
data=data,
338+
indptr=indptr,
339+
indices=indices,
340+
)
341+
)

src/sparse_dot_topn_core/include/sparse_dot_topn/maxheap.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ class MaxHeap {
9696
}
9797

9898
/**
99-
* \brief Sort the heap accoring to the insertion order.
99+
* \brief Sort the heap according to the insertion order.
100100
*
101101
* \details Note that calling `insertion_sort` invalidates the heap.
102102
* Calls should be followed by a call to `reset`.
@@ -106,7 +106,7 @@ class MaxHeap {
106106
}
107107

108108
/**
109-
* \brief Sort the heap accoring to values.
109+
* \brief Sort the heap according to values.
110110
*
111111
* \details Note that calling `value_sort` invalidates the heap.
112112
* Calls should be followed by a call to `reset`.

src/sparse_dot_topn_core/include/sparse_dot_topn/sp_matmul_topn.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
#include <algorithm>
2121
#include <cstring>
22+
#include <limits>
2223
#include <memory>
2324
#include <numeric>
2425
#include <tuple>
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/* Copyright (c) 2023 ING Analytics Wholesale Banking
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
#pragma once
18+
19+
#include <limits>
20+
#include <vector>
21+
22+
#include <sparse_dot_topn/common.hpp>
23+
#include <sparse_dot_topn/maxheap.hpp>
24+
25+
namespace sdtn::core {
26+
27+
/**
28+
* \brief Zip and compute Z = zip_j C_j = zip_j A.dot(B_j) keeping only the
29+
* top-n of the zipped results.
30+
*
31+
* \details This function will return a zipped matrix Z in CSR format, zip_j
32+
* C_j, where Z = [sorted top n results > lower_bound for each row of C_j],
33+
* where C_j = A.dot(B_j) and where B has been split row-wise into sub-matrices
34+
* B_j. Note that `C_j` must be `CSR` format where the nonzero elements of the
35+
* `i`th row are located in ``data[indptr[i]:indptr[i+1]]``. The column indices
36+
* for row `i` are stored in
37+
* ``indices[indptr[i]:indptr[i+1]]``.
38+
*
39+
* \tparam eT element type of the matrices
40+
* \tparam idxT integer type of the index arrays, must be at least 32 bit int
41+
* \param[in] top_n the top n values to store
42+
* \param[in] nrowsA the number of rows in A
43+
* \param[in] ncolsB_vec the number of columns in each B_i sub-matrix
44+
* \param[in] C_data_vec vector of the nonzero elements of each C_data_j
45+
* sub-matrix
46+
* \param[in] C_indptr_vec vector of arrays containing the row indices for
47+
* `C_data_j` sub-matrices
48+
* \param[in] C_indices_vec vector of arrays containing the column indices
49+
for the C_j sub-matrices
50+
* \param[out] Z_data the nonzero elements of zipped Z matrix
51+
* \param[out] Z_indptr array containing the row indices for zipped `Z_data`
52+
* \param[out] Z_indices array containing the zipped column indices
53+
*/
54+
template <typename eT, typename idxT, iffInt<idxT> = true>
55+
inline void zip_sp_matmul_topn(
56+
const idxT top_n,
57+
const idxT nrows,
58+
const idxT* B_ncols,
59+
const std::vector<const eT*>& C_data,
60+
const std::vector<const idxT*>& C_indptrs,
61+
const std::vector<const idxT*>& C_indices,
62+
eT* __restrict Z_data,
63+
idxT* __restrict Z_indptr,
64+
idxT* __restrict Z_indices
65+
) {
66+
idxT nnz = 0;
67+
Z_indptr[0] = 0;
68+
eT* Z_data_head = Z_data;
69+
idxT* Z_indices_head = Z_indices;
70+
const int n_mat = C_data.size();
71+
72+
// threshold is already consistent between matrices, so accept every line.
73+
auto max_heap = MaxHeap<eT, idxT>(top_n, std::numeric_limits<eT>::min());
74+
75+
// offset the index when concatenating the C sub-matrices (split by row)
76+
std::vector<idxT> offset(n_mat, idxT(0));
77+
for (int i = 0; i < n_mat - 1; ++i) {
78+
for (int j = i; j < n_mat - 1; ++j) {
79+
offset[j + 1] += B_ncols[i];
80+
}
81+
}
82+
83+
// concatenate the results of each row, apply top_n and add those results to
84+
// the C matrix
85+
for (idxT i = 0; i < nrows; ++i) {
86+
eT min = max_heap.reset();
87+
88+
// keep topn of stacked lines for each row insert in reverse order,
89+
// similar to the reverse linked list in sp_matmul_topn
90+
for (int j = n_mat - 1; j >= 0; --j) {
91+
const idxT* C_indptr_j = C_indptrs[j];
92+
const idxT* C_indices_j = C_indices[j];
93+
for (idxT k = C_indptr_j[i]; k < C_indptr_j[i + 1]; ++k) {
94+
eT val = (C_data[j])[k];
95+
if (val > min) {
96+
min = max_heap.push_pop(offset[j] + C_indices_j[k], val);
97+
}
98+
}
99+
}
100+
101+
// sort the heap s.t. the first value is the largest
102+
max_heap.value_sort();
103+
104+
// fill the zipped sparse matrix Z
105+
int n_set = max_heap.get_n_set();
106+
for (int ii = 0; ii < n_set; ++ii) {
107+
*Z_indices_head = max_heap.heap[ii].idx;
108+
*Z_data_head = max_heap.heap[ii].val;
109+
Z_indices_head++;
110+
Z_data_head++;
111+
}
112+
nnz += n_set;
113+
Z_indptr[i + 1] = nnz;
114+
}
115+
}
116+
117+
} // namespace sdtn::core
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/* Copyright (c) 2023 ING Analytics Wholesale Banking
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
#pragma once
18+
19+
#include <nanobind/nanobind.h>
20+
#include <nanobind/ndarray.h>
21+
#include <nanobind/stl/vector.h>
22+
23+
#include <memory>
24+
#include <numeric>
25+
#include <vector>
26+
27+
#include <sparse_dot_topn/common.hpp>
28+
#include <sparse_dot_topn/maxheap.hpp>
29+
#include <sparse_dot_topn/zip_sp_matmul_topn.hpp>
30+
31+
namespace sdtn {
32+
namespace nb = nanobind;
33+
34+
namespace api {
35+
36+
template <typename eT, typename idxT, core::iffInt<idxT> = true>
37+
inline nb::tuple zip_sp_matmul_topn(
38+
const int top_n,
39+
const idxT Z_max_nnz,
40+
const idxT nrows,
41+
const nb_vec<idxT>& B_ncols,
42+
const std::vector<nb_vec<eT>>& data,
43+
const std::vector<nb_vec<idxT>>& indptr,
44+
const std::vector<nb_vec<idxT>>& indices
45+
) {
46+
const int n_mats = B_ncols.size();
47+
std::vector<const eT*> data_ptrs;
48+
data_ptrs.reserve(n_mats);
49+
std::vector<const idxT*> indptr_ptrs;
50+
indptr_ptrs.reserve(n_mats);
51+
std::vector<const idxT*> indices_ptrs;
52+
indices_ptrs.reserve(n_mats);
53+
54+
for (int i = 0; i < n_mats; ++i) {
55+
data_ptrs.push_back(data[i].data());
56+
indptr_ptrs.push_back(indptr[i].data());
57+
indices_ptrs.push_back(indices[i].data());
58+
}
59+
60+
auto Z_indptr = std::unique_ptr<idxT[]>(new idxT[nrows + 1]);
61+
auto Z_indices = std::unique_ptr<idxT>(new idxT[Z_max_nnz]);
62+
auto Z_data = std::unique_ptr<eT>(new eT[Z_max_nnz]);
63+
64+
core::zip_sp_matmul_topn<eT, idxT>(
65+
top_n,
66+
nrows,
67+
B_ncols.data(),
68+
data_ptrs,
69+
indptr_ptrs,
70+
indices_ptrs,
71+
Z_data.get(),
72+
Z_indptr.get(),
73+
Z_indices.get()
74+
);
75+
76+
return nb::make_tuple(
77+
to_nbvec<eT>(Z_data.release(), Z_max_nnz),
78+
to_nbvec<idxT>(Z_indices.release(), Z_max_nnz),
79+
to_nbvec<idxT>(Z_indptr.release(), nrows + 1)
80+
);
81+
}
82+
} // namespace api
83+
84+
namespace bindings {
85+
void bind_zip_sp_matmul_topn(nb::module_& m);
86+
}
87+
88+
} // namespace sdtn

0 commit comments

Comments
 (0)