Skip to content
This repository has been archived by the owner on Aug 16, 2023. It is now read-only.

Commit

Permalink
replace omp with thread_pool
Browse files Browse the repository at this point in the history
Signed-off-by: cqy123456 <[email protected]>
  • Loading branch information
cqy123456 committed Mar 23, 2023
1 parent 3d89752 commit 244aaa7
Show file tree
Hide file tree
Showing 15 changed files with 603 additions and 496 deletions.
18 changes: 9 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ Here's a list of verified OS types where Knowhere can successfully build and run
#### Install Dependencies

```bash
$ sudo apt install build-essential libopenblas-dev ninja-build libaio-dev libboost-program-options-dev
$ sudo apt install build-essential libopenblas-dev libaio-dev libboost-program-options-dev
```

#### Build From Source Code
Expand All @@ -39,19 +39,19 @@ $ git submodule update --recursive --init

$ mkdir build && cd build
#DEBUG CPU
$ cmake .. -DCMAKE_BUILD_TYPE=Debug -DWITH_UT=ON -G Ninja
$ cmake .. -DCMAKE_BUILD_TYPE=Debug -DWITH_UT=ON
#RELEASE CPU
$ cmake .. -DCMAKE_BUILD_TYPE=Release -DWITH_UT=ON -G Ninja
$ cmake .. -DCMAKE_BUILD_TYPE=Release -DWITH_UT=ON
#DEBUG GPU
$ cmake .. -DCMAKE_BUILD_TYPE=Debug -DUSE_CUDA=ON -DWITH_UT=ON -G Ninja
$ cmake .. -DCMAKE_BUILD_TYPE=Debug -DUSE_CUDA=ON -DWITH_UT=ON
#COMPILE with new GPUs, define your CMAKE_CUDA_ARCHITECTURES
$ cmake .. -DCMAKE_BUILD_TYPE=Debug -DUSE_CUDA=ON -DWITH_UT=ON -DCMAKE_CUDA_ARCHITECTURES="86;89" -G Ninja
$ cmake .. -DCMAKE_BUILD_TYPE=Debug -DUSE_CUDA=ON -DWITH_UT=ON -DCMAKE_CUDA_ARCHITECTURES="86;89"
#RELEASE GPU
$ cmake .. -DCMAKE_BUILD_TYPE=Release -DUSE_CUDA=ON -DWITH_UT=ON -G Ninja
$ cmake .. -DCMAKE_BUILD_TYPE=Release -DUSE_CUDA=ON -DWITH_UT=ON
#ADD -DWITH_DISKANN=ON TO BUILD DISKANN INDEX
$ cmake .. -DCMAKE_BUILD_TYPE=Release -DWITH_UT=ON -DWITH_DISKANN=ON -G Ninja
#verbose compile
$ninja -v
$ cmake .. -DCMAKE_BUILD_TYPE=Release -DWITH_UT=ON -DWITH_DISKANN=ON
#compile
$ make -j4
```

#### Running Unit Tests
Expand Down
1 change: 1 addition & 0 deletions ci/docker/set_docker_mirror.sh
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ set_mirror(){
restart_docker
echo "Success."
exit 0

}

set_mirror
6 changes: 2 additions & 4 deletions include/knowhere/feder/DiskANN.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,14 @@ class DiskANNMeta {
DiskANNMeta() = default;

DiskANNMeta(const std::string& data_path, const int32_t max_degree, const int32_t search_list_size,
const float pq_code_budget_gb, const float build_dram_budget_gb, const int32_t num_threads,
const int32_t disk_pq_dims, const bool accelerate_build, const int64_t num_elem,
const std::vector<int64_t>& entry_points)
const float pq_code_budget_gb, const float build_dram_budget_gb, const int32_t disk_pq_dims,
const bool accelerate_build, const int64_t num_elem, const std::vector<int64_t>& entry_points)
: num_elem_(num_elem), entry_points_(entry_points) {
build_params_.data_path = data_path;
build_params_.max_degree = max_degree;
build_params_.search_list_size = search_list_size;
build_params_.pq_code_budget_gb = pq_code_budget_gb;
build_params_.build_dram_budget_gb = build_dram_budget_gb;
build_params_.num_threads = num_threads;
build_params_.disk_pq_dims = disk_pq_dims;
build_params_.accelerate_build = accelerate_build;
}
Expand Down
55 changes: 27 additions & 28 deletions src/common/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,34 +14,33 @@
#include "knowhere/log.h"
namespace knowhere {

static const std::unordered_set<std::string> ext_legal_json_keys = {
"metric_type",
"dim",
"nlist",
"nprobe",
"level",
"index_type",
"index_mode",
"collection_id",
"partition_id",
"segment_id",
"field_id",
"index_build_id",
"index_id",
"index_version",
"pq_code_budget_gb_ratio",
"num_build_thread_ratio",
"search_cache_budget_gb_ratio",
"num_load_thread_ratio",
"beamwidth_ratio",
"search_list",
"num_build_thread",
"num_load_thread",
"index_files",
"gpu_id",
"nbits",
"m",
};
static const std::unordered_set<std::string> ext_legal_json_keys = {"metric_type",
"dim",
"nlist",
"nprobe",
"level",
"index_type",
"index_mode",
"collection_id",
"partition_id",
"segment_id",
"field_id",
"index_build_id",
"index_id",
"index_version",
"pq_code_budget_gb_ratio",
"num_build_thread_ratio",
"search_cache_budget_gb_ratio",
"num_load_thread_ratio",
"beamwidth_ratio",
"search_list",
"num_build_thread",
"num_load_thread",
"index_files",
"gpu_id",
"nbits",
"m",
"num_threads"};

Status
Config::FormatAndCheck(const Config& cfg, Json& json) {
Expand Down
60 changes: 41 additions & 19 deletions src/index/diskann/diskann.cc
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,6 @@ class DiskANNIndexNode : public IndexNode {
std::string index_prefix_;
mutable std::mutex preparation_lock_;
std::atomic_bool is_prepared_;
int32_t num_threads_;
std::shared_ptr<FileManager> file_manager_;
std::unique_ptr<diskann::PQFlashIndex<T>> pq_flash_index_;
std::atomic_int64_t dim_;
Expand Down Expand Up @@ -275,7 +274,6 @@ DiskANNIndexNode<T>::Add(const DataSet& dataset, const Config& cfg) {
static_cast<unsigned>(build_conf.search_list_size),
static_cast<double>(build_conf.pq_code_budget_gb),
static_cast<double>(build_conf.build_dram_budget_gb),
static_cast<uint32_t>(build_conf.num_threads),
static_cast<uint32_t>(build_conf.disk_pq_dims),
false,
build_conf.accelerate_build};
Expand Down Expand Up @@ -350,8 +348,8 @@ DiskANNIndexNode<T>::Prepare(const Config& cfg) {

pq_flash_index_ = std::make_unique<diskann::PQFlashIndex<T>>(reader, diskann_metric);

auto load_expect = TryDiskANNCall<int>(
[&]() -> int { return pq_flash_index_->load(prep_conf.num_threads, index_prefix_.c_str()); });
auto load_expect =
TryDiskANNCall<int>([&]() -> int { return pq_flash_index_->load(pool_->size(), index_prefix_.c_str()); });

if (!load_expect.has_value() || load_expect.value() != 0) {
LOG_KNOWHERE_ERROR_ << "Failed to load DiskANN.";
Expand Down Expand Up @@ -392,7 +390,7 @@ DiskANNIndexNode<T>::Prepare(const Config& cfg) {
} else {
auto gen_cache_expect = TryDiskANNCall<bool>([&]() -> bool {
pq_flash_index_->generate_cache_list_from_sample_queries(warmup_query_file, 15, 6, num_nodes_to_cache,
prep_conf.num_threads, node_list);
node_list);
return true;
});

Expand All @@ -412,10 +410,6 @@ DiskANNIndexNode<T>::Prepare(const Config& cfg) {
}
}

// set thread number
omp_set_num_threads(prep_conf.num_threads);
num_threads_ = prep_conf.num_threads;

// warmup
if (prep_conf.warm_up) {
LOG_KNOWHERE_INFO_ << "Warming up.";
Expand All @@ -436,15 +430,22 @@ DiskANNIndexNode<T>::Prepare(const Config& cfg) {
std::vector<float> warmup_result_dists(warmup_num, 0);

bool all_searches_are_good = true;
#pragma omp parallel for schedule(dynamic, 1)

std::vector<std::future<void>> futures;
futures.reserve(warmup_num);
for (_s64 i = 0; i < (int64_t)warmup_num; ++i) {
auto search_expect = TryDiskANNCall<bool>([&]() -> bool {
pq_flash_index_->cached_beam_search(warmup + (i * warmup_aligned_dim), 1, warmup_L,
warmup_result_ids_64.data() + (i * 1),
warmup_result_dists.data() + (i * 1), 4);
futures.push_back(pool_->push([&, index = i]() {
pq_flash_index_->cached_beam_search(warmup + (index * warmup_aligned_dim), 1, warmup_L,
warmup_result_ids_64.data() + (index * 1),
warmup_result_dists.data() + (index * 1), 4);
}));
}
for (auto& future : futures) {
auto one_search_res = TryDiskANNCall<bool>([&]() {
future.get();
return true;
});
if (!search_expect.has_value()) {
if (!one_search_res.has_value()) {
all_searches_are_good = false;
}
}
Expand Down Expand Up @@ -502,6 +503,7 @@ DiskANNIndexNode<T>::Search(const DataSet& dataset, const Config& cfg, const Bit
auto p_id = new int64_t[k * nq];
auto p_dist = new float[k * nq];

bool all_searches_are_good = true;
std::vector<std::future<void>> futures;
futures.reserve(nq);
for (int64_t row = 0; row < nq; ++row) {
Expand All @@ -511,7 +513,17 @@ DiskANNIndexNode<T>::Search(const DataSet& dataset, const Config& cfg, const Bit
}));
}
for (auto& future : futures) {
future.get();
auto one_search_res = TryDiskANNCall<bool>([&]() {
future.get();
return true;
});
if (!one_search_res.has_value()) {
all_searches_are_good = false;
}
}

if (!all_searches_are_good) {
return unexpected(Status::diskann_inner_error);
}

auto res = GenResultDataSet(nq, k, p_id, p_dist);
Expand Down Expand Up @@ -567,6 +579,7 @@ DiskANNIndexNode<T>::RangeSearch(const DataSet& dataset, const Config& cfg, cons

std::vector<std::future<void>> futures;
futures.reserve(nq);
bool all_searches_are_good = true;
for (int64_t row = 0; row < nq; ++row) {
futures.push_back(pool_->push([&, index = row]() {
std::vector<int64_t> indices;
Expand All @@ -581,8 +594,18 @@ DiskANNIndexNode<T>::RangeSearch(const DataSet& dataset, const Config& cfg, cons
}));
}
for (auto& future : futures) {
future.get();
auto one_search_res = TryDiskANNCall<bool>([&]() {
future.get();
return true;
});
if (!one_search_res.has_value()) {
all_searches_are_good = false;
}
}
if (!all_searches_are_good) {
return unexpected(Status::diskann_inner_error);
}

GetRangeSearchResult(result_dist_array, result_id_array, is_ip, nq, radius, search_conf.range_filter, p_dist, p_id,
p_lims);
return GenResultDataSet(nq, p_id, p_dist, p_lims);
Expand All @@ -598,8 +621,7 @@ DiskANNIndexNode<T>::GetIndexMeta(const Config& cfg) const {
auto diskann_conf = static_cast<const DiskANNConfig&>(cfg);
feder::diskann::DiskANNMeta meta(diskann_conf.data_path, diskann_conf.max_degree, diskann_conf.search_list_size,
diskann_conf.pq_code_budget_gb, diskann_conf.build_dram_budget_gb,
diskann_conf.num_threads, diskann_conf.disk_pq_dims, diskann_conf.accelerate_build,
Count(), entry_points);
diskann_conf.disk_pq_dims, diskann_conf.accelerate_build, Count(), entry_points);
std::unordered_set<int64_t> id_set(entry_points.begin(), entry_points.end());

Json json_meta, json_id_set;
Expand Down
14 changes: 0 additions & 14 deletions src/index/diskann/diskann_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,6 @@ class DiskANNConfig : public BaseConfig {
// This is the flag to enable fast build, in which we will not build vamana graph by full 2 round. This can
// accelerate index build ~30% with an ~1% recall regression.
CFG_BOOL accelerate_build;
// The number of threads used for preparing and searching. When 'num_threads' uses as build parameter, the indexing
// time improves almost linearly with the number of threads (subject to the cores available on the machine and DRAM
// bandwidth). When 'num_threads' uses as prepare parameter, Threads run in parallel and one thread handles one
// query at a time. More threads will result in higher aggregate query throughput, but will also use more IOs/second
// across the system, which may lead to higher per-query latency. So find the balance depending on the maximum
// number of IOPs supported by the SSD.
CFG_INT num_threads;
// While serving the index, the entire graph is stored on SSD. For faster search performance, you can cache a few
// frequently accessed nodes in memory.
CFG_FLOAT search_cache_budget_gb;
Expand Down Expand Up @@ -102,13 +95,6 @@ class DiskANNConfig : public BaseConfig {
.description("limit on the memory allowed for building the index in GB.")
.set_range(0, std::numeric_limits<CFG_FLOAT>::max())
.for_train();
KNOWHERE_CONFIG_DECLARE_FIELD(num_threads)
.description("number of threads used by the index build/search process.")
.set_default(8)
.set_range(1, 256)
.for_train()
.for_search()
.for_range_search();
KNOWHERE_CONFIG_DECLARE_FIELD(disk_pq_dims)
.description("the dimension of compressed vectors stored on the ssd, use 0 to store uncompressed data.")
.set_default(0)
Expand Down
4 changes: 2 additions & 2 deletions thirdparty/DiskANN/include/diskann/aux_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#pragma once
#include <algorithm>
#include <fcntl.h>
#include <future>
#include <cassert>
#include <cstdlib>
#include <cstring>
Expand Down Expand Up @@ -31,6 +32,7 @@ typedef int FileHandle;

#include "utils.h"
#include "windows_customizations.h"
#include "knowhere/comp/thread_pool.h"

namespace diskann {
const size_t MAX_PQ_TRAINING_SET_SIZE = 256000;
Expand Down Expand Up @@ -117,8 +119,6 @@ namespace diskann {
double pq_code_size_gb = 0.0;
// M (memory limit while indexing)
double index_mem_gb = 0.0;
// T (number of threads for indexing)
uint32_t num_threads = 0;
// B' (PQ dim for disk index: optional parameter for very
// large dimensional data)
uint32_t disk_pq_dims = 0;
Expand Down
Loading

0 comments on commit 244aaa7

Please sign in to comment.