Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions src/server/detail/save_stages_controller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,8 @@ void SaveStagesController::SaveDfsSingle(EngineShard* shard, const std::string&
auto& [snapshot, filename] = snapshots_[shard ? shard->shard_id() : shard_set->size()];

SaveMode mode = shard == nullptr ? SaveMode::SUMMARY : SaveMode::SINGLE_SHARD;
auto glob_data = shard == nullptr ? RdbSaver::GetGlobalData(service_) : RdbSaver::GlobalData{};
bool is_summary = (shard == nullptr);
auto glob_data = RdbSaver::GetGlobalData(service_, is_summary);

if (auto err = snapshot->Start(mode, filename, glob_data, snapshot_id); err) {
shared_err_ = err;
Expand All @@ -319,7 +320,9 @@ void SaveStagesController::SaveRdb() {
if (!snapshot_storage_->IsCloud())
filename += ".tmp";

if (auto err = snapshot->Start(SaveMode::RDB, filename, RdbSaver::GetGlobalData(service_), "");
// RDB is a summary file (contains all global data)
if (auto err =
snapshot->Start(SaveMode::RDB, filename, RdbSaver::GetGlobalData(service_, true), "");
err) {
snapshot.reset();
return;
Expand Down
6 changes: 4 additions & 2 deletions src/server/dflycmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -699,9 +699,11 @@ OpStatus DflyCmd::StartFullSyncInThread(FlowInfo* flow, ExecutionState* exec_st,
error_code ec;
RdbSaver* saver = flow->saver.get();
if (saver->Mode() == SaveMode::SUMMARY || saver->Mode() == SaveMode::SINGLE_SHARD_WITH_SUMMARY) {
ec = saver->SaveHeader(saver->GetGlobalData(&sf_->service()));
// Full sync summary - include all global data
ec = saver->SaveHeader(saver->GetGlobalData(&sf_->service(), true));
} else {
ec = saver->SaveHeader({});
// Per-shard - include only search index restore commands
ec = saver->SaveHeader(saver->GetGlobalData(&sf_->service(), false));
}
if (ec) {
exec_st->ReportError(ec);
Expand Down
33 changes: 27 additions & 6 deletions src/server/rdb_load.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2952,31 +2952,52 @@ std::vector<std::string> RdbLoader::TakePendingSynonymCommands() {
}

void RdbLoader::LoadSearchIndexDefFromAux(string&& def) {
string index_name;
string full_cmd;

// Check if this is new JSON format (starts with '{') or old format ("index_name cmd")
if (!def.empty() && def[0] == '{') {
// New JSON format with HNSW metadata
// New JSON format with HNSW metadata (from summary file)
try {
auto json_opt = JsonFromString(def);
if (!json_opt) {
LOG(ERROR) << "Invalid search index JSON: " << def;
return;
}
const auto& json = *json_opt;
string index_name = json["name"].as<string>();
index_name = json["name"].as<string>();
string cmd = json["cmd"].as<string>();

// TODO: restore HNSW metadata from json["hnsw_metadata"] if present
// Currently we just restore the index definition, HNSW graph will be rebuilt

string full_cmd = absl::StrCat(index_name, " ", cmd);
LoadSearchCommandFromAux(service_, std::move(full_cmd), "FT.CREATE", "index definition");
full_cmd = absl::StrCat(index_name, " ", cmd);
} catch (const std::exception& e) {
LOG(ERROR) << "Failed to parse search index JSON: " << e.what() << " def: " << def;
return;
}
} else {
// Old format: "index_name cmd" - for backwards compatibility
LoadSearchCommandFromAux(service_, std::move(def), "FT.CREATE", "index definition");
// Simple format: "index_name cmd" - from per-shard DFS files or old format
// Extract index name (first token before space)
size_t space_pos = def.find(' ');
if (space_pos == string::npos) {
LOG(ERROR) << "Invalid search index definition: " << def;
return;
}
index_name = def.substr(0, space_pos);
full_cmd = std::move(def);
}

// Check if index already exists (may have been created by another shard file)
bool exists = shard_set->Await(
0, [&] { return EngineShard::tlocal()->search_indices()->GetIndex(index_name) != nullptr; });

if (exists) {
VLOG(1) << "Index already exists, skipping: " << index_name;
return;
}

LoadSearchCommandFromAux(service_, std::move(full_cmd), "FT.CREATE", "index definition");
}

void RdbLoader::LoadSearchSynonymsFromAux(string&& def) {
Expand Down
143 changes: 84 additions & 59 deletions src/server/rdb_save.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1409,10 +1409,89 @@ error_code RdbSaver::Impl::FlushSerializer() {
return ec;
}

RdbSaver::GlobalData RdbSaver::GetGlobalData(const Service* service) {
namespace {

// Collect search index definitions. If as_json is true, collects JSON with HNSW metadata
// and synonyms (for summary file). Otherwise collects simple restore commands (for per-shard).
void CollectSearchIndices([[maybe_unused]] EngineShard* shard,
[[maybe_unused]] StringVec* search_indices,
[[maybe_unused]] StringVec* search_synonyms,
[[maybe_unused]] bool is_summary) {
#ifdef WITH_SEARCH
auto* indices = shard->search_indices();
for (const auto& index_name : indices->GetIndexNames()) {
auto* index = indices->GetIndex(index_name);
auto index_info = index->GetInfo();

if (!is_summary) {
std::string restore_cmd = absl::StrCat(index_name, " ", index_info.BuildRestoreCommand());
search_indices->emplace_back(std::move(restore_cmd));
continue;
}

// Collect HNSW metadata for vector field (first one found)
for (const auto& [fident, finfo] : index_info.base_index.schema.fields) {
if (finfo.type == search::SchemaField::VECTOR &&
!(finfo.flags & search::SchemaField::NOINDEX)) {
if (auto hnsw_index = GlobalHnswIndexRegistry::Instance().Get(index_name, finfo.short_name);
hnsw_index) {
index_info.hnsw_metadata = hnsw_index->GetMetadata();
break;
}
}
}

// Save index definition as JSON with HNSW metadata
TmpJson index_json;
index_json["name"] = index_name;
index_json["cmd"] = index_info.BuildRestoreCommand();

if (index_info.hnsw_metadata.has_value()) {
const auto& meta = index_info.hnsw_metadata.value();
TmpJson hnsw_meta;
hnsw_meta["max_elements"] = meta.max_elements;
hnsw_meta["cur_element_count"] = meta.cur_element_count;
hnsw_meta["maxlevel"] = meta.maxlevel;
hnsw_meta["enterpoint_node"] = meta.enterpoint_node;
hnsw_meta["M"] = meta.M;
hnsw_meta["maxM"] = meta.maxM;
hnsw_meta["maxM0"] = meta.maxM0;
hnsw_meta["ef_construction"] = meta.ef_construction;
hnsw_meta["mult"] = meta.mult;
index_json["hnsw_metadata"] = std::move(hnsw_meta);
}

search_indices->emplace_back(index_json.to_string());

// Save synonym groups
const auto& synonym_groups = index->GetSynonyms().GetGroups();
for (const auto& [group_id, terms] : synonym_groups) {
if (!terms.empty()) {
std::string syn_cmd =
absl::StrCat(index_name, " ", group_id, " ", absl::StrJoin(terms, " "));
search_synonyms->emplace_back(std::move(syn_cmd));
}
}
}
#endif
}

} // namespace

RdbSaver::GlobalData RdbSaver::GetGlobalData(const Service* service, bool is_summary) {
StringVec script_bodies, search_indices, search_synonyms;
size_t table_mem_result = 0;

if (!is_summary) {
shard_set->RunBriefInParallel([&](EngineShard* shard) {
if (shard->shard_id() == 0)
CollectSearchIndices(shard, &search_indices, &search_synonyms, is_summary);
});
return RdbSaver::GlobalData{std::move(script_bodies), std::move(search_indices),
std::move(search_synonyms), table_mem_result};
}
{
// For summary file: collect all global data
auto scripts = service->script_mgr()->GetAll();
script_bodies.reserve(scripts.size());
for (auto& [sha, data] : scripts)
Expand All @@ -1421,66 +1500,13 @@ RdbSaver::GlobalData RdbSaver::GetGlobalData(const Service* service) {

atomic<size_t> table_mem{0};
shard_set->RunBriefInParallel([&](EngineShard* shard) {
#ifdef WITH_SEARCH
if (shard->shard_id() == 0) {
auto* indices = shard->search_indices();
for (const auto& index_name : indices->GetIndexNames()) {
auto* index = indices->GetIndex(index_name);
auto index_info = index->GetInfo();

// Collect HNSW metadata for vector field (first one found)
for (const auto& [fident, finfo] : index_info.base_index.schema.fields) {
if (finfo.type == search::SchemaField::VECTOR &&
!(finfo.flags & search::SchemaField::NOINDEX)) {
if (auto hnsw_index =
GlobalHnswIndexRegistry::Instance().Get(index_name, finfo.short_name);
hnsw_index) {
index_info.hnsw_metadata = hnsw_index->GetMetadata();
break; // Only store first HNSW index metadata
}
}
}

// Save index definition as JSON with HNSW metadata
TmpJson index_json;
index_json["name"] = index_name;
index_json["cmd"] = index_info.BuildRestoreCommand();

if (index_info.hnsw_metadata.has_value()) {
const auto& meta = index_info.hnsw_metadata.value();
TmpJson hnsw_meta;
hnsw_meta["max_elements"] = meta.max_elements;
hnsw_meta["cur_element_count"] = meta.cur_element_count;
hnsw_meta["maxlevel"] = meta.maxlevel;
hnsw_meta["enterpoint_node"] = meta.enterpoint_node;
hnsw_meta["M"] = meta.M;
hnsw_meta["maxM"] = meta.maxM;
hnsw_meta["maxM0"] = meta.maxM0;
hnsw_meta["ef_construction"] = meta.ef_construction;
hnsw_meta["mult"] = meta.mult;
index_json["hnsw_metadata"] = std::move(hnsw_meta);
}
if (shard->shard_id() == 0)
CollectSearchIndices(shard, &search_indices, &search_synonyms, is_summary);

search_indices.emplace_back(index_json.to_string());

// Save synonym groups to separate vector
const auto& synonym_groups = index->GetSynonyms().GetGroups();
for (const auto& [group_id, terms] : synonym_groups) {
if (!terms.empty()) {
// Format: "index_name group_id term1 term2 term3"
std::string syn_cmd =
absl::StrCat(index_name, " ", group_id, " ", absl::StrJoin(terms, " "));
search_synonyms.emplace_back(std::move(syn_cmd));
}
}
}
}
#endif
auto& db_slice = namespaces->GetDefaultNamespace().GetDbSlice(shard->shard_id());
size_t shard_table_mem = 0;
for (size_t db_id = 0; db_id < db_slice.db_array_size(); ++db_id) {
auto* db_table = db_slice.GetDBTable(db_id);

if (db_table) {
shard_table_mem += db_table->table_memory();
}
Expand Down Expand Up @@ -1625,12 +1651,11 @@ error_code RdbSaver::SaveAux(const GlobalData& glob_state) {
if (!glob_state.search_indices.empty())
LOG(WARNING) << "Dragonfly search index data is incompatible with the RDB format";
} else {
// Search index definitions are not tied to shards and are saved in the summary file
DCHECK(save_mode_ != SaveMode::SINGLE_SHARD || glob_state.search_indices.empty());
// Search index definitions (JSON for summary, simple restore cmd for per-shard)
for (const string& s : glob_state.search_indices)
RETURN_ON_ERR(impl_->SaveAuxFieldStrStr("search-index", s));

// Save synonyms in separate aux fields
// Save synonyms only in summary file
DCHECK(save_mode_ != SaveMode::SINGLE_SHARD || glob_state.search_synonyms.empty());
for (const string& s : glob_state.search_synonyms)
RETURN_ON_ERR(impl_->SaveAuxFieldStrStr("search-synonyms", s));
Expand Down
6 changes: 4 additions & 2 deletions src/server/rdb_save.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,10 @@ class RdbSaver {

SnapshotStats GetCurrentSnapshotProgress() const;

// Fetch global data to be serialized in summary part of a snapshot / full sync.
static GlobalData GetGlobalData(const Service* service);
// Fetch global data to be serialized in snapshot.
// is_summary: true for summary file (full data with JSON search indices),
// false for per-shard files (only simple search index restore commands)
static GlobalData GetGlobalData(const Service* service, bool is_summary);

// Returns time in nanos of start of the last pending write interaction.
// Returns -1 if no write operations are currently pending.
Expand Down
Loading