Skip to content
This repository has been archived by the owner on Aug 16, 2023. It is now read-only.

Commit

Permalink
replace omp with threads pool in diskann (#759)
Browse files Browse the repository at this point in the history
Signed-off-by: cqy123456 <[email protected]>
  • Loading branch information
cqy123456 authored Mar 27, 2023
1 parent 7937871 commit e5d7dff
Show file tree
Hide file tree
Showing 14 changed files with 586 additions and 470 deletions.
1 change: 1 addition & 0 deletions ci/docker/set_docker_mirror.sh
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ set_mirror(){
restart_docker
echo "Success."
exit 0

}

set_mirror
6 changes: 2 additions & 4 deletions include/knowhere/feder/DiskANN.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,14 @@ class DiskANNMeta {
DiskANNMeta() = default;

DiskANNMeta(const std::string& data_path, const int32_t max_degree, const int32_t search_list_size,
const float pq_code_budget_gb, const float build_dram_budget_gb, const int32_t num_threads,
const int32_t disk_pq_dims, const bool accelerate_build, const int64_t num_elem,
const std::vector<int64_t>& entry_points)
const float pq_code_budget_gb, const float build_dram_budget_gb, const int32_t disk_pq_dims,
const bool accelerate_build, const int64_t num_elem, const std::vector<int64_t>& entry_points)
: num_elem_(num_elem), entry_points_(entry_points) {
build_params_.data_path = data_path;
build_params_.max_degree = max_degree;
build_params_.search_list_size = search_list_size;
build_params_.pq_code_budget_gb = pq_code_budget_gb;
build_params_.build_dram_budget_gb = build_dram_budget_gb;
build_params_.num_threads = num_threads;
build_params_.disk_pq_dims = disk_pq_dims;
build_params_.accelerate_build = accelerate_build;
}
Expand Down
55 changes: 27 additions & 28 deletions src/common/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,34 +14,33 @@
#include "knowhere/log.h"
namespace knowhere {

static const std::unordered_set<std::string> ext_legal_json_keys = {
"metric_type",
"dim",
"nlist",
"nprobe",
"level",
"index_type",
"index_mode",
"collection_id",
"partition_id",
"segment_id",
"field_id",
"index_build_id",
"index_id",
"index_version",
"pq_code_budget_gb_ratio",
"num_build_thread_ratio",
"search_cache_budget_gb_ratio",
"num_load_thread_ratio",
"beamwidth_ratio",
"search_list",
"num_build_thread",
"num_load_thread",
"index_files",
"gpu_id",
"nbits",
"m",
};
static const std::unordered_set<std::string> ext_legal_json_keys = {"metric_type",
"dim",
"nlist",
"nprobe",
"level",
"index_type",
"index_mode",
"collection_id",
"partition_id",
"segment_id",
"field_id",
"index_build_id",
"index_id",
"index_version",
"pq_code_budget_gb_ratio",
"num_build_thread_ratio",
"search_cache_budget_gb_ratio",
"num_load_thread_ratio",
"beamwidth_ratio",
"search_list",
"num_build_thread",
"num_load_thread",
"index_files",
"gpu_id",
"nbits",
"m",
"num_threads"};

Status
Config::FormatAndCheck(const Config& cfg, Json& json) {
Expand Down
60 changes: 41 additions & 19 deletions src/index/diskann/diskann.cc
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,6 @@ class DiskANNIndexNode : public IndexNode {
std::string index_prefix_;
mutable std::mutex preparation_lock_;
std::atomic_bool is_prepared_;
int32_t num_threads_;
std::shared_ptr<FileManager> file_manager_;
std::unique_ptr<diskann::PQFlashIndex<T>> pq_flash_index_;
std::atomic_int64_t dim_;
Expand Down Expand Up @@ -280,7 +279,6 @@ DiskANNIndexNode<T>::Add(const DataSet& dataset, const Config& cfg) {
static_cast<unsigned>(build_conf.search_list_size),
static_cast<double>(build_conf.pq_code_budget_gb),
static_cast<double>(build_conf.build_dram_budget_gb),
static_cast<uint32_t>(build_conf.num_threads),
static_cast<uint32_t>(build_conf.disk_pq_dims),
false,
build_conf.accelerate_build};
Expand Down Expand Up @@ -355,8 +353,8 @@ DiskANNIndexNode<T>::Prepare(const Config& cfg) {

pq_flash_index_ = std::make_unique<diskann::PQFlashIndex<T>>(reader, diskann_metric);

auto load_expect = TryDiskANNCall<int>(
[&]() -> int { return pq_flash_index_->load(prep_conf.num_threads, index_prefix_.c_str()); });
auto load_expect =
TryDiskANNCall<int>([&]() -> int { return pq_flash_index_->load(pool_->size(), index_prefix_.c_str()); });

if (!load_expect.has_value() || load_expect.value() != 0) {
LOG_KNOWHERE_ERROR_ << "Failed to load DiskANN.";
Expand Down Expand Up @@ -397,7 +395,7 @@ DiskANNIndexNode<T>::Prepare(const Config& cfg) {
} else {
auto gen_cache_expect = TryDiskANNCall<bool>([&]() -> bool {
pq_flash_index_->generate_cache_list_from_sample_queries(warmup_query_file, 15, 6, num_nodes_to_cache,
prep_conf.num_threads, node_list);
node_list);
return true;
});

Expand All @@ -417,10 +415,6 @@ DiskANNIndexNode<T>::Prepare(const Config& cfg) {
}
}

// set thread number
omp_set_num_threads(prep_conf.num_threads);
num_threads_ = prep_conf.num_threads;

// warmup
if (prep_conf.warm_up) {
LOG_KNOWHERE_INFO_ << "Warming up.";
Expand All @@ -441,15 +435,22 @@ DiskANNIndexNode<T>::Prepare(const Config& cfg) {
std::vector<float> warmup_result_dists(warmup_num, 0);

bool all_searches_are_good = true;
#pragma omp parallel for schedule(dynamic, 1)

std::vector<std::future<void>> futures;
futures.reserve(warmup_num);
for (_s64 i = 0; i < (int64_t)warmup_num; ++i) {
auto search_expect = TryDiskANNCall<bool>([&]() -> bool {
pq_flash_index_->cached_beam_search(warmup + (i * warmup_aligned_dim), 1, warmup_L,
warmup_result_ids_64.data() + (i * 1),
warmup_result_dists.data() + (i * 1), 4);
futures.push_back(pool_->push([&, index = i]() {
pq_flash_index_->cached_beam_search(warmup + (index * warmup_aligned_dim), 1, warmup_L,
warmup_result_ids_64.data() + (index * 1),
warmup_result_dists.data() + (index * 1), 4);
}));
}
for (auto& future : futures) {
auto one_search_res = TryDiskANNCall<bool>([&]() {
future.get();
return true;
});
if (!search_expect.has_value()) {
if (!one_search_res.has_value()) {
all_searches_are_good = false;
}
}
Expand Down Expand Up @@ -507,6 +508,7 @@ DiskANNIndexNode<T>::Search(const DataSet& dataset, const Config& cfg, const Bit
auto p_id = new int64_t[k * nq];
auto p_dist = new float[k * nq];

bool all_searches_are_good = true;
std::vector<std::future<void>> futures;
futures.reserve(nq);
for (int64_t row = 0; row < nq; ++row) {
Expand All @@ -516,7 +518,17 @@ DiskANNIndexNode<T>::Search(const DataSet& dataset, const Config& cfg, const Bit
}));
}
for (auto& future : futures) {
future.get();
auto one_search_res = TryDiskANNCall<bool>([&]() {
future.get();
return true;
});
if (!one_search_res.has_value()) {
all_searches_are_good = false;
}
}

if (!all_searches_are_good) {
return unexpected(Status::diskann_inner_error);
}

auto res = GenResultDataSet(nq, k, p_id, p_dist);
Expand Down Expand Up @@ -572,6 +584,7 @@ DiskANNIndexNode<T>::RangeSearch(const DataSet& dataset, const Config& cfg, cons

std::vector<std::future<void>> futures;
futures.reserve(nq);
bool all_searches_are_good = true;
for (int64_t row = 0; row < nq; ++row) {
futures.push_back(pool_->push([&, index = row]() {
std::vector<int64_t> indices;
Expand All @@ -586,8 +599,18 @@ DiskANNIndexNode<T>::RangeSearch(const DataSet& dataset, const Config& cfg, cons
}));
}
for (auto& future : futures) {
future.get();
auto one_search_res = TryDiskANNCall<bool>([&]() {
future.get();
return true;
});
if (!one_search_res.has_value()) {
all_searches_are_good = false;
}
}
if (!all_searches_are_good) {
return unexpected(Status::diskann_inner_error);
}

GetRangeSearchResult(result_dist_array, result_id_array, is_ip, nq, radius, search_conf.range_filter, p_dist, p_id,
p_lims);
return GenResultDataSet(nq, p_id, p_dist, p_lims);
Expand All @@ -603,8 +626,7 @@ DiskANNIndexNode<T>::GetIndexMeta(const Config& cfg) const {
auto diskann_conf = static_cast<const DiskANNConfig&>(cfg);
feder::diskann::DiskANNMeta meta(diskann_conf.data_path, diskann_conf.max_degree, diskann_conf.search_list_size,
diskann_conf.pq_code_budget_gb, diskann_conf.build_dram_budget_gb,
diskann_conf.num_threads, diskann_conf.disk_pq_dims, diskann_conf.accelerate_build,
Count(), entry_points);
diskann_conf.disk_pq_dims, diskann_conf.accelerate_build, Count(), entry_points);
std::unordered_set<int64_t> id_set(entry_points.begin(), entry_points.end());

Json json_meta, json_id_set;
Expand Down
14 changes: 0 additions & 14 deletions src/index/diskann/diskann_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,6 @@ class DiskANNConfig : public BaseConfig {
// This is the flag to enable fast build, in which we will not build vamana graph by full 2 round. This can
// accelerate index build ~30% with an ~1% recall regression.
CFG_BOOL accelerate_build;
// The number of threads used for preparing and searching. When 'num_threads' uses as build parameter, the indexing
// time improves almost linearly with the number of threads (subject to the cores available on the machine and DRAM
// bandwidth). When 'num_threads' uses as prepare parameter, Threads run in parallel and one thread handles one
// query at a time. More threads will result in higher aggregate query throughput, but will also use more IOs/second
// across the system, which may lead to higher per-query latency. So find the balance depending on the maximum
// number of IOPs supported by the SSD.
CFG_INT num_threads;
// While serving the index, the entire graph is stored on SSD. For faster search performance, you can cache a few
// frequently accessed nodes in memory.
CFG_FLOAT search_cache_budget_gb;
Expand Down Expand Up @@ -102,13 +95,6 @@ class DiskANNConfig : public BaseConfig {
.description("limit on the memory allowed for building the index in GB.")
.set_range(0, std::numeric_limits<CFG_FLOAT>::max())
.for_train();
KNOWHERE_CONFIG_DECLARE_FIELD(num_threads)
.description("number of threads used by the index build/search process.")
.set_default(8)
.set_range(1, 256)
.for_train()
.for_search()
.for_range_search();
KNOWHERE_CONFIG_DECLARE_FIELD(disk_pq_dims)
.description("the dimension of compressed vectors stored on the ssd, use 0 to store uncompressed data.")
.set_default(0)
Expand Down
4 changes: 2 additions & 2 deletions thirdparty/DiskANN/include/diskann/aux_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#pragma once
#include <algorithm>
#include <fcntl.h>
#include <future>
#include <cassert>
#include <cstdlib>
#include <cstring>
Expand Down Expand Up @@ -31,6 +32,7 @@ typedef int FileHandle;

#include "utils.h"
#include "windows_customizations.h"
#include "knowhere/comp/thread_pool.h"

namespace diskann {
const size_t MAX_PQ_TRAINING_SET_SIZE = 256000;
Expand Down Expand Up @@ -117,8 +119,6 @@ namespace diskann {
double pq_code_size_gb = 0.0;
// M (memory limit while indexing)
double index_mem_gb = 0.0;
// T (number of threads for indexing)
uint32_t num_threads = 0;
// B' (PQ dim for disk index: optional parameter for very
// large dimensional data)
uint32_t disk_pq_dims = 0;
Expand Down
Loading

0 comments on commit e5d7dff

Please sign in to comment.