Skip to content

Commit

Permalink
[#25406] DocDB: Random test for vector index
Browse files Browse the repository at this point in the history
Summary:
Random test with big amount of rows and dimensions for vector index
Jira: DB-14638

Test Plan: PgVectorIndexTest.Random/*

Reviewers: aleksandr.ponomarenko, arybochkin

Reviewed By: aleksandr.ponomarenko

Subscribers: ybase, yql

Tags: #jenkins-ready

Differential Revision: https://phorge.dev.yugabyte.com/D40845
  • Loading branch information
spolitov committed Dec 24, 2024
1 parent ac52997 commit a886b8d
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 44 deletions.
1 change: 0 additions & 1 deletion src/yb/docdb/usearch_vector_index-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ TEST_F(UsearchVectorIndexTest, CreateAndQuery) {
for (size_t thread_index = 0; thread_index < kNumIndexingThreads; ++thread_index) {
indexing_thread_holder.AddThreadFunctor(
[&num_vectors_inserted, &index, &latch, &uniform_distrib]() {
std::random_device rd;
size_t vector_id;
while ((vector_id = num_vectors_inserted.fetch_add(1)) < kNumVectors) {
auto vec = RandomFloatVector(kDimensions, uniform_distrib);
Expand Down
18 changes: 17 additions & 1 deletion src/yb/docdb/vector_index.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,21 @@
#include "yb/vector_index/usearch_wrapper.h"
#include "yb/vector_index/vector_lsm.h"

DEFINE_RUNTIME_uint64(vector_index_initial_chunk_size, 1024,
DEFINE_RUNTIME_uint64(vector_index_initial_chunk_size, 100000,
"Number of vector in initial vector index chunk");

DEFINE_RUNTIME_PREVIEW_uint32(vector_index_ef, 128,
"The \"expansion\" parameter for search");

DEFINE_RUNTIME_PREVIEW_uint32(vector_index_ef_construction, 256,
"The \"expansion\" parameter during graph construction");

DEFINE_RUNTIME_PREVIEW_uint32(vector_index_num_neighbors_per_vertex, 32,
"Number of neighbors per graph node");

DEFINE_RUNTIME_PREVIEW_uint32(vector_index_num_neighbors_per_vertex_base, 128,
"Number of neighbors per graph node in base level graph");

namespace yb::docdb {

const std::string kVectorIndexDirPrefix = "vi-";
Expand All @@ -48,6 +60,10 @@ auto VectorLSMFactory(size_t dimensions) {
return [dimensions] {
vector_index::HNSWOptions hnsw_options = {
.dimensions = dimensions,
.num_neighbors_per_vertex = FLAGS_vector_index_num_neighbors_per_vertex,
.num_neighbors_per_vertex_base = FLAGS_vector_index_num_neighbors_per_vertex_base,
.ef_construction = FLAGS_vector_index_ef_construction,
.ef = FLAGS_vector_index_ef,
};
return FactoryImpl::Create(hnsw_options);
};
Expand Down
8 changes: 6 additions & 2 deletions src/yb/util/random_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,10 +149,14 @@ typename Collection::const_reference RandomElement(const Collection& collection,
std::string RandomHumanReadableString(size_t len, std::mt19937_64* rng = nullptr);

template<typename Distribution>
std::vector<float> RandomFloatVector(size_t dimensions, Distribution& dis) {
std::vector<float> RandomFloatVector(
size_t dimensions, Distribution& dis, std::mt19937_64* rng = nullptr) {
if (!rng) {
rng = &ThreadLocalRandom();
}
std::vector<float> vec(dimensions);
for (auto& v : vec) {
v = dis(ThreadLocalRandom());
v = dis(*rng);
}
return vec;
}
Expand Down
161 changes: 121 additions & 40 deletions src/yb/yql/pgwrapper/pg_vector_index-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
// under the License.
//

#include <queue>

#include "yb/client/snapshot_test_util.h"

#include "yb/consensus/consensus.h"
Expand All @@ -28,6 +30,8 @@
#include "yb/util/backoff_waiter.h"
#include "yb/util/test_thread_holder.h"

#include "yb/vector_index/usearch_include_wrapper_internal.h"

#include "yb/yql/pgwrapper/pg_mini_test_base.h"

DECLARE_bool(TEST_skip_process_apply);
Expand All @@ -38,6 +42,12 @@ DECLARE_uint32(vector_index_concurrent_writes);

namespace yb::pgwrapper {

using FloatVector = std::vector<float>;

const unum::usearch::byte_t* VectorToBytePtr(const FloatVector& vector) {
return pointer_cast<const unum::usearch::byte_t*>(vector.data());
}

class PgVectorIndexTest : public PgMiniTestBase, public testing::WithParamInterface<bool> {
protected:
void SetUp() override {
Expand All @@ -57,43 +67,45 @@ class PgVectorIndexTest : public PgMiniTestBase, public testing::WithParamInterf
return IsColocated() ? ConnectToDB("colocated_db") : PgMiniTestBase::Connect();
}

Result<PGConn> MakeIndex(int num_tablets = 0) {
Result<PGConn> MakeIndex(size_t dimensions = 3) {
auto colocated = IsColocated();
auto conn = VERIFY_RESULT(PgMiniTestBase::Connect());
std::string create_suffix;
if (colocated) {
create_suffix = " WITH (COLOCATED = 1)";
RETURN_NOT_OK(conn.ExecuteFormat("CREATE DATABASE colocated_db COLOCATION = true"));
conn = VERIFY_RESULT(Connect());
} else if (num_tablets) {
create_suffix = Format(" SPLIT INTO $0 TABLETS", num_tablets);
}
RETURN_NOT_OK(conn.Execute("CREATE EXTENSION vector"));
RETURN_NOT_OK(conn.Execute(
"CREATE TABLE test (id bigserial PRIMARY KEY, embedding vector(3))" + create_suffix));
RETURN_NOT_OK(conn.ExecuteFormat(
"CREATE TABLE test (id bigserial PRIMARY KEY, embedding vector($0))$1",
dimensions, create_suffix));

RETURN_NOT_OK(conn.Execute("CREATE INDEX ON test USING ybhnsw (embedding vector_l2_ops)"));

return conn;
}

Status WaitForLoadBalance(int num_tablet_servers) {
return WaitFor(
[&]() -> Result<bool> { return client_->IsLoadBalanced(num_tablet_servers); },
60s * kTimeMultiplier,
Format("Wait for load balancer to balance to $0 tservers.", num_tablet_servers));
}

Result<PGConn> MakeIndexAndFill(int num_rows, int num_tablets = 0);
Status InsertRows(PGConn& conn, int start_row, int end_row);
Result<PGConn> MakeIndexAndFill(size_t num_rows);
Result<PGConn> MakeIndexAndFillRandom(size_t num_rows, size_t dimensions);
Status InsertRows(PGConn& conn, size_t start_row, size_t end_row);
Status InsertRandomRows(PGConn& conn, size_t num_rows, size_t dimensions);

void VerifyRead(PGConn& conn, int limit, bool add_filter);
void VerifyRead(PGConn& conn, size_t limit, bool add_filter);
void VerifyRows(
PGConn& conn, bool add_filter, const std::vector<std::string>& expected, int limit = -1);
PGConn& conn, bool add_filter, const std::vector<std::string>& expected, int64_t limit = -1);

void TestSimple();
void TestManyRows(bool add_filter);
void TestRestart(tablet::FlushFlags flush_flags);

FloatVector RandomVector(size_t dimensions) {
return RandomFloatVector(dimensions, distribution_, &rng_);
}

std::vector<FloatVector> vectors_;
std::uniform_real_distribution<> distribution_;
std::mt19937_64 rng_{42};
};

void PgVectorIndexTest::TestSimple() {
Expand Down Expand Up @@ -167,53 +179,70 @@ std::string ExpectedRow(int64_t id) {
return BuildRow(id, VectorAsString(id));
}

Status PgVectorIndexTest::InsertRows(PGConn& conn, int start_row, int end_row) {
Status PgVectorIndexTest::InsertRows(PGConn& conn, size_t start_row, size_t end_row) {
RETURN_NOT_OK(conn.StartTransaction(IsolationLevel::SNAPSHOT_ISOLATION));
for (int i = start_row; i <= end_row; ++i) {
for (auto i = start_row; i <= end_row; ++i) {
RETURN_NOT_OK(conn.ExecuteFormat(
"INSERT INTO test VALUES ($0, '$1')", i, VectorAsString(i)));
}
return conn.CommitTransaction();
}

Result<PGConn> PgVectorIndexTest::MakeIndexAndFill(int num_rows, int num_tablets) {
auto conn = VERIFY_RESULT(MakeIndex(num_tablets));
Status PgVectorIndexTest::InsertRandomRows(PGConn& conn, size_t num_rows, size_t dimensions) {
RETURN_NOT_OK(conn.StartTransaction(IsolationLevel::SNAPSHOT_ISOLATION));
for (size_t i = 0; i != num_rows; ++i) {
auto vector = RandomVector(dimensions);
RETURN_NOT_OK(conn.ExecuteFormat(
"INSERT INTO test VALUES ($0, '$1')", vectors_.size(), AsString(vector)));
vectors_.push_back(std::move(vector));
}
return conn.CommitTransaction();
}

Result<PGConn> PgVectorIndexTest::MakeIndexAndFill(size_t num_rows) {
auto conn = VERIFY_RESULT(MakeIndex());
RETURN_NOT_OK(InsertRows(conn, 1, num_rows));
return conn;
}

Result<PGConn> PgVectorIndexTest::MakeIndexAndFillRandom(size_t num_rows, size_t dimensions) {
auto conn = VERIFY_RESULT(MakeIndex(dimensions));
RETURN_NOT_OK(InsertRandomRows(conn, num_rows, dimensions));
return conn;
}

void PgVectorIndexTest::VerifyRows(
PGConn& conn, bool add_filter, const std::vector<std::string>& expected, int limit) {
PGConn& conn, bool add_filter, const std::vector<std::string>& expected, int64_t limit) {
auto result = ASSERT_RESULT((conn.FetchRows<RowAsString>(Format(
"SELECT * FROM test $0 ORDER BY embedding <-> '[0.0, 0.0, 0.0]' LIMIT $1",
add_filter ? "WHERE id + 3 <= 5" : "",
limit == -1 ? expected.size() : make_unsigned(limit)))));
limit < 0 ? expected.size() : make_unsigned(limit)))));
EXPECT_EQ(result.size(), expected.size());
for (size_t i = 0; i != std::min(result.size(), expected.size()); ++i) {
SCOPED_TRACE(Format("Row $0", i));
EXPECT_EQ(result[i], expected[i]);
}
}

void PgVectorIndexTest::VerifyRead(PGConn& conn, int limit, bool add_filter) {
void PgVectorIndexTest::VerifyRead(PGConn& conn, size_t limit, bool add_filter) {
std::vector<std::string> expected;
for (int i = 1; i <= limit; ++i) {
for (size_t i = 1; i <= limit; ++i) {
expected.push_back(ExpectedRow(i));
}
VerifyRows(conn, add_filter, expected);
}

void PgVectorIndexTest::TestManyRows(bool add_filter) {
constexpr int kNumRows = RegularBuildVsSanitizers(2000, 64);
const int query_limit = add_filter ? 1 : 5;
constexpr size_t kNumRows = RegularBuildVsSanitizers(2000, 64);
const size_t query_limit = add_filter ? 1 : 5;

auto conn = ASSERT_RESULT(MakeIndexAndFill(kNumRows));
ASSERT_NO_FATALS(VerifyRead(conn, query_limit, add_filter));
}

TEST_P(PgVectorIndexTest, Split) {
constexpr int kNumRows = RegularBuildVsSanitizers(500, 64);
constexpr int kQueryLimit = 5;
constexpr size_t kNumRows = RegularBuildVsSanitizers(500, 64);
constexpr size_t kQueryLimit = 5;

auto conn = ASSERT_RESULT(MakeIndexAndFill(kNumRows));
ASSERT_OK(cluster_->FlushTablets());
Expand All @@ -236,17 +265,17 @@ TEST_P(PgVectorIndexTest, ManyReads) {
ANNOTATE_UNPROTECTED_WRITE(FLAGS_vector_index_concurrent_reads) = 1;
ANNOTATE_UNPROTECTED_WRITE(FLAGS_vector_index_concurrent_writes) = 1;

constexpr int kNumRows = 64;
constexpr int kNumReads = 16;
constexpr size_t kNumRows = 64;
constexpr size_t kNumReads = 16;

auto conn = ASSERT_RESULT(MakeIndexAndFill(kNumRows));

TestThreadHolder threads;
for (int i = 1; i <= kNumReads; ++i) {
for (size_t i = 1; i <= kNumReads; ++i) {
threads.AddThreadFunctor([this, &stop_flag = threads.stop_flag()] {
auto conn = ASSERT_RESULT(Connect());
while (!stop_flag.load()) {
auto id = RandomUniformInt(1, kNumRows);
auto id = RandomUniformInt<size_t>(1, kNumRows);
auto vector = VectorAsString(id);
auto rows = ASSERT_RESULT(conn.FetchAllAsString(Format(
"SELECT * FROM test ORDER BY embedding <-> '$0' LIMIT 1", vector)));
Expand All @@ -259,8 +288,8 @@ TEST_P(PgVectorIndexTest, ManyReads) {
}

void PgVectorIndexTest::TestRestart(tablet::FlushFlags flush_flags) {
constexpr int kNumRows = 64;
constexpr int kQueryLimit = 5;
constexpr size_t kNumRows = 64;
constexpr size_t kQueryLimit = 5;

auto conn = ASSERT_RESULT(MakeIndexAndFill(kNumRows));
ASSERT_NO_FATALS(VerifyRead(conn, kQueryLimit, false));
Expand All @@ -284,7 +313,7 @@ TEST_P(PgVectorIndexTest, BootstrapFlushedIntentsDB) {
}

TEST_P(PgVectorIndexTest, DeleteAndUpdate) {
constexpr int kNumRows = 64;
constexpr size_t kNumRows = 64;
const std::string kDistantVector = "[100, 500, 9000]";
const std::string kCloseVector = "[0.125, 0.25, 0.375]";

Expand All @@ -309,12 +338,12 @@ TEST_P(PgVectorIndexTest, DeleteAndUpdate) {
}

TEST_P(PgVectorIndexTest, RemoteBootstrap) {
constexpr int kNumRows = 64;
constexpr int kQueryLimit = 5;
constexpr size_t kNumRows = 64;
constexpr size_t kQueryLimit = 5;

auto* mts = cluster_->mini_tablet_server(2);
mts->Shutdown();
auto conn = ASSERT_RESULT(MakeIndexAndFill(kNumRows, 3));
auto conn = ASSERT_RESULT(MakeIndexAndFill(kNumRows));
const auto table_id = ASSERT_RESULT(GetTableIDFromTableName("test"));
ASSERT_OK(cluster_->FlushTablets());
for (const auto& peer : ListTableActiveTabletPeers(cluster_.get(), table_id)) {
Expand Down Expand Up @@ -355,8 +384,8 @@ TEST_P(PgVectorIndexTest, RemoteBootstrap) {
}

TEST_P(PgVectorIndexTest, SnapshotSchedule) {
constexpr int kNumRows = 128;
constexpr int kQueryLimit = 5;
constexpr size_t kNumRows = 128;
constexpr size_t kQueryLimit = 5;

client::SnapshotTestUtil snapshot_util;
snapshot_util.SetProxy(&client_->proxy_cache());
Expand All @@ -383,6 +412,58 @@ TEST_P(PgVectorIndexTest, SnapshotSchedule) {
ASSERT_NO_FATALS(VerifyRead(conn, kQueryLimit, false));
}

TEST_P(PgVectorIndexTest, Random) {
constexpr size_t kLimit = 10;
constexpr size_t kDimensions = 64;
constexpr size_t kNumRows = RegularBuildVsDebugVsSanitizers(10000, 1000, 100);
constexpr int kNumIterations = RegularBuildVsDebugVsSanitizers(100, 20, 10);

unum::usearch::metric_punned_t metric(
kDimensions, unum::usearch::metric_kind_t::l2sq_k, unum::usearch::scalar_kind_t::f32_k);

auto conn = ASSERT_RESULT(MakeIndexAndFillRandom(kNumRows, kDimensions));
size_t sum_missing = 0;
std::vector<size_t> counts;
for (int i = 0; i != kNumIterations; ++i) {
auto query_vector = RandomVector(kDimensions);
auto rows = ASSERT_RESULT(conn.FetchRows<int64_t>(Format(
"SELECT id FROM test ORDER BY embedding <-> '$0' LIMIT $1", query_vector, kLimit)));
std::vector<int64_t> expected(vectors_.size());
std::generate(expected.begin(), expected.end(), [n{0LL}]() mutable { return n++; });
std::sort(
expected.begin(), expected.end(),
[&metric, &query_vector, &vectors = vectors_](size_t li, size_t ri) {
const auto& lhs = vectors[li];
const auto& rhs = vectors[ri];
return metric(VectorToBytePtr(query_vector), VectorToBytePtr(lhs)) <
metric(VectorToBytePtr(query_vector), VectorToBytePtr(rhs));
});
size_t ep = 0;
for (int64_t id : rows) {
while (ep < expected.size() && id != expected[ep]) {
++ep;
}
ASSERT_LT(ep, expected.size());
ASSERT_EQ(id, expected[ep]);
++ep;
}
size_t missing = ep - kLimit;
if (missing > counts.size()) {
LOG(INFO)
<< "New max: " << missing << ", fetched: " << AsString(rows) << ", expected: "
<< AsString(boost::make_iterator_range(
expected.begin(), expected.begin() + kLimit + missing));
}
counts.resize(std::max(counts.size(), missing + 1));
++counts[missing];
sum_missing += missing;
}
LOG(INFO)
<< "Counts: " << AsString(counts)
<< ", recall: " << 1.0 - sum_missing * 1.0 / (kLimit * kNumIterations);
ASSERT_LE(sum_missing * 50, kLimit * kNumIterations);
}

std::string ColocatedToString(const testing::TestParamInfo<bool>& param_info) {
return param_info.param ? "Colocated" : "Distributed";
}
Expand Down

0 comments on commit a886b8d

Please sign in to comment.