Skip to content

Commit 6b2dd85

Browse files
committed
feat: send index definition for every thread
1 parent 484fefc commit 6b2dd85

File tree

5 files changed

+116
-74
lines changed

5 files changed

+116
-74
lines changed

src/server/detail/save_stages_controller.cc

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,8 @@ void SaveStagesController::SaveDfsSingle(EngineShard* shard, const std::string&
297297
auto& [snapshot, filename] = snapshots_[shard ? shard->shard_id() : shard_set->size()];
298298

299299
SaveMode mode = shard == nullptr ? SaveMode::SUMMARY : SaveMode::SINGLE_SHARD;
300-
auto glob_data = shard == nullptr ? RdbSaver::GetGlobalData(service_) : RdbSaver::GlobalData{};
300+
bool is_summary = (shard == nullptr);
301+
auto glob_data = RdbSaver::GetGlobalData(service_, is_summary);
301302

302303
if (auto err = snapshot->Start(mode, filename, glob_data, snapshot_id); err) {
303304
shared_err_ = err;
@@ -319,7 +320,9 @@ void SaveStagesController::SaveRdb() {
319320
if (!snapshot_storage_->IsCloud())
320321
filename += ".tmp";
321322

322-
if (auto err = snapshot->Start(SaveMode::RDB, filename, RdbSaver::GetGlobalData(service_), "");
323+
// RDB is a summary file (contains all global data)
324+
if (auto err =
325+
snapshot->Start(SaveMode::RDB, filename, RdbSaver::GetGlobalData(service_, true), "");
323326
err) {
324327
snapshot.reset();
325328
return;

src/server/dflycmd.cc

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -699,9 +699,11 @@ OpStatus DflyCmd::StartFullSyncInThread(FlowInfo* flow, ExecutionState* exec_st,
699699
error_code ec;
700700
RdbSaver* saver = flow->saver.get();
701701
if (saver->Mode() == SaveMode::SUMMARY || saver->Mode() == SaveMode::SINGLE_SHARD_WITH_SUMMARY) {
702-
ec = saver->SaveHeader(saver->GetGlobalData(&sf_->service()));
702+
// Full sync summary - include all global data
703+
ec = saver->SaveHeader(saver->GetGlobalData(&sf_->service(), true));
703704
} else {
704-
ec = saver->SaveHeader({});
705+
// Per-shard - include only search index restore commands
706+
ec = saver->SaveHeader(saver->GetGlobalData(&sf_->service(), false));
705707
}
706708
if (ec) {
707709
exec_st->ReportError(ec);

src/server/rdb_load.cc

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2885,7 +2885,7 @@ void RdbLoader::LoadScriptFromAux(string&& body) {
28852885
namespace {
28862886

28872887
void LoadSearchCommandFromAux(Service* service, string&& def, string_view command_name,
2888-
string_view error_context) {
2888+
string_view error_context, bool ignore_exists_error = false) {
28892889
facade::CapturingReplyBuilder crb;
28902890

28912891
ConnectionContext cntx{nullptr, acl::UserCredentials{}};
@@ -2936,6 +2936,12 @@ void LoadSearchCommandFromAux(Service* service, string&& def, string_view comman
29362936

29372937
auto response = crb.Take();
29382938
if (auto err = facade::CapturingReplyBuilder::TryExtractError(response); err) {
2939+
// Ignore "Index already exists" errors when loading from per-shard DFS files
2940+
// since multiple shards may try to create the same index
2941+
if (ignore_exists_error && absl::StrContains(err->first, "already exists")) {
2942+
VLOG(1) << "Ignoring duplicate " << error_context << ": " << err->first;
2943+
return;
2944+
}
29392945
LOG(ERROR) << "Bad " << error_context << ": " << def << " " << err->first;
29402946
}
29412947
}
@@ -2954,7 +2960,7 @@ std::vector<std::string> RdbLoader::TakePendingSynonymCommands() {
29542960
void RdbLoader::LoadSearchIndexDefFromAux(string&& def) {
29552961
// Check if this is new JSON format (starts with '{') or old format ("index_name cmd")
29562962
if (!def.empty() && def[0] == '{') {
2957-
// New JSON format with HNSW metadata
2963+
// New JSON format with HNSW metadata (from summary file)
29582964
try {
29592965
auto json_opt = JsonFromString(def);
29602966
if (!json_opt) {
@@ -2969,13 +2975,17 @@ void RdbLoader::LoadSearchIndexDefFromAux(string&& def) {
29692975
// Currently we just restore the index definition, HNSW graph will be rebuilt
29702976

29712977
string full_cmd = absl::StrCat(index_name, " ", cmd);
2972-
LoadSearchCommandFromAux(service_, std::move(full_cmd), "FT.CREATE", "index definition");
2978+
// Ignore "already exists" errors since index may have been created by another shard file
2979+
LoadSearchCommandFromAux(service_, std::move(full_cmd), "FT.CREATE", "index definition",
2980+
/*ignore_exists_error=*/true);
29732981
} catch (const std::exception& e) {
29742982
LOG(ERROR) << "Failed to parse search index JSON: " << e.what() << " def: " << def;
29752983
}
29762984
} else {
2977-
// Old format: "index_name cmd" - for backwards compatibility
2978-
LoadSearchCommandFromAux(service_, std::move(def), "FT.CREATE", "index definition");
2985+
// Simple format: "index_name cmd" - from per-shard DFS files or old format
2986+
// Ignore "already exists" errors since multiple shards may try to create the same index
2987+
LoadSearchCommandFromAux(service_, std::move(def), "FT.CREATE", "index definition",
2988+
/*ignore_exists_error=*/true);
29792989
}
29802990
}
29812991

src/server/rdb_save.cc

Lines changed: 88 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -1409,78 +1409,104 @@ error_code RdbSaver::Impl::FlushSerializer() {
14091409
return ec;
14101410
}
14111411

1412-
RdbSaver::GlobalData RdbSaver::GetGlobalData(const Service* service) {
1413-
StringVec script_bodies, search_indices, search_synonyms;
1414-
1415-
{
1416-
auto scripts = service->script_mgr()->GetAll();
1417-
script_bodies.reserve(scripts.size());
1418-
for (auto& [sha, data] : scripts)
1419-
script_bodies.push_back(std::move(data.body));
1420-
}
1412+
namespace {
14211413

1422-
atomic<size_t> table_mem{0};
1423-
shard_set->RunBriefInParallel([&](EngineShard* shard) {
1414+
// Collect search index definitions. If as_json is true, collects JSON with HNSW metadata
1415+
// and synonyms (for summary file). Otherwise collects simple restore commands (for per-shard).
1416+
void CollectSearchIndices([[maybe_unused]] StringVec* search_indices,
1417+
[[maybe_unused]] StringVec* search_synonyms,
1418+
[[maybe_unused]] bool is_summary) {
14241419
#ifdef WITH_SEARCH
1425-
if (shard->shard_id() == 0) {
1426-
auto* indices = shard->search_indices();
1427-
for (const auto& index_name : indices->GetIndexNames()) {
1428-
auto* index = indices->GetIndex(index_name);
1429-
auto index_info = index->GetInfo();
1430-
1431-
// Collect HNSW metadata for vector field (first one found)
1432-
for (const auto& [fident, finfo] : index_info.base_index.schema.fields) {
1433-
if (finfo.type == search::SchemaField::VECTOR &&
1434-
!(finfo.flags & search::SchemaField::NOINDEX)) {
1435-
if (auto hnsw_index =
1436-
GlobalHnswIndexRegistry::Instance().Get(index_name, finfo.short_name);
1437-
hnsw_index) {
1438-
index_info.hnsw_metadata = hnsw_index->GetMetadata();
1439-
break; // Only store first HNSW index metadata
1440-
}
1441-
}
1442-
}
1420+
EngineShard* shard = EngineShard::tlocal();
1421+
if (!shard)
1422+
return;
14431423

1444-
// Save index definition as JSON with HNSW metadata
1445-
TmpJson index_json;
1446-
index_json["name"] = index_name;
1447-
index_json["cmd"] = index_info.BuildRestoreCommand();
1448-
1449-
if (index_info.hnsw_metadata.has_value()) {
1450-
const auto& meta = index_info.hnsw_metadata.value();
1451-
TmpJson hnsw_meta;
1452-
hnsw_meta["max_elements"] = meta.max_elements;
1453-
hnsw_meta["cur_element_count"] = meta.cur_element_count;
1454-
hnsw_meta["maxlevel"] = meta.maxlevel;
1455-
hnsw_meta["enterpoint_node"] = meta.enterpoint_node;
1456-
hnsw_meta["M"] = meta.M;
1457-
hnsw_meta["maxM"] = meta.maxM;
1458-
hnsw_meta["maxM0"] = meta.maxM0;
1459-
hnsw_meta["ef_construction"] = meta.ef_construction;
1460-
hnsw_meta["mult"] = meta.mult;
1461-
index_json["hnsw_metadata"] = std::move(hnsw_meta);
1462-
}
1424+
auto* indices = shard->search_indices();
1425+
for (const auto& index_name : indices->GetIndexNames()) {
1426+
auto* index = indices->GetIndex(index_name);
1427+
auto index_info = index->GetInfo();
1428+
1429+
if (!is_summary) {
1430+
std::string restore_cmd = absl::StrCat(index_name, " ", index_info.BuildRestoreCommand());
1431+
search_indices->emplace_back(std::move(restore_cmd));
1432+
continue;
1433+
}
14631434

1464-
search_indices.emplace_back(index_json.to_string());
1465-
1466-
// Save synonym groups to separate vector
1467-
const auto& synonym_groups = index->GetSynonyms().GetGroups();
1468-
for (const auto& [group_id, terms] : synonym_groups) {
1469-
if (!terms.empty()) {
1470-
// Format: "index_name group_id term1 term2 term3"
1471-
std::string syn_cmd =
1472-
absl::StrCat(index_name, " ", group_id, " ", absl::StrJoin(terms, " "));
1473-
search_synonyms.emplace_back(std::move(syn_cmd));
1474-
}
1435+
// Collect HNSW metadata for vector field (first one found)
1436+
for (const auto& [fident, finfo] : index_info.base_index.schema.fields) {
1437+
if (finfo.type == search::SchemaField::VECTOR &&
1438+
!(finfo.flags & search::SchemaField::NOINDEX)) {
1439+
if (auto hnsw_index = GlobalHnswIndexRegistry::Instance().Get(index_name, finfo.short_name);
1440+
hnsw_index) {
1441+
index_info.hnsw_metadata = hnsw_index->GetMetadata();
1442+
break;
14751443
}
14761444
}
14771445
}
1446+
1447+
// Save index definition as JSON with HNSW metadata
1448+
TmpJson index_json;
1449+
index_json["name"] = index_name;
1450+
index_json["cmd"] = index_info.BuildRestoreCommand();
1451+
1452+
if (index_info.hnsw_metadata.has_value()) {
1453+
const auto& meta = index_info.hnsw_metadata.value();
1454+
TmpJson hnsw_meta;
1455+
hnsw_meta["max_elements"] = meta.max_elements;
1456+
hnsw_meta["cur_element_count"] = meta.cur_element_count;
1457+
hnsw_meta["maxlevel"] = meta.maxlevel;
1458+
hnsw_meta["enterpoint_node"] = meta.enterpoint_node;
1459+
hnsw_meta["M"] = meta.M;
1460+
hnsw_meta["maxM"] = meta.maxM;
1461+
hnsw_meta["maxM0"] = meta.maxM0;
1462+
hnsw_meta["ef_construction"] = meta.ef_construction;
1463+
hnsw_meta["mult"] = meta.mult;
1464+
index_json["hnsw_metadata"] = std::move(hnsw_meta);
1465+
}
1466+
1467+
search_indices->emplace_back(index_json.to_string());
1468+
1469+
// Save synonym groups
1470+
const auto& synonym_groups = index->GetSynonyms().GetGroups();
1471+
for (const auto& [group_id, terms] : synonym_groups) {
1472+
if (!terms.empty()) {
1473+
std::string syn_cmd =
1474+
absl::StrCat(index_name, " ", group_id, " ", absl::StrJoin(terms, " "));
1475+
search_synonyms->emplace_back(std::move(syn_cmd));
1476+
}
1477+
}
1478+
}
14781479
#endif
1480+
}
1481+
1482+
} // namespace
1483+
1484+
RdbSaver::GlobalData RdbSaver::GetGlobalData(const Service* service, bool is_summary) {
1485+
StringVec script_bodies, search_indices, search_synonyms;
1486+
size_t table_mem_result = 0;
1487+
1488+
// For per-shard files: collect only search index restore commands from current shard
1489+
CollectSearchIndices(&search_indices, &search_synonyms, is_summary);
1490+
if (!is_summary) {
1491+
return RdbSaver::GlobalData{std::move(script_bodies), std::move(search_indices),
1492+
std::move(search_synonyms), table_mem_result};
1493+
}
1494+
1495+
// For summary file: collect all global data
1496+
auto scripts = service->script_mgr()->GetAll();
1497+
script_bodies.reserve(scripts.size());
1498+
for (auto& [sha, data] : scripts)
1499+
script_bodies.push_back(std::move(data.body));
1500+
1501+
// Collect search indices as JSON from shard 0
1502+
shard_set->Await(0, [&] { CollectSearchIndices(&search_indices, &search_synonyms, is_summary); });
1503+
1504+
atomic<size_t> table_mem{0};
1505+
shard_set->RunBriefInParallel([&](EngineShard* shard) {
14791506
auto& db_slice = namespaces->GetDefaultNamespace().GetDbSlice(shard->shard_id());
14801507
size_t shard_table_mem = 0;
14811508
for (size_t db_id = 0; db_id < db_slice.db_array_size(); ++db_id) {
14821509
auto* db_table = db_slice.GetDBTable(db_id);
1483-
14841510
if (db_table) {
14851511
shard_table_mem += db_table->table_memory();
14861512
}
@@ -1625,12 +1651,11 @@ error_code RdbSaver::SaveAux(const GlobalData& glob_state) {
16251651
if (!glob_state.search_indices.empty())
16261652
LOG(WARNING) << "Dragonfly search index data is incompatible with the RDB format";
16271653
} else {
1628-
// Search index definitions are not tied to shards and are saved in the summary file
1629-
DCHECK(save_mode_ != SaveMode::SINGLE_SHARD || glob_state.search_indices.empty());
1654+
// Search index definitions (JSON for summary, simple restore cmd for per-shard)
16301655
for (const string& s : glob_state.search_indices)
16311656
RETURN_ON_ERR(impl_->SaveAuxFieldStrStr("search-index", s));
16321657

1633-
// Save synonyms in separate aux fields
1658+
// Save synonyms only in summary file
16341659
DCHECK(save_mode_ != SaveMode::SINGLE_SHARD || glob_state.search_synonyms.empty());
16351660
for (const string& s : glob_state.search_synonyms)
16361661
RETURN_ON_ERR(impl_->SaveAuxFieldStrStr("search-synonyms", s));

src/server/rdb_save.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -133,8 +133,10 @@ class RdbSaver {
133133

134134
SnapshotStats GetCurrentSnapshotProgress() const;
135135

136-
// Fetch global data to be serialized in summary part of a snapshot / full sync.
137-
static GlobalData GetGlobalData(const Service* service);
136+
// Fetch global data to be serialized in snapshot.
137+
// is_summary: true for summary file (full data with JSON search indices),
138+
// false for per-shard files (only simple search index restore commands)
139+
static GlobalData GetGlobalData(const Service* service, bool is_summary);
138140

139141
// Returns time in nanos of start of the last pending write interaction.
140142
// Returns -1 if no write operations are currently pending.

0 commit comments

Comments
 (0)