Skip to content

Commit 59ec788

Browse files
committed
feat: send index definition for every thread
1 parent 484fefc commit 59ec788

File tree

5 files changed

+115
-70
lines changed

5 files changed

+115
-70
lines changed

src/server/detail/save_stages_controller.cc

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,8 @@ void SaveStagesController::SaveDfsSingle(EngineShard* shard, const std::string&
297297
auto& [snapshot, filename] = snapshots_[shard ? shard->shard_id() : shard_set->size()];
298298

299299
SaveMode mode = shard == nullptr ? SaveMode::SUMMARY : SaveMode::SINGLE_SHARD;
300-
auto glob_data = shard == nullptr ? RdbSaver::GetGlobalData(service_) : RdbSaver::GlobalData{};
300+
bool is_summary = (shard == nullptr);
301+
auto glob_data = RdbSaver::GetGlobalData(service_, is_summary);
301302

302303
if (auto err = snapshot->Start(mode, filename, glob_data, snapshot_id); err) {
303304
shared_err_ = err;
@@ -319,7 +320,9 @@ void SaveStagesController::SaveRdb() {
319320
if (!snapshot_storage_->IsCloud())
320321
filename += ".tmp";
321322

322-
if (auto err = snapshot->Start(SaveMode::RDB, filename, RdbSaver::GetGlobalData(service_), "");
323+
// RDB is a summary file (contains all global data)
324+
if (auto err =
325+
snapshot->Start(SaveMode::RDB, filename, RdbSaver::GetGlobalData(service_, true), "");
323326
err) {
324327
snapshot.reset();
325328
return;

src/server/dflycmd.cc

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -699,9 +699,11 @@ OpStatus DflyCmd::StartFullSyncInThread(FlowInfo* flow, ExecutionState* exec_st,
699699
error_code ec;
700700
RdbSaver* saver = flow->saver.get();
701701
if (saver->Mode() == SaveMode::SUMMARY || saver->Mode() == SaveMode::SINGLE_SHARD_WITH_SUMMARY) {
702-
ec = saver->SaveHeader(saver->GetGlobalData(&sf_->service()));
702+
// Full sync summary - include all global data
703+
ec = saver->SaveHeader(saver->GetGlobalData(&sf_->service(), true));
703704
} else {
704-
ec = saver->SaveHeader({});
705+
// Per-shard - include only search index restore commands
706+
ec = saver->SaveHeader(saver->GetGlobalData(&sf_->service(), false));
705707
}
706708
if (ec) {
707709
exec_st->ReportError(ec);

src/server/rdb_load.cc

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2885,7 +2885,7 @@ void RdbLoader::LoadScriptFromAux(string&& body) {
28852885
namespace {
28862886

28872887
void LoadSearchCommandFromAux(Service* service, string&& def, string_view command_name,
2888-
string_view error_context) {
2888+
string_view error_context, bool ignore_exists_error = false) {
28892889
facade::CapturingReplyBuilder crb;
28902890

28912891
ConnectionContext cntx{nullptr, acl::UserCredentials{}};
@@ -2936,6 +2936,12 @@ void LoadSearchCommandFromAux(Service* service, string&& def, string_view comman
29362936

29372937
auto response = crb.Take();
29382938
if (auto err = facade::CapturingReplyBuilder::TryExtractError(response); err) {
2939+
// Ignore "Index already exists" errors when loading from per-shard DFS files
2940+
// since multiple shards may try to create the same index
2941+
if (ignore_exists_error && absl::StrContains(err->first, "already exists")) {
2942+
VLOG(1) << "Ignoring duplicate " << error_context << ": " << err->first;
2943+
return;
2944+
}
29392945
LOG(ERROR) << "Bad " << error_context << ": " << def << " " << err->first;
29402946
}
29412947
}
@@ -2954,7 +2960,7 @@ std::vector<std::string> RdbLoader::TakePendingSynonymCommands() {
29542960
void RdbLoader::LoadSearchIndexDefFromAux(string&& def) {
29552961
// Check if this is new JSON format (starts with '{') or old format ("index_name cmd")
29562962
if (!def.empty() && def[0] == '{') {
2957-
// New JSON format with HNSW metadata
2963+
// New JSON format with HNSW metadata (from summary file)
29582964
try {
29592965
auto json_opt = JsonFromString(def);
29602966
if (!json_opt) {
@@ -2969,13 +2975,17 @@ void RdbLoader::LoadSearchIndexDefFromAux(string&& def) {
29692975
// Currently we just restore the index definition, HNSW graph will be rebuilt
29702976

29712977
string full_cmd = absl::StrCat(index_name, " ", cmd);
2972-
LoadSearchCommandFromAux(service_, std::move(full_cmd), "FT.CREATE", "index definition");
2978+
// Ignore "already exists" errors since index may have been created by another shard file
2979+
LoadSearchCommandFromAux(service_, std::move(full_cmd), "FT.CREATE", "index definition",
2980+
/*ignore_exists_error=*/true);
29732981
} catch (const std::exception& e) {
29742982
LOG(ERROR) << "Failed to parse search index JSON: " << e.what() << " def: " << def;
29752983
}
29762984
} else {
2977-
// Old format: "index_name cmd" - for backwards compatibility
2978-
LoadSearchCommandFromAux(service_, std::move(def), "FT.CREATE", "index definition");
2985+
// Simple format: "index_name cmd" - from per-shard DFS files or old format
2986+
// Ignore "already exists" errors since multiple shards may try to create the same index
2987+
LoadSearchCommandFromAux(service_, std::move(def), "FT.CREATE", "index definition",
2988+
/*ignore_exists_error=*/true);
29792989
}
29802990
}
29812991

src/server/rdb_save.cc

Lines changed: 87 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -1409,78 +1409,107 @@ error_code RdbSaver::Impl::FlushSerializer() {
14091409
return ec;
14101410
}
14111411

1412-
RdbSaver::GlobalData RdbSaver::GetGlobalData(const Service* service) {
1412+
namespace {
1413+
1414+
// Collect search index definitions. If as_json is true, collects JSON with HNSW metadata
1415+
// and synonyms (for summary file). Otherwise collects simple restore commands (for per-shard).
1416+
void CollectSearchIndices([[maybe_unused]] EngineShard* shard,
1417+
[[maybe_unused]] StringVec* search_indices,
1418+
[[maybe_unused]] StringVec* search_synonyms,
1419+
[[maybe_unused]] bool is_summary) {
1420+
#ifdef WITH_SEARCH
1421+
auto* indices = shard->search_indices();
1422+
for (const auto& index_name : indices->GetIndexNames()) {
1423+
auto* index = indices->GetIndex(index_name);
1424+
auto index_info = index->GetInfo();
1425+
1426+
if (!is_summary) {
1427+
std::string restore_cmd = absl::StrCat(index_name, " ", index_info.BuildRestoreCommand());
1428+
search_indices->emplace_back(std::move(restore_cmd));
1429+
continue;
1430+
}
1431+
1432+
// Collect HNSW metadata for vector field (first one found)
1433+
for (const auto& [fident, finfo] : index_info.base_index.schema.fields) {
1434+
if (finfo.type == search::SchemaField::VECTOR &&
1435+
!(finfo.flags & search::SchemaField::NOINDEX)) {
1436+
if (auto hnsw_index = GlobalHnswIndexRegistry::Instance().Get(index_name, finfo.short_name);
1437+
hnsw_index) {
1438+
index_info.hnsw_metadata = hnsw_index->GetMetadata();
1439+
break;
1440+
}
1441+
}
1442+
}
1443+
1444+
// Save index definition as JSON with HNSW metadata
1445+
TmpJson index_json;
1446+
index_json["name"] = index_name;
1447+
index_json["cmd"] = index_info.BuildRestoreCommand();
1448+
1449+
if (index_info.hnsw_metadata.has_value()) {
1450+
const auto& meta = index_info.hnsw_metadata.value();
1451+
TmpJson hnsw_meta;
1452+
hnsw_meta["max_elements"] = meta.max_elements;
1453+
hnsw_meta["cur_element_count"] = meta.cur_element_count;
1454+
hnsw_meta["maxlevel"] = meta.maxlevel;
1455+
hnsw_meta["enterpoint_node"] = meta.enterpoint_node;
1456+
hnsw_meta["M"] = meta.M;
1457+
hnsw_meta["maxM"] = meta.maxM;
1458+
hnsw_meta["maxM0"] = meta.maxM0;
1459+
hnsw_meta["ef_construction"] = meta.ef_construction;
1460+
hnsw_meta["mult"] = meta.mult;
1461+
index_json["hnsw_metadata"] = std::move(hnsw_meta);
1462+
}
1463+
1464+
search_indices->emplace_back(index_json.to_string());
1465+
1466+
// Save synonym groups
1467+
const auto& synonym_groups = index->GetSynonyms().GetGroups();
1468+
for (const auto& [group_id, terms] : synonym_groups) {
1469+
if (!terms.empty()) {
1470+
std::string syn_cmd =
1471+
absl::StrCat(index_name, " ", group_id, " ", absl::StrJoin(terms, " "));
1472+
search_synonyms->emplace_back(std::move(syn_cmd));
1473+
}
1474+
}
1475+
}
1476+
#endif
1477+
}
1478+
1479+
} // namespace
1480+
1481+
RdbSaver::GlobalData RdbSaver::GetGlobalData(const Service* service, bool is_summary) {
14131482
StringVec script_bodies, search_indices, search_synonyms;
1483+
size_t table_mem_result = 0;
14141484

14151485
{
1486+
// For summary file: collect all global data
14161487
auto scripts = service->script_mgr()->GetAll();
14171488
script_bodies.reserve(scripts.size());
14181489
for (auto& [sha, data] : scripts)
14191490
script_bodies.push_back(std::move(data.body));
14201491
}
14211492

1493+
if (!is_summary) {
1494+
shard_set->RunBriefInParallel([&](EngineShard* shard) {
1495+
if (shard->shard_id() == 0)
1496+
CollectSearchIndices(shard, &search_indices, &search_synonyms, is_summary);
1497+
});
1498+
return RdbSaver::GlobalData{std::move(script_bodies), std::move(search_indices),
1499+
std::move(search_synonyms), table_mem_result};
1500+
}
1501+
14221502
atomic<size_t> table_mem{0};
14231503
shard_set->RunBriefInParallel([&](EngineShard* shard) {
1424-
#ifdef WITH_SEARCH
1425-
if (shard->shard_id() == 0) {
1426-
auto* indices = shard->search_indices();
1427-
for (const auto& index_name : indices->GetIndexNames()) {
1428-
auto* index = indices->GetIndex(index_name);
1429-
auto index_info = index->GetInfo();
1430-
1431-
// Collect HNSW metadata for vector field (first one found)
1432-
for (const auto& [fident, finfo] : index_info.base_index.schema.fields) {
1433-
if (finfo.type == search::SchemaField::VECTOR &&
1434-
!(finfo.flags & search::SchemaField::NOINDEX)) {
1435-
if (auto hnsw_index =
1436-
GlobalHnswIndexRegistry::Instance().Get(index_name, finfo.short_name);
1437-
hnsw_index) {
1438-
index_info.hnsw_metadata = hnsw_index->GetMetadata();
1439-
break; // Only store first HNSW index metadata
1440-
}
1441-
}
1442-
}
1443-
1444-
// Save index definition as JSON with HNSW metadata
1445-
TmpJson index_json;
1446-
index_json["name"] = index_name;
1447-
index_json["cmd"] = index_info.BuildRestoreCommand();
1448-
1449-
if (index_info.hnsw_metadata.has_value()) {
1450-
const auto& meta = index_info.hnsw_metadata.value();
1451-
TmpJson hnsw_meta;
1452-
hnsw_meta["max_elements"] = meta.max_elements;
1453-
hnsw_meta["cur_element_count"] = meta.cur_element_count;
1454-
hnsw_meta["maxlevel"] = meta.maxlevel;
1455-
hnsw_meta["enterpoint_node"] = meta.enterpoint_node;
1456-
hnsw_meta["M"] = meta.M;
1457-
hnsw_meta["maxM"] = meta.maxM;
1458-
hnsw_meta["maxM0"] = meta.maxM0;
1459-
hnsw_meta["ef_construction"] = meta.ef_construction;
1460-
hnsw_meta["mult"] = meta.mult;
1461-
index_json["hnsw_metadata"] = std::move(hnsw_meta);
1462-
}
1504+
shard_set->RunBriefInParallel([&](EngineShard* shard) {
1505+
if (shard->shard_id() == 0)
1506+
CollectSearchIndices(shard, &search_indices, &search_synonyms, is_summary);
1507+
});
14631508

1464-
search_indices.emplace_back(index_json.to_string());
1465-
1466-
// Save synonym groups to separate vector
1467-
const auto& synonym_groups = index->GetSynonyms().GetGroups();
1468-
for (const auto& [group_id, terms] : synonym_groups) {
1469-
if (!terms.empty()) {
1470-
// Format: "index_name group_id term1 term2 term3"
1471-
std::string syn_cmd =
1472-
absl::StrCat(index_name, " ", group_id, " ", absl::StrJoin(terms, " "));
1473-
search_synonyms.emplace_back(std::move(syn_cmd));
1474-
}
1475-
}
1476-
}
1477-
}
1478-
#endif
14791509
auto& db_slice = namespaces->GetDefaultNamespace().GetDbSlice(shard->shard_id());
14801510
size_t shard_table_mem = 0;
14811511
for (size_t db_id = 0; db_id < db_slice.db_array_size(); ++db_id) {
14821512
auto* db_table = db_slice.GetDBTable(db_id);
1483-
14841513
if (db_table) {
14851514
shard_table_mem += db_table->table_memory();
14861515
}
@@ -1625,12 +1654,11 @@ error_code RdbSaver::SaveAux(const GlobalData& glob_state) {
16251654
if (!glob_state.search_indices.empty())
16261655
LOG(WARNING) << "Dragonfly search index data is incompatible with the RDB format";
16271656
} else {
1628-
// Search index definitions are not tied to shards and are saved in the summary file
1629-
DCHECK(save_mode_ != SaveMode::SINGLE_SHARD || glob_state.search_indices.empty());
1657+
// Search index definitions (JSON for summary, simple restore cmd for per-shard)
16301658
for (const string& s : glob_state.search_indices)
16311659
RETURN_ON_ERR(impl_->SaveAuxFieldStrStr("search-index", s));
16321660

1633-
// Save synonyms in separate aux fields
1661+
// Save synonyms only in summary file
16341662
DCHECK(save_mode_ != SaveMode::SINGLE_SHARD || glob_state.search_synonyms.empty());
16351663
for (const string& s : glob_state.search_synonyms)
16361664
RETURN_ON_ERR(impl_->SaveAuxFieldStrStr("search-synonyms", s));

src/server/rdb_save.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -133,8 +133,10 @@ class RdbSaver {
133133

134134
SnapshotStats GetCurrentSnapshotProgress() const;
135135

136-
// Fetch global data to be serialized in summary part of a snapshot / full sync.
137-
static GlobalData GetGlobalData(const Service* service);
136+
// Fetch global data to be serialized in snapshot.
137+
// is_summary: true for summary file (full data with JSON search indices),
138+
// false for per-shard files (only simple search index restore commands)
139+
static GlobalData GetGlobalData(const Service* service, bool is_summary);
138140

139141
// Returns time in nanos of start of the last pending write interaction.
140142
// Returns -1 if no write operations are currently pending.

0 commit comments

Comments
 (0)