Skip to content
This repository has been archived by the owner on Aug 16, 2023. It is now read-only.

Remove omp process in diskann index #759

Merged
merged 1 commit into from
Mar 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ci/docker/set_docker_mirror.sh
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ set_mirror(){
restart_docker
echo "Success."
exit 0

}

set_mirror
6 changes: 2 additions & 4 deletions include/knowhere/feder/DiskANN.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,14 @@ class DiskANNMeta {
DiskANNMeta() = default;

DiskANNMeta(const std::string& data_path, const int32_t max_degree, const int32_t search_list_size,
const float pq_code_budget_gb, const float build_dram_budget_gb, const int32_t num_threads,
const int32_t disk_pq_dims, const bool accelerate_build, const int64_t num_elem,
const std::vector<int64_t>& entry_points)
const float pq_code_budget_gb, const float build_dram_budget_gb, const int32_t disk_pq_dims,
const bool accelerate_build, const int64_t num_elem, const std::vector<int64_t>& entry_points)
: num_elem_(num_elem), entry_points_(entry_points) {
build_params_.data_path = data_path;
build_params_.max_degree = max_degree;
build_params_.search_list_size = search_list_size;
build_params_.pq_code_budget_gb = pq_code_budget_gb;
build_params_.build_dram_budget_gb = build_dram_budget_gb;
build_params_.num_threads = num_threads;
build_params_.disk_pq_dims = disk_pq_dims;
build_params_.accelerate_build = accelerate_build;
}
Expand Down
55 changes: 27 additions & 28 deletions src/common/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,34 +14,33 @@
#include "knowhere/log.h"
namespace knowhere {

static const std::unordered_set<std::string> ext_legal_json_keys = {
"metric_type",
"dim",
"nlist",
"nprobe",
"level",
"index_type",
"index_mode",
"collection_id",
"partition_id",
"segment_id",
"field_id",
"index_build_id",
"index_id",
"index_version",
"pq_code_budget_gb_ratio",
"num_build_thread_ratio",
"search_cache_budget_gb_ratio",
"num_load_thread_ratio",
"beamwidth_ratio",
"search_list",
"num_build_thread",
"num_load_thread",
"index_files",
"gpu_id",
"nbits",
"m",
};
static const std::unordered_set<std::string> ext_legal_json_keys = {"metric_type",
"dim",
"nlist",
"nprobe",
"level",
"index_type",
"index_mode",
"collection_id",
"partition_id",
"segment_id",
"field_id",
"index_build_id",
"index_id",
"index_version",
"pq_code_budget_gb_ratio",
"num_build_thread_ratio",
"search_cache_budget_gb_ratio",
"num_load_thread_ratio",
"beamwidth_ratio",
"search_list",
"num_build_thread",
"num_load_thread",
"index_files",
"gpu_id",
"nbits",
"m",
"num_threads"};

Status
Config::FormatAndCheck(const Config& cfg, Json& json) {
Expand Down
60 changes: 41 additions & 19 deletions src/index/diskann/diskann.cc
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,6 @@ class DiskANNIndexNode : public IndexNode {
std::string index_prefix_;
mutable std::mutex preparation_lock_;
std::atomic_bool is_prepared_;
int32_t num_threads_;
std::shared_ptr<FileManager> file_manager_;
std::unique_ptr<diskann::PQFlashIndex<T>> pq_flash_index_;
std::atomic_int64_t dim_;
Expand Down Expand Up @@ -275,7 +274,6 @@ DiskANNIndexNode<T>::Add(const DataSet& dataset, const Config& cfg) {
static_cast<unsigned>(build_conf.search_list_size),
static_cast<double>(build_conf.pq_code_budget_gb),
static_cast<double>(build_conf.build_dram_budget_gb),
static_cast<uint32_t>(build_conf.num_threads),
static_cast<uint32_t>(build_conf.disk_pq_dims),
false,
build_conf.accelerate_build};
Expand Down Expand Up @@ -350,8 +348,8 @@ DiskANNIndexNode<T>::Prepare(const Config& cfg) {

pq_flash_index_ = std::make_unique<diskann::PQFlashIndex<T>>(reader, diskann_metric);

auto load_expect = TryDiskANNCall<int>(
[&]() -> int { return pq_flash_index_->load(prep_conf.num_threads, index_prefix_.c_str()); });
auto load_expect =
TryDiskANNCall<int>([&]() -> int { return pq_flash_index_->load(pool_->size(), index_prefix_.c_str()); });

if (!load_expect.has_value() || load_expect.value() != 0) {
LOG_KNOWHERE_ERROR_ << "Failed to load DiskANN.";
Expand Down Expand Up @@ -392,7 +390,7 @@ DiskANNIndexNode<T>::Prepare(const Config& cfg) {
} else {
auto gen_cache_expect = TryDiskANNCall<bool>([&]() -> bool {
pq_flash_index_->generate_cache_list_from_sample_queries(warmup_query_file, 15, 6, num_nodes_to_cache,
prep_conf.num_threads, node_list);
node_list);
return true;
});

Expand All @@ -412,10 +410,6 @@ DiskANNIndexNode<T>::Prepare(const Config& cfg) {
}
}

// set thread number
omp_set_num_threads(prep_conf.num_threads);
num_threads_ = prep_conf.num_threads;

// warmup
if (prep_conf.warm_up) {
LOG_KNOWHERE_INFO_ << "Warming up.";
Expand All @@ -436,15 +430,22 @@ DiskANNIndexNode<T>::Prepare(const Config& cfg) {
std::vector<float> warmup_result_dists(warmup_num, 0);

bool all_searches_are_good = true;
#pragma omp parallel for schedule(dynamic, 1)

std::vector<std::future<void>> futures;
futures.reserve(warmup_num);
for (_s64 i = 0; i < (int64_t)warmup_num; ++i) {
auto search_expect = TryDiskANNCall<bool>([&]() -> bool {
pq_flash_index_->cached_beam_search(warmup + (i * warmup_aligned_dim), 1, warmup_L,
warmup_result_ids_64.data() + (i * 1),
warmup_result_dists.data() + (i * 1), 4);
futures.push_back(pool_->push([&, index = i]() {
pq_flash_index_->cached_beam_search(warmup + (index * warmup_aligned_dim), 1, warmup_L,
warmup_result_ids_64.data() + (index * 1),
warmup_result_dists.data() + (index * 1), 4);
}));
}
for (auto& future : futures) {
auto one_search_res = TryDiskANNCall<bool>([&]() {
future.get();
return true;
});
if (!search_expect.has_value()) {
if (!one_search_res.has_value()) {
all_searches_are_good = false;
}
}
Expand Down Expand Up @@ -502,6 +503,7 @@ DiskANNIndexNode<T>::Search(const DataSet& dataset, const Config& cfg, const Bit
auto p_id = new int64_t[k * nq];
auto p_dist = new float[k * nq];

bool all_searches_are_good = true;
std::vector<std::future<void>> futures;
futures.reserve(nq);
for (int64_t row = 0; row < nq; ++row) {
Expand All @@ -511,7 +513,17 @@ DiskANNIndexNode<T>::Search(const DataSet& dataset, const Config& cfg, const Bit
}));
}
for (auto& future : futures) {
future.get();
auto one_search_res = TryDiskANNCall<bool>([&]() {
future.get();
return true;
});
if (!one_search_res.has_value()) {
all_searches_are_good = false;
}
}

if (!all_searches_are_good) {
return unexpected(Status::diskann_inner_error);
}

auto res = GenResultDataSet(nq, k, p_id, p_dist);
Expand Down Expand Up @@ -567,6 +579,7 @@ DiskANNIndexNode<T>::RangeSearch(const DataSet& dataset, const Config& cfg, cons

std::vector<std::future<void>> futures;
futures.reserve(nq);
bool all_searches_are_good = true;
for (int64_t row = 0; row < nq; ++row) {
futures.push_back(pool_->push([&, index = row]() {
std::vector<int64_t> indices;
Expand All @@ -581,8 +594,18 @@ DiskANNIndexNode<T>::RangeSearch(const DataSet& dataset, const Config& cfg, cons
}));
}
for (auto& future : futures) {
future.get();
auto one_search_res = TryDiskANNCall<bool>([&]() {
future.get();
return true;
});
if (!one_search_res.has_value()) {
all_searches_are_good = false;
}
}
if (!all_searches_are_good) {
return unexpected(Status::diskann_inner_error);
}

GetRangeSearchResult(result_dist_array, result_id_array, is_ip, nq, radius, search_conf.range_filter, p_dist, p_id,
p_lims);
return GenResultDataSet(nq, p_id, p_dist, p_lims);
Expand All @@ -598,8 +621,7 @@ DiskANNIndexNode<T>::GetIndexMeta(const Config& cfg) const {
auto diskann_conf = static_cast<const DiskANNConfig&>(cfg);
feder::diskann::DiskANNMeta meta(diskann_conf.data_path, diskann_conf.max_degree, diskann_conf.search_list_size,
diskann_conf.pq_code_budget_gb, diskann_conf.build_dram_budget_gb,
diskann_conf.num_threads, diskann_conf.disk_pq_dims, diskann_conf.accelerate_build,
Count(), entry_points);
diskann_conf.disk_pq_dims, diskann_conf.accelerate_build, Count(), entry_points);
std::unordered_set<int64_t> id_set(entry_points.begin(), entry_points.end());

Json json_meta, json_id_set;
Expand Down
14 changes: 0 additions & 14 deletions src/index/diskann/diskann_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,6 @@ class DiskANNConfig : public BaseConfig {
// This is the flag to enable fast build, in which we will not build vamana graph by full 2 round. This can
// accelerate index build ~30% with an ~1% recall regression.
CFG_BOOL accelerate_build;
// The number of threads used for preparing and searching. When 'num_threads' uses as build parameter, the indexing
// time improves almost linearly with the number of threads (subject to the cores available on the machine and DRAM
// bandwidth). When 'num_threads' uses as prepare parameter, Threads run in parallel and one thread handles one
// query at a time. More threads will result in higher aggregate query throughput, but will also use more IOs/second
// across the system, which may lead to higher per-query latency. So find the balance depending on the maximum
// number of IOPs supported by the SSD.
CFG_INT num_threads;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will existed code crash due to num_threads turning to invalid?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I put this key into ext_legal_json_keys, and it still legal.

// While serving the index, the entire graph is stored on SSD. For faster search performance, you can cache a few
// frequently accessed nodes in memory.
CFG_FLOAT search_cache_budget_gb;
Expand Down Expand Up @@ -102,13 +95,6 @@ class DiskANNConfig : public BaseConfig {
.description("limit on the memory allowed for building the index in GB.")
.set_range(0, std::numeric_limits<CFG_FLOAT>::max())
.for_train();
KNOWHERE_CONFIG_DECLARE_FIELD(num_threads)
.description("number of threads used by the index build/search process.")
.set_default(8)
.set_range(1, 256)
.for_train()
.for_search()
.for_range_search();
KNOWHERE_CONFIG_DECLARE_FIELD(disk_pq_dims)
.description("the dimension of compressed vectors stored on the ssd, use 0 to store uncompressed data.")
.set_default(0)
Expand Down
4 changes: 2 additions & 2 deletions thirdparty/DiskANN/include/diskann/aux_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#pragma once
#include <algorithm>
#include <fcntl.h>
#include <future>
#include <cassert>
#include <cstdlib>
#include <cstring>
Expand Down Expand Up @@ -31,6 +32,7 @@ typedef int FileHandle;

#include "utils.h"
#include "windows_customizations.h"
#include "knowhere/comp/thread_pool.h"

namespace diskann {
const size_t MAX_PQ_TRAINING_SET_SIZE = 256000;
Expand Down Expand Up @@ -117,8 +119,6 @@ namespace diskann {
double pq_code_size_gb = 0.0;
// M (memory limit while indexing)
double index_mem_gb = 0.0;
// T (number of threads for indexing)
uint32_t num_threads = 0;
// B' (PQ dim for disk index: optional parameter for very
// large dimensional data)
uint32_t disk_pq_dims = 0;
Expand Down
Loading